Help to solve EmrEtlRunner HDFS > S3

Hello friends,

I’m trying to setup Snowplow, got success in first steps, but now I got stuck in the process to move the enriched:good data from the HDFS to S3 bucket.

I’ve already read many similar posts and tried many suggested actions on my configs trying to pass this step, but none worked.

I’m using Scala Stream Collector as collector, I can see in console the events coming to my streams on Kinesis. I have setup an Enrichment (Scala Stream Enrich) that is running and moving datums.

I have a Kinesis S3 Sink running, here is the running loop output on my console:

Apr 06, 2017 3:24:34 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker initialize
INFO: Syncing Kinesis shard info
Apr 06, 2017 3:24:34 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker initialize
INFO: Starting LeaseCoordinator
Apr 06, 2017 3:24:44 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker run
INFO: Initialization complete. Starting worker loop.
Apr 06, 2017 3:24:44 PM com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable publishMetrics
INFO: Successfully published 16 datums.
Apr 06, 2017 3:24:54 PM com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable publishMetrics
INFO: Successfully published 4 datums.
Apr 06, 2017 3:25:04 PM com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable publishMetrics

… and so many others loops.

My S3 bucket for ‘enriched:good’ is empty. So, reading the Pipeline process to solve step problems, when they say to clean the folder ‘enriched:good’ (Step 4) Batch Pipeline steps it’s already clean, and running the emr-etl-runner with the --skip staging, the process crash with the error:

    Snowplow::EmrEtlRunner::EmrExecutionError (EMR jobflow j-2985V5JBVCX83 failed, check Amazon EMR console and Hadoop logs for details (help: https://github.com/snowplow/snowplow/wiki/Troubleshooting-jobs-on-Elastic-MapReduce). Data files not archived.
    johnsnow: TERMINATING [STEP_FAILURE] ~ elapsed time n/a [2017-04-06 14:40:11 +0000 - ]
     - 1. Elasticity Setup Hadoop Debugging: COMPLETED ~ 00:00:13 [2017-04-06 14:40:12 +0000 - 2017-04-06 14:40:25 +0000]
     - 2. Elasticity S3DistCp Step: Raw S3 -> HDFS: COMPLETED ~ 00:04:48 [2017-04-06 14:40:27 +0000 - 2017-04-06 14:45:16 +0000]
     - 3. Elasticity Scalding Step: Enrich Raw Events: COMPLETED ~ 00:02:00 [2017-04-06 14:45:25 +0000 - 2017-04-06 14:47:26 +0000]
     **- 4. Elasticity S3DistCp Step: Enriched HDFS -> S3: FAILED ~ 00:00:14 [2017-04-06 14:47:28** +0000 - 2017-04-06 14:47:42 +0000]
     - 5. Elasticity S3DistCp Step: Raw S3 Staging -> S3 Archive: CANCELLED ~ elapsed time n/a [ - ]
     - 6. Elasticity S3DistCp Step: Shredded HDFS -> S3: CANCELLED ~ elapsed time n/a [ - ]
     - 7. Elasticity Scalding Step: Shred Enriched Events: CANCELLED ~ elapsed time n/a [ - ]
     - 8. Elasticity S3DistCp Step: Enriched HDFS _SUCCESS -> S3: CANCELLED ~ elapsed time n/a [ - ]):
        uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/emr_job.rb:500:in `run'
        uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_reference.rb:43:in `send_to'
        uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:76:in `call_with'
        uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138:in `block in redefine_method'
        uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/runner.rb:74:in `run'
        uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_reference.rb:43:in `send_to'
        uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:76:in `call_with'
        uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138:in `block in redefine_method'
        uri:classloader:/emr-etl-runner/bin/snowplow-emr-etl-runner:39:in `<main>'
        org/jruby/RubyKernel.java:973:in `load'
        uri:classloader:/META-INF/main.rb:1:in `<main>'
        org/jruby/RubyKernel.java:955:in `require'
        uri:classloader:/META-INF/main.rb:1:in `(root)'
        uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/rubygems/core_ext/kernel_require.rb:1:in `<main>'

My config for emr-etl-runner is: https://pastebin.com/8CSb6zaU

Is there someone that solved this problem and could help me?

Hi @paulorod7 - could you share the topology you have wired up using ASCII art? I’m particularly interested in what Kinesis S3 Sink is reading from and writing to…

Hello, Alex!

Thank you for the quickly response,

My topology is:
Trackers (JS, PHP, Pixel) -> Collector (Scala Stream) -> Kinesis (streams good, bad) -> Scala Stream Enrich -> Kinesis S3 -> EmrEtlRunner -> StorageLoader -> Redshift

I’m really in doubt about the enrich process, when the flow get the informations received by the collector and transform the data to save to S3 bucket. Maybe I don’t configured the correct flow or I misunderstood the process while reading the docs.

Thank you for your time and help!

Hello Friends,

I am working together @paulorod7 and we are seeing a error on step 4, when we try to run this command:

./snowplow-emr-etl-runner --config configs/config_emr.yml --resolver resolvers/resolver-emr-etl.json --enrichments configs/enrichments --debug --skip staging

    Exception in thread "main" java.lang.RuntimeException: Error running job
    	at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:927)
    	at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:720)
    	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
    	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
    	at com.amazon.elasticmapreduce.s3distcp.Main.main(Main.java:22)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.lang.reflect.Method.invoke(Method.java:606)
    	at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
    	at org.apache.hadoop.util.RunJar.main(RunJar.java:136)

Caused by: org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://ip-172-31-45-177.ec2.internal:8020/tmp/30d0cfcd-77a6-481c-9c81-4ca87401d4b5/files

at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:317)
    	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:265)
    	at org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:59)
    	
    	... 10 more

We don’t know where is our wrong.

Thank you for your time and help!

Hi @paulorod7, @Jeferson - it sounds like you have implemented a slightly altered version of the Lambda architecture that we propose:

http://discourse.snowplow.io/t/how-to-setup-a-lambda-architecture-for-snowplow/249

Instead of archiving the raw events to S3, you are archiving the enriched events to S3. Your EMR job is then failing because you are attempting to enrich the already-enriched events.

Try running the EmrEtlRunner with --skip staging,enrich to prevent this.

Hi @alex ,

Thank you for the response! We’ve edited our topology trying to focus on Batch flow first.

Collector -> Kinesis (stream ‘raw’) -> Kinesis S3 (LZO S3 Sink) -> S3 Buckets -> EMR job -> Redshift

Now we can save the records (lzo files) that our Kinesis S3 Sink are generating by Kinesis Stream, into the S3 Bucket. There are many files. But when the EMR job starts, we get the ‘FAILED’ status:

  • in the #4 step (4. Elasticity S3DistCp Step: Enriched HDFS -> S3: FAILED ~ 00:00:14) when using --skip staging

  • in the #2 step (2. Elasticity S3DistCp Step: Enriched S3 -> HDFS: FAILED ~ 00:00:14 ) when using --skip staging,enrich

The error generated is:

    Exception in thread "main" java.lang.RuntimeException: Error running job
    	at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:927)
    	at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:720)
    	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
    	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
    	at com.amazon.elasticmapreduce.s3distcp.Main.main(Main.java:22)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.lang.reflect.Method.invoke(Method.java:606)
    	at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
    	at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
    **Caused by: org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://ip-172-31-39-94.ec2.internal:8020/tmp/0cabc09d-2adb-4840-8335-eb664e328802/files**
    	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:317)
    	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:265)
    	at org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:59)
    	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:352)
    	at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:301)
    	at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:318)
    	at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:196)
    	at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1290)
    	at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1287)
    	at java.security.AccessController.doPrivileged(Native Method)
    	at javax.security.auth.Subject.doAs(Subject.java:415)
    	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
    	at org.apache.hadoop.mapreduce.Job.submit(Job.java:1287)
    	at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:901)
    	... 10 more 

And in the S3 [controller error folder] we got:

INFO redirectOutput to /mnt/var/log/hadoop/steps/s-1UOK3UZKGRZ9S/stdout
INFO redirectError to /mnt/var/log/hadoop/steps/s-1UOK3UZKGRZ9S/stderr
INFO Working dir /mnt/var/lib/hadoop/steps/s-1UOK3UZKGRZ9S
INFO ProcessRunner started child process 6227 :
hadoop    6227  2195  0 14:54 ?        00:00:00 bash /usr/lib/hadoop/bin/hadoop jar /usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar --src s3n://aff-snow/snow-tmp/ --dest hdfs:///local/snowplow/enriched-events/ --srcPattern .part-. --s3Endpoint s3.amazonaws.com
2017-04-10T14:54:04.792Z INFO HadoopJarStepRunner.Runner: startRun() called for s-1UOK3UZKGRZ9S Child Pid: 6227
INFO Synchronously wait child process to complete : hadoop jar /usr/share/aws/emr/s3-dist-cp/lib/s3...
INFO waitProcessCompletion ended with exit code 1 : hadoop jar /usr/share/aws/emr/s3-dist-cp/lib/s3...
INFO total process run time: 18 seconds
2017-04-10T14:54:21.104Z INFO Step created jobs: 
2017-04-10T14:54:21.104Z WARN Step failed with exitCode 1 and took 18 seconds

Thank you for your time and help!

Hello, @alex

I’m really stuck on the errors, I think it’s something related to S3DiscCP while the process try to move the files from my processing bucket to HDFS. But I can’t figure out what to do to fix it.

My log bucket shows me:

INFO redirectOutput to /mnt/var/log/hadoop/steps/s-31187MFU46TSL/stdout
INFO redirectError to /mnt/var/log/hadoop/steps/s-31187MFU46TSL/stderr
INFO Working dir /mnt/var/lib/hadoop/steps/s-31187MFU46TSL
INFO ProcessRunner started child process 6217 :
hadoop    6217  2185  0 13:47 ?        00:00:00 bash /usr/lib/hadoop/bin/hadoop jar /usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar --src s3://aff-snow/snow-processing/ --dest hdfs:///local/snowplow/raw-events/ --s3Endpoint s3.amazonaws.com
2017-04-11T13:47:23.515Z INFO HadoopJarStepRunner.Runner: startRun() called for s-31187MFU46TSL Child Pid: 6217
INFO Synchronously wait child process to complete : hadoop jar /usr/share/aws/emr/s3-dist-cp/lib/s3...
INFO waitProcessCompletion ended with exit code 1 : hadoop jar /usr/share/aws/emr/s3-dist-cp/lib/s3...
INFO total process run time: 152 seconds
2017-04-11T13:49:53.926Z INFO Step created jobs: job_1491918266143_0001
2017-04-11T13:49:53.927Z WARN Step failed as jobs it created failed. Ids:job_1491918266143_0001

And my job log json shows:

{"type":"TASK_FAILED","event":{"org.apache.hadoop.mapreduce.jobhistory.TaskFailed":{"taskid":"task_1491918266143_0001_r_000006","taskType":"REDUCE","finishTime":1491918590971,"error":", Error: java.lang.RuntimeException: Reducer task failed to copy 1 files: s3://aff-snow/snow-processing/2017-04-11-49572160238853202733442753284482235255824138614794617074-49572160238853202733442753284514876252953734152267497714.us-east-1.snow-processing.us-east-1.snow-processing.us-east-1.snow-processing.us-east-1.snow-processing.us-east-1.snow-processing.us-east-1.snow-processing.lzo etc\n\tat com.amazon.elasticmapreduce.s3distcp.CopyFilesReducer.cleanup(CopyFilesReducer.java:75)\n\tat org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:195)\n\tat org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:656)\n\tat org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:394)\n\tat org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:172)\n\tat java.security.AccessController.doPrivileged(Native Method)\n\tat javax.security.auth.Subject.doAs(Subject.java:415)\n\tat org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)\n\tat org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:166)\n","failedDueToAttempt":{"string":"attempt_1491918266143_0001_r_000006_3"},"status":"FAILED","counters":{"org.apache.hadoop.mapreduce.jobhistory.JhCounters":{"name":"COUNTERS","groups":[]}}}}}

{"type":"JOB_FAILED","event":{"org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletion":{"jobid":"job_1491918266143_0001","finishTime":1491918590978,"finishedMaps":1,"finishedReduces":6,"jobStatus":"FAILED","diagnostics":{"string":"Task failed task_1491918266143_0001_r_000006\nJob failed as tasks failed. failedMaps:0 failedReduces:1"}}}}

I’ve tried to empty the buckets and re-run the jobs following the Pipeling Table Guide, mentioned in a similar post, but failed too.

Any help will be really great, thank you for your time! :slight_smile:

Hello @paulorod7 ,
I’ve run into the same problem (same topology – lambda focusing on batch first, same step failure, same log outputs). Have you by any chance solved the issue, and if, could you please tell me how?

Thanks!

@paulorod7,

It appears you have confused 2 scenarios.

@alex provided instructions for your old topology. With the new one, you shouldn’t be using --skip staging,enrich (unless recovering from a failure).

Your new topology indicates you sink raw events to S3. You don’t want to skip those but rather process/enrich them with EmrEtlRunner. Do make sure to specify the “thrift” format in your config.yml

collectors:
  format: "thrift"

and run EmrEtlRunner without skipping anything.