Spark memory woes


#1

Hey Snowplowers,

We’ve been experiencing repeated issues with the Spark job on EMR, as a result of memory management and garbage collection on the cluster. Typically the job either gets stuck on the final few tasks of stage and hangs indefinitely, or executors are lost and the job fails completely.

The problem stages are particularly 4, 5 and 6 of the second job (map at ShredJob.scala:378, map at ShredJob.scala:384, and saveAsTextFile at ShredJob.scala:412).

The frustrating thing is the problem isn’t obviously consistent; we are running the jobflow hourly and occasionally it will fail one hour and run without problems the next (on the combined input of both), for example.

So far we have tried:

  • Setting explicit values for the spark cluster config based on this spreadsheet, with various different permutations of executors per node and overhead memory coefficient
  • Different formats for the input files (gz, lzo, raw) - we are currently decompressing the input .gz files onto HDFS and grouping by date to give raw text files of ~5gb each
  • Disabling vmem checking in yarn

At this point I am really running out of ideas - if anyone has any experience to share I’d be very grateful!


#3

Hi @acgray,

Here’s a good method to start tuning your cluster.

For people running both enrich and shred, we’ll look at the volumes of the enriched archive since the shred job will be the most resource-hungry.

Let’s say that, on average, the archive enriched runs clock at around 2Gb of gzip-compressed files.

Let’s say that we have a compression factor of around 10, that gives us 20Gb of raw data in HDFS.

By default, the HDFS block size is 128Mb on EMR, that’s 160 blocks. We know that we have to reach at least a parallelism of 160 to read everything in parallel because there is a direct mapping between the number of HDFS blocks to read and Spark tasks.

Usually, we will want between 2 and 4 tasks per core. As a result, we need between 80 and 40 cores for our cluster. Let’s go with 40 cores.

At this point, we have to choose the instance type. There are two choices, either c4 (with 1 Gb per core) or r4 (with 7.6Gb per core). In our experience, c4 simply doesn’t cut it memory-wise. As a result, we’ll go with r4.

We’ll pick 6 r4.2xlarge which gives us 48 cores. After that you can refer to the spreadsheet, however be careful that EMR doesn’t leave all the box memory to yarn and actually takes a lot out (e.g. it makes 23Gb available on r4.xlarge box which have 30.5Gb of memory). That’s why we’re specifying yarn.nodemanager.resource.memory-mb and yarn.scheduler.maximum-allocation-mb below (they are the amount of ram available 61440Mb minus 3584Mb for everything that is no spark: the OS, the datanode, etc).

    configuration:
      yarn-site:
        yarn.resourcemanager.am.max-attempts: "1"
        yarn.nodemanager.vmem-check-enabled: "false"
        yarn.nodemanager.resource.memory-mb: "57856"
        yarn.scheduler.maximum-allocation-mb: "57856"
      spark:
        maximizeResourceAllocation: "false"
      spark-defaults:
        spark.dynamicAllocation.enabled: "false"
        spark.executor.instances: "41"
        spark.yarn.executor.memoryOverhead: "2048"
        spark.executor.memory: "6G"
        spark.executor.cores: "1"
        spark.yarn.driver.memoryOverhead: "2048"
        spark.driver.memory: "6G"
        spark.driver.cores: "1"
        spark.default.parallelism: "164"

Note that we further tuned Memory overhead coefficient as 0.15 (gives better results in our experience) and Parallelism per core as 4 as discussed above in the spreadsheet.

Once we have this baseline configuration running, we can further tune according to the information we gather while monitoring the job.

Final note on gzip, Spark shouldn’t have to pick up gzip files directly as the format is not splittable, i.e. a gzipped file of 10Gb will be processed by a single core. As a result, you need to either convert it to LZO or uncompress it. In the upcoming R97, we have modified the s3-dist-cp step moving the raw files collected by the clojure collector (which are gzipped) to HDFS to uncompress them.

Hope it helps.