ETL very very slow in larger batches

I suspect it’s because the DynamoDB table could get very large. At the moment ours is 3.6GB with a few months of data.

It also raises a very important consideration. @alex is it possible to reprocess Cloudfront with global event de-duplication? Haven’t all the events have already been seen and so are considered duplicates?

I’m pretty sure I saw this in testing and for our reprocessing we have the luxury of being able to delete the table and start from zero.

quick update. changing write capacity on dynamodb to 2500 shred is down to 6 minutes for 100 files. so that was definitely it. now to figure out why load is taking almost 30 minutes :slight_smile:

Just for completeness it was just a lack of disk space causing my problem. The 7GB of compressed Cloudfront needed just under 200GB of disk space based on the EMR monitoring page. That was a test job I did without the DynamoDB (DDB) event deduplication. It took 4hrs on 2*m4.4xlarge which had been configured with 3 executors according to the spreadsheet. Update I ran over 5GB of data in the reprocessing pipeline using the DDB event deduplication and it took 9hrs with a continuous DDB throughput usage.

For comparison it takes about a day to process 1GB of data with the same cluster and DDB based event deduplication. That is a significant speed up but we’ll have to see what the extra overhead from DDB with a job running over 5GB of Cloudfront logs.

Spark and AWS are a lot more picky about running successfully compared to a few years ago when we started out with Snowplow. This is a lot of effort to process 7GB of data which isn’t really big data.

Summary:

  • I’ve found anything less than a gigabyte of data works out of the box.

    • The size of you cluster has an effect on the time but only a little.
    • You will have to manage your DynamoDB table throughput if you are doing global event deduplication. The peak throughput is dependent on the cluster parallelism and requires experimentation. You probably want to start in the 500-1000 range.
  • For more than 1GB

    • Read the Apache Spark Config Cheetsheet and use the spreadsheet to select the optimal configuration
    • You will need plenty of scratch space. 7GB compressed Cloudfront input which wrote 7GB of atomic-events 1.3GB of shredded-types compressed data to S3 used up 200GB of space on the cluster. Add an EBS volume! Compression for the win?
    • If you’re doing global event deduplication with DynamoDB then you need to find the max throughput emperically, 100 is probably an order of magnitude out, and you can save a lot of money by scaling up and down for each run.

Hey @mjensen, one thing that significantly increases load time is RDB Loader’s consistency check. Basically this is a hack to overcome infamous S3 eventual consistency that often results in failed loads due ghost folders on S3. RDB Loader “solves” this problem by waiting some time until S3 starts to give consistent results. This time is specific to your dataset and calculated by formulae ((atomicFiles.length * 0.1 * shreddedTypes.length) + 5) seconds, so if you have many shredded types - this easily can reach 30 mins. Upcoming R29 is aiming to solve this problem in a more elegant way, so stay tuned.

Also, with R97 Knossos you can add --skip consistency_check to skip this stage, but chance of failure increases significantly (you’ll have to re-run load manually then, so maybe not critical).

I suspect it’s because the DynamoDB table could get very large.

@gareth, that’s a valid suggestion, however from our experience (most manifests we’re working with are 20-35GB) DynamoDB is very elastic in this sense - you only tweak throughput and this is the only thing that affects time required to put data.

is it possible to reprocess Cloudfront with global event de-duplication

If I understand the question correctly, you want to de-duplicate historical data. If by the time of original processing it you didn’t have de-duplication enabled then it should be safe thing to do - pipeline knows nothing about those events. But obviously this is quite big job and requires cleaning-up Redshift from historical data, but techically it is possible.

i am not sure why snowplow etl cant just query redshift and download the event ids and then discard the common ones , like i do that will save the dynamodb lookup

That’s an interesting approach, @bhavin, please feel free to submit an issue at snowplow/snowplow to explore and elaborate. Here’s what I think (objections mostly, but to maintain the discussion, not to reject the idea!):

  1. In the end we want our de-duplication to be DB-independent and environment-independent (batch/RT). While we still do depend on DynamoDB, I believe having one lightweight external DB is easier to implement and maintain than many heavyweight.
  2. We can solve above point, by saying that we actually don’t want to store it in DB, but instead as Avro file on S3 and then just joining these datasets during ETL. That can be quite viable option.
  3. However it still makes de-duplication RT-unfriendly. We cannot afford ourselves to have this joined dataset in Kinesis (nor we can query Redshift for single event id).
  4. Even inside batch pipeline, I believe JOINs will introduce unnecessary shuffling and implementation burden. For DynamoDB on the other hand we’re planning to release it as a separate testable and portable module.
  5. In the end I would be very surprised that DB-lookup/JOIN approach is significantly better having that from our experience capacious enough DynamoDB adds close-to-zero delays.

Again - I would like to discuss and consider this approach, please don’t consider above items as refusal.

Thanks.

4 Likes

thanks @anton