On-premise Snowplow Realtime Pipeline with Spark Streaming Enrich


#1

This is a Request for Comments on the design and development of a Hosted Snowplow Realtime Analytics Pipeline, with concentration on a Stream Enrich app based on Spark Streaming and independence from AWS-only components.

I’ve reviewed a number of discussions in Discourse and GitHub and have included a list of the more interesting ones at the end of this post. It might be a good idea to review them to quickly get on board with this discussion.

Please respond with your thoughts and comments in the thread below.

Background

Snowplow has been working great on AWS - with minimal modifications or no modifications at all, one can quickly set up a batch analytics pipeline on AWS. However, there are two things that should be considered:

  • It’s dependent on AWS. Definitely there are many companies that cannot deploy their analytic pipelines on AWS or any other public cloud provider and strictly prefer deploying their own pipeline in their own machines & data-centers. There are even cases where Amazon has blocked access to AWS for users and companies from specific regions.
  • Realtime pipeline doesn’t get that much attention. Batch processing is awesome but there are many cases where we need realtime analytics, realtime monitoring, and realtime decisions. Many Big Data Analytics teams start their work by quickly building and deploying a data collection pipeline and providing realtime dashboards to the organizations. These dashboards and metrics may be used for various means, from KPI monitoring to situations in CI/CD pipelines were the value of these metrics determine whether a new publish should be rolled back or not.

Snowplow R85 Metamorphosis - which added native Kafka support - was the first major step towards a robust on-premise realtime pipeline. However, when it comes to the enrichment phase, as far as I know there has not been much ongoing, public efforts towards on-premise deployment support without dependency to AWS.

The very recent release of Snowplow R89 Plain of Jars provides a great opportunity for the community to move toward an on-premise realtime pipeline, as we now have an official codebase for enrichments implemented on Spark, and thanks to the fact that Spark batch processing codes are reusable in Spark Streaming, the burden of writing Spark Streaming codes based on Kinesis Stream Enrich app has now faded away. A similar approach has been discussed in the RFC for R89, as @alex explained:

Our Proposal

For the first 4 steps of the pipeline, we’ve been thinking of something like this:

As for the latter part of the diagram, we can have topics for enriched and shredded events in Kafka so that storageLoader apps can simply act as consumers of these topics and load them to various sinks and databases, depending on the use cases. Thanks to Kafka’s rich ecosystem of clients, users can then develop their own consumers for loading shredded or enriched events into arbitrary targets.

DynamoDB Alternative

Stream Enrich / Shred needs an open-source / free alternative to DynamoDB for event de-duplication and identity stitching. Although Amazon provides a downloadable version of DynamoDB, I’m not sure if it’s capable to be used in production environments - and Amazon does not recommend it.
Redis or RethinkDB may be good candidates, but care needs to be taken in their compatibility with Scala/Spark. RethinkDB has official support for Java, with an unofficial, inactive driver for Scala. For Redis, there are a number of third party clients for Java or Scala. Some of them are recommended by Redis and have active communities (like Jedis). There’s even a connector for Spark called spark-redis but its latest commit dates back to August 2016 and it has not been tested with Spark v2.
Also, a simple, scalable key/value storage developed specifically for our requirements may be the answer, but this proposes further design, development and integration efforts and may not sound logical at the moment.

Storage Targets & Sinks

Snowplow’s current support for database and storage options in AWS is mature but we need support for a number of open-source / free software sinks, as we want the on-premise pipeline deployment to be possible too. Right now we are thinking of:

  1. Elasticsearch, as we have had great results in our current generation of analytics pipeline, which uses Divolte as the collector, Kafka as the storage for raw events from Divolte, Spark Streaming for ETL from Kafka to Elasticsearch and also on-the-fly analytics, and Elasticsearch/Kibana for storing the “enriched” events and visualizations. The nice thing about using the Elastic stack as a target is the fact that users can have realtime dashboards for a number of important KPIs right of the box (with Kibana).

  2. Other databases/datastores such as Cassandra, HBase, MongoDB, and of course PostgreSQL (for which Snowplow already provides great support). We need these storage targets for performing more sophisticated batch analytics and ML jobs.

For making batch processing possible, we can have a daily job that reads events from closed Kafka segments and stores raw / processed events on HDFS.

We’ve been working on a PoC of Spark Streaming Enrich with a sink to Elasticsearch, based on the Kinesis Stream Enrich app structure. With the release of R89, we now have access to a robust codebase in Spark logic and can make faster progress. However, decisions need to be made for the latter parts of the pipeline, specially the storage targets and DynamoDB alternatives.

On-premise Deployment Options

The whole pipeline in turn can be deployed easily via a number of docker images. Also, we have a good experience of using HDP and Apache Ambari for our Big Data cluster deployment and management. If we use Docker, we can have auto-scaling by playing with Kubernetes or Docker Swarm.

Request for Comments

Please share your thoughts, suggestions, and criticism (much obliged!) in the thread below.

List of relevant discussions:


On-Premise PostgreSQL storage. Still requires S3?
How to run StorageLoader in standalone system without connecting to external server eg: Amazon AWS
On-premise Realtime Pipeline
#2

Hi @ssheikholeslami - many thanks for this RFC! It’s great to see such comprehensive, thoughtful specification work coming from the Snowplow community.

A few comments and suggestions:

I think that’s a fair criticism. We have been focused for some time now on major refactoring of the batch pipeline - one of the major reasons for this refactoring has been to harmonize the batch pipeline components with the real-time pipeline components (e.g. by using Spark for batch).

This has meant less attention on the real-time pipeline recently, but expect velocity to pick-up on the real-time pipeline now that a lot of the required rework on the batch pipeline has been done.

From R85, Stream Enrich operates as a simple Kafka worker, i.e. a consumer/publisher of Kafka topics. You can run that on-premise however you run your other Kafka micro-services - Docker Swarm, Kubernetes, Nomad, Mesos…

Interesting! This is something we’ve considered, but we haven’t identified a benefit of adapting our new Spark Enrich to also run as Spark Streaming Enrich, given that we already have Stream Enrich, which of course doesn’t have a Spark dependency.

Can you make a case for preferring Spark Streaming Enrich over the existing Stream Enrich app?

The closest on-prem equivalents to DynamoDB for event deduplication would perhaps be HBase or Cassandra. But I suspect it will largely depend on what people are running already - users won’t want to add an additional NoSQL database cluster just to support the Snowplow deduplication use-case.

Yes, Elasticsearch is pretty great. We currently maintain two Elasticsearch sinks:

  1. A KCL-based Kinesis Elasticsearch Sink for the real-time pipeline (supports enriched events and bad rows)
  2. A Scalding-based Hadoop Elasticsearch Sink for the batch pipeline (supports bad rows only)

We don’t yet have anything for loading Elasticsearch from Kafka.

It would be nice to harmonize all this and deliver all six use cases (S3, Kafka, Kinesis x enriched events and bad rows) from a single codebase, potentially powered by Spark Streaming.

Hope the above feedback is helpful! Interested to hear what the wider Snowplow community thinks…