As part of our drive to make the Snowplow community more collaborative and widen our network of open source contributors, we will be regularly posting our proposals for new features, apps and libraries under the Request for Comments section of our forum. You are welcome to post your own proposals too!
This Request for Comments is to port the Snowplow event pipeline to run natively on Google Cloud Platform (GCP). By natively, we mean that Snowplow should be tailored to make best use of GCP-specific products and services.
Background on Google Cloud Platform
Google Cloud Platform is a cloud computing service by Google, equivalent in scope and ambition to Amazon Web Services or Microsoft Azure. The GCP offering has been steadily built up over the last 9 years, with various compute- and data-oriented services being added since the original preview of Google App Engine (similar to Elastic Beanstalk) in April 2008.
GCP has been in the news recently, with high-profile tech startups including Spotify and Snapchat deciding to adopt the platform.
Snowplow platform compatibility so-far
A brief recap on Snowplow’s current platform support:
- At the start of 2012 we launched the Snowplow batch pipeline running “cloud native” on AWS using EMR, S3 and associated services
- In February 2014 we launched the Snowplow real-time pipeline running on AWS, making extensive use of Amazon Kinesis
- In November 2016 we released an alpha of the Snowplow real-time pipeline ported to run on-premise using Apache Kafka
As you can see, Snowplow has been closely tied to AWS since its inception over five years ago. However, while we do make heavy use of leading AWS services such as Redshift, EMR and Kinesis, in fact little of the Snowplow processing logic is AWS-specific.
The cloud services market has changed over the past few years: while AWS continues to be the clear leader, GCP and Azure are growing fast, launching their own “hero” services such as Google BigQuery and Azure Data Lake Store. Meanwhile, a new generation of infrastructure technologies like Kafka, Kubernetes and OpenShift are helping to define a kind of “common runtime” for on-premise software deployments.
What does all this mean for Snowplow? A key priority for us in 2017 and beyond is extending our software to run on these other core runtimes: Google Cloud Platform, Microsoft Azure and on-premise. Cross-platform Snowplow should help:
Broaden Snowplow adoption - we want Snowplow to be available to companies on whichever cloud or on-premise architecture they have adopted
Deliver “pipeline portability” - investing in Snowplow shouldn’t lock you into a specific cloud platform indefinitely. You should be able to bring the core capabilities - and data - of your Snowplow pipeline with you when you migrate from one cloud to another
Enable “hybrid pipelines” - you use AWS everywhere but your data scientists love BigQuery? You want a lambda architecture with Azure HDInsight plus Kinesis Analytics? We want to support Snowplow pipelines which connect together best-in-class services from different clouds as required
For our planned extension of Snowplow onto Microsoft Azure, please check out our sibling RFC:
With all of this context in mind, let’s look specifically at Google Cloud Platform support for Snowplow.
Snowplow and GCP
Around 2 years ago we started getting questions from the community about the feasibility of loading Snowplow enriched events into BigQuery. Interestingly, these users were not set on running the whole pipeline on Google Cloud Platform - they were happy to continue using AWS, but wanted the events to be loaded cross-cloud into BigQuery for analysis.
To explore this we tasked Andrew Curtis, one of our 2015 winterns, with developing a prototype command-line BigQuery loader for Snowplow enriched events; his release post Uploading Snowplow events to Google BigQuery has some helpful background.
Since then, Snowplow users and partners have expressed an interest in running the whole Snowplow pipeline on GCP, and we were able to respond by engaging one of our 2017 winterns, Guilherme Grijó Pires, to explore this further. Gui’s work yielded two promising outputs:
An example project for Google Cloud Dataflow, written in Scala and with the associated blog post Google Cloud Dataflow example project released
A PR into Snowplow with initial GCP support - as yet unreleased, Gui’s experimental pull request extended the Scala Stream Collector and Stream Enrich to run on GCP
Fast forward to today, and this RFC is an opportunity to take stock of what we have learnt so far, and answer a few key questions:
- What cloud-native services does this cloud provide which could be used for the various components of Snowplow?
- What services shall we use?
- How do we implement a pipeline from these services?
- What components do we need to extend or create?
Answering these questions starts by taking an inventory of the relevant Google Cloud services.
5. Inventory of relevant GCP services
The relevant Google Cloud Platform services are:
||Google Cloud Pub/Sub
||"Ingest event streams from anywhere, at any scale, for simple, reliable, real-time stream analytics"
|Unified log & Storage
||Google Cloud Storage
||"Best in class performance, reliability, and pricing for all your storage needs"
||AWS Elastic Beanstalk
||Google App Engine
||"Build scalable web and mobile backends in any language on Google’s infrastructure"
||AWS Elastic Load Balancing & AWS Auto Scaling
||Google Cloud Load Balancing
||"High performance, scalable load balancing on Google Cloud Platform"
||Amazon Elastic MapReduce
||Google Cloud Dataproc
||"A faster, easier, more cost-effective way to run Spark and Hadoop"
||Kinesis Client Library
||Google Cloud Dataflow
||"Simplified stream and batch data processing, with equal reliability and expressiveness"
|Event data modeling & Analytics
||Amazon Redshift / AWS Athena / Redshift Spectrum
||"A fast, economical and fully-managed enterprise data warehouse for large-scale data analytics"
|Event data modeling & Analytics
||Google Cloud Functions
||"A serverless environment to build and connect cloud services"
Sources: Google Cloud Platform Products & Services; Google Cloud Platform for AWS Professionals: Service comparison.
Some of the above services are relatively similar to their AWS counterparts - for example, Google Cloud Dataproc is a relatively close proxy to EMR, and Google Cloud Functions are behaviorally very similar to AWS Lambda Functions.
However, we also see some stand-out “hero” services here:
5.1 Google Cloud Dataflow
Google Cloud Dataflow is both the name of a unified model for batch and streaming data processing and a GCP managed service for executing data processing jobs using that model.
The unified processing model was introduced in a 2015 Google research paper, which started with a great call-to-action for our industry:
We as a field must stop trying to groom unbounded datasets into finite pools of information that eventually become complete, and instead live and breathe under the assumption that we will never know if or when we have seen all of our data, only that new data will arrive, old data may be retracted, and the only way to make this problem tractable is via principled abstractions that allow the practitioner the choice of appropriate tradeoffs along the axes of interest: correctness, latency, and cost.
Google Cloud Dataflow then evolved as a GCP managed service for running jobs - whether real-time or “historical” (batch) - against Google’s dataflow model. And then things got even more interesting: Google donated the programming model and the SDKs to Apache, where they have been rebranded as Apache Beam; Beam now supports several “runners”, including of course Cloud Dataflow but also Apache Spark and Flink.
So it’s an intriguing (and complex) picture - but what is clear is that:
- Google Cloud Dataflow has potential to unify the Snowplow real-time and batch pipelines
- Programming to the Beam APIs could allow reuse of Snowplow components outside of the GCP ecosystem (for example via Flink)
- The Google Cloud Dataflow fabric is a significant step forward relative to Amazon Elastic MapReduce
5.2 Google BigQuery
BigQuery is a fully-managed, enterprise data warehouse for SQL-based analytics that scales to petabytes. Unlike Redshift, BigQuery is “serverless” - meaning that you don’t have to worry about provisioning and maintaining a specific database cluster. This brings some distinct advantages over Redshift:
- You don’t have to worry about running out of disk space
- You don’t have to worry about maintaining your data with VACUUMs, ANALYZEs and deep copies
- Query performance is not bounded by a fixed cluster size (BigQuery has so-called “elastic compute”)
BigQuery has another trick up its sleeve: streaming inserts, which should allow us to load Snowplow enriched events into BigQuery at very low latencies compared to with Redshift.
Being serverless, BigQuery also has a different pricing model to Redshift: rather than paying for a cluster of database instances, with BigQuery you pay specifically for data storage, streaming inserts, and for querying data.
5.3 Google Cloud Load Balancing
Although GCP’s load balancing offering has many similarities with Amazon’s, one feature is worth calling out: Google Cloud Load Balancing does not require any pre-warming.
Many Snowplow users will be familiar with the challenges of using AWS Elastic Load Balancing for event collection - pre-warming makes various scenarios more difficult, including:
- Pipeline load testing with Snowplow Avalanche
- Very sudden spikes in site or app usage (e.g. related to TV advertising campaigns)
- Event tracking of bulk offline or batch processes such as email mail-outs
As such, having a load balancer which does not require pre-warming is a huge step forward.
6. Pipeline implementation
When we look to adapt Snowplow to a new cloud, we want our new implementation to:
- Preserve the “essence” of Snowplow, including all of our core capabilities. At a bare minimum, it should be possible to migrate to or from the new cloud without losing access to your Snowplow event archive
- Be idiomatic to the new cloud’s capabilities, services and recommended design patterns - in other words, Snowplow must continue to be “cloud native”
- Make good use of the new cloud’s “hero services” - Snowplow should avoid using the “lowest common denominator” services across each cloud, except where necessary
Here is a proposed implementation on GCP which covers the real-time use case:
Of course this won’t allow a bulk re-processing of the raw event archive. For this we will want to introduce a batch version of the pipeline running natively on GCP, like this:
Finally, we will want some kind of “replay” capability to enable us to bring new consumers online that consume from the enriched event stream:
There are a set of assumptions implicit in these three designs:
6.1 Outgrowing the Lambda architecture
If you are familiar with the Snowplow architecture on AWS, or the proposed architecture for Azure, you will notice that our port to GCP focuses on the real-time pipeline rather than emphasizing an end-to-end batch pipeline or indeed a Lambda architecture.
As we are seeing with Snowplow users and customers on AWS, our industry is steadily moving from a batch-first to a real-time-first world, and this RFC reflects that.
To be clear, there are still some concrete use cases around batch processing, such as performant full re-processing of the Snowplow raw event archive - we are not yet convinced by an Kappa architecture alternative future. But it turns out that those batch use cases are relatively niche, and that a real-time-first pipeline can meet most needs.
6.2 Using Google Cloud Pub/Sub as our unified log
The Snowplow batch pipeline on AWS uses S3 as its unified log; the Snowplow real-time pipeline uses Kinesis. If we were starting building Snowplow on AWS today, it’s unlikely that we would use a blob storage like S3 as our unified log again.
Google Cloud Pub/Sub is recommended by GCP as the closest alternative to Kinesis or Kafka. Unfortunately, Cloud Pub/Sub is strictly speaking a message queue, not a unified log. The Cloud Pub/Sub documentation on Subscribers makes the difference clear:
To receive messages published to a topic, you must create a subscription to that topic. Only messages published to the topic after the subscription is created are available to subscriber applications.
A Kinesis stream or a Kafka topic is a very special form of database - it exists independently of any consumers. By contrast, with Google Cloud Pub/Sub the event stream is not a first class entity - it’s just a consumer-tied message queue.
In practice we can work around these limitations by:
- Being careful at the ops level in how we define and maintain subscriptions
- Building an event “replay” app to recreate some of the power of a Kafka or Kinesis stream
6.3 Extending the Scala Stream Collector, not the Clojure Collector
It should be relatively straightforward for us to extend the Scala Stream Collector to support a Google Cloud Pub/Sub sink. This is the simplest way of bringing Snowplow event collection to GCP, and is also compatible with our Replacing the Clojure Collector RFC.
We would likely run Scala Stream Collector in GCP using Google Cloud Load Balancing with Google Compute Engine servers, and avoid Google App Engine.
Google App Engine is a PaaS-level abstraction, somewhat similar to Elastic Beanstalk, and we have found that Beanstalk-level abstractions have too much “magic” for predictable event collection. When it comes to event collection, we prefer to use lower-level abstractions which we can have tighter control of.
6.4 Using Google Cloud Storage for raw and enriched events
This is a straightforward decision: Google Cloud Storage is the GCP equivalent of Amazon S3 or Azure Blob Storage; we can use Google Cloud Storage to archive the Snowplow raw and enriched events.
For infrequently used data (such as the raw event archive), we can consider using Google Cloud Storage Coldline, GCP’s equivalent of Amazon Glacier.
A final note: many of Google Cloud Storage’s operations are strongly globally consistent, including bucket and object listing. This should improve the Snowplow pipeline robustness relative to Amazon S3, where we often face blocking eventual consistency errors (e.g. when loading Redshift).
6.5 Using Google BigQuery as our event warehouse
Another easy choice: BigQuery is the event data warehouse service in Google Cloud; this is quite unlike AWS, where SQL-based analytics use cases are increasingly fragmenting between Amazon Redshift, AWS Athena, Redshift Spectrum and Kinesis Analytics.
BigQuery supports a richer type system than Redshift (including arrays and objects), which opens the door to alternative ways of modeling Snowplow enriched events compared to the Redshift “shredding” approach. These are covered by @mike of Snowflake Analytics in his excellent BigQuery modeling RFC:
6.6 Using Google Cloud Dataflow as our data processing fabric
It’s clear that the unified model of Google Cloud Dataflow opens up the option of us using Google Cloud Dataflow for both batch and real-time pipelines, which is hugely exciting: it should allow us to concentrate our development resources in a way that we are unable to on either AWS or Azure, given that:
- On AWS, we are split between EMR for batch and building Kinesis-based applications for real-time
- On Azure, we will be split between HDInsight for batch and building Event Hubs-based applications for real-time
Similarly, Snowplow users on GCP ought to benefit from the higher-level abstraction that Google Cloud Dataflow offers, including some job monitoring, visualization and auto-scaling capabilities.
Because of the close integration between Google Cloud Dataflow and the Apache Beam project, there are some interesting implications of adopting Cloud Dataflow that go beyond GCP, potentially impacting on the data processing choices we make on other platforms, including on-premise - more on this below.
6.7 Avoiding Google Cloud Dataproc
Although it should in theory be possible to run our Spark Enrich job on Google Cloud Dataproc with some adjustments, we believe that focusing on Cloud Dataflow should allow us to leapfrog the more EMR-like Cloud Dataproc environment.
If that assumption turns out to be incorrect, we will potentially consider:
- Removing any AWS-isms around Spark Enrich’s configuration and file storage so that it can run on Cloud Dataproc
- Extend Dataflow Runner to support kicking off jobs on Cloud Dataproc (see Dataflow Runner issue #33)
7. New and extended components
With these assumptions in place, here are some implementation notes on the particular components we need to create or extend:
7.1 Stream Enrich
For the real-time pipeline, we will need to extend Stream Enrich to support reading raw events from a Google Cloud Pub/Sub topic, and writing enriched events back to another topic.
Unless we encounter any significant blockers, we will aim to implement this using the Apache Beam API, enabling Stream Enrich to run in a largely managed fashion on Google Cloud Dataflow.
7.2 Google Cloud Storage Loader
We will build an equivalent of our Snowplow S3 Loader which:
- Consumes records (in our case, raw or enriched events) from a Google Cloud Pub/Sub topic
- Writes those events to Google Cloud Storage in the appropriate format (e.g. the Twitter Elephant Bird Protobuf-based format that Snowplow uses for raw event archiving
This seems to be a well-understood and recommended path - see the Spotify tutorial and Google Cloud article for further details.
7.3 BigQuery Loader
We can identify two requirements for loading Google BigQuery:
- For real-time, loading incoming enriched events as fast as possible using streaming inserts from Cloud Pub/Sub
- For the event archive in Cloud Storage, loading each historical batch in sequential order
As discussed above, most of the complexity here will be in designing the event model in BigQuery, and then updating other core Snowplow components (such as Iglu and Iglu Central) to support this.
We will also need to explore the optimal strategy regarding BigQuery table partitioning for the events table.
7.4 SQL Runner
Many of our users use SQL Runner to perform event data modeling on the events once in Redshift.
In a GCP world, Snowplow users might want to run legacy or standard SQL scripts against BigQuery. To enable this, we would extend SQL Runner, see issue #92 in that project’s repo.
7.5 Snowplow Event “Replay” App
Above we have raised the limitations of Cloud Pub/Sub as a message queue rather than a stateful append-only unified log like Kinesis or Kafka: when a subscribing app is brought online, it can only start read enriched events from that point forwards; it can’t “look back” and start processing from an earlier point.
For this reason, as a final phase we will add a “replay” engine that can read the enriched event archive from Google Cloud Storage and write these events back into a new enriched event topic in Cloud Pub/Sub. New or existing Cloud Dataflow apps or Cloud Functions can be pre-configured with subscriptions to this new topic, to process the replayed events as they appear.
7.6 “Dataflow Enrich”
To date Snowplow has maintained the separation of Stream Enrich and Spark Enrich, rather than trying for example to harmonize these into a single component, such as perhaps “Spark (Streaming) Enrich”; the Azure RFC has perpetuated this strategy. This ongoing separation has been driven by a few objectives, chiefly:
- Avoiding complecting the distinct semantics of batch-based processing (typically on files in cloud storage) and stream-based processing into one app
- Maintaining the event processing flexibility of the real-time pipeline - given that the micro-batching behavior of Spark Streaming is quite restrictive in how it can buffer events
- Reducing the operational complexity for the real-time pipeline - avoiding a dependency on a whole Spark runtime
However, when we need to add support for batch processing Snowplow raw event archive on Google Cloud Storage, our preference is to extend the existing Cloud Dataflow support in Stream Enrich to support this; this would allow us to leverage the Cloud Dataflow capabilities for batch, and to avoid having to port Spark Enrich to run on Google Cloud Dataproc.
At this point, perhaps Stream Enrich evolves into “Dataflow Enrich”, or “Beam Enrich”; from here we may be able to deprecate Spark Enrich all-together, running Beam Enrich on Elastic MapReduce (and perhaps Azure HDInsight) using Beam’s Apache Flink runner. This is very hypothetical at this early stage - but we look forward to exploring further with our friends on the Apache Flink project in due course.
There is a huge amount of new development involved in the above proposal! We propose splitting the work into four phases:
- Real-time pipeline through into BigQuery
- Adding Google Cloud Storage storage to the real-time pipeline
- Batch pipeline through into BigQuery
- Enriched event “replay” into a new Cloud Pub/Sub topic
The rationale for splitting the work into these four phases is as follows:
8.1 Phase 1: Real-time pipeline through into BigQuery
This is clearly a broadly-scoped phase one for GCP, but delivering this allows Snowplow users to:
- Build their own real-time apps on top of the enriched event topic in Cloud Pub/Sub
- Do ad hoc data modeling and analytics on their Snowplow data in BigQuery
This covers off two of the key Snowplow use cases from the start.
8.2. Phase 2: adding Google Cloud Storage storage to the real-time pipeline
This is a relatively small phase two, but crucially important given that it will enable:
- Full back-up of the raw event stream, allowing later processing by the batch pipeline components we build in Phase 3
- “Event replay” of the enriched events into a new Cloud Pub/Sub topic, as covered in Phase 4
8.3 Phase 3: batch pipeline through into BigQuery
In stark contrast to our approach with AWS, we will implement the batch pipeline on GCP as stage three. While less crucial than the real-time pipeline, the batch pipeline will enable us to:
- Migrate an existing raw event archive over from AWS or Azure to GCP, re-enriching it as necessary and loading it in bulk into BigQuery
- Do a full re-process of the raw event archive, for example if the required enrichment configuration changes significantly
8.4 Phase 4: enriched event “replay” into a new Cloud Pub/Sub topic
The Snowplow event “replay” app is an important Phase 4 for our GCP port. Adding this to our arsenal will let us:
- Bring up new stream processing apps that start consuming the enriched event stream from an earlier point than simply “now”
- Allow a running stream processing app to re-start from an earlier point in the stream, should that be necessary (for example if a bug is found in the stream processing app)
It’s crucial that we build data security and protection into every Snowplow port to a new cloud platform from the start.
As we proceed through each phase, we will work to make sure that:
- Public-facing components such as the Stream Collector are hardened against penetration
- Data in GCP databases and message queues is encrypted on the wire and at rest
- User permissions are as locked down as possible
- The end-to-end data flow through the system is comprehensively documented and understood
REQUEST FOR COMMENTS
This RFC represents a hugely promising new runtime for Snowplow, and as always we welcome any and all feedback from the community. Please do share comments, feedback, criticisms, alternative ideas in the thread below.
In particular, we would appreciate any experience or guidance you have from working with Google Cloud Platform in general, or services such as Cloud Dataflow or BigQuery in particular. We want to hear from you whether your experience comes from data engineering/analytics/science work on GCP, or general workloads!