Learnings from using the new Spark EMR Jobs


#1

Overview

We’ve had some hiccups and now some success in getting the new Spark based EMR job running. In the end, we were able to process 160GB of collector logs in 10 hours with a 10 node r4 cluster (through r3 may be better due to having instance stores).

Snowplow thought that our experience could be useful to others on this forum, so here’s what we’ve learned.

With no custom configuration, Spark doesn’t run well on EMR.

Without any configuration, YARN does not fully allocate the entire cluster’s resources to your Spark job. Even if you spun up 20 nodes, you’ll find only 1 or 2 being utilized. Snowplow provided us with a custom binary that they’ll likely be releasing soon, that allowed us to pass through Spark configuration to the EMR job.

In doing so, I found this spreadsheet invaluable while tuning the spark configuration. As an example, for a 6 node r3.4xlarge cluster (5 executors per node):

spark.executor.instances: "30"
spark.yarn.executor.memoryOverhead: "3072"
spark.executor.memory: "21G"
spark.yarn.driver.memoryOverhead: "1034"
spark.driver.memory: "6G"
spark.executor.cores: "3"
spark.driver.cores: "1"
spark.default.parallelism: "180"
spark.dynamicAllocation.enabled: "false"

Consider disabling dynamic allocation or enabling maximizeResourceAllocation.

By default, EMR sets spark.dynamicAllocation.enabled to true. The impact of this is that you cannot explicitly configure the number of executors for your job, which could lead to many unused nodes.

As a work around (which I have not tried), you could enable maximizeResourceAllocation, and then each node will be fully utilized by a single executor per node. The draw back here is that this also means an entire node will be allocated to the driver. This solution could work with clusters of smaller machines (no more than an large or xlarge instance).

However, we went down the path of explicitly setting the number of exectors, and thus we needed to set spark.dynamicAllocation.enabled to false.

Don’t use task nodes.

Because the task nodes do not have local HDFS storage, they are effectively useless. Any gains from leveraging the spot prices are likely wasted by the time lost due to poor data locality. Only use Core nodes (either instance backed or EBS storage work, but the former may give better performance).

Parallelism is limited by the number of log files your collector produces.

If the amount of parallelism that your cluster can support (executors x core per executor x parallelism per core) is greater than the number of collected log files, you will be under utilizing the cluster. We were generating 1GB log files from our collector. We’ve resized the collector in a way that will instead generate four 250MB files for each of the former files, and that helped us to scale up the cluster in a way that could bring down the running time of the job. So, favor many small files instead of a few large ones.

Hopefully this was helpful!

Rick
OneSpot

References

  1. Apache Spark Config Cheatsheat
  2. Configure Spark

ETL very very slow in larger batches
Elasticity Spark Step: Shred Enriched Events: consistent failure without clear reason
Shredded/bad-rows output directory already exists
Shredded/bad-rows output directory already exists
Handling large volumes of duplicated event_ids
#2

Thanks a lot for this valuable piece @rbolkey!

It will be possible to configure the EMR cluster through the EmrEtlRunner configuration file starting with the next Snowplow release (R90 Lascaux): https://github.com/snowplow/snowplow/pull/3267.


#3

@rbolkey This is great - thanks so much for sharing!

Snowplowers - The logic on the spreadsheet that Rick shared is quite simple, and reduced our Enrich time by 50%. Given the config.yml file and the instance details (workers, memory & cores), it would be possible to generate sensible defaults out of the box with zero user intervention. Does that make sense for a future release?


#4

I don’t know that integrating some cluster sizing logic into emr etl runner is a path we want to take given that we want to transition away from it per our RFC.


#5

Perhaps this could live within the snowplowctl generate dataflow --config config.yml command (as described in the RFC)?


#6

Regarding the number of log files @BenFradet - does it make sense for the first Spark job to split the collector files into the optimal number for parallelism (given the number of executors, cores per executor and parallelism per core)?


#7

We can’t assign a particular partitionning when reading the log files since reading a gzip file cannot be done in parallel.

We could repartition according to spark.default.parallelism afterwards however.


#8

@BenFradet @alex @rbolkey

We just had incredible results splitting the log files to fully utilize the cluster!

We were able to enrich 2.3GB of compressed logs in 7 minutes using 72 similar-sized files, versus 2h11min on the original 24 files. My guess is that splitting the files and equalizing the file sizes (the collector generates a lot more logs during the day vs. the night) both played an important part in this optimization.

Here’s the bash script that does this pre-processing, if anyone is interested in trying it out: https://gist.github.com/bernardosrulzon/f426a7290ad3ebacd6dfee11bb523874#file-snowplow-process-before-run-sh-L12-L14

Cheers!
Bernardo


Emr-etl-runner works with LZO but not with GZ
#9

That’s hugely encouraging @bernardosrulzon - thanks for sharing!