EMR ETL stream_enrich mode

Hi Team,

I am confused around how to setup the stream enrich process in the emr etl runner. My current setup looks as follows:

  • Scala Stream Collector setup to receives events and write to kinesis stream “snowplow-events-raw”.
  • Stream Enrich to read from above kinesis stream and write good events to “snowplow-events-good” and bad events to “snowplow-events-bad” kinesis streams.
  • S3 Loader to read from “snowplow-events-good” and write the records to s3 bucket “collector-logs”.
  • EMR ETL runner to read the records from “collector-logs” bucket and write to Redshift table(atomic.events).

After setting this up, the final stage of the pipeline which should write records to Redshift is not working. Is there something I’m missing in the setup?

Thanks in advance!

@shubhamg2208, how did you implement the last step “EMR ETL runner to read the records from “collector-logs” bucket and write to Redshift table(atomic.events)”? How did you configure the targets to let pipeline know the data needs to be loaded to Redshift? What are the EMR logs (steps) when you run the pipeline?

Your pipeline configuration file has to include enriched/stream bucket pointing to collector-logs.

Hi @ihor,

My configuration for the EMR ETL Runner looks as follows:

  # Credentials can be hardcoded or set in environment variables
  access_key_id: <%= ENV['AWS_SNOWPLOW_ACCESS_KEY'] %>
  secret_access_key: <%= ENV['AWS_SNOWPLOW_SECRET_KEY'] %>
    region: ap-south-1
      assets: s3://snowplow-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
      jsonpath_assets: s3://scrpbx-snwplw-hosted-assets/jsonpaths
      # https://stackoverflow.com/questions/10569455/differences-between-amazon-s3-and-s3n-in-hadoop
      log: s3n://logs/
        good: s3://output/enriched/good       # e.g. s3://my-out-bucket/enriched/good
        bad: s3://output/enriched/bad        # e.g. s3://my-out-bucket/enriched/bad
        stream: s3://collector-logs
        good: s3://output/shredded/good       # e.g. s3://my-out-bucket/shredded/good
        bad: s3://output/shredded/bad        # e.g. s3://my-out-bucket/shredded/bad
        errors: s3://output/shredded/error    # Leave blank unless :continue_on_unexpected_error: set to true below
        archive: s3://output/shredded/archive    # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded
    ami_version: 5.9.0
    region: ap-south-1        # Always set this
    jobflow_role: EMR_EC2_DefaultRole # Created using $ aws emr create-default-roles
    service_role: EMR_DefaultRole     # Created using $ aws emr create-default-roles
    autoscaling_role: EMR_AutoScaling_DefaultRole
    placement:     # Set this if not running in VPC. Leave blank otherwise
    ec2_subnet_id: subnet-c9ed65a1 # Set this if running in VPC. Leave blank otherwise
    ec2_key_name: test@test.com
    bootstrap: []           # Set this to specify custom boostrap actions. Leave empty otherwise
      hbase:    # Optional. To launch on cluster, provide version, "0.92.0", keep quotes. Leave empty otherwise.
      lingual:  # Optional. To launch on cluster, provide version, "1.1", keep quotes. Leave empty otherwise.
    # Adjust your Hadoop cluster below
      job_name: Snowplow Stream ETL # Give your job a name
      master_instance_type: c4.xlarge
      core_instance_count: 2
      core_instance_type: c4.xlarge
      core_instance_ebs:    # Optional. Attach an EBS volume to each core instance.
        volume_size: 100    # Gigabytes
        volume_type: "gp2"
        volume_iops: 400    # Optional. Will only be used if volume_type is "io1"
        ebs_optimized: false # Optional. Will default to true
      task_instance_count: 0 # Increase to use spot instances
      task_instance_type: c4.xlarge
      task_instance_bid: 0.015 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
    bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
        yarn.resourcemanager.am.max-attempts: "1"
        maximizeResourceAllocation: "true"
    additional_info:        # Optional JSON string for selecting additional features
    spark_enrich: 1.13.0 # Version of the Spark Enrichment process
  continue_on_unexpected_error: false # Set to 'true' (and set :out_errors: above) if you don't want any exceptions thrown from ETL
  output_compression: NONE # Compression only supported with Redshift, set to NONE if you have Postgres targets. Allowed formats: NONE, GZIP
    rdb_loader: 0.14.0
    rdb_shredder: 0.13.0        # Version of the Spark Shredding process
    hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
  tags: {} # Name-value pairs describing this job
    level: DEBUG # You can optionally switch to INFO for production
    method: get
    app_id: snowplow # e.g. snowplow
    collector: collector.test.net # e.g. d3rkrsqld9gmqf.cloudfront.net

I’m currently running R104(Stoplesteinan) of the EMR ETL runner. It is executed with the command

AWS_SNOWPLOW_ACCESS_KEY=key AWS_SNOWPLOW_SECRET_KEY=secret snowplow-emr-etl-runner-r104 run -c emr-etl-runner-stream-enrich.config.yml --resolver iglu_resolver.json -t targets -n enrichments -f staging_stream_enrich

