Using AWS Athena to query the 'good' bucket on S3


#1

Since AWS launched Athena late last year, we’ve been using it to query the ‘bad’ data (i.e. data that fails validation) directly on S3. (Check out our tutorial on how to do that: Debugging bad rows in Athena.)

Athena also holds great promise for querying “good” Snowplow data in S3, potentially as an alternative to, or alongside, querying it in Redshift. Athena is a particularly attractive querying tool for users who have very large volumes of Snowplow data, but have very variable query workloads. (So paying for a permanent Redshift cluster is cost-prohibitive.)

In this post, we explore how to query your Snowplow data in S3 using Athena: specifically how to query the ‘enriched’ rather than the ‘shredded’ data. (The difference will be described below.) Do note that although this is possible, there are some limitations to doing so related to the way that Snowplow data is currently serialized in S3. We are looking to improve this so that working with Snowplow data in Athena becomes more efficient going forwards: this is explored towards the end of this post.

A note on how Athena works

All tables created in Athena are external tables. In effect, whenever you create a table in Athena, you are actually simply creating the schema for that table. When you then run queries against it, that schema gets applied to the underlying data on S3 and the query is run against the resulting table.

This means that when you drop a table in Athena, you’re only dropping that particular schema. The underlying data is still safely sitting in your S3 instance. So you can safely experiment with different table definitions and if they don’t work out, you can just drop them with no underlying data loss.

Understanding the way Snowplow data is currently stored in S3: enriched and shredded data sets

As standard, Snowplow outputs two copies of your processed data to S3:

  1. An ‘enriched’ data set. The structure of the ‘enriched data’ maps to the atomic.events table in Redshift prior to our rolling out support for self-describing events and custom contexts in Snowplow. (At which point we moved to storing the data in Redshift in a multi-table model, one table for each self-describing event and custom context.) The format of this data is a tab-delimited text file. Self-describing events and custom contexts are stored in dedicated columns in the tsv as self-describing JSONs.
  2. A ‘shredded’ data set optimised for loading Redshift. This data is in a new-line delimited JSON format. We call this the ‘shredded’ data because the different self-describing event and custom context JSONs present in the enriched data have been ‘shredded’ out into individual tables for efficient loading and querying in Redshift.

In this post we explore querying the ‘enriched’ data with Athena. In a follow-up post we’ll explain how to query the shredded the data.

Creating the atomic.events table in Athena

Create the following table in Athena:

CREATE EXTERNAL TABLE atomic_events ( 
app_id STRING, 
platform STRING, 
etl_tstamp TIMESTAMP, 
collector_tstamp TIMESTAMP, 
dvce_tstamp TIMESTAMP, 
event STRING, 
event_id STRING, 
txn_id INT, 
name_tracker STRING, 
v_tracker STRING, 
v_collector STRING, 
v_etl STRING, 
user_id STRING, 
user_ipaddress STRING, 
user_fingerprint STRING, 
domain_userid STRING, 
domain_sessionidx INT, 
network_userid STRING, 
geo_country STRING, 
geo_region STRING, 
geo_city STRING, 
geo_zipcode STRING, 
geo_latitude STRING, 
geo_longitude STRING, 
geo_region_name STRING, 
ip_isp STRING, 
ip_organization STRING, 
ip_domain STRING, 
ip_netspeed STRING, 
page_url STRING, 
page_title STRING, 
page_referrer STRING, 
page_urlscheme STRING, 
page_urlhost STRING, 
page_urlport INT, 
page_urlpath STRING, 
page_urlquery STRING, 
page_urlfragment STRING, 
refr_urlscheme STRING, 
refr_urlhost STRING, 
refr_urlport INT, 
refr_urlpath STRING, 
refr_urlquery STRING, 
refr_urlfragment STRING, 
refr_medium STRING, 
refr_source STRING, 
refr_term STRING, 
mkt_medium STRING, 
mkt_source STRING, 
mkt_term STRING, 
mkt_content STRING, 
mkt_campaign STRING, 
contexts STRING, 
se_category STRING, 
se_action STRING, 
se_label STRING, 
se_property STRING, 
se_value STRING, 
unstruct_event STRING, 
tr_orderid STRING, 
tr_affiliation STRING, 
tr_total STRING, 
tr_tax STRING, 
tr_shipping STRING, 
tr_city STRING, 
tr_state STRING, 
tr_country STRING, 
ti_orderid STRING, 
ti_sku STRING, 
ti_name STRING, 
ti_category STRING, 
ti_price STRING, 
ti_quantity INT, 
pp_xoffset_min INT, 
pp_xoffset_max INT, 
pp_yoffset_min INT, 
pp_yoffset_max INT, 
useragent STRING, 
br_name STRING, 
br_family STRING, 
br_version STRING, 
br_type STRING, 
br_renderengine STRING, 
br_lang STRING, 
br_features_pdf STRING, 
br_features_flash STRING, 
br_features_java STRING, 
br_features_director STRING, 
br_features_quicktime STRING, 
br_features_realplayer STRING, 
br_features_windowsmedia STRING, 
br_features_gears STRING, 
br_features_silverlight STRING, 
br_cookies STRING, 
br_colordepth STRING, 
br_viewwidth INT, 
br_viewheight INT, 
os_name STRING, 
os_family STRING, 
os_manufacturer STRING, 
os_timezone STRING, 
dvce_type STRING, 
dvce_ismobile STRING, 
dvce_screenwidth INT, 
dvce_screenheight INT, 
doc_charset STRING, 
doc_width INT, 
doc_height INT, 
tr_currency STRING, 
tr_total_base STRING, 
tr_tax_base STRING, 
tr_shipping_base STRING, 
ti_currency STRING, 
ti_price_base STRING, 
base_currency STRING, 
geo_timezone STRING, 
mkt_clickid STRING, 
mkt_network STRING, 
etl_tags STRING, 
dvce_sent_tstamp TIMESTAMP, 
refr_domain_userid STRING, 
refr_dvce_tstamp TIMESTAMP, 
derived_contexts STRING, 
domain_sessionid STRING, 
derived_tstamp TIMESTAMP 
) 
PARTITIONED BY(run STRING) 
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY '\t' 
STORED AS TEXTFILE 
LOCATION 's3://bucket-name/path/to/enriched/good';

