Optimizing and reducing shredding/loading costs

I inherited a snowplow pipeline from 2017, which includes EMR ETL Runner with the following conf (copying only the relevant part):

emr:
    jobflow:
      master_instance_type: m4.large
      core_instance_count: 8
      core_instance_type: r4.xlarge
      core_instance_bid: # In USD. Adjust bid, or leave blank for on-demand core instances
      core_instance_ebs: # Optional. Attach an EBS volume to each core instance.
        volume_size: 40 # Gigabytes
        volume_type: "gp2"
        # volume_iops: 400    # Optional. Will only be used if volume_type is "io1"
        ebs_optimized: false # Optional. Will default to true
      task_instance_count: 0 # Increase to use spot instances
      task_instance_type: m1.medium
      task_instance_bid: 0.015 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
    bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
    configuration:
      yarn-site:
        yarn.resourcemanager.am.max-attempts: "1"
        yarn.nodemanager.vmem-check-enabled: "false"
        yarn.nodemanager.resource.memory-mb: "28672"
        yarn.scheduler.maximum-allocation-mb: "28672"
      spark:
        maximizeResourceAllocation: "false"
      spark-defaults:
        spark.dynamicAllocation.enabled: "false"
        spark.executor.instances: "23"
        spark.yarn.executor.memoryOverhead: "2048"
        spark.executor.memory: "8G"
        spark.executor.cores: "1"
        spark.yarn.driver.memoryOverhead: "2048"
        spark.driver.memory: "8G"
        spark.driver.cores: "1"
        spark.default.parallelism: "46"

As you can see it uses a lot of resources, at least compared to the sample config I saw on github.

But, for context, we receive several millions of visits per day.

Now I just updated the entire pipeline: installed Stream Enrich Kinesis, upgraded EMR ETL Runner to 1.0.4, switched it to Stream Enrich mode, updated the conf to EMR 6.1, RDB Shredder/Loader to 0.18.1, Redshift JSON schema to 4.0.0, etc… And I’m running EMR ETL Runner every 2 hours.

So, since I removed the enriching step from EMR ETL Runner and that RDB Shredder/Loader received a lot of performance improvements over the years, I’m wondering if we still need all the resources listed in that config.

From the enricher Kinesis stream monitoring, I see we receive ~12,000 records/min. So that’s ~1,440,000 records to shred and load every 2 hours. With the current config it takes 17 min.

Anyone running EMR ETL Runner in a similar context? Any recommendations?

If not, what would be the way to start optimizing? Decreasing the nb of instances? Downgrading the instances type? What metrics should I look at in the EMR jobs monitoring?

Thanks in advance.

@guillaume, just a number of events is not a clear determining factor to the size of the EMR cluster and the corresponding Spark configuration. No events are the same. Lots of custom events and contexts would require more computing power than the standard Snowplow events (not the self-describing events). The shredding step is more demanding than the enrichment (when run in batch mode). Removing the enrichment step effect the size of the required EMR cluster indirectly. That is if enrichment removal resulted in a more frequent run of EmrEtlRunner than you need a smaller cluster after such a migration (as there expected fewer data processed per run). If the frequency of the batch job hasn’t changed and assuming the same data volume I would say the EMR cluster size would still be the same.

There is only a rough correlation between the data being processed and the EMR cluster required for its processing. The “formula” depends on the size of the enriched files. Let’s say the max payload at some point during the day is 1 GB of gzipped enriched files per EmrEtlRunner run. This would roughly translate into 10 GB of the plain files (basically x10 times; files got ungzipped when processed on EMR cluster). Knowing the volume of data to process per run and utilizing this guide you can come up with the corresponding EMR cluster size and its Spark configuration.

Thus, for this 1 GB of gzipped enriched files, I would use the following configuration

    jobflow:
      master_instance_type: m4.large
      core_instance_count: 1
      core_instance_type: r5.8xlarge
      core_instance_ebs:
        volume_size: 60
        volume_type: gp2
        ebs_optimized: true
    configuration:
      yarn-site:
        yarn.nodemanager.vmem-check-enabled: "false"
        yarn.nodemanager.resource.memory-mb: "256000"
        yarn.scheduler.maximum-allocation-mb: "256000"
      spark:
        maximizeResourceAllocation: "false"
      spark-defaults:
        spark.dynamicAllocation.enabled: "false"
        spark.executor.instances: "9"
        spark.yarn.executor.memoryOverhead: "3072"
        spark.executor.memory: "22G"
        spark.executor.cores: "3"
        spark.yarn.driver.memoryOverhead: "3072"
        spark.driver.memory: "22G"
        spark.driver.cores: "3"
        spark.default.parallelism: "108"

Note also a newer generation of EC2 nodes, r5. You could upgrade to these types if your AMI version was also bumped to 6.1.0

  emr:
    ami_version: 6.1.0
2 Likes

@ihor thanks a lot!

Does this number correspond to the size of the enriched files / min? It’s from the enricher-good stream.

If so, then (33 MB * 120 min) / 10, i.e. 400 MB of gzipped enriched files every 2 hours?

Hi @guillaume just to flag that as its a transient EMR cluster if it runs for 17 minutes every 2 hours you are only billed for the time it is running. If you halve the spec and it then runs for 34 minutes you will still pay the exact same amount for processing the data. You will only save on cost if you reduce the specification and the job continues to run in the same amount of time.

Amazon EMR has per second billing so there is likely not too much to worry about from this vantage point.

@guillaume, your screenshot probably refers to streamed data. This data would be in a different format from that uploaded to S3 bucket. I cannot be sure of the correlation between the two formats. The way we determine the EMR cluster size is by examining the actual size of the files in the S3 bucket.

You run your batch job every 2 hours, that means that your S3 archive bucket would contain run=... folders (data processed with each EmrEtlRunner run). You can examine the size of the latest “run” folders to get the volume of data per run to be used with the “formula” I talked about.

You can correlate that data with the data in your screenshot. Perhaps your suggestion is fine but I cannot confirm it myself.