Emr-etl-runner works with LZO but not with GZ


#1

Hi everyone,

We here at GetNinjas have successfully implemented the real-time pipeline, and now we want to attach it with the batch pipeline we previously had. What we are tring to do is this:

(Scalar-Stream-Colector) -> (Kinesis-good) -> (Kinesis-LZO-S3-Sink) -> (Emr-Etl-Runner) -> (Redshift)

everything was going okay until we reach (Kinesis-LZO-S3-Sink) -> (Emr-Etl-Runner). As it wasn’t working we simplified: --skip rdb_load,analyze

but still geting an erro:

Snowplow::EmrEtlRunner::EmrExecutionError (EMR jobflow j-2J3ZVYQFL0YNV failed, check Amazon EMR console and Hadoop logs for details (help: https://github.com/snowplow/snowplow/wiki/Troubleshooting-jobs-on-Elastic-MapReduce). Data files not archived.
Snowplow ETL: TERMINATED_WITH_ERRORS [STEP_FAILURE] ~ 00:03:48 [2017-09-01 15:22:25 -0300 - 2017-09-01 15:26:14 -0300]
 - 1. Elasticity S3DistCp Step: Raw s3://bucket-in/ -> Raw Staging S3: COMPLETED ~ 00:00:42 [2017-09-01 15:22:27 -0300 - 2017-09-01 15:23:09 -0300]
 - 2. Elasticity S3DistCp Step: Raw S3 -> Raw HDFS: COMPLETED ~ 00:00:34 [2017-09-01 15:23:09 -0300 - 2017-09-01 15:23:43 -0300]
 - 3. Elasticity Spark Step: Enrich Raw Events: COMPLETED ~ 00:00:56 [2017-09-01 15:23:45 -0300 - 2017-09-01 15:24:41 -0300]
 - 4. Elasticity S3DistCp Step: Enriched HDFS -> S3: FAILED ~ 00:00:06 [2017-09-01 15:24:43 -0300 - 2017-09-01 15:24:49 -0300]
 - 5. Elasticity S3DistCp Step: Shredded S3 -> Shredded Archive S3: CANCELLED ~ elapsed time n/a [ - ]
 - 6. Elasticity S3DistCp Step: Enriched S3 -> Enriched Archive S3: CANCELLED ~ elapsed time n/a [ - ]
 - 7. Elasticity S3DistCp Step: Raw Staging S3 -> Raw Archive S3: CANCELLED ~ elapsed time n/a [ - ]
 - 8. Elasticity S3DistCp Step: Shredded HDFS _SUCCESS -> S3: CANCELLED ~ elapsed time n/a [ - ]
 - 9. Elasticity S3DistCp Step: Shredded HDFS -> S3: CANCELLED ~ elapsed time n/a [ - ]
 - 10. Elasticity Spark Step: Shred Enriched Events: CANCELLED ~ elapsed time n/a [ - ]
 - 11. Elasticity Custom Jar Step: Empty Raw HDFS: CANCELLED ~ elapsed time n/a [ - ]
 - 12. Elasticity S3DistCp Step: Enriched HDFS _SUCCESS -> S3: CANCELLED ~ elapsed time n/a [ - ]):
    uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/emr_job.rb:586:in `run'
    uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_reference.rb:43:in `send_to'
    uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:76:in `call_with'
    uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138:in `block in redefine_method'
    uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/runner.rb:103:in `run'
    uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_reference.rb:43:in `send_to'
    uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:76:in `call_with'
    uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138:in `block in redefine_method'
    uri:classloader:/emr-etl-runner/bin/snowplow-emr-etl-runner:41:in `<main>'
    org/jruby/RubyKernel.java:979:in `load'
    uri:classloader:/META-INF/main.rb:1:in `<main>'
    org/jruby/RubyKernel.java:961:in `require'
    uri:classloader:/META-INF/main.rb:1:in `(root)'
    uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/rubygems/core_ext/kernel_require.rb:1:in `<main>'

stderr:

Exception in thread "main" java.lang.RuntimeException: Error running job
	at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:927)
	at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:705)
	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
	at com.amazon.elasticmapreduce.s3distcp.Main.main(Main.java:22)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
	at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
Caused by: org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://ip-10-0-49-130.ec2.internal:8020/tmp/eced2b80-e65a-45e1-bb4d-54f6f348641b/files
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:317)
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:265)
	at org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:59)
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:352)
	at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:301)
	at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:318)
	at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:196)
	at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1290)
	at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1287)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
	at org.apache.hadoop.mapreduce.Job.submit(Job.java:1287)
	at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:901)
	... 10 more

syslog:

2017-09-04 19:57:27,690 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Running with args: --src hdfs:///local/snowplow/enriched-events/ --dest s3://bucket-2/enriched/good/run=2017-09-04-16-49-49/ --srcPattern .*part-.* --s3Endpoint s3.amazonaws.com --outputCodec gz 
2017-09-04 19:57:27,972 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): S3DistCp args: --src hdfs:///local/snowplow/enriched-events/ --dest s3://bucket-2/enriched/good/run=2017-09-04-16-49-49/ --srcPattern .*part-.* --s3Endpoint s3.amazonaws.com --outputCodec gz 
2017-09-04 19:57:27,992 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Using output path 'hdfs:/tmp/eced2b80-e65a-45e1-bb4d-54f6f348641b/output'
2017-09-04 19:57:28,899 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Created 0 files to copy 0 files 
2017-09-04 19:57:31,982 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Reducer number: 23
2017-09-04 19:57:32,037 INFO org.apache.hadoop.yarn.client.RMProxy (main): Connecting to ResourceManager at ip-10-0-49-130.ec2.internal/10.0.49.130:8032
2017-09-04 19:57:32,471 INFO org.apache.hadoop.mapreduce.JobSubmitter (main): Cleaning up the staging area /tmp/hadoop-yarn/staging/hadoop/.staging/job_1504554745802_0004
2017-09-04 19:57:32,480 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Try to recursively delete hdfs:/tmp/eced2b80-e65a-45e1-bb4d-54f6f348641b/tempspace

