EMR failing in enrich step

We’re using the EmrEtlRunner with Spark enrich, and have just started seeing some errors in the enrich step. We’ve been using this same approach for quite a while, and recently had a case where the Shred step failed, but we were able to diagnose that as memory-related and fix the problem by changing the resources used by the cluster. This issue looks different, and I’m having difficulty diagnosing it. The fatal error on the enrich step is this:

Exception in thread "main" org.apache.spark.SparkException: Application application_1559790052086_0002 finished with failed status

Digging further into the log bucket in S3, I was able to find some more detailed logs in the application logs (i.e. “j-1UVX9U7OCPUBX/containers/application_1559790052086_0002/container_1559790052086_0002_01_000130/stderr” in my case). In this example, there appear to be several stack traces. Several like this:

org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /local/snowplow/enriched-events/_temporary/0/_temporary/attempt_20190606033148_0001_m_000151_0/part-00151-a20fa6a3-cf3f-4943-b528-bb000f968428-c000.csv could only be replicated to 0 nodes instead of minReplication (=1).  There are 4 datanode(s) running and no node(s) are excluded in this operation.

I was able to find some discussion of similar errors to this, but the cases I saw here seemed to be resolved by updating the EMR AMI to a version older than the one we’re currently running (we’re using 5.9.0, which matches what’s in the example config provided by Snowplow here https://github.com/snowplow/snowplow/blob/master/3-enrich/emr-etl-runner/config/config.yml.sample#L30)

And several like this- this one I’ve had trouble finding any other discussion that references similar errors:

com.univocity.parsers.common.TextWritingException: Error writing row.
Internal state when error was thrown: recordCount=50, recordData=[{redacted actual data here for now, but it's a csv row}]

Finally, an error like this, which I’m also not finding much discussion on:

org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /local/snowplow/enriched-events/_temporary/0/_temporary/attempt_20190606033151_0001_m_000174_0/part-00174-a20fa6a3-cf3f-4943-b528-bb000f968428-c000.csv (inode 17358): File does not exist. Holder DFSClient_NONMAPREDUCE_-48490931_116 does not have any open files.

Our current cluster is r4.8xlarge master, 4x r4.8xlarge cores, and 0 task nodes. Anyone have any advice for how to diagnose these, or other information I can provide to help clarify the problem?

@ben_matcha, are you using any Spark-related configuration in your configuration file? The sample of the Spark configuration here is OK for low data volume. It requires a more thorough tuning when the volume increases.

There is a rough correlation between the size of the enriched files per ETL and the Spark configuration to go along with it. For example, as you use 4x r4.8xlarge core instances, I would expect the following Spark configuration to utilize the resource to its maximum:

  jobflow:
    master_instance_type: m4.xlarge
    core_instance_count: 4
    core_instance_type: r4.8xlarge
    core_instance_ebs:
      volume_size: 320
      volume_type: gp2
      ebs_optimized: true
  configuration:
    yarn-site:
      yarn.nodemanager.vmem-check-enabled: "false"
      yarn.nodemanager.resource.memory-mb: "245760"
      yarn.scheduler.maximum-allocation-mb: "245760"
    spark:
      maximizeResourceAllocation: "false"
    spark-defaults:
      spark.dynamicAllocation.enabled: "false"
      spark.executor.instances: "59"
      spark.yarn.executor.memoryOverhead: "3072"
      spark.executor.memory: "13G"
      spark.executor.cores: "2"
      spark.yarn.driver.memoryOverhead: "3072"
      spark.driver.memory: "13G"
      spark.driver.cores: "2"
      spark.default.parallelism: "472"

Such configuration would allow processing roughly up to 50GB of uncompressed good logs.

Note that using r4.8xlarge for the master is an exaggeration; m4.xlarge will do.

Thank you @ihor - this is extremely helpful. We’re looking at about 25GB logs uncompressed here so it seems like this should work. I’ll give your suggestions a try now and see how it goes.

Oh, sorry - I forgot to answer this question. We were using a similar configuration to the one you linked:

     configuration:
       spark:
         maximizeResourceAllocation: "true"

@ihor Those changes definitely helped, thanks again. We were able to complete the enrich step after making those changes, but unfortunately still had a step failure at the Shred Enriched Events step.

The step failure is Exception in thread "main" org.apache.spark.SparkException: Application application_1559869727954_0002 finished with failed status. Unlike in my attempt at diagnosing the previous step, when I look at the stderr logs for the containers for this application (j-{job-id}/containers/application_1559869727954_0002/*/stderr) I don’t see any clear (to me) errors.

The enriched/good files that were produced amount to ~24gb uncompressed, so similar in size to the raw inputs to the enrich step, and there are 242 of these files, so around 100mb on average. I tried another run, picking up from the shred step using --resume-from shred as suggested in the Batch pipeline steps doc, but had the same error. I do notice available memory going to zero pretty early on in that second attempt, which is the one thing that jumps out at me so far.

Should I expect to need more memory resources for the shred step than enrich? Any other places you’d suggest I look as I try to continue to get this up and running?

@ben_matcha, the file size is a very rough estimation. It depends on the volume of self-describing events and contexts and their complexity. I would try a (much) beefier cluster. Here are the configurations with more r4.8xlarge instances. Do note that the default restriction on the account is 5x r4.8xlarge. You might need to raise the limit on your AWS account to use more of those.

  jobflow:
    master_instance_type: m4.xlarge
    core_instance_count: 5
    core_instance_type: r4.8xlarge
    core_instance_ebs:
      volume_size: 320
      volume_type: gp2
      ebs_optimized: true
  configuration:
    yarn-site:
      yarn.nodemanager.vmem-check-enabled: "false"
      yarn.nodemanager.resource.memory-mb: "245760"
      yarn.scheduler.maximum-allocation-mb: "245760"
    spark:
      maximizeResourceAllocation: "false"
    spark-defaults:
      spark.dynamicAllocation.enabled: "false"
      spark.executor.instances: "74"
      spark.yarn.executor.memoryOverhead: "3072"
      spark.executor.memory: "13G"
      spark.executor.cores: "2"
      spark.yarn.driver.memoryOverhead: "3072"
      spark.driver.memory: "13G"
      spark.driver.cores: "2"
      spark.default.parallelism: "592"
  jobflow:
    master_instance_type: m4.xlarge
    core_instance_count: 10
    core_instance_type: r4.8xlarge
    core_instance_ebs:
      volume_size: 320
      volume_type: gp2
      ebs_optimized: true
  configuration:
    yarn-site:
      yarn.nodemanager.vmem-check-enabled: "false"
      yarn.nodemanager.resource.memory-mb: "245760"
      yarn.scheduler.maximum-allocation-mb: "245760"
    spark:
      maximizeResourceAllocation: "false"
    spark-defaults:
      spark.dynamicAllocation.enabled: "false"
      spark.executor.instances: "99"
      spark.yarn.executor.memoryOverhead: "4096"
      spark.executor.memory: "20G"
      spark.executor.cores: "3"
      spark.yarn.driver.memoryOverhead: "4096"
      spark.driver.memory: "20G"
      spark.driver.cores: "3"
      spark.default.parallelism: "1188"

Got it, I figured the file sizes were a rough estimation, and we do actually make reasonably heavy use of self-describing events in our implementation. What I ended up doing, prior to seeing your last comment, was breaking the enriched output into two separate sets of files and running two batches of shred. My thought here was that I hadn’t eliminated the possibility that my problem was bad data, not just memory, so I thought this could also narrow that down through somewhat of a binary search.

This got the job done for now, but was probably an overcomplicated way of doing this compared to your suggestion. It’s also not something I can do every day when we run this job, so I’ll likely look to apply more of your feedback here for the next run and see if that solves the problem moving forward. I did bump into some of those EC2 limits earlier, and already got them raised, so I am good to go there for now.

Thank you again @ihor for being so helpful here!

Just a final follow up here - I think things are stable here now. We did not have to go up to the most recent larger suggested cluster size after all - the original suggestion from ihor did keep us on track for now, but we’ll look to his suggestions for a larger cluster size if we see memory issues in the future. We’re also considering changing our EMR runs to 2x daily instead of 1x in order to continue to keep batches small, but all set for now.

Thanks again for all the help.