The targets folder contains the Redshift target with the required configuration. I’ve also added additional enrichments using the enrichment folder.

With the staging_stream_enrich mode, there is only one step displayed in EMR which is Elasticity S3DistCp Step: Shredded S3 -> S3 Shredded Archive. There are no error logs for this step, just the controller logs and syslog.

Controller logs:

2019-06-05T10:05:49.455Z INFO Ensure step 1 jar file /usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar
2019-06-05T10:05:49.456Z INFO StepRunner: Created Runner for step 1
INFO startExec 'hadoop jar /usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar --src s3://output/shredded/good/run=2018-09-12-20-00-21/ --dest s3://output/shredded/archive/run=2018-09-12-20-00-21/ --s3Endpoint s3-ap-south-1.amazonaws.com --deleteOnSuccess'
INFO Environment:
  LESSOPEN=||/usr/bin/lesspipe.sh %s
INFO redirectOutput to /mnt/var/log/hadoop/steps/s-3OM7XL0P3NKQ/stdout
INFO redirectError to /mnt/var/log/hadoop/steps/s-3OM7XL0P3NKQ/stderr
INFO Working dir /mnt/var/lib/hadoop/steps/s-3OM7XL0P3NKQ
INFO ProcessRunner started child process 8578 :
hadoop    8578  4184  0 10:05 ?        00:00:00 bash /usr/lib/hadoop/bin/hadoop jar /usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar --src s3://output/shredded/good/run=2018-09-12-20-00-21/ --dest s3://output/shredded/archive/run=2018-09-12-20-00-21/ --s3Endpoint s3-ap-south-1.amazonaws.com --deleteOnSuccess
2019-06-05T10:05:53.462Z INFO HadoopJarStepRunner.Runner: startRun() called for s-3OM7XL0P3NKQ Child Pid: 8578
INFO Synchronously wait child process to complete : hadoop jar /usr/share/aws/emr/s3-dist-cp/lib/s3...
INFO waitProcessCompletion ended with exit code 0 : hadoop jar /usr/share/aws/emr/s3-dist-cp/lib/s3...
INFO total process run time: 30 seconds
2019-06-05T10:06:21.722Z INFO Step created jobs: job_1559729030926_0001
2019-06-05T10:06:21.722Z INFO Step succeeded with exitCode 0 and took 30 seconds