controller:

2017-09-04T19:57:25.307Z INFO Ensure step 5 jar file /usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar
2017-09-04T19:57:25.307Z INFO StepRunner: Created Runner for step 5
INFO startExec 'hadoop jar /usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar --src hdfs:///local/snowplow/enriched-events/ --dest s3://bucket-2/enriched/good/run=2017-09-04-16-49-49/ --srcPattern .*part-.* --s3Endpoint s3.amazonaws.com --outputCodec gz'
INFO Environment:
  PATH=/sbin:/usr/sbin:/bin:/usr/bin:/usr/local/sbin:/opt/aws/bin
  LESS_TERMCAP_md=[01;38;5;208m
  LESS_TERMCAP_me=[0m
  HISTCONTROL=ignoredups
  LESS_TERMCAP_mb=[01;31m
  AWS_AUTO_SCALING_HOME=/opt/aws/apitools/as
  UPSTART_JOB=rc
  LESS_TERMCAP_se=[0m
  HISTSIZE=1000
  HADOOP_ROOT_LOGGER=INFO,DRFA
  JAVA_HOME=/etc/alternatives/jre
  AWS_DEFAULT_REGION=us-east-1
  AWS_ELB_HOME=/opt/aws/apitools/elb
  LESS_TERMCAP_us=[04;38;5;111m
  EC2_HOME=/opt/aws/apitools/ec2
  TERM=linux
  XFILESEARCHPATH=/usr/dt/app-defaults/%L/Dt
  runlevel=3
  LANG=en_US.UTF-8
  AWS_CLOUDWATCH_HOME=/opt/aws/apitools/mon
  MAIL=/var/spool/mail/hadoop
  LESS_TERMCAP_ue=[0m
  LOGNAME=hadoop
  PWD=/
  LANGSH_SOURCED=1
  HADOOP_CLIENT_OPTS=-Djava.io.tmpdir=/mnt/var/lib/hadoop/steps/s-UIIQKDOYMDW8/tmp
  _=/etc/alternatives/jre/bin/java
  CONSOLETYPE=serial
  RUNLEVEL=3
  LESSOPEN=||/usr/bin/lesspipe.sh %s
  previous=N
  UPSTART_EVENTS=runlevel
  AWS_PATH=/opt/aws
  USER=hadoop
  UPSTART_INSTANCE=
  PREVLEVEL=N
  HADOOP_LOGFILE=syslog
  PYTHON_INSTALL_LAYOUT=amzn
  HOSTNAME=ip-10-0-49-130
  NLSPATH=/usr/dt/lib/nls/msg/%L/%N.cat
  HADOOP_LOG_DIR=/mnt/var/log/hadoop/steps/s-UIIQKDOYMDW8
  EC2_AMITOOL_HOME=/opt/aws/amitools/ec2
  SHLVL=5
  HOME=/home/hadoop
  HADOOP_IDENT_STRING=hadoop
INFO redirectOutput to /mnt/var/log/hadoop/steps/s-UIIQKDOYMDW8/stdout
INFO redirectError to /mnt/var/log/hadoop/steps/s-UIIQKDOYMDW8/stderr
INFO Working dir /mnt/var/lib/hadoop/steps/s-UIIQKDOYMDW8
INFO ProcessRunner started child process 9509 :
hadoop    9509  4212  0 19:57 ?        00:00:00 bash /usr/lib/hadoop/bin/hadoop jar /usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar --src hdfs:///local/snowplow/enriched-events/ --dest s3://bucket-2/enriched/good/run=2017-09-04-16-49-49/ --srcPattern .*part-.* --s3Endpoint s3.amazonaws.com --outputCodec gz
2017-09-04T19:57:29.314Z INFO HadoopJarStepRunner.Runner: startRun() called for s-UIIQKDOYMDW8 Child Pid: 9509
INFO Synchronously wait child process to complete : hadoop jar /usr/share/aws/emr/s3-dist-cp/lib/s3...
INFO waitProcessCompletion ended with exit code 1 : hadoop jar /usr/share/aws/emr/s3-dist-cp/lib/s3...
INFO total process run time: 6 seconds
2017-09-04T19:57:33.388Z INFO Step created jobs: 
2017-09-04T19:57:33.388Z WARN Step failed with exitCode 1 and took 6 seconds

But when I change the compression of the kinesis-s3-sink files:

$ aws s3 mv s3://bucket-1 ./
$ gzip -d *
$ lzo *
$ aws s3 mv ./* s3://bucket-1 

and run again the emr-etl-runner, it works!

What the issue with gzip ? Or Am I missing something?

Thanks in advance,
Caio Salgado


#2

You need to use the LZO output defined in your S3 sink config. The GZIP option is used for the enriched stream which is not fully supported yet.


#3

@marien is correct:

  • Scala Stream Collector needs to be configured to generate lzo
  • EmrEtlRunner needs to be configured to process thrift records

The actual file format written when the SSC is configured to lzo is pretty complex - it involves LZO, Protocol Buffers (courtesy of ElephantBird) and Thrift. I am very surprised that a simple re-compression of the files from GZIP -> LZO was enough to make processing work…


#4

Hey @alex!

Do you have any ideas on how to split/join LZO files to fully utilize Spark paralellism, per this thread? This seems non-trivial to do with a bash script given the complexity of generating the file format.

Thanks!
Bernardo