RFC: making the Snowplow pipeline real-time end-to-end and deprecating support for batch processing modules

As part of our drive to make the Snowplow community more collaborative and widen our network of open source contributors, we are regularly posting our proposals for new features, apps and libraries under the Request for Comments section of our forum. You are welcome to post your own proposals too!

Introduction

This RFC covers two related proposed goals:

  1. To make the entire Snowplow pipeline (i.e. every Snowplow microservice) real-time
  2. To deprecate support for batch versions of microservice components

Our assumption on writing this RFC is that the first goal is not controversial, but the second one might be. However, we look forward to hearing back from the Snowplow community whether this is correct or not.

A bit of history

Looking back, two things are very notable about the first version of Snowplow, launched way-back in February 2012:

  1. The Snowplow pipeline was composed of discrete, independent microservices. (E.g. a collector, an enrich module.)
  2. All of these microservices processed data in batches

That each microservice processed the data in batches is very unsurprising: it was very uncommon in 2012 to process stream data in real-time - Kafka was only open sourced in early 2011 and Amazon Kinesis was only launched in November 2013.

As real-time processing technologies including unified logging and stream processing frameworks evolved, we built out support for real-time processing in Snowplow by building real-time versions of our core components: in the first instance, a real-time collector (the Scala Stream collector) and the stream enrich module. New Snowplow users could therefore decide whether they wanted real-time processing or not, and choose the appropriate collector and enrichment microservice accordingly.

This was just the start of a journey. Soon, we started building new microservices that were real-time only - the first was our Elasticsearch Loader.

Then, when we built a version of Snowplow to run natively on GCP, we built each new microservice for GCP so that it was real-time only.

Today, as we look across the Snowplow estate, we see:

  • Microservices that are available in multiple flavours: some real-time, some batch. (E.g. collectors, enrichers.)
  • Microservices that are batch-only (e.g. the Redshift and Snowflake loaders)
  • Microservices that are real-time only (e.g. the BigQuery loadern (although this can be run in a batch simulation mode), the Elastic loader, the Indicative Relay)

We believe that now is a good time to tidy up this estate, and evaluate where to focus our development efforts going forwards, to deliver the most for the Snowplow community.

Upgrading all our microservices to be runnable in real-time

With our core microservices available in batch and real-time flavours, Snowplow can be setup in a “Lambda Architecture” - as presented in Nathan Marz’s 2015 Big Data book. In this approach, batch based processing is the reliable backbone of the big data pipeline, and real-time processing is used to provide a “speed layer” for low latency queries. It is common, for example, to stream Snowplow data into Elasticsearch to support real-time dashboards and APIs, and load the same data in batch into Snowflake DB or Redshift, so support more sophisticated but less time-critical analysis.

The Lambda Architecture made sense in a world where stream processing was inherently less reliable than batch processing. That assumption is not true today, and it has not been true for some time. Unified log technologies and stream processing frameworks have evolved to the point they can be dependently used for business critical processing of big data sets at low latencies. Given that, the case for building batch-based microservices is much weaker, and that is why we focused all our efforts when we ported Snowplow to run in GCP on a real-time version.

Now we have the opportunity to look at those remaining microservices that only run in batch: notably our Redshift and Snowflake Loaders, and port them to run in real-time. This is something that we are looking to do between now and the year end.

Deprecating our batch technology