2019-06-05 10:05:51,788 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Running with args: --src s3://output/shredded/good/run=2018-09-12-20-00-21/ --dest s3://output/shredded/archive/run=2018-09-12-20-00-21/ --s3Endpoint s3-ap-south-1.amazonaws.com --deleteOnSuccess 
2019-06-05 10:05:52,048 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): S3DistCp args: --src s3://output/shredded/good/run=2018-09-12-20-00-21/ --dest s3://output/shredded/archive/run=2018-09-12-20-00-21/ --s3Endpoint s3-ap-south-1.amazonaws.com --deleteOnSuccess 
2019-06-05 10:05:52,067 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Using output path 'hdfs:/tmp/a1ae3c19-b66f-4eac-9f0a-c2cd80144d47/output'
2019-06-05 10:05:54,021 WARN com.amazon.ws.emr.hadoop.fs.rolemapping.RoleMappings (main): Found no mappings configured with 'fs.s3.authorization.roleMapping', credentials resolution may not work as expected
2019-06-05 10:05:55,230 WARN com.amazonaws.profile.path.cred.CredentialsLegacyConfigLocationProvider (main): Found the legacy config profiles file at [/home/hadoop/.aws/config]. Please move it to the latest default location [~/.aws/credentials].
2019-06-05 10:05:55,288 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): DefaultAWSCredentialsProviderChain is used to create AmazonS3Client. KeyId: ASIA2LHO7ZSGIVSQCLND
2019-06-05 10:05:55,288 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): AmazonS3Client setEndpoint s3-ap-south-1.amazonaws.com
2019-06-05 10:05:55,422 INFO com.amazon.elasticmapreduce.s3distcp.FileInfoListing (main): Opening new file: hdfs:/tmp/a1ae3c19-b66f-4eac-9f0a-c2cd80144d47/files/1
2019-06-05 10:05:55,500 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Created 1 files to copy 9 files 
2019-06-05 10:05:56,241 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Reducer number: 3
2019-06-05 10:05:56,501 INFO org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl (main): Timeline service address: http://ip-172-31-50-196.ap-south-1.compute.internal:8188/ws/v1/timeline/
2019-06-05 10:05:56,508 INFO org.apache.hadoop.yarn.client.RMProxy (main): Connecting to ResourceManager at ip-172-31-50-196.ap-south-1.compute.internal/
2019-06-05 10:05:57,279 INFO org.apache.hadoop.mapreduce.lib.input.FileInputFormat (main): Total input paths to process : 1
2019-06-05 10:05:57,351 INFO org.apache.hadoop.mapreduce.JobSubmitter (main): number of splits:1
2019-06-05 10:05:57,931 INFO org.apache.hadoop.mapreduce.JobSubmitter (main): Submitting tokens for job: job_1559729030926_0001
2019-06-05 10:05:58,466 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl (main): Submitted application application_1559729030926_0001
2019-06-05 10:05:58,504 INFO org.apache.hadoop.mapreduce.Job (main): The url to track the job: http://ip-172-31-50-196.ap-south-1.compute.internal:20888/proxy/application_1559729030926_0001/
2019-06-05 10:05:58,505 INFO org.apache.hadoop.mapreduce.Job (main): Running job: job_1559729030926_0001
2019-06-05 10:06:04,583 INFO org.apache.hadoop.mapreduce.Job (main): Job job_1559729030926_0001 running in uber mode : false
2019-06-05 10:06:04,584 INFO org.apache.hadoop.mapreduce.Job (main):  map 0% reduce 0%
2019-06-05 10:06:10,625 INFO org.apache.hadoop.mapreduce.Job (main):  map 100% reduce 0%
2019-06-05 10:06:17,654 INFO org.apache.hadoop.mapreduce.Job (main):  map 100% reduce 33%
2019-06-05 10:06:18,658 INFO org.apache.hadoop.mapreduce.Job (main):  map 100% reduce 67%
2019-06-05 10:06:19,662 INFO org.apache.hadoop.mapreduce.Job (main):  map 100% reduce 100%
2019-06-05 10:06:20,673 INFO org.apache.hadoop.mapreduce.Job (main): Job job_1559729030926_0001 completed successfully
2019-06-05 10:06:20,748 INFO org.apache.hadoop.mapreduce.Job (main): Counters: 54
  File System Counters
    FILE: Number of bytes read=1133
    FILE: Number of bytes written=518685
    FILE: Number of read operations=0
    FILE: Number of large read operations=0
    FILE: Number of write operations=0
    HDFS: Number of bytes read=3577
    HDFS: Number of bytes written=0
    HDFS: Number of read operations=13
    HDFS: Number of large read operations=0
    HDFS: Number of write operations=6
    S3: Number of bytes read=0
    S3: Number of bytes written=0
    S3: Number of read operations=0
    S3: Number of large read operations=0
    S3: Number of write operations=0
  Job Counters 
    Launched map tasks=1
    Launched reduce tasks=3
    Data-local map tasks=1
    Total time spent by all maps in occupied slots (ms)=129976
    Total time spent by all reduces in occupied slots (ms)=1425952
    Total time spent by all map tasks (ms)=2954
    Total time spent by all reduce tasks (ms)=16204
    Total vcore-milliseconds taken by all map tasks=2954
    Total vcore-milliseconds taken by all reduce tasks=16204
    Total megabyte-milliseconds taken by all map tasks=4159232
    Total megabyte-milliseconds taken by all reduce tasks=45630464
  Map-Reduce Framework
    Map input records=9
    Map output records=9
    Map output bytes=4421
    Map output materialized bytes=1121
    Input split bytes=170
    Combine input records=0
    Combine output records=0
    Reduce input groups=9
    Reduce shuffle bytes=1121
    Reduce input records=9
    Reduce output records=0
    Spilled Records=18
    Shuffled Maps =3
    Failed Shuffles=0
    Merged Map outputs=3
    GC time elapsed (ms)=625
    CPU time spent (ms)=19230
    Physical memory (bytes) snapshot=1539936256
    Virtual memory (bytes) snapshot=16474624000
    Total committed heap usage (bytes)=1281359872
  Shuffle Errors
  File Input Format Counters 
    Bytes Read=3407
  File Output Format Counters 
    Bytes Written=0
2019-06-05 10:06:20,750 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Try to recursively delete hdfs:/tmp/a1ae3c19-b66f-4eac-9f0a-c2cd80144d47/tempspace

Also as a digression, would it be possible to the records from the raw kinesis stream to collector-logs s3 bucket and use EMR ETL directly? That way I can remove the Stream Enrich part in my pipeline.

@shubhamg2208, you should not be using flag -f unless you are trying to recover from the failure as per the guide here: https://github.com/snowplow/snowplow/wiki/Batch-pipeline-steps#recovery-steps-for-stream-enrich-mode.

By skipping staging_stream_enrich step you essentially skip staging files to enriched:good bucket. As that bucket is empty, there are no good files to shred and hence no data to load to Redshift.

You are also missing enriched:archive bucket in your configuration.

@ihor Yes I realized the configuration options I was using were wrong. I added the enriched:archive bucket in the configuration and changed my command to:

AWS_SNOWPLOW_ACCESS_KEY=key AWS_SNOWPLOW_SECRET_KEY=secret snowplow-emr-etl-runner-r104 run -c emr-etl-runner-stream-enrich.config.yml --resolver iglu_resolver.json -t targets -n enrichments

It seems to be working now! All the data is now available in Redshift. Thanks for your help! :smiley: