Loading events into a data warehouse is almost never a goal in and of itself. The companies that take full advantage of their Snowplow data all have a data modeling step that transforms and aggregates events into a set of derived tables, which are then made available to end users in the business.
For more information on event data modeling:
Most Snowplow users load their data into Redshift and implement their data models in SQL (using either SQL Runner or a BI tool such as Looker). These data models often work like this: each time new data is loaded into Redshift, the derived tables are dropped (or truncated) and rebuilt using all historical data.
This approach has several benefits. For instance, it makes it possible to iterate on the data model without having to update the existing derived tables. However, because the time it takes to build the derived tables goes up with the number of events in Redshift, there will be a point when rebuilding the tables from scratch each time will start to take too long.
One solution is to move the data models out of Redshift and rewrite them to run on Spark instead. We have written about this before and it’s something we will keep exploring. This tutorial instead focuses on improving the performance in Redshift – in particular on how to update a set of derived tables rather than rebuild them from scratch with each run.
1. The example data model
We’ll use a simple SQL data model to illustrate how one would go about updating a derived table rather than rebuilding it with each run. The data model itself is simple, but the queries that make it incremental also work for a more complex data model.
Let’s create a simple sessions table. Each time the Snowplow pipeline runs, we create a new sessions table using all events in
atomic.events, drop the old table, and rename the new table:
CREATE SCHEMA IF NOT EXISTS derived; CREATE TABLE derived.sessions_new DISTKEY(session_id) SORTKEY(tstamp) AS ( SELECT domain_sessionid AS session_id, MIN(derived_tstamp) AS tstamp FROM atomic.events GROUP BY 1 ); DROP TABLE derived.sessions; ALTER TABLE derived.sessions_new RENAME TO sessions;
In the next section, we’ll make this example model incremental.
2. Moving to incremental updates
Rebuild the sessions table one last time
The sessions table needs to exist before we can start updating it, so build the table from scratch one last time.
Create a manifest
Next, create a table (
derived.etl_tstamps) that will be used to keep track of which events have been processed. We use the
etl_tstamp for this.
CREATE SCHEMA IF NOT EXISTS scratch; CREATE TABLE derived.etl_tstamps (etl_tstamp timestamp encode lzo) DISTSTYLE ALL; INSERT INTO derived.etl_tstamps (SELECT DISTINCT etl_tstamp FROM atomic.events); CREATE TABLE scratch.sessions (LIKE derived.sessions); CREATE TABLE scratch.etl_tstamps (LIKE derived.etl_tstamps);
2.2 Incremental updates
The incremental update happens in a couple of steps.
Step 0: truncate the tables in scratch
TRUNCATE scratch.etl_tstamps; TRUNCATE scratch.sessions; ANALYZE scratch.etl_tstamps; ANALYZE scratch.sessions;
Step 1: select all ETL timestamps that are not in the manifest
INSERT INTO scratch.etl_tstamps ( WITH recent_etl_tstamps AS ( -- return all recent ETL timestamps SELECT DISTINCT etl_tstamp FROM atomic.events WHERE collector_tstamp > DATEADD(week, -1, CURRENT_DATE) -- restrict table scan to the last week (SORTKEY) ORDER BY 1 ) -- return all ETL timestamps that are not in the manifest (i.e. that have NOT been processed) SELECT DISTINCT etl_tstamp FROM recent_etl_tstamps WHERE etl_tstamp NOT IN (SELECT etl_tstamp FROM derived.etl_tstamps ORDER BY 1) ORDER BY 1 );
Note that we filter on
collector_tstamp. This same filter will be used in subsequent queries as well. Because the
atomic tables are sorted on
collector_tstamp, Redshift will be able to skip almost all blocks because the events in them are older than 1 week, therefore returning the results much faster.
The default in this case is 1 week, because this is:
- is restrictive enough to speed things up
- provides enough margin if things break
If either the pipeline or the SQL break for some reason, the problem will need to be resolved within one week or some batches that still need to be processed will be excluded (the filter can of course be updated if that were to happen).
Step 2a: create a list of session ID
Select all session ID that have at least one event in the batches (or batches) that we want to process.
INSERT INTO scratch.session_id ( SELECT DISTINCT session_id AS id FROM atomic.events WHERE collector_tstamp > DATEADD(week, -1, CURRENT_DATE) -- restrict table scan to the last week (SORTKEY) AND etl_tstamp IN (SELECT etl_tstamp FROM scratch.etl_tstamps ORDER BY 1) ORDER BY 1 );
Step 2b: create a list of event ID
In some cases, it’s easier if we can restrict on the event ID, so we don’t need to join to
atomic.events to get the session ID (e.g. in the unstructured event or context tables).
INSERT INTO scratch.event_id ( -- XX% are within 48 hours -- YY% are within the last week so we don't look at older events SELECT event_id AS id FROM atomic.events WHERE collector_tstamp > DATEADD(week, -1, CURRENT_DATE) -- restrict table scan to the last week (SORTKEY) AND domain_sessionid IN (SELECT id FROM scratch.session_id ORDER BY 1) ORDER BY 1 );
There are a couple of important things to note here.
First, some sessions will have events that were processed in earlier runs. This will happen when a session was still active when the pipeline last ran, or if not all events had arrived yet. These sessions already have a row in
derived.sessions, which we will drop just before we update the table. To prevent mistakes, the model recomputes sessions in full (i.e. using all events with that session ID, not just the events that haven’t been processed before), but only those sessions that have at least one unprocessed event.
However, we also restrict on
collector_tstamp, which limits how far we can look back in time. This will introduce a small number of mistakes – for instance if a session lasts for more than 1 week, or if an event arrives more than 1 week late – but it can also speed up the query by 2 orders of magnitude.
We recommend running a quick analysis on the distribution to determine what is acceptable. Based on what we have seen with sessions, more than 99.99% of sessions fall entirely within the 1 week range.
Step 3: aggregate the sessions
INSERT INTO scratch.sessions ( SELECT domain_sessionid, COUNT(*) FROM atomic.events WHERE collector_tstamp > DATEADD(week, -1, CURRENT_DATE) -- restrict table scan to the last week (SORTKEY) AND domain_sessionid IN (SELECT id FROM scratch.session_id ORDER BY 1) -- restrict to session ID we need to recalculate GROUP BY 1 ORDER BY 1 );
Step 4: commit into the derived table
DELETE FROM derived.sessions WHERE domain_sessionid IN (SELECT id FROM scratch.session_id ORDER BY 1); ALTER TABLE derived.sessions APPEND FROM scratch.sessions; ... ALTER TABLE derived.etl_tstamps APPEND FROM scratch.etl_tstamps;
Because we do a
DELETE FROM, the sessions table will need to be vacuumed.
2.3 Final notes
The restrictions on
root_tstamp) are, to a large extent, what keep the queries fast. However, it places some constraints on what tables can be updated like this. It works well for a sessions table because sessions never last for weeks at a time. It’s a bit different if the aggregation is happening across longer timeframes (e.g. at the visitor level). In that case, it’s still possible to move the model to incremental updates, but the table that is updated will have to be a bit different. For example, in the case of visitors, the table might have one row per visitor per day rather than one row per visitor.
2.4 Questions or feedback
Let us know below if we made any mistakes, missed anything, or if you have any feedback of questions!
Make sure to also check out this related post: