Shredded/bad-rows output directory already exists


#1

Hi

I’ve had a second odd failure in the 0.12.0 version of the RDB shredder. The step failed after 7 minutes with the error message

User class threw exception: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory s3://metail-dataeng-snowplow-dev-data/shredded/bad-rows/run=2017-09-11-08-28-37 already exists

The directory does exist but it can’t have until the run was started, this is just a standard Snowplow setup. Looking in the that directory I can see part files 0-13 with 0-11 created at 09:04:47, and 12 and 13 at 09:04:50.

This is the second time I’ve had this happen although that’s in probably more than 10 runs. The only thing I was obviously doing wrong this time was my event deduplication DynamoDB table throughput was set 1, I can’t remember whether it would have been underpowered the first time I encountered this problem (last week).

This is in a test set up so it’s not running regularly at the moment.

Any thoughts on what I might have done wrong?

Thanks
Gareth


#2

I didn’t encounter this problem again during my testing but our second live run failed with this problem. This with the R92 Maidan Castle release.

To save me a bit of time wandering through the code base, how is the shredder code writing the bad rows? would it try to overwrite the part files?

Thanks
Gareth


#3

No, it wouldn’t try to override contents of the run folder because the writing of the bad rows is supposed to create it.

For reference, the code is here.


#4

I think I know what’s going on. There’s an exception which cases the container to die, yarn then restarts the container and it re-runs. However the container died after the shredding process has created the part file so on restart it tries to write out the file again. That’s the best explanation I can come up with.

The cause is my DynamoDB (DDB) throughput. I wrote a small wrapper around the Snowplow runner to kick it off response to an S3 event. It turns up the DDB throughput, run the runner and then turns it down. I think there’s a race condition if runs start back-to-back (catching up with history, not normal running) where DDB is still turning the throughput down so reports the run time value and my code never tries to set it. Thus when the shredding tries to de-dup there’s not enough throughput on the table.


#5

mmmh interesting, are you sure about the container dying and being recreated?
Because, if so, it would make sense for us to overwrite what already exists.


#6

No I’m not :slight_smile: however if you like I can send over the log files I’m looking at to see if you agree. It’ll be tomorrow morning though as I need to head off.

Container might also be the wrong word, I’ve been looking at stderr in directories with paths like containers/application_1506440142831_0006/container_1506440142831_0006_01_000001/.


#7

Yup, please do, interested to see what the yarn containers logs say


#8

I am also getting the same error, If I delete the directory and rerun it, it works fine but fails again on next scheduled run.

Here is the stderr of the container that failed.

   SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/mnt/yarn/usercache/hadoop/filecache/13/__spark_libs__5691570934057986767.zip/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
