Recover from EMR failures with deduplication?



This excellent troubleshooting guide recommends deleting the enriched:good files after a failure in step 6 - Shred, and rerunning the job.

I’m wondering how that works with the DynamoDB deduplication feature - does deleting files & rerunning job create a new etl_tstamp, making all the events look like duplicates? That would result in a lot of missing events.

On a side note: not deleting files and running the job with --skip staging,enrich makes the Shred step fail 1 minute into the job. Running with --process-shred works fine, but has the inconvenience of not archiving raw files.


Hi @bernardosrulzon,

The guide you are referring to is not up-to-date with the latest Snowplow releases. In particular, R87 moved the “archive_raw” step (step 10) to EMR cluster:

we have migrated the archival code for raw collector payloads from EmrEtlRunner into the EMR cluster itself, where the work is performed by the S3DistCp distributed tool. This should reduce the strain on your server running EmrEtlRunner, and should improve the speed of that step. Note that as a result of this, the raw files are now archived in the same way as the enriched and shredded files, using run= sub-folders.

As a result of this change, the recovery step which includes --skip staging,emr should be --skip staging,enrich,shred which also should work for earlier releases.

Running the pipeline with --skip staging,enrich fails at shredding because no files are available in HDFS once the EMR cluster is terminated. Hence, you need to deleted “enriched:good” and spin the EMR cluster with --skip staging.

As for deduplication handling, the events in the manifest table are tracked by means of

  • Event id - used to identify event
  • Event fingerprint - used in conjunction with event id to identify natural duplicates
  • ETL timestamp - used to check if previous Hadoop Shred was aborted and event is being reprocessed

and the conditional update feature of DynamoDB.

Thus, it is easy to determine if the same events (Event id and Event fingerprint) have been processed (ETL timestamp). Reprocessing the events does generate a new ETL timestamp. The deduplication process is described here:


That’s interesting. Let’s say I have a job that failed into the Shred step - that means some of the events were successfully written to DynamoDB.

If I just delete enriched:good and re-run with --skip staging, this will result in a different etl_tstamp and the events that were previously written will be treted as duplicates (same event_id, different etl_tstamp) – and this will result in loss of data.

I looked into the code and a new ETL timestamp will be created every time a new EMR cluster is fired up, meaning that fully recovering from a failed Shred job is impossible with deduplication enabled.

Maybe we could have a --shred-failure argument into Dataflow Runner that will inherit etl_tstamp from the successful Enrich step.

Does that make sense @ihor @alex?


Hey @bernardosrulzon - I just wanted to let you know that we are not ignoring your question! It’s a really important one and raised some serious challenges around how we track progress / maintain useable manifests with batch/realtime Snowplow.

We are having some wide-ranging internal discussions about this and hope to have a follow-up to share soon.


I know that @bernardosrulzon is aware - but for anybody else coming across this post, we have published a full tutorial on recovering from a pipeline failure involving cross-batch deduplication, here: