Debugging bad rows in Athena [tutorial]
One of the features that makes Snowplow unique is that it is a non-lossy pipeline: any data that hits the pipeline but isn’t successfully processed, rather than being dropped is preserved as ‘bad rows’ with (hopefully) helpful error messages indicating why those rows did not successfully validate. This is invaluable when you’re rolling out new tracking, to test that the tracking is working properly, and to make ongoing monitoring of your data quality over time manageable. (More about the importance of data quality and understanding bad data in this blog post.
There are a number of ways you can query the bad rows that have already been documented:
- If you’ve setup Snowplow to load bad rows into Elasticsearch, you can query them there including via Kibana
- It’s also possible to query the bad data using Spark on EMR e.g. with Zeppelin
In December last year Amazon launched Athena and this is a particularly convenient route to querying bad data. The process for doing so is outlined below.
1. Log into Athena and create the bad rows table
In the AWS console navigate to Athena. Enter the following statement to create your bad rows table, being sure to update the S3 path to the location of your bad rows. (This should be identifiable from your EmrEtlRunner config.)
CREATE EXTERNAL TABLE IF NOT EXISTS bad_rows ( line string, errors array<struct<level: string, message: string>>, failure_tstamp string ) PARTITIONED BY ( run string ) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS TEXTFILE LOCATION 's3://bucket-name/path/to/enriched/bad/';
Now let’s load all the partitions:
-- Load all partitions MSCK REPAIR TABLE bad_rows;
Count the number of bad rows by run
We easily see how many bad rows are generated with each run:
SELECT run, count(*) AS bad_rows_count FROM bad_rows GROUP BY run ORDER BY run;
This should make it easy to spot spikes in the number of bad rows.
Viewing a selection of bad rows
We can view a selection of bad rows, e.g. that have been recently generated:
SELECT errors FROM bad_rows WHERE run > '2017-01-20' LIMIT 100;
Note that filtering any query on
run massively increases the query speed and reduces the query cost, as it saves Athena having to scan too much of the data in S3. (Which is partitioned by run timestamp.)
As discussed in our earlier post, there are a number of bad rows that we don’t need to worry about. We can easily filter these out:
SELECT errors.message FROM bad_rows WHERE run > '2017-01-20' AND position('does not match (/)vendor/version(/) pattern nor is a legacy' in errors.message) = 0 AND position('Unrecognized event [null]' in errors.message) = 0 LIMIT 100;
Note that in the above example we’re filtering out bad rows with the two specific error messages listed in the earlier post using the
position function, which returns 0 if the specified text cannot be found the error message.
Identifying an error and measuring what impact it’s having
In the below screenshot you can see we’ve identified an error to do with one of our schemas: the data contains two fields:
urgency which are not allowed by the schema:
We can inspect a set of lines associated with the event:
SELECT line FROM bad_rows WHERE run > '2017-01-20' AND position('object instance has properties which are not allowed by the schema: ["urgency"]' in errors.message) > 0 LIMIT 100;
By base64 decoding the payload body we can understand where this event came from and use that to home in on the tracking issue that generated it.
We can also look at how many bad rows this error has accounted for over time:
SELECT run, count(*) FROM bad_rows WHERE run > '2017-01-01' AND position('object instance has properties which are not allowed by the schema: ["urgency"]' in errors.message) > 0 GROUP BY 1 ORDER BY 1
Finally we can easily filter out these rows to continue debugging the remaining bad rows:
SELECT errors.message FROM bad_rows WHERE run > '2017-01-20' AND position('does not match (/)vendor/version(/) pattern nor is a legacy' in errors.message) = 0 AND position('Unrecognized event [null]' in errors.message) = 0 -- Adding the new condition to our filter below so we can see what errors remain to be diagnosed AND position('object instance has properties which are not allowed by the schema: ["urgency"]' in errors.message) = 0 LIMIT 100;
##Manually loading partitions into Athena
There are some cases where running a
REPAIR TABLE statement can take a significant amount of time (e.g if you are scanning terabytes of data). In this situation you might want to manually load a partition. This allows you to select an individual run, rather than adding all missing partitions to your
ALTER TABLE bad_rows ADD PARTITION (run='2017-05-09-00-14-32') location 's3://bucket-name/path/to/enriched/bad/'
To dive into the run that was added to your
bad_rows table, you just need to specify the full timestamp:
SELECT COUNT(*), errors FROM bad_rows WHERE run = '2017-05-09-00-14-32' GROUP BY 2 ORDER BY 1 DESC