'Elasticity Scalding Step: Shred Enriched Events' step failing


#1

Hi guys,

Last night the EMR ETL run failed for us on “Elasticity Scalding Step: Shred Enriched Events”, after spending 2 hours on that step.

The logs say:

Exception in thread “main” cascading.flow.FlowException: step failed: (5/5) …dded-events/atomic-events, with job id: job_1474417773341_0011, please see cluster logs for failure messages
at cascading.flow.planner.FlowStepJob.blockOnJob(FlowStepJob.java:261)
at cascading.flow.planner.FlowStepJob.start(FlowStepJob.java:162)
at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:124)
at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:43)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

I tried running it again with --skip staging,enrich. Then it failed on the same step but only after 48 seconds saying:

Exception in thread "main" cascading.flow.FlowException: unhandled exception
	at cascading.flow.BaseFlow.complete(BaseFlow.java:918)
	at com.twitter.scalding.Job.run(Job.scala:296)
	at com.twitter.scalding.Tool.start$1(Tool.scala:102)
	at com.twitter.scalding.Tool.run(Tool.scala:118)
	at com.twitter.scalding.Tool.run(Tool.scala:66)
	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
	at com.snowplowanalytics.snowplow.enrich.hadoop.JobRunner$.main(JobRunner.scala:29)
	at com.snowplowanalytics.snowplow.enrich.hadoop.JobRunner.main(JobRunner.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
	at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
Caused by: java.io.IOException: Not a file: hdfs://ip-10-180-240-44.ec2.internal:8020/local/snowplow/enriched-events/run=2016-09-21-00-26-27
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:288)
	at cascading.tap.hadoop.io.MultiInputFormat.getSplits(MultiInputFormat.java:200)
	at cascading.tap.hadoop.io.MultiInputFormat.getSplits(MultiInputFormat.java:134)
	at org.apache.hadoop.mapreduce.JobSubmitter.writeOldSplits(JobSubmitter.java:328)
	at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:320)
	at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:196)
	at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1290)
	at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1287)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:415)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
	at org.apache.hadoop.mapreduce.Job.submit(Job.java:1287)
	at org.apache.hadoop.mapred.JobClient$1.run(JobClient.java:575)
	at org.apache.hadoop.mapred.JobClient$1.run(JobClient.java:570)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:415)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
	at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:570)
	at org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:561)
	at cascading.flow.hadoop.planner.HadoopFlowStepJob.internalNonBlockingStart(HadoopFlowStepJob.java:108)
	at cascading.flow.planner.FlowStepJob.blockOnJob(FlowStepJob.java:236)
	at cascading.flow.planner.FlowStepJob.start(FlowStepJob.java:162)
	at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:124)
	at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:43)
	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)

We recently upgraded to R83. We’re running EMR 4.5.0.

Any help would be appreciated.

Thanks,
Daniel


#2

We debugged this in person at the Snowplow Meetup London - the cluster had run out of space; my suggestion was to bump the cluster to 3 or 4 i2.4xlarges.

When running it again with --skip staging,enrich, you have to temporarily update the config.yml's enriched:good setting to suffix the run folder, in this case run=2016-09-21-00-26-27.


#3

@alex whenever we have a failing daily processing and accumulate a few days worth of data, we bump into a similar issue. Provisioning the EMR cluster well over its default capacity has solved the issue in the past, but what’s interesting now is that we have a failing shred step with the following HDFS utilization charts:

It could very well be that the final drop in HDFS utilization is the result of the job failing and YARN/MR cleaning up so we will rerun with an even larger cluster. We’ve already had 5 EMR runs fail on the same data :frowning:

Why is it that whenever we need to process multiple days worth of data, storage requirements increase by over a magnitude? Our daily cluster runs with 600GB of HDFS and the failure above happened on a 3840GB cluster.


#4

Hey @rgabo - yes the drop-off you are seeing in HDFS utilization after peaking at about 75% relates to the job entering its failure state.

We have seen similar issues when needing to process >36h of data for very high volume pipelines. At this point, the cluster requirements do not scale linearly with event volumes, and the pain point seems to be particularly around Hadoop Shred. Recent versions of Hadoop Shred contain some reduce operations to handle the de-dupe (versus Hadoop Enrich which is map phase-only), so that is likely to be playing a part.

If we were sticking with Hadoop Shred for the long-term we would dive into this deeply, but we are getting close to putting the Spark rewrite of Hadoop Enrich and Hadoop Shred into test, and so our priority is to identify if these ports exhibit the same behaviors under major event volumes.


#5

For what it’s worth, the i2.4xlarge do help but they’re quite overpriced and old generation. i3.* instances are not supported by EMR. c3.4xlarge instances are power horses but I found the cluster to fail because they don’t come with enough ephemeral storage.

Running c4.2xlarge, c4.4xlarge instances with additional EBS volumes gives granural control over compute and storage ratio. Provisioning too much EBS volume can hit the default limit of 20TB, but increase to the limit can be requested.

It seems that the complexity of the deduplication is at least quadratic if not worse and it is just amplified if the cluster has plenty of compute resources otherwise. Any time we had to process more than 24 hours, we had to increase capacity by over a magnitude. We are already using early builds of the Spark Enrichment so I look forward to the R9X release(s) that include Spark enrichment/shredding and the EmrEtlRunner improvements.

Any ETA on a first Spark-enabled release?


#6

Thanks for sharing that @rgabo.

The reduce-phase for the de-duplication code is here:

https://github.com/snowplow/snowplow/blob/master/3-enrich/scala-hadoop-shred/src/main/scala/com.snowplowanalytics.snowplow.enrich/hadoop/ShredJob.scala#L301-L318

If you have any suggestions for improving its performance, do please share!

We can’t commit publically to release dartes, but I can tell you that the first Spark release will be R89 Plain of Jars, which is highly likely to be the release-after-next: