How to setup a Lambda architecture for Snowplow


#1

What is the Lambda Architecture?

In the Big Data book, Nathan Marz came up with the term Lambda Architecture for a generic, scalable and fault-tolerant data processing architecture, based on his experience working on distributed data processing systems at Backtype and Twitter.

Lambda architecture is a data-processing architecture designed to handle massive quantities of data by taking advantage of both batch- and stream-processing methods. This approach to architecture attempts to balance latency, throughput, and fault-tolerance by using batch processing to provide comprehensive and accurate views of batch data, while simultaneously using real-time stream processing to provide views of online data. The two view outputs may be joined before presentation. The rise of lambda architecture is correlated with the growth of big data, real-time analytics, and the drive to mitigate the latencies of map-reduce.

Note: when we say real-time, we actually mean near-real-time and the time delay between the occurrence of an event and the availability of any processed data from that event. In the Lambda architecture, real-time is the ability to process the delta of data that has been captured after the start of the batch layers current iteration and now.

Here’s how it looks like, from a high-level perspective (ref: http://lambda-architecture.net/):

  1. All data entering the system is dispatched to both the batch layer and the speed layer for processing.
  2. The batch layer has two functions:
  • managing the master dataset (an immutable, append-only set of raw data)
  • pre-compute the batch views.
  1. The serving layer indexes the batch views so that they can be queried in low-latency, ad-hoc way.
  2. The speed layer compensates for the high latency of updates to the serving layer and deals with recent data only.
  3. Any incoming query can be answered by merging results from batch views and real-time views.

Snowplow approach to Lambda Architecture

At Snowplow, the Real-Time pipeline is built on Lambda Architecture. Below is the diagram depicting the data flow and the components used to implement it.

To be more specific, the list of the Snowplow components is outlined below:

Note, that

  • Raw Events Stream is common to both Real-Time and Batch pipelines
  • Batch layer is implemented as a standard Snowplow batch pipeline with raw:in bucket (in config.yml) pointing to the Amazon S3 bucket to which the data is sinked with Kinesis S3
  • Real-Time layer:
  • you would need at least 2 streams (for enriched and bad events)
  • you could add a custom component, a stream job such as in AWS Lambda for example

Please, feel free to express your thoughts and ideas.

Regards,
Ihor


Stream vs Batch
Redshift setup not working
Enriched event stream into Redshift using StorageLoader: Contract violation error
Error using StorageLoader to load data into Redshift
Error using StorageLoader to load data into Redshift
Replacing the Clojure Collector
Scala Kinesis Enrich
Is this Lambda Architechture possible
Using Snowplow data to feed other applications
Does the Kinesis LZO S3 Sink support reading from an "enriched" stream?
How to store kinesis enriched stream to redshift
How to shred events into Redshift from the real-time pipeline?
Enriched event stream into Redshift using Kinesis Firehose
Is my version of snowplow lambda architecture correct
Kinesis + EMR ETL (R89)
Porting Snowplow to Google Cloud Platform
Encoded bad rows in Elasticsearch - advanced debugging support
Skip archive_enriched
Reading raw events from kinesis-s3 log
Trying to use StorageLoader with Stream Enrich without AWS
Shredding to Redshift in the Scala Collector Flow
Data modeling using Map Reduce
EmrEtlRunner::EmrExecutionError while storing the events in redshift database
Porting Snowplow to Microsoft Azure
'Serverless' Snowplow architecture
Snowplow Mini - Two Kinesis sinks to Elasticsearch?
On-premise Snowplow Realtime Pipeline with Spark Streaming Enrich
Unable to push stream enrich data into Redshift using "copy"
Help to solve EmrEtlRunner HDFS > S3
#2

Hi @ihor,

I’m in the process of setting up a snowplow setup on AWS and had most of the realtime side of things in place when I found this article which is right in line with our setup plans.

I was hoping to get clarification on one thing before I assume anything. The terminology of lambda architecture in this context refers to the Nathan Marz coined description of the overall combined architecture of the real time and batch flows, and doesn’t indicate that the various application instances (collector, enrich, etc.) run as AWS lambda functions. Is this understanding correct?

From following the setup guides for the collector, enrich, etc. It seems the typical approach is to run these application instances from an EC2, but the lambda terminology made me pause and I’ve started to question our current setup.

Thanks in advance for any insight!


#3

Hi @stperona,

You are right. Even though both terms (“lambda architecture” vs AWS Lambda) share the same word “lambda”, different meanings are implied. Though, on the high level, they both have the right to use it. Lambda is also used in programming languages as in lambda (anonymous) function (compare it with lambda function in terms of AWS Lambda). Also we have lambda in calculus. It is confusing…

Just to clarify the difference:

AWS Lambda is a serverless compute service that runs your code in response to events and automatically manages the underlying compute resources for you. You can use AWS Lambda to extend other AWS services with custom logic, or create your own back-end services that operate at AWS scale, performance, and security.

(AWS docs: Lambda)

As you can see from the diagram above you could use AWS Lambda to extend your Real-Time pipeline with an additional custom computation.

Generally speaking, lambda refers to ability to utilize some function (calculations) execution of which is triggered by some events.

The lambda architecture this post describes, on the other hand, has a specific meaning that is combining two processing approaches (real-time and batch) to benefit from both at the same time. When building real-time pipeline, however, you are not bound to having it integrated with the batch “branch”.

–Ihor


#4

Great! Just what I needed. Thanks for the quick response and clarifying/confirming my understanding.


#5

We’re looking into setting-up a lambda architecture and have a few questions.

The first area is around sensible/general configuration choices:

  • For the Scala Stream Collector, what are the recommended values for backoffPolicy?
  • For both the Scala Stream Collector and the S3 LZO Sink, what are the recommended values for buffer byte-limit, record-limit and time-limit?
  • The config file for the S3 LZO sink has an option for an output stream of events where the storage process fails, is there a recommended way of handling this stream? Same question for the bad stream from the Scala Stream Collector.
  • For the output format of the S3 LZO sink, is there a recommended format if we want to enrich and eventually load the data to Redshift using the batch pipeline?

The second area is around general architecture:

  • Are there recommended EC2 instance types for the Scala Stream Collector, S3 LZO Sink, Kinesis Enrich, and the Kinesis Elasticsearch Sink?
  • What is the best practice in-terms of which boxes to place each of these components on? I imagine the collector should be on it’s own box, but can we colocate the other components?

Thanks!


#6

Hi @tfinkel,

  • For the back-off we recommend: 2000 -> 600000 - this setting controls how long to backoff after a failure to write to Kinesis.

  • These options control how often you will initiate a flush to their respective endpoints. Depending on how often you want this flush to happen you will need to change them as appropriate.

    • For example if you want events available in the raw stream very quickly for processing by another application you would set these quite low. However the trade off is that you are making more network requests. You would tune these to match roughly how real-time you want your pipeline to be.
    • For the S3 LZO Sink we generally use a time_limit of ten minutes to ensure we only have one file being rotated to S3 every 10 minutes - there is no need to have these events in S3 straightaway as the Batch process is generally only run a few times per day.
  • The option to send failures to a Stream is generally coupled with the Kinesis Elasticsearch Sink and an Elasticsearch Cluster. This allows you to debug failures directly within Kibana or using the Elasticsearch API to quickly discover failures in the system.

    • You could also consume these failures with an AWS Lambda Service or your own Kinesis Consumer Application.
  • We use the LZO format for later consumption by EMR and loading into Redshift.

  • The instance type you intend to use completely depends on the amount of traffic you are expecting to send into the pipeline. Placing these instances within Autoscaling Groups you can start with a lower instance type and have it scale to meet demand - please be aware that the t2.* range is not suitable for autoscaling groups as the CPU will be throttled after a period of time rendering any CPU scaling unusable.

  • The recommended layout that we use is to have each application allocated within its own Autoscaling Group. This allows you to scale each application as required, which is nice to have when you are dealing with high amounts of load and applications which require different amounts of resources.

    • This is again dependant on the load you are expecting to get, under low throughput you could quite happily have applications together, but it makes it much harder to scale out if traffic increases.

There are a lot of questions here so if you feel like you need more detail on any of them please let me know!

Cheers,
Josh


#7

Thanks @josh, this has been extremely helpful so far.

I also came across the post below, which references a a Redshift sink Kinesis app. Is this still in the pipeline? And if so, are there any timelines associated with it’s release?

Thanks again!


#8

Hi @tfinkel,

There is a PR open with a work in progress Kinesis -> Redshift application but it is not currently scheduled for merging into snowplow/snowplow as of yet. You can check out the PR progress here: https://github.com/snowplow/snowplow/pull/1927

@alex might have a better idea of any timelines associated with this?


#9

Yes sure, happy to - Redshift dripfeeding is still planned, but our priority first is fully automating the batch based load of Redshift. At the moment that process is still far too manual (think table creation, table mutations, JSON Paths file uploads) to be safely operated in real-time.

Once we have that batch load fully automated, we’ll get back to the dripfeeder. It should be very cool - automatic creation of tables, upgrading of tables when new fields arrive etc, all in about a 5 minute latency…


#10

Thanks @alex and @josh, I really appreciate the level of responsiveness. This is helpful for planning on our end. I’m also very excited about automating the batch pipeline. Thanks again!