Excutors lost and disconnecting in EMR


#1

Hey!

Since a couple of days, I’m getting an issue I’m not able to resolve for now. My EMR job fails during the shredding step (that already takes significantly longer than the enrichment step).

The issues I’m seeing in the logs relate to executors being lost because of closed connections or timeouts.

Examples I can find:

18/02/03 14:40:03 WARN TransportChannelHandler: Exception in connection from /172.31.16.113:54972
java.io.IOException: Connection reset by peer
	at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
	at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
	at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
	at sun.nio.ch.IOUtil.read(IOUtil.java:192)
	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
	at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221)
	at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899)
	at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:275)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:652)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451)
	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
	at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
	at java.lang.Thread.run(Thread.java:748)
18/02/03 14:40:03 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 5.
18/02/03 14:40:03 INFO DAGScheduler: Executor lost: 5 (epoch 1)
18/02/03 14:40:03 INFO BlockManagerMasterEndpoint: Trying to remove executor 5 from BlockManagerMaster.
18/02/03 14:40:03 INFO BlockManagerMaster: Removed 5 successfully in removeExecutor
18/02/03 14:40:03 ERROR YarnClusterScheduler: Lost executor 5 on ip-172-31-16-113.ec2.internal: Container container_1517648233216_0006_01_041190 exited from explicit termination request.
...

Or another one:

18/02/05 16:01:00 INFO TransportClientFactory: Successfully created connection to ip-172-31-26-187.ec2.internal/172.31.26.187:7337 after 1 ms (0 ms spent in bootstraps)
18/02/05 16:01:00 INFO BlockManager: Initialized BlockManager: BlockManagerId(13, ip-172-31-26-187.ec2.internal, 44825, None)
18/02/05 16:01:00 INFO CoarseGrainedExecutorBackend: Got assigned task 2584
18/02/05 16:01:00 INFO CoarseGrainedExecutorBackend: Got assigned task 2585
18/02/05 16:01:00 INFO CoarseGrainedExecutorBackend: Got assigned task 2586
18/02/05 16:01:00 INFO CoarseGrainedExecutorBackend: Got assigned task 2587
18/02/05 16:01:00 INFO CoarseGrainedExecutorBackend: Got assigned task 2588
18/02/05 16:01:00 INFO CoarseGrainedExecutorBackend: Got assigned task 2589
18/02/05 16:01:00 INFO CoarseGrainedExecutorBackend: Got assigned task 2590
18/02/05 16:01:00 INFO CoarseGrainedExecutorBackend: Got assigned task 2591
18/02/05 16:01:00 INFO CoarseGrainedExecutorBackend: Got assigned task 2592
18/02/05 16:01:00 INFO Executor: Running task 380.2 in stage 3.0 (TID 2591)
18/02/05 16:01:00 INFO Executor: Running task 2.3 in stage 3.0 (TID 2584)
18/02/05 16:01:00 INFO Executor: Running task 117.1 in stage 3.0 (TID 2586)
18/02/05 16:01:00 INFO Executor: Running task 74.3 in stage 3.0 (TID 2585)
org.apache.spark.SparkException: Exception thrown in awaitResult
	at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)
	at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
	at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
	at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
	at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
	at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
	at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:538)
	at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:567)
	at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:567)
	at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:567)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1951)
	at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:567)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Connection from /172.31.19.107:44075 closed
	at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:128)
	at org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:109)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:251)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:230)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
	at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:257)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:251)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:230)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:251)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:230)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
	at org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:182)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:251)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:230)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1289)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:251)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
	at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:893)
	at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:691)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:408)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:455)
	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
	at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
	... 1 more
18/02/05 16:09:51 INFO Executor: Executor killed task 116.1 in stage 3.0 (TID 2643)
18/02/05 16:09:51 INFO Executor: Executor killed task 8.3 in stage 3.0 (TID 2622)
18/02/05 16:09:51 INFO Executor: Executor killed task 89.3 in stage 3.0 (TID 2635)
18/02/05 16:09:51 INFO Executor: Executor killed task 105.2 in stage 3.0 (TID 2641)
18/02/05 16:09:51 INFO Executor: Executor killed task 16.3 in stage 3.0 (TID 2590)
....

I’ve been trying with different instance types and counts, for the whole pipeline and only on the shredding step (resume-from=shred) and it still fails. I’m running R92. I’d say that my enriched volume is about 100G (800Mb per file in average, 1.5Gb max).

Did anyone encounter this issue already? Does anyone have a hint at a config setting I could try to change ?

PS: It looks like the issue also shows up in successful jobs. But when the retries fail as well, it triggers timeouts and crashes the job (at least it’s what I think is happening).

Thanks for your help!


#2

Hi @Timmycarbone.

Is it the size of uncompressed files? Do you use any Spark configuration options?

If it’s an uncompressed size you can try the following configuration:

      master_instance_type: "m4.xlarge"
      core_instance_count: 3
      core_instance_type: "r4.8xlarge"
      core_instance_ebs:        
        volume_size: 320        
        volume_type: "gp2"
        ebs_optimized: true     
    ...
    bootstrap_failure_tries: 3
    configuration:
      yarn-site:
        yarn.nodemanager.vmem-check-enabled: "false"
        yarn.nodemanager.resource.memory-mb: "245760"
        yarn.scheduler.maximum-allocation-mb: "245760"
      spark:
        maximizeResourceAllocation: "false"
      spark-defaults:
        spark.dynamicAllocation.enabled: "false"
        spark.executor.instances: "44"
        spark.yarn.executor.memoryOverhead: "3072"
        spark.executor.memory: "13G" 
        spark.executor.cores: "2"
        spark.yarn.driver.memoryOverhead: "3072"
        spark.driver.memory: "13G"
        spark.driver.cores: "2"
        spark.default.parallelism: "352"

and in order, if you use Clojure collector you can upgrade the pipeline to R97 as it gives a significant speedup in the performance of the Spark Enrich job (link)

Hope this helps.


#3

Thanks a lot for the advised configuration. I’m trying it out tomorrow.
Re-running somehow worked this time …

I’ve upgraded to the latest version as well.

I don’t use any Spark configuration options (except the maximizeResourceAllocation that was set to “true”), so this helps a lot, thanks again!


#4

I used your advised settings and switched my environment to use VPC and it’s pure gold now. It also went down from 4-5 hours runs to 1h. Thanks a lot !

I have another issue now but it’s unrelated. Will search for a solution or post another topic.

Thanks thanks thanks!


#5

Hi @Timmycarbone,

Good news!

You can also find this article quite useful for Spark tuning. It contains a description of the process and that’s more important - a spreadsheet which can be used in case if you decide to optimize or change the configuration.