Using AWS Athena to query the shredded events

Recently, we looked into using AWS Athena to query the ‘good’ bucket on S3. The approach we outlined focussed on querying the ‘enriched’ unshredded data but we also wanted to see if we can query the shredded events directly from S3.

Why use Athena?

We’ve been using Athena to query the ‘bad’ data (ie data that fails validation) directly on S3. However, it also holds great promise for querying the ‘good’ Snowplow data in S3, potentially as an alternative to, or alongside, querying it in Redshift. Athena is a particularly attractive querying tool for users who have very large volumes of Snowplow data, but have very variable query workloads. (So paying for a permanent Redshift cluster is cost-prohibitive.)

In this post, we explore how to query your Snowplow data in S3 using Athena: specifically how to query the ‘shredded’ rather than the ‘enriched’ data. (The difference will be discussed below.)

Although this is possible, there are some limitations to doing it. Those relate to the way that Snowplow data is currently serialized in S3. We are looking into improving this so that working with Snowplow data in Athena becomes more efficient going forwards.

A reminder on how Snowplow data is currently stored on S3

As standard, Snowplow outputs two copies of your processed data to S3:

  • An ‘enriched’ data set. The structure of the ‘enriched data’ maps to the atomic.events table in Redshift prior to our rolling out support for self-describing events and custom contexts in Snowplow. (At which point we moved to storing the data in Redshift in a multi-table model, one table for each self-describing event and custom context.) The format of this data is a tab-delimited text file. Self-describing events and custom contexts are stored in dedicated columns in the tsv as self-describing JSONs.
  • A ‘shredded’ data set optimised for loading into Redshift. This data is in a new-line delimited JSON format. We call this the ‘shredded’ data because the different self-describing event and custom context JSONs present in the enriched data have been ‘shredded’ out into individual tables for efficient loading and querying in Redshift.

In this post we explore querying the ‘shredded’ data with Athena.

Creating the shredded event table in Athena

Each specific shredded table will require its own table definition. For this post we are using the webPage context table, which most Snowplow users have enabled. You’ll have to come up with DDLs for any shredded table you want to query. The documentation for the CREATE TABLE statement in Athena provides useful guidance on that.

Let’s first take a look at what an actual shredded event looks like. Here is an example row from the shredded files for www.snowplowanalytics.com:

{
	"schema": {
		"vendor": "com.snowplowanalytics.snowplow",
		"name": "web_page",
		"format": "jsonschema",
		"version": "1-0-0"
	},
	"data": {
		"id": "ef499475-70c6-4ad9-a289-82b953adaa20"
	},
	"hierarchy": {
		"rootId": "c1b6e9c2-b04f-4b80-a993-9e37503fde08",
		"rootTstamp": "2017-08-04 08:51:19.000",
		"refRoot": "events",
		"refTree": ["events", "web_page"],
		"refParent": "events"
	}
}

Based on that, we’ve come up with the following CREATE TABLE query:

CREATE EXTERNAL TABLE IF NOT EXISTS com_snowplowanalytics_snowplow_web_page (
  schema STRUCT    <
                    vendor:STRING,
                    name:STRING,
                    format:STRING,
                    version:STRING
                   >,

  data STRUCT      <
                    id:STRING
                   >,

  hierarchy STRUCT <
                    rootId:STRING,
                    rootTstamp:TIMESTAMP,
                    refRoot:STRING,
                    refTree:STRING,
                    refParent:STRING
                   >
)
PARTITIONED BY(run STRING)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' WITH SERDEPROPERTIES ( "ignore.malformed.json" = "true")
STORED AS TEXTFILE
LOCATION 's3://path/to/shredded/good/';

Then, load the partitions (the data is partitioned by run, ie etl_tstamp):

MSCK REPAIR TABLE com_snowplowanalytics_snowplow_web_page;

Be prepared for that command to take a long time to complete. The reason is also one of the key limitations of this approach.

The way shredded Snowplow data is stored on S3 is usually a variation of:

com.example-company/shredded/good/run=TIMESTAMP/com.vendor/event_name/jsonschema/version=1-0-0/

What that means is that, eg the run=2017-08-01-03-08-40 folder will have all the files for all shredded tables for that etl_tstamp. That includes the webPage context files, but also files for link_click events, any custom self-describing events etc.

When we load the partitions, we are loading all these events into our table. The ones that are not webPage context records will end up creating empty rows in the com_snowplowanalytics_snowplow_web_page table (empty, that is, but for the partition column run, which will be populated).

Here is an example of what this looks like:

 	schema	data	hierarchy	run
1               				2017-07-30-03-09-05
2		                		2017-07-30-03-09-05
3				                2017-07-30-03-09-05

“Cleaning up” the table

You can ‘manually’ filter out these empty rows:

SELECT *
FROM com_snowplowanalytics_snowplow_web_page
WHERE schema IS NOT NULL
  AND data IS NOT NULL
  AND hierarchy IS NOT NULL
LIMIT 10 -- reduce query time

However, now we can spot a further limitation: all events that match the table definition get loaded:

    schema	                                                                                    data	    hierarchy	                                   run
