Reprocessing Bad Events, EmrEtlRunner Error


#1

I’m attempting to reprocess bad events following the steps outlined here and load them to Redshift.

I’ve successfully reprocessed the bad events, with the results sent to the bucket s3://q-snowplow-recovered/recovered within that bucket, there are 3 files: _SUCCESS, part-00000, and part-00001.

I’ve updated the config file for the EmrEtlRunner to the following:

raw:
  in:                  # Multiple in buckets are permitted
    - s3://q-snowplow-does-not-exist          # IGNORED e.g. s3://my-in-bucket
  processing: s3://q-snowplow-recovered/recovered

When I then attempt to run the EmrEtlRunner as follows (hiding the full paths for brevity):

snowplow-emr-etl-runner --config config-recovery.yml --resolver iglu_resolver.json --enrichments enrichments --skip staging

I get the following errors during the Elasticity S3DistCp Step: Raw S3 -> HDFS step of the EMR job:

2017-08-21 18:20:14,146 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Running with args: --src s3://q-snowplow-recovered/recovered/ --dest hdfs:///local/snowplow/raw-events/ --s3Endpoint s3.amazonaws.com --groupBy .*\.([0-9]+-[0-9]+-[0-9]+)-[0-9]+\..* --targetSize 128 --outputCodec lzo 
2017-08-21 18:20:16,156 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): S3DistCp args: --src s3://q-snowplow-recovered/recovered/ --dest hdfs:///local/snowplow/raw-events/ --s3Endpoint s3.amazonaws.com --groupBy .*\.([0-9]+-[0-9]+-[0-9]+)-[0-9]+\..* --targetSize 128 --outputCodec lzo 
2017-08-21 18:20:33,157 INFO com.amazon.ws.emr.hadoop.fs.EmrFileSystem (main): Consistency disabled, using com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem as filesystem implementation
2017-08-21 18:20:33,961 INFO amazon.emr.metrics.MetricsSaver (main): MetricsConfigRecord disabledInCluster: false instanceEngineCycleSec: 60 clusterEngineCycleSec: 60 disableClusterEngine: false maxMemoryMb: 3072 maxInstanceCount: 500 lastModified: 1503339465506 
2017-08-21 18:20:33,962 INFO amazon.emr.metrics.MetricsSaver (main): Created MetricsSaver j-LIBQ62MBKWUX:i-032d6420d929c9538:RunJar:06282 period:60 /mnt/var/em/raw/i-032d6420d929c9538_20170821_RunJar_06282_raw.bin
2017-08-21 18:20:36,908 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Using output path 'hdfs:/tmp/711f7e02-88b7-4d88-8568-b06119b03d32/output'
2017-08-21 18:20:37,879 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): DefaultAWSCredentialsProviderChain is used to create AmazonS3Client. KeyId: ASIAIXI7OEIJZACG2PFQ
2017-08-21 18:20:37,879 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): AmazonS3Client setEndpoint s3.amazonaws.com
2017-08-21 18:20:38,166 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Skipping key 'recovered/' because it ends with '/'
2017-08-21 18:20:38,166 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Created 0 files to copy 0 files 
2017-08-21 18:20:38,263 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Reducer number: 10
2017-08-21 18:20:38,964 INFO org.apache.hadoop.yarn.client.RMProxy (main): Connecting to ResourceManager at ip-172-31-40-121.ec2.internal/172.31.40.121:8032
2017-08-21 18:20:42,294 INFO org.apache.hadoop.mapreduce.JobSubmitter (main): Cleaning up the staging area /tmp/hadoop-yarn/staging/hadoop/.staging/job_1503339444086_0001
2017-08-21 18:20:42,321 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Try to recursively delete hdfs:/tmp/711f7e02-88b7-4d88-8568-b06119b03d32/tempspace

I’ve tried running with several variants in the config raw.processing to no avail:

s3://q-snowplow-recovered/recovered
s3://q-snowplow-recovered/recovered/
s3://q-snowplow-recovered
s3://q-snowplow-recovered/

Thanks in advance for your help!


#2

@alex @BenFradet I see you’re the last two folks to update the Hadoop Event Recovery docs. Any chance you have insight into what I’m doing wrong?

Again, thanks in advance.


#3

Hey @tfinkel - the log lines you quoted are all INFO level. Do you have any errors you can share?

Alternatively, I can see:

Created 0 files to copy 0 files 

which suggests that somehow the files are not being picked up… You can always try changing the protocol on processing to s3n://, although I’m not sure that will fix it.


#4

Thanks @alex .

It doesn’t appear the files are being picked-up, there is a line that also says: Skipping key 'recovered/' because it ends with '/'.

I just tried running it again, using the s3n protocol and got the following output from the Elasticity S3DistCp Step: Raw S3 -> HDFS step, which failed.

Controller:

2017-08-22T21:53:21.259Z INFO Ensure step 2 jar file /usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar
2017-08-22T21:53:21.259Z INFO StepRunner: Created Runner for step 2
INFO startExec 'hadoop jar /usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar --src s3n://q-snowplow-recovered/ --dest hdfs:///local/snowplow/raw-events/ --s3Endpoint s3.amazonaws.com --groupBy .*\.([0-9]+-[0-9]+-[0-9]+)-[0-9]+\..* --targetSize 128 --outputCodec lzo'
INFO Environment:
  TERM=linux
  CONSOLETYPE=serial
  SHLVL=5
  JAVA_HOME=/etc/alternatives/jre
  HADOOP_IDENT_STRING=hadoop
  LANGSH_SOURCED=1
  XFILESEARCHPATH=/usr/dt/app-defaults/%L/Dt
  HADOOP_ROOT_LOGGER=INFO,DRFA
  AWS_CLOUDWATCH_HOME=/opt/aws/apitools/mon
  UPSTART_JOB=rc
  MAIL=/var/spool/mail/hadoop
  EC2_AMITOOL_HOME=/opt/aws/amitools/ec2
  PWD=/
  HOSTNAME=ip-172-31-37-100
  LESS_TERMCAP_se=[0m
  LOGNAME=hadoop
  UPSTART_INSTANCE=
  AWS_PATH=/opt/aws
  LESS_TERMCAP_mb=[01;31m
  _=/etc/alternatives/jre/bin/java
  LESS_TERMCAP_me=[0m
  NLSPATH=/usr/dt/lib/nls/msg/%L/%N.cat
  LESS_TERMCAP_md=[01;38;5;208m
  runlevel=3
  AWS_AUTO_SCALING_HOME=/opt/aws/apitools/as
  UPSTART_EVENTS=runlevel
  HISTSIZE=1000
  previous=N
  HADOOP_LOGFILE=syslog
  PATH=/sbin:/usr/sbin:/bin:/usr/bin:/usr/local/sbin:/opt/aws/bin
  EC2_HOME=/opt/aws/apitools/ec2
  HADOOP_LOG_DIR=/mnt/var/log/hadoop/steps/s-28L51TC5DDLXF
  LESS_TERMCAP_ue=[0m
  AWS_ELB_HOME=/opt/aws/apitools/elb
  RUNLEVEL=3
  USER=hadoop
  HADOOP_CLIENT_OPTS=-Djava.io.tmpdir=/mnt/var/lib/hadoop/steps/s-28L51TC5DDLXF/tmp
  PREVLEVEL=N
  HOME=/home/hadoop
  HISTCONTROL=ignoredups
  LESSOPEN=||/usr/bin/lesspipe.sh %s
  AWS_DEFAULT_REGION=us-east-1
  LANG=en_US.UTF-8
  LESS_TERMCAP_us=[04;38;5;111m
INFO redirectOutput to /mnt/var/log/hadoop/steps/s-28L51TC5DDLXF/stdout
INFO redirectError to /mnt/var/log/hadoop/steps/s-28L51TC5DDLXF/stderr
INFO Working dir /mnt/var/lib/hadoop/steps/s-28L51TC5DDLXF
INFO ProcessRunner started child process 6026 :
hadoop    6026  2187  0 21:53 ?        00:00:00 bash /usr/lib/hadoop/bin/hadoop jar /usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar --src s3n://q-snowplow-recovered/ --dest hdfs:///local/snowplow/raw-events/ --s3Endpoint s3.amazonaws.com --groupBy .*\.([0-9]+-[0-9]+-[0-9]+)-[0-9]+\..* --targetSize 128 --outputCodec lzo
2017-08-22T21:53:25.307Z INFO HadoopJarStepRunner.Runner: startRun() called for s-28L51TC5DDLXF Child Pid: 6026
INFO Synchronously wait child process to complete : hadoop jar /usr/share/aws/emr/s3-dist-cp/lib/s3...
INFO waitProcessCompletion ended with exit code 1 : hadoop jar /usr/share/aws/emr/s3-dist-cp/lib/s3...
INFO total process run time: 22 seconds
2017-08-22T21:53:45.565Z INFO Step created jobs: 
2017-08-22T21:53:45.566Z WARN Step failed with exitCode 1 and took 22 seconds

Syslog:

2017-08-22 21:53:24,439 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Running with args: --src s3n://q-snowplow-recovered/ --dest hdfs:///local/snowplow/raw-events/ --s3Endpoint s3.amazonaws.com --groupBy .*\.([0-9]+-[0-9]+-[0-9]+)-[0-9]+\..* --targetSize 128 --outputCodec lzo 
2017-08-22 21:53:25,108 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): S3DistCp args: --src s3n://q-snowplow-recovered/ --dest hdfs:///local/snowplow/raw-events/ --s3Endpoint s3.amazonaws.com --groupBy .*\.([0-9]+-[0-9]+-[0-9]+)-[0-9]+\..* --targetSize 128 --outputCodec lzo 
2017-08-22 21:53:34,892 INFO com.amazon.ws.emr.hadoop.fs.EmrFileSystem (main): Consistency disabled, using com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem as filesystem implementation
2017-08-22 21:53:35,779 INFO amazon.emr.metrics.MetricsSaver (main): MetricsConfigRecord disabledInCluster: false instanceEngineCycleSec: 60 clusterEngineCycleSec: 60 disableClusterEngine: false maxMemoryMb: 3072 maxInstanceCount: 500 lastModified: 1503438653958 
2017-08-22 21:53:35,780 INFO amazon.emr.metrics.MetricsSaver (main): Created MetricsSaver j-3O3LQF8HBUIFM:i-05bb881797057d806:RunJar:06026 period:60 /mnt/var/em/raw/i-05bb881797057d806_20170822_RunJar_06026_raw.bin
2017-08-22 21:53:38,461 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Using output path 'hdfs:/tmp/23153453-e533-4c18-88f9-0cdc93f0368a/output'
2017-08-22 21:53:39,447 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): DefaultAWSCredentialsProviderChain is used to create AmazonS3Client. KeyId: ASIAJXNEEVUVDF2DZSPQ
2017-08-22 21:53:39,447 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): AmazonS3Client setEndpoint s3.amazonaws.com
2017-08-22 21:53:39,857 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Created 0 files to copy 0 files 
2017-08-22 21:53:39,902 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Reducer number: 10
2017-08-22 21:53:40,405 INFO org.apache.hadoop.yarn.client.RMProxy (main): Connecting to ResourceManager at ip-172-31-37-100.ec2.internal/172.31.37.100:8032
2017-08-22 21:53:43,287 INFO org.apache.hadoop.mapreduce.JobSubmitter (main): Cleaning up the staging area /tmp/hadoop-yarn/staging/hadoop/.staging/job_1503438633886_0001
2017-08-22 21:53:43,303 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Try to recursively delete hdfs:/tmp/23153453-e533-4c18-88f9-0cdc93f0368a/tempspace

Stderr:

Exception in thread "main" java.lang.RuntimeException: Error running job
	at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:927)
	at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:720)
	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
	at com.amazon.elasticmapreduce.s3distcp.Main.main(Main.java:22)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
	at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
Caused by: org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://ip-172-31-37-100.ec2.internal:8020/tmp/23153453-e533-4c18-88f9-0cdc93f0368a/files
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:317)
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:265)
	at org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:59)
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:352)
	at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:301)
	at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:318)
	at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:196)
	at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1290)
	at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1287)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:415)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
	at org.apache.hadoop.mapreduce.Job.submit(Job.java:1287)
	at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:901)
	... 10 more

#5

The likely cause of the above is that either there are no files in the source bucket or none correspond to the specified --groupBy.


#6

Thanks @BenFradet. So looking at the step, the groupBy argument defaults to: --groupBy .*\.([0-9]+-[0-9]+-[0-9]+)-[0-9]+\..*.

I’m wondering (1) if there is an example of a file that meets this requirement, or (2) whether it’s possible to pass an argument to the EmrEtlRunner that would customize the groupBy argument?


#7

OK - So I think I’ve solved it by renaming the files created through Hadoop Event Recovery to match that regex.

The second part of the question still stands: can we either customize the groupBy argument, passing a param to the EmrEtlRunner, or update Hadoop Event Recovery to generate files with a naming convention that matches this regex?

Or am I missing something?


#8

Unfortunately no, the only way to have an effect on this --groupBy argument is through the collectors:format: configuration: it’s only used for the cloudfront format.