Replacing Amazon Redshift with Apache Spark for event data modeling [tutorial]


#1

A key data processing stage in a Snowplow pipeline is event data modeling:

Event data modeling is the process of using business logic to aggregate over event-level data to produce ‘modeled’ data that is simpler for querying.

Typically Snowpow users have relied on Amazon Redshift for their event data modeling. The Snowplow pipeline ingests enriched events into Redshift, and then a series of SQL scripts, perhaps orchestrated by SQL Runner, has performed the aggregations, storing the results in new tables. A good example of this is Snowplow’s own web data model.

1. Challenges of SQL event data modeling with Redshift

Event data modeling in Redshift with SQL has worked well for many Snowplow users, particularly at small to medium event volumes, but there have been certain challenges, some around the use of SQL in general, and some around the use of Redshift itself:

1.1 Challenges of SQL

SQL is great for prototyping event data models, but it can be challenging to then put these models into production:

  1. SQL is difficult to unit test - leading to plenty of “debugging in production”
  2. SQL is difficult to modularise – leading to “copy-paste-itis”
  3. SQL is impossible to parameterise – you quickly end up with a user-specific fork of any given data model

1.2 Challenges of Redshift

Running data modeling processes in Amazon Redshift also comes with some challenges, particularly at very large event volumes:

  1. Loading all enriched events into Redshift and then running event data modeling processes can put a significant load on the Redshift database
  2. Redshift does not support elastically scaling compute independently of storage (unlike e.g. Snowflake), limiting options to tune the event data modeling performance
  3. Loading richly nested enriched event data into Redshift requires our shredding process, which is an costly operation in EMR, and leads to complex SQL JOINs in Redshift

2. An alternative approach with Apache Spark

This tutorial presents one possible solution, using Apache Spark, Dataflow Runner and a Snowplow Analytics SDK to perform event data modeling.

This figure compares the dataflows for the Redshift- and Spark-based approaches:

Let’s get started with this tutorial, by setting out the event data modeling that we want to migrate to Spark.

3. Our example data model

Let’s imagine that we want to collect page views from a website, group visitors by country, count how many times particular page was viewed from each country, and then store the aggregated results for further analysis or visualization.

Suppose, we had this SQL query:

DROP TABLE IF EXISTS atomic.pageview_geo_summary;

CREATE TABLE atomic.pageview_geo_summary AS
SELECT
  DATE_TRUNC('day', derived_tstamp) AS report_date,
  page_url AS page,
  geo_country AS country,
  COUNT(*) AS num_views,
  MAX(etl_tstamp) AS last_etl_tstamp
FROM
  atomic.events
WHERE
  event = 'page_view'
GROUP BY
  report_date,
  page,
  country;

In our “canonical” batch pipeline, this event data model could be executed via SQL Runner inside a Factotum job, straight after successful EMR jobflow completion.

Each time this query is executed, it drops pageview_geo_summary table and then recreates it from updated data in Redshift. It can be made incremental to improve performance, but in this tutorial we’ll try to gain much more performance and flexibility.

4. Building blocks of Spark data modeling

First we will introduce the building blocks of our Spark-based event data modeling.

4.1 Event transformation

The Snowplow Analytics SDKs are powerful tools to perform data-modeling in environments such as Apache Spark. They allow you to validate, access and extract canonical Snowplow properties from Snowplow enriched events.

As an example, you can transform an enriched event (in TSV line format) into a convenient JSON object with property names and pre-validated values.

Here’s an example, using the Snowplow Python Analytics SDK:

>>> import json
>>> import snowplow_analytics_sdk.event_transformer

>>> example_tsv = "demo\tweb\t2015-12-01T08:32:35.048Z\t2015-12-01T04:00:54.000Z\t2015-12-01T03:57:08.986Z\tpage_view\tf4b8dd3c-85ef-4c42-9207-11ef61b2a46\tco\tjs-2.5.0..."
>>> json_str = snowplow_analytics_sdk.event_transformer.transform(example_tsv)
>>> json.dumps(json)['v_tracker']
'js-2.5.0'

The event_transformer converts the enriched event into a flattened JSON object, where each column assigned to predefined key from the canonical event model. The flatness of this object allows us to easily map this JSON object to table row.

4.2 Run manifests

Another feature we will make use of in this tutorial are Snowplow Run Manifests, as introduced in Scala Analytics SDK 0.2.0 and Python Analytics SDK 0.2.0

Snowplow’s batch pipeline marks folders as processed by moving these folders around different locations on Amazon S3. But file moves are quite problematic:

  • They are time-consuming
  • They are network-intensive
  • They are error-prone - a failure to move a file will cause the job to fail and require manual intervention
  • They only support one use-case at a time - you can’t have two distinct jobs moving the same files at the same time