If it is possible to run Snowplow pipelines real-time, end-to-end, are there still compelling reasons why someone would want to run them in batch? After all, the benefits of real-time are very real:

  1. The data is available at very low latency. This means that not only can real-time reports and dashboards be delivered, but the data can be made available for real-time data applications like personalisation engines, marketing automation, health monitoring etc.
  2. The real-time pipeline can be setup to autoscale - gracefully handling traffic fluctuations without human intervention. (In comparison, it is typical for batch users to need to reconfigure the size of their EMR clusters to handle shifting traffic patterns.
  3. The real-time pipeline is more stable - esp. at very high volumes. One of the issues we have found with companies running batch data pipelines processing billion+ events per day is that in the event of issues arising on the EMR cluster can cause a whole batch to fail. This means the job needs to be rerun from scratch, during which time data “backs up”, requiring progressively larger (and more expensive) EMR clusters to clear the backlog.
  4. At any sort of scale, real-time pipelines are typically much more resource efficient than batch pipelines: clusters don’t idle or spend time in bootstrap; stream topologies can be configured to scale so that each node is being effectively utilised, with enough headroom to buy a bit of breathing space to spin up new nodes in the event of a traffic spike. As a result, at higher volumes we find real-time pipelines more cost effective than their batch counterparts.

We think there are two reasons why some might prefer running the batch technology:

  1. Batch processes are typically easier to setup, manage and maintain. In the event of any issues, it is generally straightforward to reason about where a failure has occurred and resume the pipeline appropriately to avoid data loss or double processing of data.
  2. At low volumes in particular, batch processing may be more cost effective than real-time processing, which requires (at least in Snowplow’s case) the constant running of EC2 instances for the different microservices out of which a pipeline is composed.

Both of these issues are broader than batch vs stream processing, however:

  1. The Snowplow open source stack (both the real-time and batch versions) are too difficult to set up, manage and run on an ongoing basis.
  2. In both batch and real-time, the architecture of Snowplow is such that is more expensive than it needs to be at low volumes. (Because it is architected for scale, therefore using services like EMR on AWS or Dataflow on GCP.)

We think we have really great opportunities to reduce the cost of running Snowplow at low volumes, and making Snowplow easier to run:

  1. We want to focus our efforts on making our real time open source pipeline even easier to setup, run and manage. This will be a key focus in 2020, in particular: we would love to get to a one-click install. (Let’s see how close we can get.)
  2. We want to focus on dramatically improving Snowplow Mini (e.g. by adding Postgres support) so that it is much more cost effective option for companies that want to do more advanced digital analytics but don’t require big data scale.

Given the above, we think that not many people and companies would benefit in running Snowplow in batch mode, and that we would do better to concentrate our and the community’s engineering resources on our stream processing versions (and the other improvements we’ve highlighted above) than continuing to support batch processing in Snowplow. It is for that reason that we would like to deprecate our batch technology.

Proposed next steps at Snowplow

Before we look at which microservices we will deprecate first, we should stress that we are committed to supporting all the functionality currently supported in batch in stream - that includes:

  • Enabling users to keep a persistent log of all the events ever recorded to disk (S3 or GCS)
  • Enabling users to reprocess their entire event data set, if they wish
  • Enabling users to debug, recover and reprocess “bad rows” (i.e. events that were not successfully processed)

The first two components we plan to deprecate support for are the Clojure Collector and Spark Enrich. We do not plan to release new versions of either of these microservices. That is particularly important to note for the Clojure Collector, because Amazon announced a few months ago that they were deprecating support for Tomcat 8 on Elasticbeanstalk, which the Clojure Collector uses under the hood. As we will not release an updated version of the Clojure Collector that removes this dependency, Clojure Collector users will need to migrate to running the Scala Stream Collector instead. We will be posting guidance for users migrating from batch -> real-time shortly, to help users going through the process do so successfully.

We intend to formally deprecate support when we release the new version of Snowplow with the refactored bad row format, which we hope to publish by the end of Q3. As part of this will be releasing new versions of:

  • The Scala Stream Collector
  • Stream enrich (which runs on AWS)
  • Beam enrich (which runs on GCP)

all of which will emit bad rows (failed events) in a new, highly structured format.

As part of this release we will move the Clojure Collector and Spark Enrich out of the core Snowplow/Snowplow repo into the Snowplow Archive. We hope this will make it clear to users that these microservices are no longer supported by Snowplow, but make the full source code easily accessible for anyone who wants it. At this point we will no longer publish any releases including publishing maintenance releases. We will take the same approach with the other microservices we deprecate support for.

In parallel we plan to start work on porting RDB Loader and the Snowflake Loader to run as stream processing jobs: expect an RFC with our proposed architecture shortly.

Once these have been published we plan to deprecate support for EmrEtlRunner: as it will no longer be necessary to run batch pipelines as part of the Snowplow setup. The proposed sequence of events is illustrated below:

Recommended next steps for companies running batch components (esp. the Clojure Collector and Spark Enrich)

We will be publishing documentation walking users of the Clojure Collector and Spark Enrich through the process of upgrading to the Scala Stream Collector and Stream Enrich shortly, and will post links in the thread below.

We would love your feedback!

What do you think? We’d love your feedback below :slightly_smiling_face: .

7 Likes

YES!!! and running batch is expensive AWS services wise as well.

We have been eyeing a migration to GCP/RT for some time anyway. The Redshift pricing model is the primary driver. We’re at the point our dense compute nodes only hold 8-9 months of data (which is more than enough for day-to-day use, but not when it comes to comparing YoY etc). Adding new reserved instances that expire at different times throughout the year only makes it harder to switch away from these nodes.

Our main reservations around the real-time pipelines have been:

  • More servers and time required to keep the microservices running(?) - Mint Metrics currently only does a few million events/day
  • Lack of cost visibility over GCP vs AWS vs self-hosted (Is GCP really cheaper?)
  • Lack of knowledge (on my part) around how the RT microservices work together (i.e. Could we run SSC + Kafka on our own metal, but stream into BigQuery?)
  • Migrating to GCP seen as a nuclear option - haven’t seen anyone migrating over from AWS batch, so not sure what to expect

Ultimately, we just need to learn more about the RT setup as it compares to Batch. Despite that, deprecating batch would give us the impetus to migrate to RT sooner rather than later.

What sort of timeline is the team thinking?

Not sure if it’s under the scope of this RFC but it would be nice to make it easier to add data from s3 as Spectrum tables. Right now, this is a bit painful because of the way the data is stored in S3.

If I had a vote on deprecating batch ETL, I guess I would vote “Well OK, if we must”. I recognize that the folks who actually write the application have limited time they can spend, and I can see that Stream Collector is more broadly useful and in general The Way Forward.

But our current Snowplow installation is great – I hardly ever have to think about it. The client applications are all fine with a data refresh every 24 hours. And the EMR job is not terribly expensive. (Maybe we’re just in the sweet spot with about 800K daily events.)

Anyway, thanks for a great toolset. If the bus is going to stream processing, we’ll get on the bus.

1 Like

Hey @robkingston! Great to hear you’ve been thinking of moving to GCP / RT. To talk to your reservations:

  • The RT infrastructure does use a lot of microservices, which does mean that it can be costly at low volumes, where your each service is minimally specced, but still overspecced for the given event volumes.
  • At low volumes GCP is more cost effective because of the BigQuery pricing model: you can end up spending much less on BigQuery than on Redshift. On the flipside, the minimum specification for the different Dataflow jobs is quite chunky, which means the pipeline on GCP is not much cheaper than AWS at low volumes.
  • If you want the data ending up in BigQuery my strong recommendation would be to setup the end-to-end GCP pipeline using Pub/Sub. We have never even attempted to load BigQuery from Kafka.
  • There are different ways of migrating from batch -> RT and AWS -> GCP. Might be worth exploring on a separate thread. (Certainly we’re going to put together documentation to assist with the former.)

Timeline-wise we’d be looking at deprecating support for the Clojure Collector and Spark Enrich in the next 3-6 months.

Hope that helps!

Yali

Thanks for the feedback @wleftwich. Is there anything we can do around helping you from batch -> RT to persuade you to stay on the bus?

@rahulj51 out-of-the-box support for Spectrum is something that we’re looking to add after porting RDB Loader to run as a real-time process.

Awesome… that’s very helpful @yali.

Based on the docs, Google Pub/sub doesn’t specify any limit to “outstanding messages” for pull-delivery beyond 7 day retention - in that sense couldn’t you under-spec Beam Enrich for spikes in events and process accumulated events gradually throughout the day?

Even with Beam Enrich under specced you’ve got to run at least one streaming worker which is around about $70 / month (per Dataflow job with minimum configuration in terms of vCPUs / memory). Depending on your configuration you’ll want to be running at least 2 Dataflow jobs (enrichment, BigQuery loader) though more if you want redundancy for enriched bad and failed BigQuery writes.

Depending on your event volumes (and providing you are reusing the PubSub subscriptions) you could in theory spin up and terminate the streaming Dataflow jobs every few hours which would probably save money but you’d have to worry about backpressure.

1 Like

From my perspective, equivalent real-time support for all supported Cloud providers sounds like a smart move.

Thanks @yali. I am definitely on the bus, just have to make the time to change over to RT. I’ll study the docs and holler for help when I get stuck.

Great to hear its still working well! We just configured the batch setup for AWS this month and continue to grow the analytics and research.

We are migrating our data warehousing to near real-time loading and analytics using Snowflake DW. A more cost effective streaming process is aligned with our overall architecture. Our intent is to load snowflake in “micro-batches” vs “continuously”, I’m hoping that will be a feature.