Reprocessing Bad Events, EmrEtlRunner Error

I’m attempting to reprocess bad events following the steps outlined here and load them to Redshift.

I’ve successfully reprocessed the bad events, with the results sent to the bucket s3://q-snowplow-recovered/recovered within that bucket, there are 3 files: _SUCCESS, part-00000, and part-00001.

I’ve updated the config file for the EmrEtlRunner to the following:

raw:
  in:                  # Multiple in buckets are permitted
    - s3://q-snowplow-does-not-exist          # IGNORED e.g. s3://my-in-bucket
  processing: s3://q-snowplow-recovered/recovered

When I then attempt to run the EmrEtlRunner as follows (hiding the full paths for brevity):

snowplow-emr-etl-runner --config config-recovery.yml --resolver iglu_resolver.json --enrichments enrichments --skip staging

I get the following errors during the Elasticity S3DistCp Step: Raw S3 -> HDFS step of the EMR job:

2017-08-21 18:20:14,146 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Running with args: --src s3://q-snowplow-recovered/recovered/ --dest hdfs:///local/snowplow/raw-events/ --s3Endpoint s3.amazonaws.com --groupBy .*\.([0-9]+-[0-9]+-[0-9]+)-[0-9]+\..* --targetSize 128 --outputCodec lzo 
2017-08-21 18:20:16,156 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): S3DistCp args: --src s3://q-snowplow-recovered/recovered/ --dest hdfs:///local/snowplow/raw-events/ --s3Endpoint s3.amazonaws.com --groupBy .*\.([0-9]+-[0-9]+-[0-9]+)-[0-9]+\..* --targetSize 128 --outputCodec lzo 
2017-08-21 18:20:33,157 INFO com.amazon.ws.emr.hadoop.fs.EmrFileSystem (main): Consistency disabled, using com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem as filesystem implementation
2017-08-21 18:20:33,961 INFO amazon.emr.metrics.MetricsSaver (main): MetricsConfigRecord disabledInCluster: false instanceEngineCycleSec: 60 clusterEngineCycleSec: 60 disableClusterEngine: false maxMemoryMb: 3072 maxInstanceCount: 500 lastModified: 1503339465506 
2017-08-21 18:20:33,962 INFO amazon.emr.metrics.MetricsSaver (main): Created MetricsSaver j-LIBQ62MBKWUX:i-032d6420d929c9538:RunJar:06282 period:60 /mnt/var/em/raw/i-032d6420d929c9538_20170821_RunJar_06282_raw.bin
2017-08-21 18:20:36,908 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Using output path 'hdfs:/tmp/711f7e02-88b7-4d88-8568-b06119b03d32/output'
2017-08-21 18:20:37,879 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): DefaultAWSCredentialsProviderChain is used to create AmazonS3Client. KeyId: ASIAIXI7OEIJZACG2PFQ
2017-08-21 18:20:37,879 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): AmazonS3Client setEndpoint s3.amazonaws.com
2017-08-21 18:20:38,166 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Skipping key 'recovered/' because it ends with '/'
2017-08-21 18:20:38,166 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Created 0 files to copy 0 files 
2017-08-21 18:20:38,263 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Reducer number: 10
2017-08-21 18:20:38,964 INFO org.apache.hadoop.yarn.client.RMProxy (main): Connecting to ResourceManager at ip-172-31-40-121.ec2.internal/172.31.40.121:8032
2017-08-21 18:20:42,294 INFO org.apache.hadoop.mapreduce.JobSubmitter (main): Cleaning up the staging area /tmp/hadoop-yarn/staging/hadoop/.staging/job_1503339444086_0001
2017-08-21 18:20:42,321 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Try to recursively delete hdfs:/tmp/711f7e02-88b7-4d88-8568-b06119b03d32/tempspace

I’ve tried running with several variants in the config raw.processing to no avail:

s3://q-snowplow-recovered/recovered
s3://q-snowplow-recovered/recovered/
s3://q-snowplow-recovered
s3://q-snowplow-recovered/

Thanks in advance for your help!

@alex @BenFradet I see you’re the last two folks to update the Hadoop Event Recovery docs. Any chance you have insight into what I’m doing wrong?

Again, thanks in advance.

Hey @tfinkel - the log lines you quoted are all INFO level. Do you have any errors you can share?

Alternatively, I can see:

Created 0 files to copy 0 files 

which suggests that somehow the files are not being picked up… You can always try changing the protocol on processing to s3n://, although I’m not sure that will fix it.

Thanks @alex .

It doesn’t appear the files are being picked-up, there is a line that also says: Skipping key 'recovered/' because it ends with '/'.

I just tried running it again, using the s3n protocol and got the following output from the Elasticity S3DistCp Step: Raw S3 -> HDFS step, which failed.

Controller:

2017-08-22T21:53:21.259Z INFO Ensure step 2 jar file /usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar
2017-08-22T21:53:21.259Z INFO StepRunner: Created Runner for step 2
INFO startExec 'hadoop jar /usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar --src s3n://q-snowplow-recovered/ --dest hdfs:///local/snowplow/raw-events/ --s3Endpoint s3.amazonaws.com --groupBy .*\.([0-9]+-[0-9]+-[0-9]+)-[0-9]+\..* --targetSize 128 --outputCodec lzo'
INFO Environment:
  TERM=linux
  CONSOLETYPE=serial
  SHLVL=5
  JAVA_HOME=/etc/alternatives/jre
  HADOOP_IDENT_STRING=hadoop
  LANGSH_SOURCED=1
  XFILESEARCHPATH=/usr/dt/app-defaults/%L/Dt
  HADOOP_ROOT_LOGGER=INFO,DRFA
  AWS_CLOUDWATCH_HOME=/opt/aws/apitools/mon
  UPSTART_JOB=rc
  MAIL=/var/spool/mail/hadoop
  EC2_AMITOOL_HOME=/opt/aws/amitools/ec2
  PWD=/
  HOSTNAME=ip-172-31-37-100
  LESS_TERMCAP_se=[0m
  LOGNAME=hadoop
  UPSTART_INSTANCE=
  AWS_PATH=/opt/aws
  LESS_TERMCAP_mb=[01;31m
  _=/etc/alternatives/jre/bin/java
  LESS_TERMCAP_me=[0m
  NLSPATH=/usr/dt/lib/nls/msg/%L/%N.cat
  LESS_TERMCAP_md=[01;38;5;208m
  runlevel=3
  AWS_AUTO_SCALING_HOME=/opt/aws/apitools/as
  UPSTART_EVENTS=runlevel
  HISTSIZE=1000
  previous=N
  HADOOP_LOGFILE=syslog
  PATH=/sbin:/usr/sbin:/bin:/usr/bin:/usr/local/sbin:/opt/aws/bin
  EC2_HOME=/opt/aws/apitools/ec2
  HADOOP_LOG_DIR=/mnt/var/log/hadoop/steps/s-28L51TC5DDLXF
  LESS_TERMCAP_ue=[0m
  AWS_ELB_HOME=/opt/aws/apitools/elb
  RUNLEVEL=3
  USER=hadoop
  HADOOP_CLIENT_OPTS=-Djava.io.tmpdir=/mnt/var/lib/hadoop/steps/s-28L51TC5DDLXF/tmp
  PREVLEVEL=N
  HOME=/home/hadoop
  HISTCONTROL=ignoredups
  LESSOPEN=||/usr/bin/lesspipe.sh %s
  AWS_DEFAULT_REGION=us-east-1
  LANG=en_US.UTF-8
  LESS_TERMCAP_us=[04;38;5;111m
INFO redirectOutput to /mnt/var/log/hadoop/steps/s-28L51TC5DDLXF/stdout
INFO redirectError to /mnt/var/log/hadoop/steps/s-28L51TC5DDLXF/stderr
INFO Working dir /mnt/var/lib/hadoop/steps/s-28L51TC5DDLXF
INFO ProcessRunner started child process 6026 :
hadoop    6026  2187  0 21:53 ?        00:00:00 bash /usr/lib/hadoop/bin/hadoop jar /usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar --src s3n://q-snowplow-recovered/ --dest hdfs:///local/snowplow/raw-events/ --s3Endpoint s3.amazonaws.com --groupBy .*\.([0-9]+-[0-9]+-[0-9]+)-[0-9]+\..* --targetSize 128 --outputCodec lzo
2017-08-22T21:53:25.307Z INFO HadoopJarStepRunner.Runner: startRun() called for s-28L51TC5DDLXF Child Pid: 6026
INFO Synchronously wait child process to complete : hadoop jar /usr/share/aws/emr/s3-dist-cp/lib/s3...
INFO waitProcessCompletion ended with exit code 1 : hadoop jar /usr/share/aws/emr/s3-dist-cp/lib/s3...
INFO total process run time: 22 seconds
2017-08-22T21:53:45.565Z INFO Step created jobs: 
2017-08-22T21:53:45.566Z WARN Step failed with exitCode 1 and took 22 seconds

Syslog:

2017-08-22 21:53:24,439 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Running with args: --src s3n://q-snowplow-recovered/ --dest hdfs:///local/snowplow/raw-events/ --s3Endpoint s3.amazonaws.com --groupBy .*\.([0-9]+-[0-9]+-[0-9]+)-[0-9]+\..* --targetSize 128 --outputCodec lzo 
2017-08-22 21:53:25,108 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): S3DistCp args: --src s3n://q-snowplow-recovered/ --dest hdfs:///local/snowplow/raw-events/ --s3Endpoint s3.amazonaws.com --groupBy .*\.([0-9]+-[0-9]+-[0-9]+)-[0-9]+\..* --targetSize 128 --outputCodec lzo 
2017-08-22 21:53:34,892 INFO com.amazon.ws.emr.hadoop.fs.EmrFileSystem (main): Consistency disabled, using com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem as filesystem implementation
2017-08-22 21:53:35,779 INFO amazon.emr.metrics.MetricsSaver (main): MetricsConfigRecord disabledInCluster: false instanceEngineCycleSec: 60 clusterEngineCycleSec: 60 disableClusterEngine: false maxMemoryMb: 3072 maxInstanceCount: 500 lastModified: 1503438653958 
2017-08-22 21:53:35,780 INFO amazon.emr.metrics.MetricsSaver (main): Created MetricsSaver j-3O3LQF8HBUIFM:i-05bb881797057d806:RunJar:06026 period:60 /mnt/var/em/raw/i-05bb881797057d806_20170822_RunJar_06026_raw.bin
2017-08-22 21:53:38,461 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Using output path 'hdfs:/tmp/23153453-e533-4c18-88f9-0cdc93f0368a/output'
2017-08-22 21:53:39,447 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): DefaultAWSCredentialsProviderChain is used to create AmazonS3Client. KeyId: ASIAJXNEEVUVDF2DZSPQ
2017-08-22 21:53:39,447 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): AmazonS3Client setEndpoint s3.amazonaws.com
2017-08-22 21:53:39,857 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Created 0 files to copy 0 files 
2017-08-22 21:53:39,902 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Reducer number: 10
2017-08-22 21:53:40,405 INFO org.apache.hadoop.yarn.client.RMProxy (main): Connecting to ResourceManager at ip-172-31-37-100.ec2.internal/172.31.37.100:8032
2017-08-22 21:53:43,287 INFO org.apache.hadoop.mapreduce.JobSubmitter (main): Cleaning up the staging area /tmp/hadoop-yarn/staging/hadoop/.staging/job_1503438633886_0001
2017-08-22 21:53:43,303 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Try to recursively delete hdfs:/tmp/23153453-e533-4c18-88f9-0cdc93f0368a/tempspace

Stderr:

Exception in thread "main" java.lang.RuntimeException: Error running job
	at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:927)
	at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:720)
	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
	at com.amazon.elasticmapreduce.s3distcp.Main.main(Main.java:22)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
	at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
Caused by: org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://ip-172-31-37-100.ec2.internal:8020/tmp/23153453-e533-4c18-88f9-0cdc93f0368a/files
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:317)
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:265)
	at org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:59)
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:352)
	at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:301)
	at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:318)
	at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:196)
	at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1290)
	at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1287)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:415)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
	at org.apache.hadoop.mapreduce.Job.submit(Job.java:1287)
	at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:901)
	... 10 more

The likely cause of the above is that either there are no files in the source bucket or none correspond to the specified --groupBy.

Thanks @BenFradet. So looking at the step, the groupBy argument defaults to: --groupBy .*\.([0-9]+-[0-9]+-[0-9]+)-[0-9]+\..*.

I’m wondering (1) if there is an example of a file that meets this requirement, or (2) whether it’s possible to pass an argument to the EmrEtlRunner that would customize the groupBy argument?

OK - So I think I’ve solved it by renaming the files created through Hadoop Event Recovery to match that regex.

The second part of the question still stands: can we either customize the groupBy argument, passing a param to the EmrEtlRunner, or update Hadoop Event Recovery to generate files with a naming convention that matches this regex?

Or am I missing something?

Unfortunately no, the only way to have an effect on this --groupBy argument is through the collectors:format: configuration: it’s only used for the cloudfront format.