Question on EmrEtlRunner options

Hello,

Our setup is - StreamCollector > Raw Stream > Kinesis LZO S3 Sink > EmrEtlRunner > StorageLoader > Redshift
The collector format is “thrift”.

We’re getting a large volume of data into our collector which then gets pushed into “raw/in” S3 folder. In this scenario, when we run EmrEtlRunner, it is not able to push data from “raw/in” into “raw/processing” faster than the data that’s coming into “raw/in” from collector. Hence, it is stuck in “staging” step and is not able to progress into the EMR stage of batch pipeline.

Question -

  1. Is this setup correct?
  2. The documentation on “2-Using-EmrEtlRunner” states that it can run in a timespan mode for only “cloudfront” collector format which doesn’t work for us (since we’re using thrift").
  • Can timespan mode be made to support thrift format?
  • Can the timespan be made granular to include hours and minutes?

Thanks
Amit

Hi @amitkhanal - your pipeline setup sounds good.

Are you saying that your pipeline is stuck in moving files from raw/in to raw/staging? That’s not something I’ve seen before.

HI @alex,

Thanks for your reply.

The EmrEtlRunner process runs but cannot proceed beyond step 1 of the batch pipeline because it’s is not able to catch up to the new records that get put in under “raw/in” through collectors. I had to manually stop the EmrEtlRunner process and rerun it with --skip staging option to start the EMR jobflow.

Also, could you please let me know your thoughts on question #2.

We’re actually removing this functionality altogether soon:

It’s very rarely used - and I don’t believe that it would solve your problem here.

I’m a bit puzzled by your underlying problem - we run this exact setup for some very large users and haven’t encountered this.

Questions:

  1. What event volumes are you doing per day
  2. How many EC2 instances is the Kinesis S3 sink running on
  3. What are the buffer settings (from the config.hocon) for the Kinesis S3 sink
  4. What is the specification of the box running EmrEtlRunner

Thanks!

What event volumes are you doing per day

  • We have 2 collectors that are writing 500 records every ~0.5 seconds. Here’s a snippet of the collector log -
08:05.518 [scala-stream-collector-akka.actor.default-dispatcher-14] INFO  c.s.s.c.s.sinks.KinesisSink - Writing 500 Thrift records to Kinesis stream collector-stream-good
18:08:05.993 [pool-1-thread-7] INFO  c.s.s.c.s.sinks.KinesisSink - Successfully wrote 500 out of 500 records
18:08:06.228 [scala-stream-collector-akka.actor.default-dispatcher-13] INFO  c.s.s.c.s.sinks.KinesisSink - Writing 500 Thrift records to Kinesis stream collector-stream-good
18:08:06.663 [pool-1-thread-2] INFO  c.s.s.c.s.sinks.KinesisSink - Successfully wrote 500 out of 500 records
18:08:06.950 [scala-stream-collector-akka.actor.default-dispatcher-10] INFO  c.s.s.c.s.sinks.KinesisSink - Writing 500 Thrift records to Kinesis stream collector-stream-good
18:08:07.410 [pool-1-thread-9] INFO  c.s.s.c.s.sinks.KinesisSink - Successfully wrote 500 out of 500 records
18:08:07.645 [scala-stream-collector-akka.actor.default-dispatcher-9] INFO  c.s.s.c.s.sinks.KinesisSink - Writing 500 Thrift records to Kinesis stream collector-stream-good
18:08:07.960 [pool-1-thread-1] INFO  c.s.s.c.s.sinks.KinesisSink - Successfully wrote 500 out of 500 records
18:08:08.354 [scala-stream-collector-akka.actor.default-dispatcher-10] INFO  c.s.s.c.s.sinks.KinesisSink - Writing 500 Thrift records to Kinesis stream collector-stream-good
18:08:08.674 [pool-1-thread-8] INFO  c.s.s.c.s.sinks.KinesisSink - Successfully wrote 500 out of 500 records
18:08:09.063 [scala-stream-collector-akka.actor.default-dispatcher-15] INFO  c.s.s.c.s.sinks.KinesisSink - Writing 500 Thrift records to Kinesis stream collector-stream-good
18:08:09.483 [pool-1-thread-1] INFO  c.s.s.c.s.sinks.KinesisSink - Successfully wrote 500 out of 500 records

How many EC2 instances is the Kinesis S3 sink running on

  • One (Type - m4.large)

What are the buffer settings (from the config.hocon) for the Kinesis S3 sink
I initially had set it to -
buffer {
byte-limit: 1000000 # 1MB
record-limit: 100
time-limit: 120000
}

Changed to -

buffer {
byte-limit: 4500000 # 4.5MB
record-limit: 500 # 500 records
time-limit: 60000 # 1 minute
}

What is the specification of the box running EmrEtlRunner

c3.xlarge

EMR config -

emr:
ami_version: 4.5.0
region: us-west-2 # Always set this
jobflow_role: EMR_EC2_DefaultRole # Created using $ aws emr create-default-roles
service_role: EMR_DefaultRole # Created using $ aws emr create-default-roles
placement: # Set this if not running in VPC. Leave blank otherwise
ec2_subnet_id: xxxx # Set this if running in VPC. Leave blank otherwise
ec2_key_name: xxxx
bootstrap: # Set this to specify custom boostrap actions. Leave empty otherwise
software:
hbase: # Optional. To launch on cluster, provide version, “0.92.0”, keep quotes. Leave empty otherwise.
lingual: # Optional. To launch on cluster, provide version, “1.1”, keep quotes. Leave empty otherwise.
# Adjust your Hadoop cluster below
jobflow:
master_instance_type: c3.xlarge
core_instance_count: 2
core_instance_type: c3.xlarge
core_instance_ebs: # Optional. Attach an EBS volume to each core instance.
volume_size: 100 # Gigabytes
volume_type: “gp2”
volume_iops: 400 # Optional. Will only be used if volume_type is “io1”
ebs_optimized: true # Optional. Will default to true
task_instance_count: 3 # Increase to use spot instances
task_instance_type: c3.xlarge
task_instance_bid: xxxx # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
additional_info: # Optional JSON string for selecting additional features

EMR version - emr_r87_chichen_itza

Couple of points -

  1. I started this setup a couple days ago only with the latest version of EMR so have not completed once success enrich and load in Redshift yet
  2. The EMR job flow I started with --skip staging has been running for over 11 hours now with almost all the time spent in - Elasticity Scalding Step: Enrich Raw Events
    Total Objects in S3 raw/processing - 72603

Thanks!

Thanks for the additional detail @amitkhanal. I read that as you generating either 1,000 or 2,000 events per second, and you have the record limit in Kinesis S3 set to 500 records, so are going to be generating either 120 or 240 files a minute, 7,200 or 14,400 files per hour.

This is going to massively overload not just EmrEtlRunner but also Hadoop.

My recommendation is to:

  1. Change the buffer settings so you are generating only double-digit files per hour (the lower the better)
  2. Delete the files you have so far
  3. Delete the Kinesis S3 checkpoint from DynamoDB so that you can generate much smaller files from the start of your stream

Let us know what buffer settings end up working for you!

Thanks for the recommendation @alex. I’ve been testing several configurations and here’s one that seems to work for now -

Kinesis S3 sink config.hocon

buffer: {
byte-limit: 1000000000 # 1GB
record-limit: 600000 # 600K
time-limit: 10800000 # 180 mins
}

Java Heap size - 24GB

I’ve also set the time limit to 4 hrs but that leads out of memory sometimes. The raw flie size in S3 ranges from 250 MB to 750MB based on traffic and time of day.

EMR config.yml -

jobflow:
master_instance_type: m1.medium
core_instance_count: 3
core_instance_type: c4.2xlarge
core_instance_ebs: # Optional. Attach an EBS volume to each core instance.
volume_size: 500 # Gigabytes
volume_type: “gp2”
volume_iops: 400 # Optional. Will only be used if volume_type is “io1”
ebs_optimized: true # Optional. Will default to true
task_instance_count: 2 # Increase to use spot instances
task_instance_type: m4.xlarge
task_instance_bid: x.xx # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances

Here are some entries from manifest with the configs above -

ets_tstamp                 commit_tstamp                   event_count      shredded_cardinality
2017-03-08 18:07:12.698	   2017-03-08 23:36:01.769959	   4153081	        26
2017-03-09 18:31:42.146	   2017-03-09 22:37:26.136232	   8443300	        15
2017-03-11 14:11:17.092	   2017-03-11 17:09:09.50273	   13051161	        16
2017-03-12 15:37:12.826	   2017-03-12 23:06:16.329663	   36738025	        16
2017-03-13 03:55:09.428	   2017-03-13 06:56:47.938757	   13362932	        15

We’re using all enrichments under - https://github.com/snowplow/snowplow/tree/master/3-enrich/config/enrichments

These configs will need to change for holiday traffic (Black Friday, Christmas etc) and we’ll be fine tuning them further.

Please let me know if you have any thoughts.

Thanks

Hey @amitkhanal - thanks for sharing that. It sounds like a good first step - for a second phase you could consider adding a file compaction step at the front of the EMR job. This will reduce your reliance on the Kinesis S3 buffer settings to do all the heavy lifting of file # minimization.

Hey @alex, could you elaborate on -

Is there a reference/documentation I can look at for file compaction prior to EMR?

Thanks

I’m afraid there’s no formal documentation on the compaction approach, so let’s write some here :slight_smile:

  • You will need to delete the .lzo.index files before you do the compaction
  • You can perform the compaction using S3DistCp, the jar is /usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar
  • After the compaction if you want you can re-index the files using s3://snowplow-hosted-assets/third-party/twitter/hadoop-lzo-0.4.20.jar
  • Then you can run the Hadoop Enrich job

Hope this helps! These kinds of techniques are very valuable for running Snowplow at “mega-scale” - this is an ongoing program of R&D at Snowplow, which is why none of this has been formally documented / integrated into the codebase yet.

If you are responsible for operating a Snowplow at these kinds of volumes and the Snowplow Managed Service is not an option, then I would recommend checking out the Big Data on AWS three day training course.

Thanks for the instructions @alex, really appreciate it. Will look into the training course.

For now we’re looking into self-hosting snowplow. I’ll get in touch with Snowplow team if our plans change.

No worries @amitkhanal, any time!

If you are also running Stream Enrich, then another strategy you can consider is to add a second Kinesis S3 sink onto the Kinesis enriched event stream, and then run EMR with --skip enrich, so it only runs the Hadoop Shred step.