Emr-etl-runner works with LZO but not with GZ

Hi everyone,

We here at GetNinjas have successfully implemented the real-time pipeline, and now we want to attach it with the batch pipeline we previously had. What we are tring to do is this:

(Scalar-Stream-Colector) -> (Kinesis-good) -> (Kinesis-LZO-S3-Sink) -> (Emr-Etl-Runner) -> (Redshift)

everything was going okay until we reach (Kinesis-LZO-S3-Sink) -> (Emr-Etl-Runner). As it wasn’t working we simplified: --skip rdb_load,analyze

but still geting an erro:

Snowplow::EmrEtlRunner::EmrExecutionError (EMR jobflow j-2J3ZVYQFL0YNV failed, check Amazon EMR console and Hadoop logs for details (help: https://github.com/snowplow/snowplow/wiki/Troubleshooting-jobs-on-Elastic-MapReduce). Data files not archived.
Snowplow ETL: TERMINATED_WITH_ERRORS [STEP_FAILURE] ~ 00:03:48 [2017-09-01 15:22:25 -0300 - 2017-09-01 15:26:14 -0300]
 - 1. Elasticity S3DistCp Step: Raw s3://bucket-in/ -> Raw Staging S3: COMPLETED ~ 00:00:42 [2017-09-01 15:22:27 -0300 - 2017-09-01 15:23:09 -0300]
 - 2. Elasticity S3DistCp Step: Raw S3 -> Raw HDFS: COMPLETED ~ 00:00:34 [2017-09-01 15:23:09 -0300 - 2017-09-01 15:23:43 -0300]
 - 3. Elasticity Spark Step: Enrich Raw Events: COMPLETED ~ 00:00:56 [2017-09-01 15:23:45 -0300 - 2017-09-01 15:24:41 -0300]
 - 4. Elasticity S3DistCp Step: Enriched HDFS -> S3: FAILED ~ 00:00:06 [2017-09-01 15:24:43 -0300 - 2017-09-01 15:24:49 -0300]
 - 5. Elasticity S3DistCp Step: Shredded S3 -> Shredded Archive S3: CANCELLED ~ elapsed time n/a [ - ]
 - 6. Elasticity S3DistCp Step: Enriched S3 -> Enriched Archive S3: CANCELLED ~ elapsed time n/a [ - ]
 - 7. Elasticity S3DistCp Step: Raw Staging S3 -> Raw Archive S3: CANCELLED ~ elapsed time n/a [ - ]
 - 8. Elasticity S3DistCp Step: Shredded HDFS _SUCCESS -> S3: CANCELLED ~ elapsed time n/a [ - ]
 - 9. Elasticity S3DistCp Step: Shredded HDFS -> S3: CANCELLED ~ elapsed time n/a [ - ]
 - 10. Elasticity Spark Step: Shred Enriched Events: CANCELLED ~ elapsed time n/a [ - ]
 - 11. Elasticity Custom Jar Step: Empty Raw HDFS: CANCELLED ~ elapsed time n/a [ - ]
 - 12. Elasticity S3DistCp Step: Enriched HDFS _SUCCESS -> S3: CANCELLED ~ elapsed time n/a [ - ]):
    uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/emr_job.rb:586:in `run'
    uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_reference.rb:43:in `send_to'
    uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:76:in `call_with'
    uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138:in `block in redefine_method'
    uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/runner.rb:103:in `run'
    uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_reference.rb:43:in `send_to'
    uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:76:in `call_with'
    uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138:in `block in redefine_method'
    uri:classloader:/emr-etl-runner/bin/snowplow-emr-etl-runner:41:in `<main>'
    org/jruby/RubyKernel.java:979:in `load'
    uri:classloader:/META-INF/main.rb:1:in `<main>'
    org/jruby/RubyKernel.java:961:in `require'
    uri:classloader:/META-INF/main.rb:1:in `(root)'
    uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/rubygems/core_ext/kernel_require.rb:1:in `<main>'

stderr:

Exception in thread "main" java.lang.RuntimeException: Error running job
	at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:927)
	at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:705)
	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
	at com.amazon.elasticmapreduce.s3distcp.Main.main(Main.java:22)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
	at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
Caused by: org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://ip-10-0-49-130.ec2.internal:8020/tmp/eced2b80-e65a-45e1-bb4d-54f6f348641b/files
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:317)
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:265)
	at org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:59)
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:352)
	at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:301)
	at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:318)
	at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:196)
	at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1290)
	at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1287)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
	at org.apache.hadoop.mapreduce.Job.submit(Job.java:1287)
	at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:901)
	... 10 more

syslog:

2017-09-04 19:57:27,690 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Running with args: --src hdfs:///local/snowplow/enriched-events/ --dest s3://bucket-2/enriched/good/run=2017-09-04-16-49-49/ --srcPattern .*part-.* --s3Endpoint s3.amazonaws.com --outputCodec gz 
2017-09-04 19:57:27,972 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): S3DistCp args: --src hdfs:///local/snowplow/enriched-events/ --dest s3://bucket-2/enriched/good/run=2017-09-04-16-49-49/ --srcPattern .*part-.* --s3Endpoint s3.amazonaws.com --outputCodec gz 
2017-09-04 19:57:27,992 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Using output path 'hdfs:/tmp/eced2b80-e65a-45e1-bb4d-54f6f348641b/output'
2017-09-04 19:57:28,899 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Created 0 files to copy 0 files 
2017-09-04 19:57:31,982 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Reducer number: 23
2017-09-04 19:57:32,037 INFO org.apache.hadoop.yarn.client.RMProxy (main): Connecting to ResourceManager at ip-10-0-49-130.ec2.internal/10.0.49.130:8032
2017-09-04 19:57:32,471 INFO org.apache.hadoop.mapreduce.JobSubmitter (main): Cleaning up the staging area /tmp/hadoop-yarn/staging/hadoop/.staging/job_1504554745802_0004
2017-09-04 19:57:32,480 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Try to recursively delete hdfs:/tmp/eced2b80-e65a-45e1-bb4d-54f6f348641b/tempspace

controller:

2017-09-04T19:57:25.307Z INFO Ensure step 5 jar file /usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar
2017-09-04T19:57:25.307Z INFO StepRunner: Created Runner for step 5
INFO startExec 'hadoop jar /usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar --src hdfs:///local/snowplow/enriched-events/ --dest s3://bucket-2/enriched/good/run=2017-09-04-16-49-49/ --srcPattern .*part-.* --s3Endpoint s3.amazonaws.com --outputCodec gz'
INFO Environment:
  PATH=/sbin:/usr/sbin:/bin:/usr/bin:/usr/local/sbin:/opt/aws/bin
  LESS_TERMCAP_md=[01;38;5;208m
  LESS_TERMCAP_me=[0m
  HISTCONTROL=ignoredups
  LESS_TERMCAP_mb=[01;31m
  AWS_AUTO_SCALING_HOME=/opt/aws/apitools/as
  UPSTART_JOB=rc
  LESS_TERMCAP_se=[0m
  HISTSIZE=1000
  HADOOP_ROOT_LOGGER=INFO,DRFA
  JAVA_HOME=/etc/alternatives/jre
  AWS_DEFAULT_REGION=us-east-1
  AWS_ELB_HOME=/opt/aws/apitools/elb
  LESS_TERMCAP_us=[04;38;5;111m
  EC2_HOME=/opt/aws/apitools/ec2
  TERM=linux
  XFILESEARCHPATH=/usr/dt/app-defaults/%L/Dt
  runlevel=3
  LANG=en_US.UTF-8
  AWS_CLOUDWATCH_HOME=/opt/aws/apitools/mon
  MAIL=/var/spool/mail/hadoop
  LESS_TERMCAP_ue=[0m
  LOGNAME=hadoop
  PWD=/
  LANGSH_SOURCED=1
  HADOOP_CLIENT_OPTS=-Djava.io.tmpdir=/mnt/var/lib/hadoop/steps/s-UIIQKDOYMDW8/tmp
  _=/etc/alternatives/jre/bin/java
  CONSOLETYPE=serial
  RUNLEVEL=3
  LESSOPEN=||/usr/bin/lesspipe.sh %s
  previous=N
  UPSTART_EVENTS=runlevel
  AWS_PATH=/opt/aws
  USER=hadoop
  UPSTART_INSTANCE=
  PREVLEVEL=N
  HADOOP_LOGFILE=syslog
  PYTHON_INSTALL_LAYOUT=amzn
  HOSTNAME=ip-10-0-49-130
  NLSPATH=/usr/dt/lib/nls/msg/%L/%N.cat
  HADOOP_LOG_DIR=/mnt/var/log/hadoop/steps/s-UIIQKDOYMDW8
  EC2_AMITOOL_HOME=/opt/aws/amitools/ec2
  SHLVL=5
  HOME=/home/hadoop
  HADOOP_IDENT_STRING=hadoop
INFO redirectOutput to /mnt/var/log/hadoop/steps/s-UIIQKDOYMDW8/stdout
INFO redirectError to /mnt/var/log/hadoop/steps/s-UIIQKDOYMDW8/stderr
INFO Working dir /mnt/var/lib/hadoop/steps/s-UIIQKDOYMDW8
INFO ProcessRunner started child process 9509 :
hadoop    9509  4212  0 19:57 ?        00:00:00 bash /usr/lib/hadoop/bin/hadoop jar /usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar --src hdfs:///local/snowplow/enriched-events/ --dest s3://bucket-2/enriched/good/run=2017-09-04-16-49-49/ --srcPattern .*part-.* --s3Endpoint s3.amazonaws.com --outputCodec gz
2017-09-04T19:57:29.314Z INFO HadoopJarStepRunner.Runner: startRun() called for s-UIIQKDOYMDW8 Child Pid: 9509
INFO Synchronously wait child process to complete : hadoop jar /usr/share/aws/emr/s3-dist-cp/lib/s3...
INFO waitProcessCompletion ended with exit code 1 : hadoop jar /usr/share/aws/emr/s3-dist-cp/lib/s3...
INFO total process run time: 6 seconds
2017-09-04T19:57:33.388Z INFO Step created jobs: 
2017-09-04T19:57:33.388Z WARN Step failed with exitCode 1 and took 6 seconds

But when I change the compression of the kinesis-s3-sink files:

$ aws s3 mv s3://bucket-1 ./
$ gzip -d *
$ lzo *
$ aws s3 mv ./* s3://bucket-1 

and run again the emr-etl-runner, it works!

What the issue with gzip ? Or Am I missing something?

Thanks in advance,
Caio Salgado

You need to use the LZO output defined in your S3 sink config. The GZIP option is used for the enriched stream which is not fully supported yet.

2 Likes

@marien is correct:

  • Scala Stream Collector needs to be configured to generate lzo
  • EmrEtlRunner needs to be configured to process thrift records

The actual file format written when the SSC is configured to lzo is pretty complex - it involves LZO, Protocol Buffers (courtesy of ElephantBird) and Thrift. I am very surprised that a simple re-compression of the files from GZIP -> LZO was enough to make processing work…

1 Like

Hey @alex!

Do you have any ideas on how to split/join LZO files to fully utilize Spark paralellism, per this thread? This seems non-trivial to do with a bash script given the complexity of generating the file format.

Thanks!
Bernardo

1 Like

In my case (and probably in none) it won’t work, Just allows to pass EMR without errors. Nothing more.