EmrEtlRunner fails when copying enriched events to S3

Hello,

I am currently trying to set up batch enrichment using EmrEtlRunner.
The staging process is working fine and my logs are copied from their raw/in bucket to raw/processing.
Then the EMR cluster starts and apparently finishes the Hadoop enrichment step but the logs (see below, seems to indicate that nothing was processed).

Then when the enriched events should be copied back to S3, I get the following error:

Exception in thread "main" java.lang.RuntimeException: Error running job
	at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:927)
	at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:720)
	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
	at com.amazon.elasticmapreduce.s3distcp.Main.main(Main.java:22)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
	at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
Caused by: org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://<IP_ADDR:PORT>/tmp/03482dc5-7ec2-4884-9593-e196dd9fe755/files
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:317)
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:265)
	at org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:59)
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:352)
	at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:301)
	at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:318)
	at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:196)
	at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1290)
	at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1287)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:415)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
	at org.apache.hadoop.mapreduce.Job.submit(Job.java:1287)
	at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:901)
	... 10 more

You can find some logs attached to this post and here is some more information about my configuration:

  • I am using the clojure collector but we modified a bit the logrotate to have a custom file name. I forked and edited the EmrEtlRunner project to add a file_pattern attribute in the collectors namespace. This allows me to copy logs with a different naming from the raw/in bucket to raw/processing.
  • I do not have any custom enrichments set up yet.

Do you have any idea why this error could happen?
Something else I need to mention, if I try to run EmrEtlRunne with the --debug option, it fails because it can’t initialize debug for hadoop.

Thank you in advance for your response.

Console output - No debug

./snowplow-emr-etl-runner --skip staging --enrichments ../emr-etl-runner/enrichments/ --config ../emr-etl-runner/conf/dev-conf.yml --resolver ../emr-etl-runner/iglu/iglu_resolver.json
D, [2016-10-26T15:46:07.864000 #11714] DEBUG -- : Initializing EMR jobflow
D, [2016-10-26T15:46:15.102000 #11714] DEBUG -- : EMR jobflow j-3HMEWSLYEE8VW started, waiting for jobflow to complete...
F, [2016-10-26T15:58:18.325000 #11714] FATAL -- : 

Snowplow::EmrEtlRunner::EmrExecutionError (EMR jobflow j-3HMEWSLYEE8VW failed, check Amazon EMR console and Hadoop logs for details (help: https://github.com/snowplow/snowplow/wiki/Troubleshooting-jobs-on-Elastic-MapReduce). Data files not archived.
Snowplow ETL: TERMINATING [STEP_FAILURE] ~ elapsed time n/a [2016-10-26 15:55:05 +0200 - ]
 - 1. Elasticity Scalding Step: Enrich Raw Events: COMPLETED ~ 00:01:50 [2016-10-26 15:55:12 +0200 - 2016-10-26 15:57:02 +0200]
 - 2. Elasticity S3DistCp Step: Enriched HDFS -> S3: FAILED ~ 00:00:14 [2016-10-26 15:57:04 +0200 - 2016-10-26 15:57:19 +0200]
 - 3. Elasticity S3DistCp Step: Shredded HDFS -> S3: CANCELLED ~ elapsed time n/a [ - ]
 - 4. Elasticity Scalding Step: Shred Enriched Events: CANCELLED ~ elapsed time n/a [ - ]
 - 5. Elasticity S3DistCp Step: Enriched HDFS _SUCCESS -> S3: CANCELLED ~ elapsed time n/a [ - ]):
    uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/emr_job.rb:475:in `run'
    uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_reference.rb:43:in `send_to'
    uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:76:in `call_with'
    uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138:in `block in redefine_method'
    uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/runner.rb:68:in `run'
    uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_reference.rb:43:in `send_to'
    uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:76:in `call_with'
    uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138:in `block in redefine_method'
    uri:classloader:/emr-etl-runner/bin/snowplow-emr-etl-runner:39:in `<main>'
    org/jruby/RubyKernel.java:973:in `load'
    uri:classloader:/META-INF/main.rb:1:in `<main>'
    org/jruby/RubyKernel.java:955:in `require'
    uri:classloader:/META-INF/main.rb:1:in `(root)'
    uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/rubygems/core_ext/kernel_require.rb:1:in `<main>'

Console output - Debug

./snowplow-emr-etl-runner --debug --skip staging --enrichments ../emr-etl-runner/enrichments/ --config ../emr-etl-runner/conf/dev-conf.yml --resolver ../emr-etl-runner/iglu/iglu_resolver.json 
D, [2016-10-26T16:01:53.883000 #12879] DEBUG -- : Initializing EMR jobflow
D, [2016-10-26T16:01:59.577000 #12879] DEBUG -- : EMR jobflow j-YK85XVK0207O started, waiting for jobflow to complete...
F, [2016-10-26T16:12:02.353000 #12879] FATAL -- : 

Snowplow::EmrEtlRunner::EmrExecutionError (EMR jobflow j-YK85XVK0207O failed, check Amazon EMR console and Hadoop logs for details (help: https://github.com/snowplow/snowplow/wiki/Troubleshooting-jobs-on-Elastic-MapReduce). Data files not archived.
Snowplow ETL: TERMINATING [STEP_FAILURE] ~ elapsed time n/a [2016-10-26 16:10:33 +0200 - ]
 - 1. Elasticity Setup Hadoop Debugging: FAILED ~ 00:00:00 [2016-10-26 16:10:33 +0200 - 2016-10-26 16:10:33 +0200]
 - 2. Elasticity S3DistCp Step: Shredded HDFS -> S3: CANCELLED ~ elapsed time n/a [ - ]
 - 3. Elasticity Scalding Step: Shred Enriched Events: CANCELLED ~ elapsed time n/a [ - ]
 - 4. Elasticity S3DistCp Step: Enriched HDFS _SUCCESS -> S3: CANCELLED ~ elapsed time n/a [ - ]
 - 5. Elasticity S3DistCp Step: Enriched HDFS -> S3: CANCELLED ~ elapsed time n/a [ - ]
 - 6. Elasticity Scalding Step: Enrich Raw Events: CANCELLED ~ elapsed time n/a [ - ]):
    uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/emr_job.rb:475:in `run'
    uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_reference.rb:43:in `send_to'
    uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:76:in `call_with'
    uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138:in `block in redefine_method'
    uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/runner.rb:68:in `run'
    uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_reference.rb:43:in `send_to'
    uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:76:in `call_with'
    uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138:in `block in redefine_method'
    uri:classloader:/emr-etl-runner/bin/snowplow-emr-etl-runner:39:in `<main>'
    org/jruby/RubyKernel.java:973:in `load'
    uri:classloader:/META-INF/main.rb:1:in `<main>'
    org/jruby/RubyKernel.java:955:in `require'
    uri:classloader:/META-INF/main.rb:1:in `(root)'
    uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/rubygems/core_ext/kernel_require.rb:1:in `<main>'

Configuration file

aws:
  # Credentials can be hardcoded or set in environment variables
  access_key_id: KEY_ID
  secret_access_key: KEY_SECRET
  s3:
    region: eu-west-1
    buckets:
      assets: s3://snowplow-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
      jsonpath_assets: s3://my_bucket/jsonpath/ # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
      log: s3://my_bucket/log/ # Bbucket in which Amazon EMR will record processing information for this job run, including logging any errors
      raw:
        in:                  # Multiple in buckets are permitted
          - s3://my_bucket/raw/in/ # e.g. s3://my-in-bucket
          # - ADD HERE # Another bucket
        processing: s3://my_bucket/raw/processing/
        archive: s3://my_bucket/raw/archive/    # e.g. s3://my-archive-bucket/raw
      enriched:
        good: s3://my_bucket/enriched/good/       # e.g. s3://my-out-bucket/enriched/good
        bad: s3://my_bucket/enriched/bad/        # e.g. s3://my-out-bucket/enriched/bad
        errors: #s3://my_bucket/enriched/errors     # Leave blank unless :continue_on_unexpected_error: set to true below
        archive: s3://my_bucket/enriched/archive/    # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched
      shredded:
        good: s3://my_bucket/shredded/good/       # e.g. s3://my-out-bucket/shredded/good
        bad: s3://my_bucket/shredded/bad/        # e.g. s3://my-out-bucket/shredded/bad
        errors: #s3://my_bucket/shredded/errors     # Leave blank unless :continue_on_unexpected_error: set to true below
        archive: s3://my_bucket/shredded/archive/    # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded
  emr:
    ami_version: 4.5.0
    region: eu-west-1        # Always set this
    jobflow_role: SnowplowEMRInstanceRole # Created using $ aws emr create-default-roles
    service_role: SnowplowEMRRole     # Created using $ aws emr create-default-roles
    placement: eu-west-1a # Set this if NOT running in VPC. Leave blank otherwise
    ec2_subnet_id: # Set this if running in VPC. Leave blank otherwise
    ec2_key_name: KEY_NAME
    bootstrap: []           # Set this to specify custom boostrap actions. Leave empty otherwise
    software:
      hbase:                # Optional. To launch on cluster, provide version, "0.92.0", keep quotes. Leave empty otherwise.
      lingual:              # Optional. To launch on cluster, provide version, "1.1", keep quotes. Leave empty otherwise.
    # Adjust your Hadoop cluster below
    jobflow:
      master_instance_type: m1.medium
      core_instance_count: 2
      core_instance_type: m1.medium
      task_instance_count: 0 # Increase to use spot instances
      task_instance_type: m1.medium
      task_instance_bid: 0.015 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
    bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
    additional_info:        # Optional JSON string for selecting additional features
collectors:
  format: clj-tomcat # For example: 'clj-tomcat' for the Clojure Collector, 'thrift' for Thrift records, 'tsv/com.amazon.aws.cloudfront/wd_access_log' for Cloudfront access logs or 'ndjson/urbanairship.connect/v1' for UrbanAirship Connect events
  file_pattern: "my_event_log_pattern.*"
enrich:
  job_name: Snowplow ETL # Give your job a name
  versions:
    hadoop_enrich: 1.8.0 # Version of the Hadoop Enrichment process
    hadoop_shred: 0.9.0 # Version of the Hadoop Shredding process
    hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
  continue_on_unexpected_error: false # Set to 'true' (and set :out_errors: above) if you don't want any exceptions thrown from ETL
  output_compression: GZIP # Compression only supported with Redshift, set to NONE if you have Postgres targets. Allowed formats: NONE, GZIP
storage:
  download:
    folder: # Postgres-only config option. Where to store the downloaded files. Leave blank for Redshift
  targets:
    - name: "My Redshift database"
      type: redshift
      host: MY_REDSHIT_HOST # The endpoint as shown in the Redshift console
      database: MY_REDSHIT_DATABASE # Name of database
      port: MY_REDSHIT_PORT # Default Redshift port
      ssl_mode: disable # One of disable (default), require, verify-ca or verify-full
      table: atomic.events
      username: MY_REDSHIT_USR
      password: MY_REDSHIT_PWD
      maxerror: 1 # Stop loading on first error, or increase to permit more load errors
      comprows: 200000 # Default for a 1 XL node cluster. Not used unless --include compupdate specifiedh
monitoring:
  tags: {} # Name-value pairs describing this job
  logging:
    level: DEBUG # You can optionally switch to INFO for production

Logs of enrichment step

2016-10-24 13:44:54,119 INFO cascading.flow.Flow (flow com.snowplowanalytics.snowplow.enrich.hadoop.EtlJob): [com.snowplowanalytics....]  parallel execution is enabled: true
2016-10-24 13:44:54,119 INFO cascading.flow.Flow (flow com.snowplowanalytics.snowplow.enrich.hadoop.EtlJob): [com.snowplowanalytics....]  starting jobs: 3
2016-10-24 13:44:54,119 INFO cascading.flow.Flow (flow com.snowplowanalytics.snowplow.enrich.hadoop.EtlJob): [com.snowplowanalytics....]  allocating threads: 3
2016-10-24 13:44:54,123 INFO cascading.flow.FlowStep (pool-5-thread-1): [com.snowplowanalytics....] starting step: (1/3)
2016-10-24 13:44:54,222 INFO org.apache.hadoop.yarn.client.RMProxy (pool-5-thread-1): Connecting to ResourceManager at <IP_ADDR>
2016-10-24 13:44:54,717 INFO org.apache.hadoop.yarn.client.RMProxy (pool-5-thread-1): Connecting to ResourceManager at <IP_ADDR>
2016-10-24 13:44:56,945 INFO com.hadoop.compression.lzo.GPLNativeCodeLoader (pool-5-thread-1): Loaded native gpl library
2016-10-24 13:44:56,949 INFO com.hadoop.compression.lzo.LzoCodec (pool-5-thread-1): Successfully loaded & initialized native-lzo library [hadoop-lzo rev 38958adfc6f8f191da8f76e5beafd1f11eaccab2]
2016-10-24 13:44:57,011 INFO com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem (pool-5-thread-1): listStatus s3://my_bucket/raw/processing with recursive false
2016-10-24 13:44:57,089 INFO org.apache.hadoop.mapred.FileInputFormat (pool-5-thread-1): Total input paths to process : 0
2016-10-24 13:44:57,232 INFO org.apache.hadoop.mapreduce.JobSubmitter (pool-5-thread-1): number of splits:0
2016-10-24 13:44:57,964 INFO org.apache.hadoop.mapreduce.JobSubmitter (pool-5-thread-1): Submitting tokens for job: job_1477316512805_0001
2016-10-24 13:44:58,842 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl (pool-5-thread-1): Submitted application application_1477316512805_0001
2016-10-24 13:44:58,969 INFO org.apache.hadoop.mapreduce.Job (pool-5-thread-1): The url to track the job: http://<IP_ADDR>/proxy/application_1477316512805_0001/
2016-10-24 13:44:58,971 INFO cascading.flow.FlowStep (pool-5-thread-1): [com.snowplowanalytics....] submitted hadoop job: job_1477316512805_0001
2016-10-24 13:44:58,971 INFO cascading.flow.FlowStep (pool-5-thread-1): [com.snowplowanalytics....] tracking url: http://<IP_ADDR>/proxy/application_1477316512805_0001/
2016-10-24 13:45:24,127 INFO org.apache.hadoop.mapred.ClientServiceDelegate (pool-5-thread-1): Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
2016-10-24 13:45:24,323 INFO cascading.util.Update (UpdateRequestTimer): newer Cascading release available: 2.6.3
2016-10-24 13:45:25,665 INFO cascading.flow.FlowStep (pool-5-thread-2): [com.snowplowanalytics....] starting step: (2/3) ...d/run=2016-10-24-15-35-02
2016-10-24 13:45:25,665 INFO cascading.flow.FlowStep (pool-5-thread-3): [com.snowplowanalytics....] starting step: (3/3) .../snowplow/enriched-events
2016-10-24 13:45:25,756 INFO org.apache.hadoop.yarn.client.RMProxy (pool-5-thread-2): Connecting to ResourceManager at <IP_ADDR>
2016-10-24 13:45:25,863 INFO org.apache.hadoop.yarn.client.RMProxy (pool-5-thread-3): Connecting to ResourceManager at <IP_ADDR>
2016-10-24 13:45:25,918 INFO org.apache.hadoop.yarn.client.RMProxy (pool-5-thread-2): Connecting to ResourceManager at <IP_ADDR>
2016-10-24 13:45:25,991 INFO org.apache.hadoop.yarn.client.RMProxy (pool-5-thread-3): Connecting to ResourceManager at <IP_ADDR>
2016-10-24 13:45:29,719 INFO org.apache.hadoop.mapred.FileInputFormat (pool-5-thread-3): Total input paths to process : 0
2016-10-24 13:45:29,851 INFO org.apache.hadoop.mapreduce.JobSubmitter (pool-5-thread-3): number of splits:0
2016-10-24 13:45:30,016 INFO org.apache.hadoop.mapreduce.JobSubmitter (pool-5-thread-3): Submitting tokens for job: job_1477316512805_0002
2016-10-24 13:45:30,051 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl (pool-5-thread-3): Submitted application application_1477316512805_0002
2016-10-24 13:45:30,058 INFO org.apache.hadoop.mapreduce.Job (pool-5-thread-3): The url to track the job: http://<IP_ADDR>/proxy/application_1477316512805_0002/
2016-10-24 13:45:30,058 INFO cascading.flow.FlowStep (pool-5-thread-3): [com.snowplowanalytics....] submitted hadoop job: job_1477316512805_0002
2016-10-24 13:45:30,058 INFO cascading.flow.FlowStep (pool-5-thread-3): [com.snowplowanalytics....] tracking url: http://<IP_ADDR>/proxy/application_1477316512805_0002/
2016-10-24 13:45:30,280 INFO org.apache.hadoop.mapred.FileInputFormat (pool-5-thread-2): Total input paths to process : 0
2016-10-24 13:45:30,345 INFO org.apache.hadoop.mapreduce.JobSubmitter (pool-5-thread-2): number of splits:0
2016-10-24 13:45:30,430 INFO org.apache.hadoop.mapreduce.JobSubmitter (pool-5-thread-2): Submitting tokens for job: job_1477316512805_0003
2016-10-24 13:45:30,467 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl (pool-5-thread-2): Submitted application application_1477316512805_0003
2016-10-24 13:45:30,472 INFO org.apache.hadoop.mapreduce.Job (pool-5-thread-2): The url to track the job: http://<IP_ADDR>/proxy/application_1477316512805_0003/
2016-10-24 13:45:30,472 INFO cascading.flow.FlowStep (pool-5-thread-2): [com.snowplowanalytics....] submitted hadoop job: job_1477316512805_0003
2016-10-24 13:45:30,473 INFO cascading.flow.FlowStep (pool-5-thread-2): [com.snowplowanalytics....] tracking url: http://<IP_ADDR>/proxy/application_1477316512805_0003/
2016-10-24 13:45:50,083 INFO org.apache.hadoop.mapred.ClientServiceDelegate (pool-5-thread-3): Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
2016-10-24 13:46:06,286 INFO cascading.tap.hadoop.util.Hadoop18TapUtil (flow com.snowplowanalytics.snowplow.enrich.hadoop.EtlJob): deleting temp path s3://my_bucket/enriched/bad/run=2016-10-24-15-35-02/_temporary
2016-10-24 13:46:06,334 INFO cascading.tap.hadoop.util.Hadoop18TapUtil (flow com.snowplowanalytics.snowplow.enrich.hadoop.EtlJob): deleting temp path hdfs:/local/snowplow/enriched-events/_temporary

Hi @platuraze,

It sounds like your pipeline is not successfully writing out any enriched events - hence the S3DistCp failure (it’s very fussy about there being no data to copy).

I would be surprised if the reason doesn’t lie somewhere in your fork - can I ask why you needed to support custom file names, and whether you also changed the contents of the files that are being processed by Snowplow?