Therefore, we decided to introduce new mechanism for optionally tracking a Snowplow pipeline’s progress: run manifests. Run manifests take form of a simple AWS DynamoDB table, plus an API found in the Analytics SDKs to help you read and update the manifest with processed folders.

Here’s a simple example of the manifests in action, checking for new un-processed folders in the Snowplow enriched event archive, and then processing them using user-provided process function.

from snowplow_analytics_sdk.run_manifests import list_runids, RunManifests

# Create wrapper to access AWS DynamoDB table
run_manifests = RunManifests(dynamodb_client, 'acme-snowplow-run-manifests')

# Iterate folders with enriched events
# run_id takes form of 'enriched/good/run=2017-07-17-22-40-30'
for run_id in list_runids(s3_client, 's3a://acme-enriched-archive/enriched/good'):
    # If folder is not yet processed - process and add to run manifest table
    if not run_manifests.contains(run_id):
        process(sc, run_id, sys.argv[1])
        run_manifests.add(run_id)
    else:
        pass

Run manifests provide a lightweight and reliable mechanism to ensure our pipeline does not process folder more than once and doesn’t miss unprocessed events. And the API provided by the Analytics SDKs lets you work with the manifest via three simple functions: list_runids, RunManifests.check and RunManifests.add.

4.3 Spark DataFrames

Apache Spark is a powerful framework for distributed computation, providing us with three distinct APIs: DataFrames, DataSets and RDDs. These APIs share a lot of the same functionality - for this example we’re going to use the DataFrames API, which is straightforward to map onto existing SQL code.

First, we need to load all enriched JSONs from the enriched events folder into a Spark DataFrame, where each JSON key conforms to a DataFrame column:

transformed = sc.textFile(enriched_folder).map(event_to_json)
df = spark.read.json(transformed)

Now, df contains our DataFrame, with an on-fly derived SQL-like schema and ready for queries to be run.

The SQL query easily can be mapped to DataFrame with a few minor tweaks:

df.select(to_date(df.derived_tstamp).alias('report_date'),
          df.page_url.alias('page'),
          df.geo_country.alias('country'),
          df.etl_tstamp)
  .where(df.event == 'page_view')
  .groupBy('report_date', 'page', 'country')
  .agg(count("*").alias('num_views'), max("etl_tstamp").astype('timestamp').alias("last_etl_tstamp"))

To find out more about DataFrame API you can check out Spark Programming Guide.

4.4 Writing data to PostgreSQL

When we are event data modeling in Redshift, our aggregation outputs are typically written back into new tables in Redshift. With Spark, we can choose any database or blob storage to write our outputs to.

Given that our aggregated data volumes will not be significant, we can simply use Postgres for our aggregate store. Spark supports writing to relational databases via the Spark JDBC API:

url = "jdbc:postgresql://postgres.acme.com:5432/summaries"
properties = {'user' : "snowplow-model",
              'password': password,
              'driver': "org.postgresql.Driver",
              'sslmode': "require"}
aggregated_events.write.jdbc(url, 'pageview_geo_summary', mode="append", properties=properties)

Having defined the DataFrame associated with aggregated_events variable we can “dump” it to any JDBC-compatible database. Obviously, we will need a pre-existing database (summaries in our case) plus a table (pageview_geo_summary) with schema conforming our SQL query:

CREATE TABLE "pageview_geo_summary" (
	"id"              INTEGER SERIAL PRIMARY KEY,
	"report_date"     TIMESTAMP,
	"page"            VARCHAR(255),
	"country"         CHAR(2),
	"last_etl_tstamp" TIMESTAMP WITHOUT TIME ZONE,
	"num_views"       INTEGER,

  CONSTRAINT uniq_record UNIQUE(report_date, page, country, last_etl_tstamp),
);

The above DDL creates our table, which matches the output of our aggregation and has a defensive UNIQUE CONSTRAINT check to ensures that the table cannot be loaded twice, even if the Spark job failed before correctly updating the manifest.

It’s worth highlighting an important difference between this data model and the original Redshift one. Redshift can group all page views into one day, using the derived_tstamp, but the Spark job is only reading from a single enriched event folder; two different enriched event folders can easily contain report_dates in common, which has two important consequences:

  1. We must have last_etl_tstamp in our UNIQUE constraint, otherwise it’ll be violated very soon
  2. The ultimate reporting layer in Postgres will need to perform one more aggregation

Also, it’s very important to note that this approach works only for additive data points, for example total views per day, but not for non-additive data points, such as unique website visitors.

5. Putting it all together: our Spark job

Putting all of the above together, we can write the following PySpark job:

import json
import argparse
import boto3
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import to_date, max, count

