EmrExecutionError - Enriched HDFS -> S3: FAILED

Hi,
I’m trying to load events in Redshift from a Kinesis Stream using lambda architecture.
I use snowplow-s3-loader-0.6.0.jar, then run snowplow-emr-etl-runner (last version) with this command:

./snowplow-emr-etl-runner run -c config.yml -r /home/centos/storage-loader/iglu_resolver.json --debug -t /home/centos/storage-loader/targets -n /home/centos/storage-loader/enrichments

Then I get this error:
Snowplow::EmrEtlRunner::EmrExecutionError (EMR jobflow j-2VE5O4DFZOUGR failed, check Amazon EMR console and Hadoop logs for details (help: https://github.com/snowplow/snowplow/wiki/Troubleshooting-jobs-on-Elastic-MapReduce). Data files not archived.
Snowplow_ETL_Lzo: TERMINATING [STEP_FAILURE] ~ elapsed time n/a [2019-04-25 13:29:52 UTC - ]

    1. Elasticity Setup Hadoop Debugging: COMPLETED ~ 00:00:04 [2019-04-25 13:29:54 UTC - 2019-04-25 13:29:58 UTC]
    1. [staging] s3-dist-cp: Raw s3://sp-archive-acc/stream/ -> Raw Staging S3: COMPLETED ~ 00:00:40 [2019-04-25 13:30:00 UTC - 2019-04-25 13:30:41 UTC]
    1. [enrich] s3-dist-cp: Raw S3 -> Raw HDFS: COMPLETED ~ 00:00:40 [2019-04-25 13:30:43 UTC - 2019-04-25 13:31:23 UTC]
    1. [enrich] spark: Enrich Raw Events: COMPLETED ~ 00:00:58 [2019-04-25 13:31:23 UTC - 2019-04-25 13:32:21 UTC]
    1. [enrich] spark: Enriched HDFS -> S3: FAILED ~ 00:00:06 [2019-04-25 13:32:21 UTC - 2019-04-25 13:32:28 UTC]
    1. [enrich] spark: Enriched HDFS _SUCCESS -> S3: CANCELLED ~ elapsed time n/a [ - ]
    1. [shred] spark: Shred Enriched Events: CANCELLED ~ elapsed time n/a [ - ]
    1. [cleanup] Empty Raw HDFS: CANCELLED ~ elapsed time n/a [ - ]
    1. [shred] s3-dist-cp: Shredded atomic events HDFS -> S3: CANCELLED ~ elapsed time n/a [ - ]
    1. [shred] s3-dist-cp: Shredded HDFS _SUCCESS -> S3: CANCELLED ~ elapsed time n/a [ - ]
    1. [shred] s3-dist-cp: Shredded types HDFS -> S3: CANCELLED ~ elapsed time n/a [ - ]
    1. [archive_raw] s3-dist-cp: Raw Staging S3 -> Raw Archive S3: CANCELLED ~ elapsed time n/a [ - ]
    1. [archive_shredded] s3-dist-cp: Shredded S3 -> Shredded Archive S3: CANCELLED ~ elapsed time n/a [ - ]
    1. [archive_enriched] s3-dist-cp: Enriched S3 -> Enriched Archive S3: CANCELLED ~ elapsed time n/a [ - ]
    1. [rdb_load] Load AWS Redshift enriched events storage Storage Target: CANCELLED ~ elapsed time n/a [ - ]):
      uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/emr_job.rb:783:in run' uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_reference.rb:43:insend_to’
      uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:76:in call_with' uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138:inblock in redefine_method’
      uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/runner.rb:138:in run' uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_reference.rb:43:insend_to’
      uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:76:in call_with' uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138:inblock in redefine_method’
      uri:classloader:/emr-etl-runner/bin/snowplow-emr-etl-runner:41:in <main>' org/jruby/RubyKernel.java:994:inload’
      uri:classloader:/META-INF/main.rb:1:in <main>' org/jruby/RubyKernel.java:970:inrequire’
      uri:classloader:/META-INF/main.rb:1:in (root)' uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/rubygems/core_ext/kernel_require.rb:1:in

It seems like Emr can’t write to s3, but there are events in the proccessing bucket s3://sp-archive-acc/raw/processing
So I think Emr can access and write in S3.
This is my config.yml file
aws:

Credentials can be hardcoded or set in environment variables

access_key_id: XXXXX
secret_access_key: XXXXX
s3:
region: us-east-2
buckets:
assets: s3://snowplow-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
jsonpath_assets: # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
log: s3://sp-archive-acc/logs
encrypted: false # Whether the buckets below are enrcrypted using server side encryption (SSE-S3)
raw:
in: # This is a YAML array of one or more in buckets - you MUST use hyphens before each entry in the array, as below
- s3://sp-archive-acc/stream # e.g. s3://my-old-collector-bucket
processing: s3://sp-archive-acc/raw/processing
archive: s3://sp-archive-acc/raw/archive # e.g. s3://my-archive-bucket/raw
enriched:
good: s3://sp-rs-acc/enr/good # e.g. s3://my-out-bucket/enriched/good
bad: s3://sp-rs-acc/enr/bad # e.g. s3://my-out-bucket/enriched/bad
errors: # Leave blank unless :continue_on_unexpected_error: set to true below
archive: s3://sp-rs-acc/enr/archive # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched
shredded:
good: s3://sp-archive-acc/shredded/good # e.g. s3://my-out-bucket/shredded/good
bad: s3://sp-archive-acc/shredded/bad # e.g. s3://my-out-bucket/shredded/bad
errors: continue_on_unexpected_error # Leave blank unless :continue_on_unexpected_error: set to true below
archive: s3://sp-archive-acc/shredded/archive # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded
consolidate_shredded_output: true # Whether to combine files when copying from hdfs to s3
emr:
ami_version: 5.9.0
region: us-east-2 # Always set this
jobflow_role: EMR_EC2_DefaultRole # Created using aws emr create-default-roles service_role: EMR_DefaultRole # Created using aws emr create-default-roles
placement: # Set this if not running in VPC. Leave blank otherwise
ec2_subnet_id: subnet-a9f19bd3 # subnet-80dfe3e8 Set this if running in VPC. Leave blank otherwise
ec2_key_name: snowplow00
security_configuration: # Specify your EMR security configuration if needed. Leave blank otherwise
bootstrap: # Set this to specify custom boostrap actions. Leave empty otherwise
software:
hbase: # Optional. To launch on cluster, provide version, “0.92.0”, keep quotes. Leave empty otherwise.
lingual: # Optional. To launch on cluster, provide version, “1.1”, keep quotes. Leave empty otherwise.
# Adjust your Hadoop cluster below
jobflow:
job_name: Snowplow_ETL_Lzo # Give your job a name
master_instance_type: i2.xlarge
core_instance_count: 2
core_instance_type: i2.xlarge
core_instance_ebs: # Optional. Attach an EBS volume to each core instance.
volume_size: 100 # Gigabytes
volume_type: “gp2”
volume_iops: 400 # Optional. Will only be used if volume_type is “io1”
ebs_optimized: false # Optional. Will default to true
task_instance_count: 0 # Increase to use spot instances
task_instance_type: i2.xlarge
task_instance_bid: 0.015 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
configuration:
yarn-site:
yarn.resourcemanager.am.max-attempts: “1”
spark:
maximizeResourceAllocation: “true”
additional_info: # Optional JSON string for selecting additional features
collectors:
format: thrift # For example: ‘clj-tomcat’ for the Clojure Collector, ‘thrift’ for Thrift records, ‘tsv/com.amazon.aws.cloudfront/wd_access_log’ for Cloudfront acc$
enrich:
versions:
spark_enrich: 1.17.0 # Version of the Spark Enrichment process
continue_on_unexpected_error: false # Set to ‘true’ (and set :out_errors: above) if you don’t want any exceptions thrown from ETL
output_compression: GZIP # Compression only supported with Redshift, set to NONE if you have Postgres targets. Allowed formats: NONE, GZIP
storage:
versions:
rdb_loader: 0.14.0
rdb_shredder: 0.13.1 # Version of the Spark Shredding process
hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
monitoring:
tags: {name: “data-pipeline-enrichment”} # Name-value pairs describing this job
logging:
level: DEBUG # You can optionally switch to INFO for production
snowplow:
method: get
protocol: http
port: 80
app_id: snowplow # e.g. snowplow
collector: d3rkrsqld9gmqf.cloudfront.net # e.g. d3rkrsqld9gmqf.cloudfront.net

And my iglu_resolver.json
{
“schema”: “iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-1”,
“data”: {
“cacheSize”: 500,
“repositories”: [
{
“name”: “Iglu Central”,
“priority”: 0,
“vendorPrefixes”: [ “com.snowplowanalytics” ],
“connection”: {
“http”: {
“uri”: “http://iglucentral.com
}
}
},
{
“name”: “Iglu Central - GCP Mirror”,
“priority”: 1,
“vendorPrefixes”: [ “com.snowplowanalytics” ],
“connection”: {
“http”: {
“uri”: “http://mirror01.iglucentral.com
}
}
}
]
}
}

This is the Amazon EMR error log:
Exception in thread “main” java.lang.RuntimeException: Error running job
at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:927)
at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:705)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
at com.amazon.elasticmapreduce.s3distcp.Main.main(Main.java:22)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
Caused by: org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://ip-172-31-28-186.us-east-2.compute.internal:8020/tmp/98d7c334-9beb-48ca-a5f1-4727b35cd3b1/files
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:317)
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:265)
at org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:59)
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:352)
at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:301)
at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:318)
at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:196)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1290)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1287)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1287)
at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:901)

Any help, please!

@mosan, it appears that despite using the latest pipeline version your lambda is still on “older” architecture whereby you enrich the data on EMR cluster. Is there a reason for this choice, don’t you enrich the data in the real-time pipeline (Stream Enrich)? You might wish to switch to Stream Enrich mode if you already enrich the data in real time.

Regardless, the error indicates that enriched files have not been copied over from EMR cluster to S3 (some kind of a glitch). Follow the instructions here (see “Recovery steps” section). Though, if you have had run the pipeline in Stream Enrich mode the enrichment in EMR would have been skipped altogether (no need to run enrichment twice - in real-time and in batch).

On a side note, I wouldn’t use i2 types in the EMR cluster (expensive!). Since you are using Spark Enrich as opposed to Hadoop. You might not need as much storage but you do need lots of memory. We typically use r4 instances in a similar configuration.

Thank you @ihor!
So I have emptied all buckets and switched to Stream Enrich.
Then I run:
./snowplow-emr-etl-runner run -c config.yml -r /home/centos/storage-loader/iglu_resolver.json --debug -t /home/centos/storage-loader/targets

I get this error:
Snowplow::EmrEtlRunner::EmrExecutionError (EMR jobflow j-2IWUR76IPK8IU failed, check Amazon EMR console and Hadoop logs for details (help: https://github.com/snowplow/snowplow/wiki/Troubleshooting-jobs-on-Elastic-MapReduce). Data files not archived.
Snowplow_ETL_Lzo: TERMINATING [STEP_FAILURE] ~ elapsed time n/a [2019-04-26 09:30:32 UTC - ]

    1. Elasticity Setup Hadoop Debugging: COMPLETED ~ 00:00:04 [2019-04-26 09:30:34 UTC - 2019-04-26 09:30:38 UTC]
    1. [staging_stream_enrich] s3-dist-cp: Stream Enriched s3://sp-archive-acc/stream/ -> Enriched Staging S3: COMPLETED ~ 00:00:32 [2019-04-26 09:30:40 UTC - 2019-04-26 09:31:12 UTC]
    1. [shred] s3-dist-cp: Enriched S3 -> HDFS: COMPLETED ~ 00:00:28 [2019-04-26 09:31:14 UTC - 2019-04-26 09:31:42 UTC]
    1. [shred] spark: Shred Enriched Events: FAILED ~ 00:00:56 [2019-04-26 09:31:44 UTC - 2019-04-26 09:32:41 UTC]
    1. [archive_shredded] s3-dist-cp: Shredded S3 -> Shredded Archive S3: CANCELLED ~ elapsed time n/a [ - ]
    1. [archive_enriched] s3-dist-cp: Enriched S3 -> Enriched Archive S3: CANCELLED ~ elapsed time n/a [ - ]
    1. [rdb_load] Load AWS Redshift enriched events storage Storage Target: CANCELLED ~ elapsed time n/a [ - ]
    1. [shred] s3-dist-cp: Shredded HDFS _SUCCESS -> S3: CANCELLED ~ elapsed time n/a [ - ]
    1. [shred] s3-dist-cp: Shredded types HDFS -> S3: CANCELLED ~ elapsed time n/a [ - ]
    1. [shred] s3-dist-cp: Shredded atomic events HDFS -> S3: CANCELLED ~ elapsed time n/a [ - ]):
      uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/emr_job.rb:783:in run' uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_reference.rb:43:insend_to’
      uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:76:in call_with' uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138:inblock in redefine_method’
      uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/runner.rb:138:in run' uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_reference.rb:43:insend_to’
      uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:76:in call_with' uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138:inblock in redefine_method’
      uri:classloader:/emr-etl-runner/bin/snowplow-emr-etl-runner:41:in <main>' org/jruby/RubyKernel.java:994:inload’
      uri:classloader:/META-INF/main.rb:1:in <main>' org/jruby/RubyKernel.java:970:inrequire’
      uri:classloader:/META-INF/main.rb:1:in (root)' uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/rubygems/core_ext/kernel_require.rb:1:in

So I’ve followed the Recovery steps for Stream Enrich mode:
Delete shredded:good [D]. Rerun the EmrEtlRunner with --skip staging_stream_enrich option.
I didn’t have any shredded:good
So I rerun the EmrEtlRunner and now I get:
Snowplow::EmrEtlRunner::EmrExecutionError (EMR jobflow j-BV2QPBNDYCL9 failed, check Amazon EMR console and Hadoop logs for details (help: https://github.com/snowplow/snowplow/wiki/Troubleshooting-jobs-on-Elastic-MapReduce). Data files not archived.
Snowplow_ETL_Lzo: TERMINATING [STEP_FAILURE] ~ elapsed time n/a [2019-04-26 09:49:39 UTC - ]

    1. Elasticity Setup Hadoop Debugging: COMPLETED ~ 00:00:04 [2019-04-26 09:49:40 UTC - 2019-04-26 09:49:45 UTC]
    1. [shred] s3-dist-cp: Enriched S3 -> HDFS: COMPLETED ~ 00:00:32 [2019-04-26 09:49:47 UTC - 2019-04-26 09:50:19 UTC]
    1. [shred] spark: Shred Enriched Events: FAILED ~ 00:00:54 [2019-04-26 09:50:21 UTC - 2019-04-26 09:51:15 UTC]
    1. [shred] s3-dist-cp: Shredded HDFS _SUCCESS -> S3: CANCELLED ~ elapsed time n/a [ - ]
    1. [shred] s3-dist-cp: Shredded types HDFS -> S3: CANCELLED ~ elapsed time n/a [ - ]
    1. [shred] s3-dist-cp: Shredded atomic events HDFS -> S3: CANCELLED ~ elapsed time n/a [ - ]
    1. [archive_shredded] s3-dist-cp: Shredded S3 -> Shredded Archive S3: CANCELLED ~ elapsed time n/a [ - ]
    1. [archive_enriched] s3-dist-cp: Enriched S3 -> S3 Enriched Archive: CANCELLED ~ elapsed time n/a [ - ]
    1. [rdb_load] Load AWS Redshift enriched events storage Storage Target: CANCELLED ~ elapsed time n/a [ - ]):
      uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/emr_job.rb:783:in run' uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_reference.rb:43:insend_to’
      uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:76:in call_with' uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138:inblock in redefine_method’
      uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/runner.rb:138:in run' uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_reference.rb:43:insend_to’
      uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:76:in call_with' uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138:inblock in redefine_method’
      uri:classloader:/emr-etl-runner/bin/snowplow-emr-etl-runner:41:in <main>' org/jruby/RubyKernel.java:994:inload’
      uri:classloader:/META-INF/main.rb:1:in <main>' org/jruby/RubyKernel.java:970:inrequire’
      uri:classloader:/META-INF/main.rb:1:in (root)' uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/rubygems/core_ext/kernel_require.rb:1:in

So I’ve followed the Recovery steps for Stream Enrich mode:
Rerun the *EmrEtlRunner* with–skip staging_stream_enrichoption.
But I get the same error…
Any ideas?
Thanks!!

As I can see the job fails as soon as shredding starts. There seems to be an issue with your data. Is it enriched events you have in s3://sp-archive-acc/stream/ (or raw)? Any valid events at all in there?

Yes @ihor it seems that the job fails at the begining of shredding. It’s enriched events in s3://sp-archive-acc/stream/
And yes, I think there are valid events…
Any idea?
Thanks!

Hi @ihor,
If I move the events from:
sp-enrich-acc/good/run=2019-05-02-08-32-16/run_yr=2019/run_mo=05/run_dy=122
to a new folder :
sp-enrich-acc/good/run=2019-05-02
and then I run:
./snowplow-emr-etl-runner run --skip staging_stream_enrich -c config.yml -r iglu_resolver.json --debug -t /home/ubuntu/storage/targets
Then the process continues through the shredding.
I’ve checked the logs and I see this line:
19/05/02 09:08:56 WARN RoleMappings: Found no mappings configured with ‘fs.s3.authorization.roleMapping’, credentials resolution may not work as expected

Any ideas?
Thank you!

@mosan, right. Those extra folders definitely mess things up. The enriched files are expected to be in the run folder directly such as sp-enrich-acc/good/run=2019-05-02-08-32-16/<enriched_files>. You might need to redo your upstream configuration to adhere to this format or come up with your custom staging step to get rid of those extra folders (run_yr=2019/run_mo=05/run_dy=122).

Thanks @ihor, I’ve redone my upstream configuration and it seems to work fine!