17/09/28 04:21:28 INFO SignalUtils: Registered signal handler for TERM
17/09/28 04:21:28 INFO SignalUtils: Registered signal handler for HUP
17/09/28 04:21:28 INFO SignalUtils: Registered signal handler for INT
17/09/28 04:21:29 INFO ApplicationMaster: Preparing Local resources
17/09/28 04:21:30 INFO ApplicationMaster: ApplicationAttemptId: appattempt_1506569545779_0006_000001
17/09/28 04:21:30 INFO SecurityManager: Changing view acls to: yarn,hadoop
17/09/28 04:21:30 INFO SecurityManager: Changing modify acls to: yarn,hadoop
17/09/28 04:21:30 INFO SecurityManager: Changing view acls groups to:
17/09/28 04:21:30 INFO SecurityManager: Changing modify acls groups to:
17/09/28 04:21:30 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(yarn, hadoop); groups with view permissions: Set(); users  with modify permissions: Set(yarn, hadoop); groups with modify permissions: Set()
17/09/28 04:21:30 INFO ApplicationMaster: Starting the user application in a separate Thread
17/09/28 04:21:30 INFO ApplicationMaster: Waiting for spark context initialization...
17/09/28 04:21:30 INFO SparkContext: Running Spark version 2.1.0
17/09/28 04:21:30 INFO SecurityManager: Changing view acls to: yarn,hadoop
17/09/28 04:21:30 INFO SecurityManager: Changing modify acls to: yarn,hadoop
17/09/28 04:21:30 INFO SecurityManager: Changing view acls groups to:
17/09/28 04:21:30 INFO SecurityManager: Changing modify acls groups to:
17/09/28 04:21:30 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(yarn, hadoop); groups with view permissions: Set(); users  with modify permissions: Set(yarn, hadoop); groups with modify permissions: Set()
17/09/28 04:21:30 INFO Utils: Successfully started service 'sparkDriver' on port 42800.
17/09/28 04:21:30 INFO SparkEnv: Registering MapOutputTracker
17/09/28 04:21:30 INFO SparkEnv: Registering BlockManagerMaster
17/09/28 04:21:30 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
17/09/28 04:21:30 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
17/09/28 04:21:30 INFO DiskBlockManager: Created local directory at /mnt/yarn/usercache/hadoop/appcache/application_1506569545779_0006/blockmgr-c7a57b31-420c-47ae-af79-c82cc7a818b7
17/09/28 04:21:30 INFO MemoryStore: MemoryStore started with capacity 654.8 MB
17/09/28 04:21:30 INFO SparkEnv: Registering OutputCommitCoordinator
17/09/28 04:21:30 INFO JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
17/09/28 04:21:31 INFO Utils: Successfully started service 'SparkUI' on port 33462.
17/09/28 04:21:31 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://10.103.5.13:33462
17/09/28 04:21:31 INFO YarnClusterScheduler: Created YarnClusterScheduler
17/09/28 04:21:31 INFO SchedulerExtensionServices: Starting Yarn extension services with app application_1506569545779_0006 and attemptId Some(appattempt_1506569545779_0006_000001)
17/09/28 04:21:31 INFO Utils: Using initial executors = 2, max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances
17/09/28 04:21:31 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 45952.
17/09/28 04:21:31 INFO NettyBlockTransferService: Server created on 10.103.5.13:45952
17/09/28 04:21:31 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
17/09/28 04:21:31 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 10.103.5.13, 45952, None)
17/09/28 04:21:31 INFO BlockManagerMasterEndpoint: Registering block manager 10.103.5.13:45952 with 654.8 MB RAM, BlockManagerId(driver, 10.103.5.13, 45952, None)
17/09/28 04:21:31 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 10.103.5.13, 45952, None)
17/09/28 04:21:31 INFO BlockManager: external shuffle service port = 7337
17/09/28 04:21:31 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 10.103.5.13, 45952, None)
17/09/28 04:21:31 INFO EventLoggingListener: Logging events to hdfs:///var/log/spark/apps/application_1506569545779_0006_1
17/09/28 04:21:31 INFO Utils: Using initial executors = 2, max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances
17/09/28 04:21:31 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!
17/09/28 04:21:31 INFO YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster registered as NettyRpcEndpointRef(spark://YarnAM@10.103.5.13:42800)
17/09/28 04:21:32 INFO ApplicationMaster:
===============================================================================
YARN executor launch context:
  env:
    CLASSPATH -> /usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*<CPS>{{PWD}}<CPS>{{PWD}}/__spark_conf__<CPS>{{PWD}}/__spark_libs__/*<CPS>$HADOOP_CONF_DIR<CPS>$HADOOP_COMMON_HOME/*<CPS>$HADOOP_COMMON_HOME/lib/*<CPS>$HADOOP_HDFS_HOME/*<CPS>$HADOOP_HDFS_HOME/lib/*<CPS>$HADOOP_MAPRED_HOME/*<CPS>$HADOOP_MAPRED_HOME/lib/*<CPS>$HADOOP_YARN_HOME/*<CPS>$HADOOP_YARN_HOME/lib/*<CPS>/usr/lib/hadoop-lzo/lib/*<CPS>/usr/share/aws/emr/emrfs/conf<CPS>/usr/share/aws/emr/emrfs/lib/*<CPS>/usr/share/aws/emr/emrfs/auxlib/*<CPS>/usr/share/aws/emr/lib/*<CPS>/usr/share/aws/emr/ddb/lib/emr-ddb-hadoop.jar<CPS>/usr/share/aws/emr/goodies/lib/emr-hadoop-goodies.jar<CPS>/usr/share/aws/emr/kinesis/lib/emr-kinesis-hadoop.jar<CPS>/usr/lib/spark/yarn/lib/datanucleus-api-jdo.jar<CPS>/usr/lib/spark/yarn/lib/datanucleus-core.jar<CPS>/usr/lib/spark/yarn/lib/datanucleus-rdbms.jar<CPS>/usr/share/aws/emr/cloudwatch-sink/lib/*<CPS>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*<CPS>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*<CPS>/usr/lib/hadoop-lzo/lib/*<CPS>/usr/share/aws/emr/emrfs/conf<CPS>/usr/share/aws/emr/emrfs/lib/*<CPS>/usr/share/aws/emr/emrfs/auxlib/*<CPS>/usr/share/aws/emr/lib/*<CPS>/usr/share/aws/emr/ddb/lib/emr-ddb-hadoop.jar<CPS>/usr/share/aws/emr/goodies/lib/emr-hadoop-goodies.jar<CPS>/usr/share/aws/emr/kinesis/lib/emr-kinesis-hadoop.jar<CPS>/usr/share/aws/emr/cloudwatch-sink/lib/*
    SPARK_YARN_STAGING_DIR -> hdfs://ip-10-103-5-222.ec2.internal:8020/user/hadoop/.sparkStaging/application_1506569545779_0006
    SPARK_USER -> hadoop
    SPARK_YARN_MODE -> true

  command:
    LD_LIBRARY_PATH="/usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native:$LD_LIBRARY_PATH" \
      {{JAVA_HOME}}/bin/java \
      -server \
      -Xmx512m \
      '-verbose:gc' \
      '-XX:+PrintGCDetails' \
      '-XX:+PrintGCDateStamps' \
      '-XX:+UseConcMarkSweepGC' \
      '-XX:CMSInitiatingOccupancyFraction=70' \
      '-XX:MaxHeapFreeRatio=70' \
      '-XX:+CMSClassUnloadingEnabled' \
      '-XX:OnOutOfMemoryError=kill -9 %p' \
      -Djava.io.tmpdir={{PWD}}/tmp \
      '-Dspark.history.ui.port=18080' \
      -Dspark.yarn.app.container.log.dir=<LOG_DIR> \
      org.apache.spark.executor.CoarseGrainedExecutorBackend \
      --driver-url \
      spark://CoarseGrainedScheduler@10.103.5.13:42800 \
      --executor-id \
      <executorId> \
      --hostname \
      <hostname> \
      --cores \
      2 \
      --app-id \
      application_1506569545779_0006 \
      --user-class-path \
      file:$PWD/__app__.jar \
      1><LOG_DIR>/stdout \
      2><LOG_DIR>/stderr

  resources:
    __app__.jar -> resource { scheme: "hdfs" host: "ip-10-103-5-222.ec2.internal" port: 8020 file: "/user/hadoop/.sparkStaging/application_1506569545779_0006/snowplow-rdb-shredder-0.12.0.jar" } size: 76600934 timestamp: 1506572483992 type: FILE visibility: PRIVATE
    __spark_libs__ -> resource { scheme: "hdfs" host: "ip-10-103-5-222.ec2.internal" port: 8020 file: "/user/hadoop/.sparkStaging/application_1506569545779_0006/__spark_libs__5691570934057986767.zip" } size: 196696785 timestamp: 1506572473554 type: ARCHIVE visibility: PRIVATE
    __spark_conf__ -> resource { scheme: "hdfs" host: "ip-10-103-5-222.ec2.internal" port: 8020 file: "/user/hadoop/.sparkStaging/application_1506569545779_0006/__spark_conf__.zip" } size: 74422 timestamp: 1506572484172 type: ARCHIVE visibility: PRIVATE

===============================================================================
17/09/28 04:21:32 INFO RMProxy: Connecting to ResourceManager at ip-10-103-5-222.ec2.internal/10.103.5.222:8030
17/09/28 04:21:32 INFO YarnRMClient: Registering the ApplicationMaster
17/09/28 04:21:32 INFO Utils: Using initial executors = 2, max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances
17/09/28 04:21:32 INFO YarnAllocator: Will request 2 executor container(s), each with 2 core(s) and 896 MB memory (including 384 MB of overhead)
17/09/28 04:21:32 INFO YarnAllocator: Submitted 2 unlocalized container requests.
17/09/28 04:21:32 INFO ApplicationMaster: Started progress reporter thread with (heartbeat : 3000, initial allocation : 200) intervals
17/09/28 04:21:33 INFO AMRMClientImpl: Received new token for : ip-10-103-5-9.ec2.internal:8041
17/09/28 04:21:33 INFO YarnAllocator: Launching container container_1506569545779_0006_01_000002 on host ip-10-103-5-9.ec2.internal
17/09/28 04:21:33 INFO YarnAllocator: Received 1 containers from YARN, launching executors on 1 of them.
17/09/28 04:21:33 INFO ContainerManagementProtocolProxy: yarn.client.max-cached-nodemanagers-proxies : 0
17/09/28 04:21:33 INFO ContainerManagementProtocolProxy: Opening proxy : ip-10-103-5-9.ec2.internal:8041
17/09/28 04:21:35 INFO YarnAllocator: Launching container container_1506569545779_0006_01_000003 on host ip-10-103-5-9.ec2.internal
17/09/28 04:21:35 INFO YarnAllocator: Received 1 containers from YARN, launching executors on 1 of them.
17/09/28 04:21:35 INFO ContainerManagementProtocolProxy: yarn.client.max-cached-nodemanagers-proxies : 0
17/09/28 04:21:35 INFO ContainerManagementProtocolProxy: Opening proxy : ip-10-103-5-9.ec2.internal:8041
17/09/28 04:21:41 INFO YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(null) (10.103.5.9:38700) with ID 1
17/09/28 04:21:41 INFO YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(null) (10.103.5.9:38702) with ID 2
17/09/28 04:21:41 INFO YarnClusterSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.8
17/09/28 04:21:41 INFO YarnClusterScheduler: YarnClusterScheduler.postStartHook done
17/09/28 04:21:41 INFO ExecutorAllocationManager: New executor 1 has registered (new total is 1)
17/09/28 04:21:41 INFO ExecutorAllocationManager: New executor 2 has registered (new total is 2)
17/09/28 04:21:41 INFO SharedState: Warehouse path is 'hdfs:///user/spark/warehouse'.
17/09/28 04:21:41 INFO BlockManagerMasterEndpoint: Registering block manager ip-10-103-5-9.ec2.internal:43276 with 117.2 MB RAM, BlockManagerId(2, ip-10-103-5-9.ec2.internal, 43276, None)
17/09/28 04:21:41 INFO BlockManagerMasterEndpoint: Registering block manager ip-10-103-5-9.ec2.internal:41215 with 117.2 MB RAM, BlockManagerId(1, ip-10-103-5-9.ec2.internal, 41215, None)
17/09/28 04:21:42 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 299.5 KB, free 654.5 MB)
17/09/28 04:21:43 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 26.1 KB, free 654.5 MB)
17/09/28 04:21:43 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.103.5.13:45952 (size: 26.1 KB, free: 654.8 MB)
17/09/28 04:21:43 INFO SparkContext: Created broadcast 0 from textFile at ShredJob.scala:340
17/09/28 04:21:45 ERROR ApplicationMaster: User class threw exception: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory s3://our-s3-bucket/enriched/bad/run=2017-09-28-13-25-11 already exists
org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory s3://our-s3-bucket/enriched/bad/run=2017-09-28-13-25-11 already exists
	at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:131)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1191)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1168)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1168)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1168)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1071)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1037)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1037)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1037)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:963)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:963)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:963)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:962)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1488)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1467)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1467)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1467)
	at com.snowplowanalytics.snowplow.storage.spark.ShredJob.run(ShredJob.scala:400)
	at com.snowplowanalytics.snowplow.storage.spark.ShredJob$.run(ShredJob.scala:78)
	at com.snowplowanalytics.snowplow.storage.spark.SparkJob$class.main(SparkJob.scala:32)
	at com.snowplowanalytics.snowplow.storage.spark.ShredJob$.main(ShredJob.scala:47)
	at com.snowplowanalytics.snowplow.storage.spark.ShredJob.main(ShredJob.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:637)
17/09/28 04:21:45 INFO ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory s3://our-s3-bucket/enriched/bad/run=2017-09-28-13-25-11 already exists)
17/09/28 04:21:45 INFO SparkContext: Invoking stop() from shutdown hook
17/09/28 04:21:45 INFO SparkUI: Stopped Spark web UI at http://10.103.5.13:33462
17/09/28 04:21:45 INFO YarnAllocator: Driver requested a total number of 0 executor(s).
17/09/28 04:21:45 INFO YarnClusterSchedulerBackend: Shutting down all executors
17/09/28 04:21:45 INFO YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down
17/09/28 04:21:45 INFO SchedulerExtensionServices: Stopping SchedulerExtensionServices
(serviceOption=None,
 services=List(),
 started=false)
17/09/28 04:21:45 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
17/09/28 04:21:45 INFO MemoryStore: MemoryStore cleared
17/09/28 04:21:45 INFO BlockManager: BlockManager stopped
17/09/28 04:21:45 INFO BlockManagerMaster: BlockManagerMaster stopped
17/09/28 04:21:45 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
17/09/28 04:21:45 WARN Dispatcher: Message RemoteProcessDisconnected(10.103.5.9:38700) dropped. RpcEnv already stopped.
17/09/28 04:21:45 WARN Dispatcher: Message RemoteProcessDisconnected(10.103.5.9:38700) dropped. RpcEnv already stopped.
17/09/28 04:21:45 WARN Dispatcher: Message RemoteProcessDisconnected(10.103.5.9:38700) dropped. RpcEnv already stopped.
17/09/28 04:21:45 WARN Dispatcher: Message RemoteProcessDisconnected(10.103.5.9:38700) dropped. RpcEnv already stopped.
17/09/28 04:21:45 INFO SparkContext: Successfully stopped SparkContext
17/09/28 04:21:45 INFO ApplicationMaster: Unregistering ApplicationMaster with FAILED (diag message: User class threw exception: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory s3://our-s3-bucket/enriched/bad/run=2017-09-28-13-25-11 already exists)
17/09/28 04:21:45 INFO AMRMClientImpl: Waiting for application to be successfully unregistered.
17/09/28 04:21:45 INFO ApplicationMaster: Deleting staging directory hdfs://ip-10-103-5-222.ec2.internal:8020/user/hadoop/.sparkStaging/application_1506569545779_0006
17/09/28 04:21:45 INFO ShutdownHookManager: Shutdown hook called
17/09/28 04:21:45 INFO ShutdownHookManager: Deleting directory /mnt/yarn/usercache/hadoop/appcache/application_1506569545779_0006/spark-430e1580-e28d-4b97-b363-20456111af43

#9

Has it already been run?

ERROR ApplicationMaster: User class threw exception: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory s3://logistics.snowplow.enriched.bellroy/enriched/bad/run=2017-09-28-13-25-11 already exists

would suggest that it’s failing because the directory already exists.


#10

That directory was definitely created on the same run (probably by the same worker or a different one). I had the same issue earlier and deleted the whole bad directory before the run to make sure it doesn’t fail.

Yeah, it’s failing because the directory exists but the directory was not there before the run and something from the same run created it.

It also failed on the shredding step, not sure if that is supposed to write something in the enriched/bad directory.

Screen of EMR console if that helps:


#11

@BenFradet I tried to up load a tgz of the log files but discourse only allows images and does some checks that you’ve not just changed the extension on your archive/virsus :wink: You can find the log archive here. I’ll explain what I see and where to see what you think. This might also help @joycse06 see if there’s a similar cause for his error as I also have the same output in containers/application_1506440142831_0006/container_1506440142831_0006_02_000001/stderr with the FileAlreadyExistsException.

I found my errors by doing grep -nr Exception ./ and reading through the results but perhaps a faster pointer is that there is only on application folder that has only stderr files in some of the directories

└── [4.0K]  application_1506440142831_0006
├── [   0]  container_1506440142831_0006_01_000001
│   └── [120K]  stderr
├── [   0]  container_1506440142831_0006_01_000002
│   ├── [ 38K]  stderr
│   └── [198K]  stdout
├── [   0]  container_1506440142831_0006_01_000003
│   ├── [ 17K]  stderr
│   └── [130K]  stdout
├── [   0]  container_1506440142831_0006_01_000004
│   ├── [ 21K]  stderr
│   └── [121K]  stdout
├── [   0]  container_1506440142831_0006_01_000005
│   ├── [ 15K]  stderr
│   └── [119K]  stdout
├── [   0]  container_1506440142831_0006_01_000006
│   ├── [ 21K]  stderr
│   └── [ 23K]  stdout
├── [   0]  container_1506440142831_0006_01_000007
│   ├── [ 21K]  stderr
│   └── [ 19K]  stdout
├── [   0]  container_1506440142831_0006_01_000008
│   ├── [ 17K]  stderr
│   └── [ 17K]  stdout
└── [   0]  container_1506440142831_0006_02_000001
    └── [ 16K]  stderr

The container_1506440142831_0006_02_000001/stderr contains the error message about the file not being able to be overwritten. All the other stderr contain ProvisionedThroughputExceededException exceptions but one appears to have not completed application_1506440142831_0006/container_1506440142831_0006_01_000001/stderr. There are a lot of messages about starting and losing tasks (grep -nr "Start task\|Lost task") and there are a few like

ERROR YarnClusterScheduler: Lost executor 2 on ip-10-163-2-146.aws.metail.org: Container killed by YARN for exceeding memory limits. 5.5 GB of 5.5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

I’m not sure which of all these are relevant. I’m going to improve my code and see if the problem does go away.

Any thoughts on what might have happened are greatly appreciated, I’m still learning what’s normal log output.

Thanks
Gareth


#12

It seems like your cluster is under-specced, you shouldn’t lose containers because of memory issues.
However I do agree that if you are to lose a container we should be able to overwrite the files in case of retried tasks.

I logged https://github.com/snowplow/snowplow/issues/3443 to further follow the issue.

I also found https://forums.databricks.com/questions/1489/why-do-i-get-javaioioexception-file-already-exists.html.


#13

Thanks for having a look. It’s definitely my fault it’s happening, hopefully it will make Snowplow a bit more robust. :slight_smile:

I have experimented with the advice the discourse post on Spark optimisation but didn’t see any change in the step time so just went for the defaults i.e. no Spark config changes. I didn’t do any log file analysis though. I might post over on the Spark learnings topic to see if someone has some further optimisation advice. We only have two files output from the enriched step and I thought that might be limiting the shredding step’s parallelism.


#14

@gareth yes this will limit the parallelism of your job, only two cores will be used.

By the way, wasn’t the original issue caused by the fact that this configuration wasn’t in your config.yml?


#15

@BenFradet yes that line isn’t in my configuration, well it is but it’s commented out. Why I can’t say. Probably I did it because I just commented out the cluster configuration that I thought was related to Spark optimisation without reading everything properly.

Thanks for the info and your help.


#16

Hi,

We are facing the same issue. We are using Snowplow R89 plain of jars and Facing the same error saying bad directory already exists. Before starting a fresh run we deleted the bad directory but still the shred step fails saying output bad directory already exists.

Any help will be appreciated.


#17

We are continuously getting the same error. We had few successful run with same config. But now suddenly it started failing with this issue?

What could be the cause of this? And how can we ensure successful run of EMR?


#18

Reading back through the thread our solution to this was to better configure our cluster. I recommend reading through this thread Learnings from using the new Spark EMR Jobs, particularly the references to the source spreadsheets.

You can also try setting the max retries to 1 and you will probably stop masking the true error.