Recovering pipelines with cross-batch deduplication enabled [tutorial]

In Snowplow R86 Petra we released DynamoDB-powered cross-batch natural depulication, which effectively eliminated the problem of duplicates for the vast majority of Snowplow pipelines. A powerful new feature, this did also introduce some additional complexity around failed pipeline recovery, which Snowplow pipeline operators should be aware of.

In this post we will explain how to safely recover a Snowplow pipeline with cross-batch deduplication enabled, introducing a new Spark app we have written, Event Manifest Cleaner v0.1.0.

1. Recap on cross-batch event deduplication

Cross-batch event deduplication involves Spark Shred extracting two event properties which precisely identify an event (event_id and event_fingerprint), plus the etl_tstamp which identifies the pipeline run, and storing these three properties (the “deduplication triple”) in persistent DynamoDB storage. These three properties together let us build a simple and efficient algorithm to maintain state between loads and identify duplicates using AWS DynamoDB. The algorithm works as follows:

  1. If an element with the same event_id and event_fingerprint but different etl_tstamp exists in storage, then we found a duplicate from an older run, and the event will be silently dropped from the Spark Shred output
  2. If element with same event_id, same event_fingerprint and same etl_tstamp exists in storage - we’re reprocessing same enriched folder (due to a previous run failure), and the event should not be marked as duplicate
  3. All other combinations as well as previous one (e.g. same event_id different event_fingerprint) have no effect on the Spark Shred output

The following figure illustrates the different scenarios:

In the above, we see that algorithm has an effect, in the form of dropping data, if and only if the same event_id, same event_fingerprint and different etl_tstamp were found in DynamoDB table.

2. Impact on pipeline recovery

The problem with the above arises if you follow the standard recovery process, involving deleting enriched and shredded good data and reprocessing raw events starting from enrich step.

The issue is that the etl_tstamp is assigned when EmrEtlRunner launches the Spark Enrich job. Imagine the following scenario:

  • Spark Shred fails for processing folder X, but managed to write some events to DynamoDB table with etl_tstamp A
  • Pipeline operator tries to reprocess same folder X
  • New etl_tstamp B is assigned to all events in new pipeline run
  • Cross-batch reduplication drops events whose event_id’s and event_fingerprints are already in DynamoDB, but accompanied with etl_tstamp A
  • These events have not made it into Redshift in either run

The following figure illustrates this:

3. Introducing Event Manifest Cleaner

To address this problem, we have released Event Manifest Cleaner, a Spark job that cleans up a particular ETL’s events from DynamoDB table.

Event Manifest Cleaner takes as input the path to the enriched folder that failed processing, and then deletes from DynamoDB those records which match the events found in input folder. The interface of this Spark job closely resembles the Event Manifest Populator, but deletes data from the manifest instead of inserting it.

There are three scenarios we need to discuss:

  1. Spark Shred failed, and processing has been paused pending recovery
  2. Spark Shred failed, but the enriched folder has already been deleted
  3. Spark Shred failed, but further runs have already happened

3.1 Processing has been paused pending recovery

The least serious scenario, where the pipeline failed at shredding, and the pipeline operator immediately realized that Spark Shred already populated the manifest with some data.

The pipeline operator needs to run Event Manifest Cleaner pointing it to single remaining folder in enriched.good path from config.yml:

$ python run.py run_emr s3://snowplow-acme/enriched/good/run=2017-08-17-22-10-30/ /path/to/dynamodb_config.json /path/to/iglu_resolver.json

Here s3://snowplow-acme/enriched/good/run=2017-08-17-22-10-30/ is the full path to enriched folder that failed to shred and the other two arguments are the usual duplicate storage config and Iglu resolver config.

Here are the effects of running this:

After this, the pipeline can be resumed as per our Troubleshooting Guide.

3.2 Enriched folder has already been deleted

Spark Shred failed, but unfortunately the pipeline operator did not realize that he or she needs to take extra steps to recover it - and instead, simply deleted folders from enriched and shredded locations. Data was not loaded into Redshift either.

The deleted folder makes recovery problematic because there’s now no single source of truth to provide to Event Manifest Cleaner - the enriched folder that was re-processed has a new etl_tstamp and therefore cannot be used.

In this case you can provide an additional --time option to Event Manifest Cleaner, which specifies the timestamp of the first failed run (not the run with the same data that eventually succeeded). The timestamp can be retrieved from EmrEtlRunner logs or EMR console, and has the exact format as in enriched folders.

Here is an example:

$ python run.py run_emr s3://snowplow-acme/archive/enriched/run=2017-08-17-22-10-30/ /path/to/dynamodb_config.json /path/to/iglu_resolver.json --time 2017-08-17-22-48-00

Here, s3://snowplow-acme/archive/enriched/run=2017-08-17-22-10-30/ is the enriched folder that was successfully processed and archived to archive/enriched S3 location. Event Manifest Cleaner will only use this data to retrieve the affected event_ids and event_fingerprints - the etl_tstamp will be taken from the --time 2017-08-17-22-48-00 argument.

As next step, the pipeline operator will need to:

  1. Delete the s3://snowplow-acme/archive/enriched/run=2017-08-17-22-10-30/ folder
  2. Delete the s3://snowplow-acme/archive/shredded/run=2017-08-17-22-10-30/ folder
  3. Place the s3://snowplow-acme/archive/enriched/run=2017-08-17-22-10-30/ folder into processing location
  4. Resume the pipeline as per Troubleshooting Guide

3.3 Further runs have already happened

Spark Shred failed, the pipeline operator did not realize they need to take extra-steps to recover it and simply deleted folders from enriched and shredded locations. Then, the pipeline was restarted and data was loaded to Redshift. The problem is that this load remains incomplete as some part of dataset was filtered out by cross-batch deduplication algorithm, as explained in the problem description above.