This schema follows the format of the unshredded event in S3. As you can see, this is pretty much the atomic.events table that you have in Redshift today, with three notable additions:

  • unstruct_event is a self-describing JSON with the data that is specific to the event, for example a link_click event;
  • contexts is an array of contexts sent with the event;
  • derived_contexts is an array of derived contexts sent with the event.

The actual format of the values of these fields is JSON but here we’re loading them as strings.

Next, load the partitions. (The data is partitioned by etl_tstamp i.e. run:

MSCK REPAIR TABLE atomic_events;

Running a few simple queries

Now that we have our atomic_events table in Athena, we can try running a few simple queries to confirm it’s working as expected. Try:

SELECT count(DISTINCT page_urlhost)
FROM atomic_events
WHERE run > '2017-06-14';

or

SELECT page_urlpath, count(*)
FROM atomic_events
WHERE run > '2017-06-14'
GROUP BY 1
ORDER BY 2 DESC
LIMIT 100;

It’s a good idea to limit your queries by run:

  1. This reduces the number of rows scanned in S3, which
  2. decreases query time and
  3. decreases the cost of each query.

Querying data within the self-describing JSON fields

Much of the data that we want to query is likely to be in the unstruct_event field (for any self-describing event) and contexts field (for any custom context that we send to Snowplow).

Querying other fields in Athena is straightforward. Querying these fields is not, because the data stored in them is self-describing JSON format. However, it is possible.

Let’s take a particular use case: we have a particular set of structured events that we capture, recognizable because se_category is set to advertisement. We know that for each of these events, a custom context is sent that contains the name of the campaign to which the ad belongs.

The JSON in the contexts field will look something like:

{
	"schema": "iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0",
	"data": [{
		"schema": "iglu:com.example/ad_context/jsonschema/1-0-0",
		"data": {
			"campaignName": "Example Campaign",
			"adName": "Example Ad",
			"adFormat": "Standard Banner",
			"campaignId": "12345",
			"adId": "67890"
		}
	}]
}

Now we can use the json_extract_scalar() function in Athena to extract the value of a specific element:

SELECT json_extract_scalar(contexts, '$.data[0].data.campaignName') AS campaign_name
FROM atomic_events
WHERE run > '2017-06-14'
AND se_category = 'advertisement';

That should return a result like:

 campaign_name
1   Example Campaign
2   ...
3   ...

We can extract the other properties of the context (adName, adFormat, etc) in the same way.

One thing to note is that the outer data element of the contexts JSON is an array (because potentially you are sending multiple contexts). So you have to specify which element of the array you want to extract: '$.data[0].data.campaignName'. In this case we only have one context, so we want the 0th element. If you don’t call that element by index, you will get an empty string in return.

We can use this approach to query the unstruct_event field as well. Here is a link_click event:

{
	"schema": "iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0",
	"data": {
		"schema": "iglu:com.snowplowanalytics.snowplow/link_click/jsonschema/1-0-1",
		"data": {
			"targetUrl": "http://example.com",
			"elementId": "",
			"elementClasses": "",
			"elementTarget": ""
		}
	}
}

We can surface the value of the targetUrl property:

SELECT json_extract_scalar(unstruct_event, '$.data.data.targetUrl') AS target_URL
FROM atomic_events
WHERE run > '2017-06-14';

In this case the value of the outer data element is a single JSON object rather than an array, so we don’t have to use an index.

A better way to query enriched Snowplow data in Athena

We plan to develop Snowplow to make querying data stored in S3 easier with Athena. In particular, we want to:

  1. Update the way that data is serialized so that it is easy to directly query all the fields in the event, including any fields that belong to a self-describing event or custom context.
  2. Make querying the data faster and more efficient by serializing it in a more compact format e.g. Avro with Parquet
  3. Update our Iglu schema tech to elegantly handle updates to Athena table definitions that need to be made as new schema versions are released.

More on the above shortly!


Can we use spectrum to query shredded data instead of enriched?
Minimal Enrich Setup?
Directory structure for enriched data to support AWS Athena
Self-describing events without "real" events?
#2

Nice, i added an issue to add support for parquet :slight_smile:
https://github.com/snowplow/kinesis-s3/issues/87


#3

I’ve also added in a ticket for adding Avro support which should enable ‘external source’ querying for a variety of ecosystems:

Source / Service
S3 / Athena
Cloud Storage / BigQuery
Azure Storage / HDInsight

https://github.com/snowplow/kinesis-s3/issues/99

Previous discussion on how Avro/Parquet could fit into the pipeline here:
http://discourse.snowplowanalytics.com/t/optionally-support-avro-and-the-confluent-schema-registry-for-kafka/1127/9