Debugging bad rows in Athena [tutorial]


#1

Debugging bad rows in Athena [tutorial]

One of the features that makes Snowplow unique is that it is a non-lossy pipeline: any data that hits the pipeline but isn’t successfully processed, rather than being dropped is preserved as ‘bad rows’ with (hopefully) helpful error messages indicating why those rows did not successfully validate. This is invaluable when you’re rolling out new tracking, to test that the tracking is working properly, and to make ongoing monitoring of your data quality over time manageable. (More about the importance of data quality and understanding bad data in this blog post.

There are a number of ways you can query the bad rows that have already been documented:

In December last year Amazon launched Athena and this is a particularly convenient route to querying bad data. The process for doing so is outlined below.

1. Log into Athena and create the bad rows table

In the AWS console navigate to Athena. Enter the following statement to create your bad rows table, being sure to update the S3 path to the location of your bad rows. (This should be identifiable from your EmrEtlRunner config.)

CREATE EXTERNAL TABLE IF NOT EXISTS bad_rows (
  line string,
  errors array<struct<level: string, message: string>>,
  failure_tstamp string
) PARTITIONED BY (
  run string 
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
STORED AS TEXTFILE
LOCATION 's3://bucket-name/path/to/enriched/bad/';

Now let’s load all the partitions:

-- Load all partitions
MSCK REPAIR TABLE bad_rows;

Count the number of bad rows by run

We easily see how many bad rows are generated with each run:

SELECT 
run, 
count(*) AS bad_rows_count
FROM bad_rows
GROUP BY run
ORDER BY run;

This should make it easy to spot spikes in the number of bad rows.

Viewing a selection of bad rows

We can view a selection of bad rows, e.g. that have been recently generated:

SELECT
errors
FROM bad_rows
WHERE run > '2017-01-20'
LIMIT 100;

Note that filtering any query on run massively increases the query speed and reduces the query cost, as it saves Athena having to scan too much of the data in S3. (Which is partitioned by run timestamp.)

As discussed in our earlier post, there are a number of bad rows that we don’t need to worry about. We can easily filter these out:

SELECT
errors[1].message
FROM bad_rows
WHERE run > '2017-01-20' 
AND position('does not match (/)vendor/version(/) pattern nor is a legacy' in errors[1].message) = 0
AND position('Unrecognized event [null]' in errors[1].message) = 0
LIMIT 100;

Note that in the above example we’re filtering out bad rows with the two specific error messages listed in the earlier post using the position function, which returns 0 if the specified text cannot be found the error message.

Identifying an error and measuring what impact it’s having

In the below screenshot you can see we’ve identified an error to do with one of our schemas: the data contains two fields: resolve_reason and urgency which are not allowed by the schema:

We can inspect a set of lines associated with the event:

SELECT
line
FROM bad_rows
WHERE run > '2017-01-20' 
AND position('object instance has properties which are not allowed by the schema: ["urgency"]' in errors[1].message) > 0
LIMIT 100;

By base64 decoding the payload body we can understand where this event came from and use that to home in on the tracking issue that generated it.

We can also look at how many bad rows this error has accounted for over time:

SELECT
run,
count(*)
FROM bad_rows
WHERE run > '2017-01-01'
AND position('object instance has properties which are not allowed by the schema: ["urgency"]' in errors[1].message) > 0
GROUP BY 1
ORDER BY 1

Finally we can easily filter out these rows to continue debugging the remaining bad rows:

SELECT
errors[1].message
FROM bad_rows
WHERE run > '2017-01-20' 
AND position('does not match (/)vendor/version(/) pattern nor is a legacy' in errors[1].message) = 0
AND position('Unrecognized event [null]' in errors[1].message) = 0
-- Adding the new condition to our filter below so we can see what errors remain to be diagnosed
AND position('object instance has properties which are not allowed by the schema: ["urgency"]' in errors[1].message) = 0
LIMIT 100;

##Manually loading partitions into Athena

There are some cases where running a REPAIR TABLE statement can take a significant amount of time (e.g if you are scanning terabytes of data). In this situation you might want to manually load a partition. This allows you to select an individual run, rather than adding all missing partitions to your bad_rows table:

ALTER TABLE bad_rows ADD PARTITION (run='2017-05-09-00-14-32') location 's3://bucket-name/path/to/enriched/bad/'

To dive into the run that was added to your bad_rows table, you just need to specify the full timestamp:

SELECT
COUNT(*),
errors

FROM bad_rows

WHERE run = '2017-05-09-00-14-32'

GROUP BY 2
ORDER BY 1 DESC

Trouble sending bad rows to amazon elasticsearch service (EsHadoopInvalidRequest)
Using AWS Athena to query the 'good' bucket on S3
Tracking clicks on email clicks - using unstructured event
#2

This is really exciting @yali - looking forward to using it.

I noticed Athena is not available in all regions yet. I couldn’t get it working on our setup and I suspect we need to run Athena in the same region as our S3 bucket.


#3

Would be interesting to have a dashboard built on top to visualize volume of rejections over time and breakdown by category. A tutorial on fixing the problems and sending fixed data for re-processing would also be appreciated.


#4

@robkingston we’ve been running Athena in us-east-1 processing data stored in eu-west-1 and that’s been working fine across all the accounts we’ve tried it on, so I reckon you should be able to do something similar… (Can never be sure because there are weird differences between AWS regions…)


#5

@dashirov two great ideas!

For a dashboard I think the two most promising platforms are QuickSight which has Athena support and Redash, which similarly has Athena support. Hope to get a chance to build one in both and share the results here shortly.

In terms of a tutorial for recovering bad rows there are a couple:

  1. Using Hadoop Event Recovery to recover events with a missing schema
  2. The Hadoop Event Recovery wiki page includes a couple of examples.

Is there’s a specific type of bad row you’re looking to recover and are stuck / have questions on do open a thread with the relevant details. I’m keen that we start publishing the different JS recovery functions that we use as I suspect a handful of error types (and corresponding JS recovery functions) probably account for more than 80% of the events people want to recover.