1	{vendor=com.pingdom, name=incident_notify_user, format=jsonschema, version=1-0-0}	        {id=null}	{rootId=361c5eea-2724-4bdd-a3a7-47b19d914b07, rootTstamp=2017-05-19 23:16:55.000, refRoot=events, refTree=["events","incident_notify_user"], refParent=events}	2017-05-20-03-09-17
2	{vendor=com.snowplowanalytics.snowplow, name=duplicate, format=jsonschema, version=1-0-0}	{id=null}	{rootId=6add8d30-5585-4591-9650-e23b57b92627, rootTstamp=2017-04-05 15:52:49.000, refRoot=events, refTree=["events","duplicate"], refParent=events}	            2017-04-06-03-08-47
3	{vendor=com.snowplowanalytics.snowplow, name=change_form, format=jsonschema, version=1-0-0}	{id=null}	{rootId=df74ce60-8198-47a9-9376-083135db6657, rootTstamp=2017-05-23 12:46:47.000, refRoot=events, refTree=["events","change_form"], refParent=events}	        2017-05-24-03-09-20
4	{vendor=com.snowplowanalytics.snowplow, name=link_click, format=jsonschema, version=1-0-1}	{id=null}	{rootId=017033df-e1b8-4651-a419-370a38735ee6, rootTstamp=2017-03-16 11:10:13.000, refRoot=events, refTree=["events","link_click"], refParent=events}	        2017-03-16-12-55-15

(That is not to say that all these events have the same schema: but they all have the fields we are using in the table definition.)

Fortunately, we can filter by schema name:

SELECT *
FROM com_snowplowanalytics_snowplow_web_page
WHERE schema.name = 'web_page'
LIMIT 10;

which results in:

    schema	                                                                                    data	                                    hierarchy	   run
1	{vendor=com.snowplowanalytics.snowplow, name=web_page, format=jsonschema, version=1-0-0}	{id=658e01d2-12f1-46cd-b3ab-7fbe3d6debf7}	{rootId=0018fa47-434b-4f82-a7fb-76ac2fe74f97, rootTstamp=2017-04-29 14:36:51.000, refRoot=events, refTree=["events","web_page"], refParent=events}	2017-04-30-03-07-19
2   (...)

Querying the new table in Athena

You will have noticed by now that this table only has three columns (+ one for the partition). The value in each cell is in turn a JSON with various key:value pairs. To extract a specific element, like the page view ID, we can call it by key:

SELECT data.id
FROM com_snowplowanalytics_snowplow_web_page
WHERE schema.name = 'web_page'
LIMIT 10;
    id
1	b362dcd1-4753-4358-b7a2-1fc7575934af
2	baf897ac-ec1d-49b6-9900-6085542c5923
3	e2116b12-5172-424a-ae0c-b4ae95c26fd7
4   (...) 

This Athena table looks very different from the corresponding table in Redshift. To bring the two closer, try this query:

SELECT
  schema.vendor AS schema_vendor,
  schema.name AS schema_name,
  schema.format AS schema_format,
  schema.version AS schema_version,
  hierarchy.rootId AS root_id,
  hierarchy.rootTstamp AS root_tstamp,
  hierarchy.refRoot AS ref_root,
  hierarchy.refTree AS ref_tree,
  hierarchy.refParent AS ref_parent,
  data.id AS id
  FROM com_snowplowanalytics_snowplow_web_page
WHERE schema.name = 'web_page'
LIMIT 10
 	schema_vendor  schema_name	schema_format	schema_version	root_id	root_tstamp	ref_root	ref_tree	ref_parent	id
1	com.snowplowanalytics.snowplow	web_page	jsonschema	1-0-0	7db1bde4-978c-4088-ae52-24c19fca8046	2017-05-04 19:22:27.000	events	["events","web_page"]	events	b5a7b823-1dbf-4586-b618-337b056703f2

Athena does not support CREATE AS SELECT statements, so we don’t have the option of creating a new table that matches the one in Redshift. But we can use a variation of the above ‘translation’ query in the WITH clause, to create a temporary table that can be referenced in the FROM clause.

Joining the shredded event table onto atomic_events

Joining the shredded event table onto atomic.events in Redshift is a routine operation for any Snowplow user. Here is how it works in Athena:

SELECT
  a.event_id,
  wp.data.id AS page_view_id
FROM atomic_events AS a
JOIN com_snowplowanalytics_snowplow_web_page AS wp
  ON a.event_id = wp.hierarchy.rootId
WHERE wp.hierarchy.refTree = '["events","web_page"]' -- another way to filter out unwanted events
LIMIT 10;
    event_id	                            page_view_id
1	090cd7b1-bfd2-4e70-a917-0eb7d4ad4038	0f487907-47db-42e1-b447-a8fe88ab8642

Usually, we would advise to join on both event_id = root_id and collector_tstamp = root_tstamp. However, I got zero records returned when I did that in this case, suggesting there might be some issue with the way timestamps are formatted in the two tables that needs some more investigating.

Next steps

The key limitation to the approach described in this post is that we can’t guarantee only desired data will end up in the Athena table we are creating. Because we partition the data in S3 first by run, then by vendor, then by event etc, all events are being scanned when loading the partitions. That increases the cost and also means any event that matches the DDL will get loaded (while non-matching events produce redundant empty rows).

For our future experiments with Athena, we’d like to test a different way of structuring data in S3 to ensure that each run bucket we load only contains data about one event or context (something that has already been raised by the community).

4 Likes