EmrEtlRunner Issues - taking too long on step 2


hi there!

I have been testing the platform for the past few weeks on a test system and everything had been working great!
Yesterday I connected it to a production server (average traffic of about 30,000 pageviews/day.

The collector, kinesis streams etc. worked well but I have reached a block on the EmrEtlRunner process. The process has been going on for about 10 hrs already and its still on step 2: Enrich Raw Events! There’s only one entry in stderr:

log4j:ERROR Failed to rename [/mnt/var/log/hadoop/steps/s-2O6T4H2PNZWLB/syslog] to [/mnt/var/log/hadoop/steps/s-2O6T4H2PNZWLB/syslog.2017-03-25-02].

The EMR has been setup with 2 cores (m1.medium boxes) and during testing, it ran for about an hr to process the thrift data and send to redshift. The data volume is significantly higher on production of course, so I am not clear where the issue lies. I am not sure if the process and crashed completely (the EMR monitor indicates both cores as running).

What would be the best approach at this time?

  1. Would increasing the cores numbers from 2 to 4 help?
  2. Shall I stop this process now and restart? If so, how can I move the events data back from raw-processing bucket into the raw-in bucket? Will that affect that events data already being collected there for the past 10 hrs?
  3. So far, I am not clear how many events has been tracked, but so far about 6000 raw thrift records were deposited into the ETL raw processing bucket folder.
  4. In my javascript tracker, I am also recording page ping events after every 30 seconds (repeats every 10). Would turning this off help? I understand I do need this to capture bounce data etc.

Here are the other logs from the EMR process:

  1. Controller log (end):

    INFO ProcessRunner started child process 7618 :
    hadoop 7618 2200 0 02:20 ? 00:00:00 bash /usr/lib/hadoop/bin/hadoop jar /mnt/var/lib/hadoop/steps/s-2O6T4H2PNZWLB/snowplow-hadoop-enrich-1.8.0.jar com.snowplowanalytics.snowplow.enrich.hadoop.EtlJob --hdfs --input_format thrift --etl_tstamp 1490407375035 --iglu_config ewogICJzY2hlbWEiOiAiaWdsdTpjb20uc25vd3Bsb3dhbmFseXRpY3MuaWdsdS9yZXNvbHZlci1jb25maWcvanNvbnNjaGVtYS8xLTAtMSIsCiAgImRhdGEiOiB7CiAgICAiY2FjaGVTaXplIjogNTAwLAogICAgInJlcG9zaXRvcmllcyI6IFsKICAgICAgewogICAgICAgICJuYW1lIjogIklnbHUgQ2VudHJhbCIsCiAgICAgICAgInByaW9yaXR5IjogMCwKICAgICAgICAidmVuZG9yUHJlZml4ZXMiOiBbICJjb20uc25vd3Bsb3dhbmFseXRpY3MiIF0sCiAgICAgICAgImNvbm5lY3Rpb24iOiB7CiAgICAgICAgICAiaHR0cCI6IHsKICAgICAgICAgICAgInVyaSI6ICJodHRwOi8vaWdsdS5pbnNpZGVob29rLmNvbSIKICAgICAgICAgIH0KICAgICAgICB9CiAgICAgIH0sCiAgICAgIHsKICAgICAgICAibmFtZSI6ICJJbnNpZGVIb29rIElnbHUgUmVwb3NpdG9yeSIsCiAgICAgICAgInByaW9yaXR5IjogNSwKICAgICAgICAidmVuZG9yUHJlZml4ZXMiOiBbICJjb20uaW5zaWRlaG9vay5pZ2x1IiBdLAogICAgICAgICJjb25uZWN0aW9uIjogewogICAgICAgICAgImh0dHAiOiB7CiAgICAgICAgICAgICJ1cmkiOiAiaHR0cDovL2lnbHUuaW5zaWRlaG9vay5jb20iCiAgICAgICAgICB9CiAgICAgICAgfQogICAgICB9CgogICAgXQogIH0KfQ== --enrichments eyJzY2hlbWEiOiJpZ2x1OmNvbS5zbm93cGxvd2FuYWx5dGljcy5zbm93cGxvdy9lbnJpY2htZW50cy9qc29uc2NoZW1hLzEtMC0wIiwiZGF0YSI6W3sic2NoZW1hIjoiaWdsdTpjb20uc25vd3Bsb3dhbmFseXRpY3Muc25vd3Bsb3cvdWFfcGFyc2VyX2NvbmZpZy9qc29uc2NoZW1hLzEtMC0wIiwiZGF0YSI6eyJ2ZW5kb3IiOiJjb20uc25vd3Bsb3dhbmFseXRpY3Muc25vd3Bsb3ciLCJuYW1lIjoidWFfcGFyc2VyX2NvbmZpZyIsImVuYWJsZWQiOnRydWUsInBhcmFtZXRlcnMiOnt9fX0seyJzY2hlbWEiOiJpZ2x1OmNvbS5zbm93cGxvd2FuYWx5dGljcy5zbm93cGxvdy9yZWZlcmVyX3BhcnNlci9qc29uc2NoZW1hLzEtMC0wIiwiZGF0YSI6eyJuYW1lIjoicmVmZXJlcl9wYXJzZXIiLCJ2ZW5kb3IiOiJjb20uc25vd3Bsb3dhbmFseXRpY3Muc25vd3Bsb3ciLCJlbmFibGVkIjp0cnVlLCJwYXJhbWV0ZXJzIjp7ImludGVybmFsRG9tYWlucyI6WyJpbnNpZGVob29rLmNvbSIsInd3dy1zdGFnZS5pbnNpZGVob29rLmNvbSIsInd3dy1kZXYuaW5zaWRlaG9vay5jb20iLCJpaC53d3cubG9jYWwiXX19fV19 --input_folder hdfs:///local/snowplow/raw-events/ --output_folder hdfs:///local/snowplow/enriched-events/ --bad_rows_folder s3://ih-anlytics-data-dev1/enriched/bad/run=2017-03-25-02-02-55/
    2017-03-25T02:20:27.633Z INFO HadoopJarStepRunner.Runner: startRun() called for s-2O6T4H2PNZWLB Child Pid: 7618
    INFO Synchronously wait child process to complete : hadoop jar /mnt/var/lib/hadoop/steps/s-2O6T4H2P…
    INFO Process still running
    INFO Process still running
    INFO Process still running
    INFO Process still running


2017-03-25 02:20:55,507 INFO cascading.flow.Flow (flow com.snowplowanalytics.snowplow.enrich.hadoop.EtlJob): [com.snowplowanalytics....]  sink: Hfs["TextDelimited[['app_id', 'platform', 'etl_tstamp', 'collector_tstamp', 'dvce_created_tstamp', 'event', 'event_id', 'txn_id', 'name_tracker', 'v_tracker', 'v_collector', 'v_etl', 'user_id', 'user_ipaddress', 'user_fingerprint', 'domain_userid', 'domain_sessionidx', 'network_userid', 'geo_country', 'geo_region', 'geo_city', 'geo_zipcode', 'geo_latitude', 'geo_longitude', 'geo_region_name', 'ip_isp', 'ip_organization', 'ip_domain', 'ip_netspeed', 'page_url', 'page_title', 'page_referrer', 'page_urlscheme', 'page_urlhost', 'page_urlport', 'page_urlpath', 'page_urlquery', 'page_urlfragment', 'refr_urlscheme', 'refr_urlhost', 'refr_urlport', 'refr_urlpath', 'refr_urlquery', 'refr_urlfragment', 'refr_medium', 'refr_source', 'refr_term', 'mkt_medium', 'mkt_source', 'mkt_term', 'mkt_content', 'mkt_campaign', 'contexts', 'se_category', 'se_action', 'se_label', 'se_property', 'se_value', 'unstruct_event', 'tr_orderid', 'tr_affiliation', 'tr_total', 'tr_tax', 'tr_shipping', 'tr_city', 'tr_state', 'tr_country', 'ti_orderid', 'ti_sku', 'ti_name', 'ti_category', 'ti_price', 'ti_quantity', 'pp_xoffset_min', 'pp_xoffset_max', 'pp_yoffset_min', 'pp_yoffset_max', 'useragent', 'br_name', 'br_family', 'br_version', 'br_type', 'br_renderengine', 'br_lang', 'br_features_pdf', 'br_features_flash', 'br_features_java', 'br_features_director', 'br_features_quicktime', 'br_features_realplayer', 'br_features_windowsmedia', 'br_features_gears', 'br_features_silverlight', 'br_cookies', 'br_colordepth', 'br_viewwidth', 'br_viewheight', 'os_name', 'os_family', 'os_manufacturer', 'os_timezone', 'dvce_type', 'dvce_ismobile', 'dvce_screenwidth', 'dvce_screenheight', 'doc_charset', 'doc_width', 'doc_height', 'tr_currency', 'tr_total_base', 'tr_tax_base', 'tr_shipping_base', 'ti_currency', 'ti_price_base', 'base_currency', 'geo_timezone', 'mkt_clickid', 'mkt_network', 'etl_tags', 'dvce_sent_tstamp', 'refr_domain_userid', 'refr_dvce_tstamp', 'derived_contexts', 'domain_sessionid', 'derived_tstamp', 'event_vendor', 'event_name', 'event_format', 'event_version', 'event_fingerprint', 'true_tstamp']]"]["hdfs:/local/snowplow/enriched-events"]
2017-03-25 02:20:55,510 INFO cascading.flow.Flow (flow com.snowplowanalytics.snowplow.enrich.hadoop.EtlJob): [com.snowplowanalytics....]  parallel execution is enabled: true
2017-03-25 02:20:55,510 INFO cascading.flow.Flow (flow com.snowplowanalytics.snowplow.enrich.hadoop.EtlJob): [com.snowplowanalytics....]  starting jobs: 3
2017-03-25 02:20:55,510 INFO cascading.flow.Flow (flow com.snowplowanalytics.snowplow.enrich.hadoop.EtlJob): [com.snowplowanalytics....]  allocating threads: 3
2017-03-25 02:20:55,518 INFO cascading.flow.FlowStep (pool-5-thread-1): [com.snowplowanalytics....] starting step: (1/3)
2017-03-25 02:20:55,938 INFO org.apache.hadoop.yarn.client.RMProxy (pool-5-thread-1): Connecting to ResourceManager at ip-10-0-1-153.ec2.internal/
2017-03-25 02:20:56,629 INFO org.apache.hadoop.yarn.client.RMProxy (pool-5-thread-1): Connecting to ResourceManager at ip-10-0-1-153.ec2.internal/
2017-03-25 02:21:17,993 INFO org.apache.hadoop.mapreduce.lib.input.FileInputFormat (pool-5-thread-1): Total input paths to process : 6288
2017-03-25 02:21:17,994 INFO org.apache.hadoop.conf.Configuration.deprecation (pool-5-thread-1): mapred.input.dir.recursive is deprecated. Instead, use mapreduce.input.fileinputformat.input.dir.recursive
2017-03-25 02:21:25,563 INFO cascading.util.Update (UpdateRequestTimer): newer Cascading release available: 2.6.3
2017-03-25 02:21:38,354 INFO org.apache.hadoop.mapreduce.JobSubmitter (pool-5-thread-1): number of splits:3144
2017-03-25 02:21:38,879 INFO org.apache.hadoop.mapreduce.JobSubmitter (pool-5-thread-1): Submitting tokens for job: job_1490407757382_0002
2017-03-25 02:21:39,285 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl (pool-5-thread-1): Submitted application application_1490407757382_0002
2017-03-25 02:21:39,426 INFO org.apache.hadoop.mapreduce.Job (pool-5-thread-1): The url to track the job: http://ip-    10-0-1-153.ec2.internal:20888/proxy/application_1490407757382_0002/
2017-03-25 02:21:39,435 INFO cascading.flow.FlowStep (pool-5-thread-1): [com.snowplowanalytics....] submitted hadoop job: job_1490407757382_0002
2017-03-25 02:21:39,437 INFO cascading.flow.FlowStep (pool-5-thread-1): [com.snowplowanalytics....] tracking url: http://ip-10-0-1-153.ec2.internal:20888/proxy/application_1490407757382_0002/

Thanks very much for your help!


Hi @kjain,

Can you share the buffer settings for your Kinesis S3 app?




Thanks for your response Alex;

My kinesis buffer settings are:
buffer {
byte-limit: 64000
record-limit: 128
time-limit: 60000

Could this be a reason why the emretl processing is taking so long?


Yes, see e.g. this thread for details:



Thanks Alex!

I can update my buffer settings to kinesis-s3 and try again. But how would you recommend the best way to do that? I would need to rebuild my raw:in bucket I imagine. But how can I do that without losing all the data from before this started almost 22 hrs ago? I assume I cannot simply move all the files from raw:processing back into raw:in? If not, and once I update the kinesis-s3 settings, how can I rebuild the raw:in bucket with updated thrift files from before the processing started? As you can understand, I wouldn’t want to lose an entire day’s data if it can be avoided!

My traffic is not nearly as high as the case from your linked article so I can test with some settings in the middle. However, I am a little concerned about whether that situation is similar to this one. In my case, data was moved from raw:in to raw:processing and the EMR process moved to step 2 (Elasticity Scalding Step: Enrich Raw Events) but that step is what has been going on for the past 14 hrs.

Thanks again!


Assuming that you have e.g. 7 day expiry on your raw event stream, you can simply:

  • Delete all the events from S3
  • Delete the Kinesis S3 checkpoints from DynamoDB
  • Restart the Kinesis S3 app

It should then rebuild your S3 archive from the start.


Thanks Alex!

I have updated the buffer and reinitialized kinesis-lzo-s3 after deleting the checkpoints. I can see that the new thrift files are lesser and larger. I have also updated the EMR instance count to 4. I’ll run EmrEtlRunner after an hour and test its performance and report back.


Sounds promising - thanks for the update!


Here’s my update:

My buffer settings:

  buffer {
   byte-limit: 64000
   record-limit: 128
   time-limit: 60000

The Kinesis-s3 instance is now upgraded to m4.large (roughly 25,000 pageviews/day).
The EMR cores are now upgraded to m1.medium with 4 core counts.

The above setting was able to handle processing a day of data in about 4 hrs. The kinesis-lzo server shows its health as SERVER due to the initial influx of large network input data, though. Even though it dropped after the first few minutes, its health status remained at that level, so I’ll have to investigate into this further. But in general this configuration seems to run reasonably well.
What would you suggest as an optimal setting for activity logging? Currently I have at:

window.ihSp('enableActivityTracking', 30, 10);

Thanks again for your help!


That’s a fair question - but it deserves its own thread!


Thanks Alex!

I had one question regarding resetting the checkpoints. Once I clear the s3 buckets and clear the dynamodb entry, I assume all the data from the kinesis streams will get reloaded into the raw:in bucket. But this could mean data from up to 7 days ago which could overlap with data already processed and stored in redshift. How can we limit the amount to load into the raw bucket or is that something thats done intrinsically?
Also is there any thumb rule for calculating the rate of the data transfer from the kinesis stream to s3? ie once the kinesis-s3 loaders starts, how long could it take to transfer events into the bucket?



I don’t know of a way to do this, sorry - you would need to wipe Redshift too.[quote=“kjain, post:11, topic:1074”]

Also is there any thumb rule for calculating the rate of the data transfer from the kinesis stream to s3? ie once the kinesis-s3 loaders starts, how long could it take to transfer events into the bucket?

I can’t give you a formula but it’s pretty fast - the bottleneck is pulling the records down from Kinesis.


Thanks again Alex!

I ran cleared the checkpoint, restarted the kinesis-LZOs3 loader and as you said, the last 7 days of data got duplicated into redshift :frowning: I had hoped that there would be a way to avoid de-duplication of data within redshift. It would be almost impossible to clear the db a month down the line and if I need to do this process again for any reason! I suppose I could load the files from the archive but even one month of production data capture will require a massive process to get them back into redshift!

Perhaps this is not a typical situation (ie clearing the checkpoints at an advance period of data gathering) but it would be good to know that if it does happen, what steps could be taken to avoid the the above?

Regarding the 2nd question, I was asking because once I restarted the kinesis-lzo-s3 process, I saw the thrift files being created in the raw:in buckets at a gradual pace and I wanted to give the process enough time to get all the data in before I re-ran the ETL steps.



Right yes - I only suggested this tabula rasa approach because it was clear from your initial post that you were only on Day Four of loading production data in. This wipe-and-restart approach certainly wouldn’t work once you have been loading data into Redshift for some time.