import snowplow_analytics_sdk.event_transformer
from snowplow_analytics_sdk.run_manifests import *

APP_NAME = "Snowplow Page Views Data Modeling"
DYNAMODB_RUN_MANIFESTS_TABLE = 'acme-run-manifests'
ENRICHED_EVENTS_ARCHIVE = 's3a://acme-enriched-archive/enriched/good/'

def process(sc, run_id, password):
    # Transform enriched events to JSON
    full_path = ENRICHED_EVENTS_ARCHIVE + run_id + '*'
    transformed = sc.textFile(full_path).map(event_to_json)

    # Import JSON to Spark Dataframe
    df = spark.read.json(transformed)

    # Port of SQL modeling step
    pageview_geo = df.select(to_date(df.derived_tstamp).alias('report_date'),
                             df.page_url.alias('page'),
                             df.geo_country.alias('country'),
                             df.etl_tstamp)
                     .where(df.event == 'page_view')
                     .groupBy('report_date', 'page', 'country')
                     .agg(count("*").alias('num_views'), max("etl_tstamp").astype('timestamp').alias("last_etl_tstamp"))

    # Connect to PSQL DB
    url = "jdbc:postgresql://postgres.acme.com:5432/summaries"
    properties = {'user' : "snowplow-model",
                  'password': password,
                  'driver': "org.postgresql.Driver",
                  "sslmode": "require"}

    aggregated_events.write.jdbc(url, 'pageview_geo_summary', mode="append", properties=properties)

def event_to_json(line):
    json = snowplow_analytics_sdk.event_transformer.transform(line)
    return json.dumps(json)

if __name__ == "__main__":
    # Initialize run manifests
    session = boto3.Session()
    s3_client = session.client('s3')
    dynamodb_client = session.client('dynamodb', region_name='us-east-1')
    run_manifests = RunManifests(dynamodb_client, DYNAMODB_RUN_MANIFESTS_TABLE)

    # Initialize Spark job
    conf = SparkConf().setAppName(APP_NAME).setMaster('local[*]')
    sc = SparkContext(conf=conf)
    spark = SparkSession.builder.getOrCreate()

    # Traversing unprocessed run folders in enriched archive
    run_ids = list_runids(s3_client, ENRICHED_EVENTS_ARCHIVE)
    for run_id in run_ids:
        if not run_manifests.contains(run_id):
            process(sc, run_id, sys.argv[1])
            run_manifests.add(run_id)
        else:
            pass

This self-contained job can be saved on your S3 bucket as s3://acme-snowplow/jobs/page_views.py for further use in Elastic MapReduce.

6. Running our job: Dataflow Runner

The last ingredient in our data modeling implementation is Dataflow Runner, a Snowplow tool intended to replace EmrEtlRunner in the future, as per our RFC.

Dataflow Runner lets you create and orchestrate cloud computing clusters such as Elastic MapReduce. It’s a lightweight and very configurable tool, without any Snowplow-specific logic

You can download Dataflow Runner or learn more about it on project’s wiki.

6.1 Dataflow Runner cluster configuration

Our configuration consists of two files. The first one is the cluster configuration, cluster.json:

{
  "schema": "iglu:com.snowplowanalytics.dataflowrunner/ClusterConfig/avro/1-1-0",
  "data": {
    "name": "Spark Pageview by country data-modeling",
    "logUri": "s3://acme-snowplow/logs/",
    "region": "us-east-1",
    "credentials": {
      "accessKeyId": "env",
      "secretAccessKey": "env"
    },
    "roles": {
      "jobflow": "EMR_EC2_DefaultRole",
      "service": "EMR_DefaultRole"
    },
    "ec2": {
      "amiVersion": "5.5.0",
      "location": {
        "vpc": {
          "subnetId": null
        }
      },
      "instances": {
        "master": {
          "type": "m3.xlarge"
        },
        "core": {
          "type": "c3.8xlarge",
          "count": 1
        },
        "task": {
          "type": "m1.medium",
          "count": 0,
          "bid": "0.015"
        }
      }
    },
    "tags": [ ], 
    "bootstrapActionConfigs": [
      {
        "name": "Installing Python dependencies",
        "scriptBootstrapAction": {
          "path": "s3://acme-snowplow/scripts/snowplow-pyspark-bootstrap.sh",
          "args": []
        }
      }
    ],
    "configurations": [
      {
        "classification": "core-site",
        "properties": {
          "Io.file.buffer.size": "65536"
        }
      },
      {
        "classification": "mapred-site",
        "properties": {
          "Mapreduce.user.classpath.first": "true"
        }
      },
      {
        "classification": "yarn-site",
        "properties": {
          "yarn.resourcemanager.am.max-attempts": "1"
        }
      },
      {
        "classification": "spark",
        "properties": {
          "maximizeResourceAllocation": "true"
        }
      }
    ],
    "applications": [ "Hadoop", "Spark" ]
  }
}

