Disc usage during EMR stage

Hi everyone!

We are trying to predict as far as possible how much disc space we will need for the EMR process, because we want to improve our current cluster provision and forecast our needs for the near future.

We created a script to fetch how many files and how big they are before the wave starts. This script, in short, sums up the values of every file after decompression and then saves this value for future comparison/analysis.

But this value was not close to the numbers found in Amazon’s EMR monitoring tools. Then, we started to investigate how much Snowplow does use inside HDFS. So far, we were able to figure out that Snowplow creates the raw, enriched and shredded directories and how much they consume inside Hadoop. For instance:

2.9 G  hdfs:///local/snowplow/enriched-events
1.1 G  hdfs:///local/snowplow/raw-events
2.9 G  hdfs:///local/snowplow/shredded-events

However, even if we add up the space used by the SO and system files (including Hadoop JARs), we don’t get the value shown on Amazon’s EMR monitoring data.

Are there any directory we are missing? Is there any process where Snowplow creates temporary files on the EMR Cluster that we did not trace? Is there anything file related that Snowplow does during the Elasticity S3DistCp Step: Shredded HDFS -> S3 event that uses some more space before transferring to S3?

Thank you in advance!

@rcpp, the following diagram depicts the ETL process on EMR: https://github.com/snowplow/snowplow/wiki/Batch-pipeline-steps. Your assumptions on the data written to HDFS is correct. However, there is more to it when you configure your EMR cluster. In particular, with high volume data, it is important to configure your Spark correctly.

There’s plenty of posts on the subject in this forum. We have a rough correlation between the size of uncompressed enriched data and the EMR/Spark config. We used this guide when we were creating templates for various bands (ranges of volume).

If we take the volume you depicted - 2.9G of uncompressed enriched data, then the EmrEtlRunner config would include the following EMR and Spark settings:

  jobflow:
    master_instance_type: m4.large
    core_instance_count: 1
    core_instance_type: r4.2xlarge
    core_instance_ebs:
      volume_size: 80
      volume_type: gp2
      ebs_optimized: true
  configuration:
    yarn-site:
      yarn.nodemanager.vmem-check-enabled: "false"
      yarn.nodemanager.resource.memory-mb: "57344"
      yarn.scheduler.maximum-allocation-mb: "57344"
    spark:
      maximizeResourceAllocation: "false"
    spark-defaults:
      spark.dynamicAllocation.enabled: "false"
      spark.executor.instances: "6"
      spark.yarn.executor.memoryOverhead: "1024"
      spark.executor.memory: "7G"
      spark.executor.cores: "1"
      spark.yarn.driver.memoryOverhead: "1024"
      spark.driver.memory: "7G"
      spark.driver.cores: "1"
      spark.default.parallelism: "24"

Again, this is a rough estimation as there is a dependency on the complexity of the custom data tracked. However, we are using this approach successfully on our managed services pipelines.

Hi, @ihor!

Sorry for the late feedback. We did a similar approach: we observed our cluster for a couple of days and how much space the events’ files were using during this proccess.

With the information we collected, we decided to define the amount used like this:

  • 12GB for the base system. Our system starts with 8GB and stays bellow 9GB, so we are already adding a safe margin.
  • Sum of uncompressed files size from our raw bucket, downloaded before the cluster is started.
  • 3x times this ammount (one for each bucket: raw, enriched, shredded).
  • This calculated ammount plus a safe margin of 20% from this ammount.

To monitor the entire proccess, we used ssh executing these remote commands bellow:
du -h 2>&1
hdfs dfs -du -h hdfs:///local/snowplow/
hdfs dfs -ls -h hdfs:///local/snowplow/

With this process, we didn’t lose any information from the output when the cluster died.

Cheers!