EMR Shredding fails randomly

Hi,

I have standard lambda architecture: Real Time pipeline saves enriched data to S3. Than ETL-EMR runner starts EMR cluster, but occasionally EMR fails. I am able to push it forward with resume_from: shred parameter. Nothing unusual happens. Disk space is OK (in fact HDFS is occupied in 2%-3%). Any ideas? Run times vary sometimes (usually I fit in 3 hours but sometimes it takes over 5 - like once a week). I have yarn max attempts set to 3 so most likely this could be the source of extended times. What makes me wonder, are “over 1h idle times” between high CPU usage times on EMR.

I am not able to find the issue. I use c4.4xlarge machines as core instances.
Any ideas or suggestions?

Thanks

@grzegorzewald, you seem to be using Spark job on EMR to shred the files. If so, you might want to replace c4 with r4 and fine tune the Spark configuration (rather than using default settings from the release notes). Spark processing is done mostly in memory rather than disk (Hadoop job).

To tune your Spark configuration, you can refer to this post.

Thanks @ihor.
Is there better (rad more cost effective) method to shred enriched events than Spark job?

@grzegorzewald, Spark job is more performant than Hadoop as there’s much less writes to disk (HDFS) involved (again, processing is done in memory). If configured properly, it should be less expensive than running a Hadoop job.

We have a rough mapping between the size of the files in the enriched format and the required EMR cluster and Spark configuration to handle the payload. If you let us know what the usual size of all the enriched files per batch is we could share the appropriate EMR cluster size and the Spark config with you.

Hi,

Enriched files (Gzipped) take about 2.2 GB/day.
Last night i have found, that m4 seem to be more stable than C4 (although more expensive as I used dubled number of instances) - so RAM is definitely the path I would follow. Today I will try with r4 and see what happens.
I am still a bit concerned about EMR cluster “dead time” (low load for about hour) - is this reduce stage? Can I somehow speed it up?

@grzegorzewald, to process (shred) the compressed enriched files with a total size of 2.2 GB, we would recommend using 1x r4.16xlarge core instance and 1x m4.xlarge master instance. The might require up to 640 GB of EBS and 20 Spark executors.

To make clearer below is the relevant section with the corresponding settings:

  . . .
  jobflow:
    master_instance_type: m4.xlarge
    core_instance_count: 1
    core_instance_type: r4.16xlarge
    core_instance_ebs:
      volume_size: 640
      volume_type: gp2
      ebs_optimized: true
    . . .
  configuration:
    yarn-site:
      yarn.nodemanager.vmem-check-enabled: "false"
      yarn.nodemanager.resource.memory-mb: "494592"
      yarn.scheduler.maximum-allocation-mb: "494592"
    spark:
      maximizeResourceAllocation: "false"
    spark-defaults:
      spark.dynamicAllocation.enabled: "false"
      spark.executor.instances: "20"
      spark.yarn.executor.memoryOverhead: "3072"
      spark.executor.memory: "20G"
      spark.executor.cores: "3"
      spark.yarn.driver.memoryOverhead: "3072"
      spark.driver.memory: "20G"
      spark.driver.cores: "3"
      spark.default.parallelism: "240"
  . . .

You can start with this config and depending on its performance adjust it. This config should be good enough to process the compressed files of 2.2 GB fast.

I assume you are talking about long data load to Redshift. It might happen due to the infamous eventual consistency of AWS S3. This basically means that AWS S3 API reports that some files are still present while they are not preventing data load to commence.

When it comes to low volume data, it makes sense to keep retrying the data load in a little while. For this reason, the latest releases have built-in functionality to do that which could be skipped with the option --skip consistency_check. This means, that if accessing files in S3 bucket fails, the EMR job will also fail at data load step (no retries to access shredded files). You would have to resume your job from rdb_load step later on (hoping the eventual consistency has been resolved by then).

Additionally, depending on your S3 references in config.yml - s3n or s3a - you might get either empty files or empty directories. We recommend deleting those manually from time to time. At some point (if not cleaned up) they might cause a problem to ETL process as S3DictCp would have to scan ever accumulating number of files.