This is the most complex scenario and it can be discovered days and weeks after an incomplete dataset was loaded into database. AWS provides several tools (explained below) to help you ensure that this scenario never happens, but if it’s too late - well, Snowplow never deletes your data and you should be able to recover it in Redshift:

  1. Pause your pipeline. We don’t want to make state even more inconsistent
  2. Identify the timestamp for the failed run. This can be done through EmrEtlRunner logs or EMR console
  3. Identify the timestamp for the incomplete run. This is a recovery pipeline launched right after failed one and processing same dataset. It should visibile as a huge spike in “Conditional Write Failures” in the AWS DynamoDB console
  4. Convert this timestamp from Snowplow format (e.g. 2017-07-20-18-47-06) to the Redshift format (e.g. 2017-07-20 18:47:06.874. You will have to check in atomic.events in Redshift to retrieve the milliseconds portion of the timestamp
  5. Delete data from all shredded Redshift tables: DELETE atomic.com_snowplowanalytics_snowplow_change_form_1 FROM atomic.com_snowplowanalytics_snowplow_change_form_1 as s JOIN atomic.events as e ON s.root_id = e.event_id WHERE e.etl_tstamp = '2017-07-20 18:47:06.874';
  6. Delete atomic data from Redshift: DELETE FROM atomic.events WHERE etl_tstamp = '2017-07-20 18:47:06.874’
  7. Launch the Manifest Cleaner:

$ python run.py run_emr s3://snowplow-acme/archive/enriched/run=2017-08-17-22-10-30/ /path/to/dynamodb_config.json /path/to/iglu_resolver.json --time 2017-08-17-22-48-00

  1. Run the pipeline against the raw data as per Troubleshooting Guide.

4. Detecting incomplete loads

While this recovery process is something every pipeline operator should be aware of, there’s also a way to have more confidence that your data is never being silently lost due to cross-batch de-duplication issues.

When Spark Shred finds a record in DynamoDB that needs to be marked as a duplicate (whether it is a real duplicate or an event left from a failed run), it raises ConditionalCheckFailedException, which DynamoDB keeps track of and allows to monitor using CloudWatch alarms.

If you have 1,000 duplicates on a typical run and the number of conditional write failures suddenly spikes to 400,000 - you have a clear sign that almost 400,000 events have been mistakenly marked as duplicates.

To be notified about cases like this you can create a CloudWatch alarm:

$ aws cloudwatch put-metric-alarm \
    --alarm-name DeduplicationInconsistentStateAlarm \
    --alarm-description "Alarm when unexpectedly high amount of events marked as duplicates" \
    --namespace AWS/DynamoDB \
    --metric-name ConditionalCheckFailedRequests \
    --dimensions Name=TableName,Value=snowplow-event-manifest \
    --statistic Sum \
    --threshold 10000 \
    --comparison-operator GreaterThanThreshold \
    --period 60 \
    --unit Count \
    --evaluation-periods 1 \
    --alarm-actions arn:aws:sns:us-east-1:123456789012:deduplication-alarm

This creates a CloudWatch alarm for incidents where the total amount of conditional check failures reaches 10000 per one minute. There’s no silver bullet for calculating threshold, but from our experience 5% of event volume in average load is a reasonable metric.

5. A warning

In our experience, EmrEtlRunner logs are not always reliable enough to recognize Shred job failure. Particularly, we have occasionally encountered the following picture in logs:

- 1. Elasticity Scalding Step: Enrich Raw Events: COMPLETED ~ 01:15:04 [2017-07-20 14:52:26 +0000 - 2017-07-20 16:07:31 +0000]
 - 2. Elasticity S3DistCp Step: Enriched HDFS -> S3: COMPLETED ~ 00:01:50 [2017-07-20 16:07:33 +0000 - 2017-07-20 16:09:23 +0000]
 - 3. Elasticity S3DistCp Step: Enriched HDFS _SUCCESS -> S3: FAILED ~ 00:00:40 [2017-07-20 16:09:25 +0000 - 2017-07-20 16:10:06 +0000]
 - 4. Elasticity Scalding Step: Shred Enriched Events: CANCELLED ~ elapsed time n/a [2017-07-20 16:10:06 +0000 - ]
 - 5. Elasticity S3DistCp Step: Raw S3 Staging -> S3 Archive: CANCELLED ~ elapsed time n/a [ - ]
 - 6. Elasticity S3DistCp Step: Shredded HDFS -> S3: CANCELLED ~ elapsed time n/a [ - ]):

These logs imply that Spark Shred was cancelled and that therefore no data was written to DynamoDB. However, data in DynamoDB console can reveal a different truth, with the Write Throughput chart showing that the job actually did start writing to DynamoDB. This means that recovery steps must be taken.

Use DynamoDB console or a CloudWatch alarm as source of truth regarding data coming into the event manifest to power cross-batch de-duplication.

6. Conclusion

Cross-batch deduplication is a powerful and popular feature of the Snowplow pipeline - operating it safely requires some specific care and attention.

This post has introduced the various failure scenarios you might encounter, provided ways of monitoring the status of the manifest, and documented a new Event Manifest Cleaner Spark job which makes recovery much safer and more ergonomic.

3 Likes

Great tutorial @anton!

Quick comment on 3.3: it’s not sufficient to delete data only from atomic.events - one would would to delete data from the self-describing events tables too, right?

Cheers!
Bernardo

2 Likes

Hi @bernardosrulzon,

You’re totally right, thanks - shredded data need to be deleted before atomic. I’ll add this into post.

1 Like