To use this cluster configuration file yourself, just change the bucket name in logUri and update the bootstrapActionConfigs. Notice that credentials are written as env, which means they will need to be set as AWS_ACCESS_KEY and AWS_SECRET_KEY environment variables, or else hardcoded right in cluster.json otherwise.

6.2 PySpark bootstrap script

The “Installing Python Depencies” bootstrap action references a simple shell-script, required to install dependencies (only the Analytics SDK in our case) on Spark nodes:

#!/bin/sh

set -e

# Script to install necessary Python dependencies
sudo pip install snowplow_analytics_sdk==0.2.3

If your data-modeling step requires any other Python libraries, you should install them using this bootstrap-script.

The file needs to be uploaded to S3 as snowplow-pyspark-bootstrap.sh.

6.3 Dataflow Runner playbook

Oure second Dataflow Runner configuration file, playbook.json, is responsible for running the actual data-modeling step:

{
  "schema": "iglu:com.snowplowanalytics.dataflowrunner/PlaybookConfig/avro/1-0-0",
  "data": {
    "region": "us-east-1",
    "credentials": {
      "accessKeyId": "env",
      "secretAccessKey": "env"
    },

    "steps": [
      {
        "type": "CUSTOM_JAR",
        "name": "PySpark load",
        "actionOnFailure": "CANCEL_AND_WAIT",
        "jar": "command-runner.jar",
        "arguments": [
          "spark-submit",
          "--deploy-mode",
          "cluster",
          "--packages",
          "org.postgresql:postgresql:42.0.0.jre7",

          "s3://acme-snowplow/scripts/snowplow.py",
          "{{.dbPassword}}"
        ]
      }
    ]
  }
}

This playbook tells Dataflow Runner to submit a jobflow step for our PySpark job to the EMR cluster.

6.4 Running the EMR jobflow via Dataflow Runner

After you have prepared your Dataflow Runner configurations and uploaded all the necessary files to S3, you can launch Dataflow Runner:

$ export AWS_ACCESS_KEY=ABCDEKIA
$ export AWS_SECRET_KEY=abcdefgh1234
$ DB_PASSWORD=secret
$ dataflow-runner run-transient --emr-config cluster.json --emr-playbook playbook.json --vars dbPassword,$DB_PASSWORD

We are passing the database password as the first argument to PySpark job to avoid storing sensitive credentials on S3; environment variables and --vars option allows you to avoid hard-coding credentials into configuration.

Dataflow Runner can be scheduled via cron or even launched after the EmrEtlRunner step via Factotum.

7. Conclusion

Here is the overall process we have put together:

And that’s it! You now have all the necessary tools to perform event data modeling for your Snowplow event data using Apache Spark, storing the results in an inexpensive Postgres database for later analysis, visualization or processing.

Of course this pipeline is not limited to simple aggregations - with the full power of Apache Spark, you’re free to perform very complex event data modeling on Snowplow enriched events, all the way up to machine-learning-based approaches.


Data modeling using Map Reduce
Recalculating big data models - how to do it with better performance?
#2

#3

Thanks @anton! This is great timing for us as we’re dealing with some expensive queries that would be a better fit for Spark - especially when we’re making changes to a data model and need to recalculate data from 2+ years of events.

Since enriched data can contain duplicates, does it make sense to use shredded data on the Spark job? Is there any particular reason why you decided not to use shredded/atomic-events for this example?

Cheers!
Bernardo


#4

Hello @bernardosrulzon,

This is actually a very very good question.

Short answer to your question - it’s because enriched data is supported by Analytics SDK. But here’s some more thoughts regarding this (everything is debatable, we would like to hear objections!)

Right now we’re considering enriched data as our canonical format, it is self-contained and sufficient (unlike shredded which is scattered around different files and folders). That’s why our Analytics SDK work exclusively with enriched data. Shredded data on the other hand is just an enriched data prepared for loading into relational database. In future I can imagine us adding new NoSQL storage targets, where shredding data would not make much sense, but enrichment will be a part of Snowplow ETL for a very long time.

Also, it was a conscious decision to introduce de-duplication as downstream as possible, which helps us to avoid re-introducing duplicates further downstream (e.g. due to Kinesis at-least-once processing). In this sense I’d still like to think about this process as a data-modeling.

But in the end, I think nothing prevents you from consuming shredded data (though Analytics SDK won’t be useful here). Another good news is that we’re planning to release our de-duplication logic as a separate library (code already duplicated across three components), which should allow you to use it your own jobs.