Thanks @ihor,

I will give r4 instances a try tomorrow.

I assume you are talking about long data load to Redshift. It might happen due to the infamous eventual consistency of AWS S3. This basically means that AWS S3 API reports that some files are still present while they are not preventing data load to commence.

Nope. Redshift load takes 6 to 9 minutes. No delay. Everything goes flawlessly.

The waiting time is strictly Spark Shred connected. As per images below - after over 90 minutes waiting time, a small load appears (or big if there was possible issue?). During wait time, average load is quite small. This happened for c4 and m4 core instances. Will check how does look for r4…

20 46

Hi @grzegorzewald,

Are you using cross batch deduplication? It might be that DynamoDB throughput is a bottleneck here and switching it to on-demand mode you can speed up the shred step.

Hi @egor,

No DynamoDB deduplication here. So this must be something else. I have enabled debugging - maybe I will find something.

BTW: @ihor can you look at your coulster CPU usage during Spark job? Maybe this is regular behaviour. Anybody form Snowplow team (@antman/@BenFradet/@anton /anybody?) - is this normal or I am having an issue?

Hello @grzegorzewald,

Looking at the graphs you shared, it is very much possible that you have bots taking a big chunk of your events.

The problem with those bots is that they have a broken random number generator and that results in a lot of events sharing the same event id and as a result ending up on the same node in your Spark cluster during deduplication (different from cross-batch deduplication).

To confirm this hypothesis, I’d look into the distribution of event ids in your events.

Hi @BenFradet,
Bots can be the issue. How can I check distribution ef event IDs? I am not able to search over Redshift (as they are deduplicated - duplicated event IDs are below 0.05%). I am also unable to query elasticsearch over _version field…

If the issue is based on bots, what is possible due to spread of sites and character of business, what can I do, to help EMR Spark job to finish fast? In fact it is more important for me to have a stable computation time rather than fit in particular time.

If the issue is because of duplicated events, I believe my Spark failed several times because out of memory issues on c4 instances (as r4 has about 4 times more ram per core than c4).

[EDIT]
I have gone through Sprak Job logs. What I found is time “hole” driver in logs:

19/02/20 00:46:59 INFO YarnAllocator: Driver requested a total number of 0 executor(s).
19/02/20 01:07:45 INFO BlockManagerInfo: Removed broadcast_2_piece0 on 10.16.0.166:44963 in memory (size: 38.5 KB, free: 6.3 GB)
19/02/20 01:07:45 INFO BlockManagerInfo: Removed broadcast_2_piece0 on ip-10-16-0-194.eu-west-1.compute.internal:37315 in memory (size: 38.5 KB, free: 8.9 GB)
19/02/20 01:07:45 INFO BlockManagerInfo: Removed broadcast_2_piece0 on ip-10-16-0-13.eu-west-1.compute.internal:36829 in memory (size: 38.5 KB, free: 9.0 GB)
19/02/20 01:07:45 INFO BlockManagerInfo: Removed broadcast_2_piece0 on ip-10-16-0-19.eu-west-1.compute.internal:44167 in memory (size: 38.5 KB, free: 6.8 GB)
19/02/20 01:44:05 INFO FileFormatWriter: Job null committed.

All the executors finished before 00:44, so this is not like extraordinary big job. It is rather like Driver doing something. Executors are in Idle state (see 1:00-1:30):
45
Is this deduplication, or something different?

You can query your enriched data on s3 through something like Athena :+1:

How can I check distribution ef event IDs? I am not able to search over Redshift (as they are deduplicated - duplicated event IDs are below 0.05%).

You can also look in atomic.com_snowplowanalytics_snowplow_duplicate_1 table.

If these problematic events have different event_fingerprints they won’t be filtered out and got new event_ids instead (https://snowplowanalytics.com/blog/2016/12/20/snowplow-r86-petra-released/#synthetic-dedupe).

By joining atomic.events with the table you will be able to reproduce initial id with something like NVL(d.original_event_id, e.event_id) and extract useragent for most frequent ones.