We welcome feedback on all of our RFCs! Please respond with your thoughts and comments in the thread below.
Because Hadoop’s MapReduce API was too low-level, we made the decision to use a framework on top of Hadoop; we chose Scalding, an idiomatic Scala API from Twitter, with Scalding itself sitting on top of Cascading, a Java framework for building ETL pipelines on Hadoop.
Fast forward to today, and we now have three jobs written in Scalding which constitute our batch-based event data pipeline:
These are all designed to run on EMR, although the amount of AWS-specific code in these jobs is in fact minimal.
We have a further job, Hadoop Event Recovery, which is not part of the core Snowplow flow but is used in an ad hoc fashion on EMR to recover raw events which failed prior processing.
All four of these jobs use the following versions:
- Hadoop 2.4.1
- Cascading 2.6.0 or 2.7.0
- Scalding 0.11.2 or 0.15.0
2. Pain points
Our Scalding/Cascading/Hadoop stack has been extremely robust in production across many Snowplow customers and users, happily crunching through many billions of events for users each day. However, we have identified three major pain points with this stack, which are becoming more pronounced as time goes on:
2.1 Hadoop is excessively disk-bound
We soon learnt that reading lots of small files in S3 directly into Hadoop was a performance killer (see our blog post Dealing with Hadoop’s small files problem). So instead, we copied and compacted files from S3 onto the transient EMR cluster’s local HDFS storage ready for processing.
We then realized that writing out to S3 was similarly slow, and so we again updated our jobs to write to HDFS, and then copied the data back out to S3.
Finally, we realised that our jobs were being effectively run by Hadoop twice: once to generate the good output (enriched events), and a second time to generate the bad output (events failing validation). This was an artifact of the way Hadoop is designed around one output for a given set of inputs. So, again we added an output to disk, this time caching the intermediate output (essentially
Either[Success, Failure]) to disk to minimize the amount of work that needed doing twice.
By stealth, we had ended up with an extremely disk-intensive workload - especially compared to our Stream Enrich worker for the real-time pipeline, which merrily processes huge event volumes without touching disk.
This disk-intensive workload causes pain in a few ways:
- We believe that we are having to significantly overprovision EMR clusters relative to our jobs’ true memory, CPU and I/O requirements
- With larger users we see fairly regular job failures due to the cluster’s HDFS storage filling up
- We have to use on-demand EC2 instances in our EMR clusters, because we are dependent on those instances’ local storage
2.2 Code reuse between our batch and real-time pipelines is impossible
Attempts to build abstraction layers on top of Hadoop and real-time engines, such as Twitter’s own Summingbird, have not gained much traction, largely because they are trying to abstract at a semantic level over processing frameworks which are operationally very different.
As a result, at Snowplow we have ended up with essentially two parallel implementations of our pipeline:
- Scala Hadoop Enrich vs Stream Enrich
- Hadoop Elasticsearch Sink vs Kinesis Elasticsearch Sink
- Scala Hadoop Shred plus StorageLoader have no equivalent in the real-time pipeline currently
We can somewhat minimize code duplication via careful extraction of libraries, but it still leads to having a lot of separate apps to manage.
2.3 Slow pace of innovation in Hadoop
At this point Hadoop is a very mature technology - operationally predictable and reliable, but evolving very slowly. Similarly, Cascading and Scalding are effectively “done” at this point - mature technologies getting limited performance and correctness improvements.
Recent R&D and thoughtpieces from those projects (such as “Apache Spark as a Compiler” from Spark, or “Time and Order in Streams” from Flink) show that these platforms are rapidly advancing the state of the art for event data pipelines like Snowplow to build on top of.
3. Enter Apache Spark
We have been working with Spark in various capacities at Snowplow for several years:
- We have been maintaining a spark-example-project which runs on Amazon EMR for the past 2.5 years
- We released a spark-streaming-example-project last summer
- We have implemented event data modeling jobs for Snowplow customers in Spark, using our Snowplow Scala Analytics SDK and Snowplow Python Analytics SDK
Most importantly, Spark seems to address the three painpoints set out above:
3.1 Excellent in-memory processing performance
There have been plenty of claims and counter-claims about Spark’s performance and reliability relative to Hadoop.
At Snowplow we have been rather cautious about these claims, believing that the best validation for us would be to actually port our jobs to Spark - but knowing that this itself would require committing serious resources on our side.
However, the evidence has been steadily mounting that a switch to Spark would reap serious performance benefits for Snowplow. I attended an excellent talk by Xavier Léauté from Metamarkets at Strata London - Metamarkets are an ad analytics business who process many billions of events each day; in his talk “Streaming analytics at 300 billion events per day with Kafka, Samza, and Druid”, Xavier explained that:
- Metamarkets had been long-term users of Hadoop/Cascading/Scalding, but had moved all their workloads to Spark
- They were extremely happy with the move
- They no longer used HDFS for their event processing at all - instead their Spark jobs read from and write directly to S3, and this was highly performant for them
- As a result, Metamarkets was able to switch to using cheaper spot instances for all Spark jobs
Combined with anecdotal evidence from other Spark users, there seems to finally be enough justification on performance grounds for going ahead with at least an exploratory port to Spark.
3.2 Single codebase for batch and micro-batch
Spark Streaming is well known to be a micro-batch system rather than a true streaming system: essentially an event received receives the incoming event stream and discretizes it into small micro-batches ready for the actual Spark engine to consume (see “Diving into Apache Spark Streaming’s Execution Model” for more information).
While this architecture has come under some criticism from proponents of competing frameworks, Spark’s approach is actually a good fit for cases where true streaming is not an option. A good example of this is loading databases like Redshift and Postgres which for performance reasons need to be fed in batches, even if the source data is in a stream.
In the case of loading Redshift, rather than having one framework for loading Redshift from batch and another for loading it from stream, we could have a single Spark app, which works in two modes:
- A Spark configuration which reads a batch of enriched events from S3 and loads them into Redshift
- A Spark Streaming configuration which reads a micro-batch of enriched events from Kinesis and loads them into Redshift
So we have the potential of a single codebase to provide a capability for both Snowplow pipelines. To be clear, this won’t be suitable for all Snowplow jobs, but it should reduce some duplication.
3.3 Evolving fast
Spark is evolving quickly, reaching version 2.0.0 in July this year. They are innovating on multiple fronts - not just Spark itself and Spark Streaming, but also GraphX for graph computation, MLlib for machine learning, and Spark Datasets for type-safe processing over structured data.
While the framework needs of our batch processing jobs are relatively straightforward today, having access to these more advanced capabilities in Spark for the future is hugely interesting.
4. A proposal for moving to Spark
4.1 Phase 1: porting our batch enrichment process to Spark
The most obvious place to start is with our batch enrichment process. As soon as we have re-implemented this as Scala Spark Enrich in Spark, we can run load tests using Avalanche to compare the performance relative to our existing Hadoop version.
If performance is close to Hadoop, or better, then we can proceed to the next stage.
4.2 Phase 2: porting our Redshift load process to Spark/Spark Streaming
Currently our Redshift load process involves running a job on Hadoop, called Scala Hadoop Enrich, followed by StorageLoader, a JRuby app which runs on the server which orchestrates your pipeline.
Regardless of a potential port to Spark, we are planning on:
- Updating StorageLoader so that it is executed from the EMR cluster itself, rather than from your orchestration server. This should be more performant and more secure
- Rewriting StorageLoader in Scala. We want to deeply integrate StorageLoader with Iglu, for which we have an Iglu Scala Client
Once these are done, we can look at a full re-implementation of our Redshift load process using Spark:
- We would merge Scala Hadoop Shred and StorageLoader (having been rewritten in Scala) into a single app, Scala Redshift Sink
- Using Spark, this app would support loading enriched events from S3 into Redshift as part of the batch pipeline
- Using Spark Streaming, this app would also support loading enriched events from a Kinesis stream into Redshift, as part of our real-time pipeline
When all this was done, we would have a single app which could be used as part of our batch or real-time pipelines.
4.3 Phase 3: porting our remaining apps
With this successfully completed, we could look at porting the remaining apps to Spark:
Although the business rationale for porting these two apps is less clear-cut, there could be benefits in standardizing on Spark for all of our batch apps, and indeed in these apps being usable via Spark Streaming in our real-time pipeline.
4.4 Out of scope
One piece of work which is out of scope for now is replacing our Stream Enrich component with a Spark Streaming configuration of the new Scala Spark Enrich.
Today, Stream Enrich is a standalone app built around the Kinesis Client Library; it has been around for several years, has a minimum of moving parts and is easy to reason about.
There are some valid grounds for not migrating this app to Spark:
- Stream Enrich is a simple worker app using the KCL. Spark Streaming also uses the KCL, but has a lot of additional Spark-idiomatic complexity to manage on top of this
- Stream Enrich’s buffer options can be adjusted to work on individual events at a time - this is more flexible than the micro-batch configuration available with Spark Streaming
Given these points, a port to Spark Streaming would increase the complexity of Stream Enrich, while also decreasing that component’s flexibility.
REQUEST FOR COMMENTS
As always, please do share comments, feedback, criticisms, alternative ideas in the thread below.