No enrichment output

Hi there,

Just started using Snowplow. Have got the Scala collector -> Kinesis -> S3 pipe working and sending events using our Javascript tracker; we are seeing .gz files in the S3 bucket as expected.

Am now trying to get the enrichment part working. I am running the EmrETLRunner:

./snowplow-emr-etl-runner --config /etc/snowplow/emretlrunner.conf --resolver /etc/snowplow/resolver.conf

where the config file has the following contents:

aws:
  # Credentials can be hardcoded or set in environment variables
  access_key_id: <%= ENV['AWS_ACCESS_KEY_ID'] %>
  secret_access_key: <%= ENV['AWS_SECRET_ACCESS_KEY'] %>
  s3:
    region: us-west-2
    buckets:
      assets: s3n://snowplow-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
      jsonpath_assets: # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
      log: s3n://snowplow-kinesis-logs
      raw:
        in:
          - s3n://snowplow-kinesis                # Multiple in buckets are permitted
        processing: s3n://snowplow-kinesis-processing/processed
        archive: s3n://snowplow-kinesis-archive/raw    # e.g. s3://my-archive-bucket/raw
      enriched:
        good: s3n://snowplow-kinesis-output/enriched/good       # e.g. s3://my-out-bucket/enriched/good
        bad: s3n://snowplow-kinesis-output/enriched/bad       # e.g. s3://my-out-bucket/enriched/bad
        errors:      # Leave blank unless :continue_on_unexpected_error: set to true below
        archive: s3n://snowplow-kinesis-archive/enriched    # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched
      shredded:
        good: s3n://snowplow-kinesis-output/shredded/good       # e.g. s3://my-out-bucket/shredded/good
        bad: s3n://snowplow-kinesis-output/shredded/bad        # e.g. s3://my-out-bucket/shredded/bad
        errors:      # Leave blank unless :continue_on_unexpected_error: set to true below
        archive: s3n://snowplow-kinesis-archive/shredded    # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded
  emr:
    ami_version: 4.5.0
    region: us-west-2        # Always set this
    jobflow_role: EMR_EC2_DefaultRole # Created using $ aws emr create-default-roles
    service_role: EMR_DefaultRole     # Created using $ aws emr create-default-roles
    placement:      # Set this if not running in VPC. Leave blank otherwise
    ec2_subnet_id: i45643454 # Set this if running in VPC. Leave blank otherwise
    ec2_key_name: keyname # ec2_key_name here...
    bootstrap:            # Set this to specify custom boostrap actions. Leave empty otherwise
    software:
      hbase:         # Optional. To launch on cluster, provide version, "0.92.0", keep quotes. Leave empty otherwise.
      lingual:          # Optional. To launch on cluster, provide version, "1.1", keep quotes. Leave empty otherwise.
    # Adjust your Hadoop cluster below
    jobflow:
      master_instance_type: m1.medium
      core_instance_count: 2
      core_instance_type: m3.xlarge
      task_instance_count: 0 # Increase to use spot instances
      task_instance_type: m1.medium
      task_instance_bid: 0.05 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
    bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
    additional_info:        # Optional JSON string for selecting additional features
collectors:
  format: thrift # For example: 'clj-tomcat' for the Clojure Collector, 'thrift' for Thrift records, 'tsv/co
enrich:
  job_name: Snowplow ETL # Give your job a name
  versions:
    hadoop_enrich: 1.7.0 # Version of the Hadoop Enrichment process
    hadoop_shred: 0.9.0 # Version of the Hadoop Shredding process
    hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
  continue_on_unexpected_error: true # Set to 'true' (and set :out_errors: above) if you don't want any exceptions thrown from ETL
  output_compression: GZIP # Compression onh Redshift, set to NONE if you have Postgres targets. Allowed formats: NONE, GZIP
storage:
  download:
    folder: # Postgres-only config option. Where to store the downloaded files. Leave blank for Redshift
  targets:
    - name: "snowplow"
      type: redshift
      host: #
      database: enriched # Name of database
      port: 5439 # Default Redshift port
      ssl_mode: disable # One of disable (default), require, verify-ca or verify-full
      table: atomic.events
      username: snowplow
      password: #
      maxerror: 1 # Stop loading on first error, or increase to permit more load errors
      comprows: 200000 # Default for a 1 XL node cluster. Not used unless --include compupdate specified
monitoring:
  tags: {} # Name-value pairs describing this job
  logging:
    level: DEBUG # You can optionally switch to INFO for production

When I run this, it correctly copies the Thrift files (assuming there are an even number of them…) to the snowplow-kinesis-processing/processed bucket (i am generally just trying with a couple of small files) and spins up the EMR instance. It then seems to copy the events to the HDFS and Enrich them, but cannot output them back to S3:

Looking at the stderr file for the failing step (Elasticity S3DistCp Step: Enriched HDFS -> S3):

Exception in thread "main" java.lang.RuntimeException: Error running job
	at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:927)
...
Caused by: org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://ip-10-2-1-227.us-west-2.compute.internal:8020/tmp/63be916c-da0a-46e7-b4b2-1855a00424cd/files
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:317)

I’m not sure what this means; does it mean it tried to copy something back but there was nothing to copy? If so and the enrichment didn’t generate anything, why is nothing in the enrichment/bad/ bucket?

All pointers to a solution much appreciated.

Brian

Hi @Brian,

I can see 2 possibilities here:

  1. The EMR cluster crashed midway. Hence, no files in HDFS which are gone with the cluster terminated unexpectedly
  2. Indeed, no events gone through enrichment. This could be contributed by the files in processing bucket being of wrong format or being incorrectly (re-)named and thus ignored during enrichment.

Could you share with us your Kinesis S3 cohon configuration file (with sensitive info removed)? Could you also check the files in your processing bucket (names, content)?

–Ihor

Thanks for your reply!

Yes, the main problem was that we output the Thrift records in gz rather than lzo (this is also the reason it baulks with an odd number of files - it’s expecting the lzo and index file for each file).

It works now (although we ran into the Java heap memory exception in the EmrEtlRunner for r82, so needed to use r77).

Spoke too soon. It worked on one run and now seems not to work any more. Running emretlrunner r77 using the config file above, we are getting the following EMR error:

"On the master instance (i-6ab86dfe), bootstrap action 2 returned a non-zero return code"

even though there is only one bootstrap action location specified, which is:

s3://snowplow-hosted-assets/common/emr/snowplow-ami4-bootstrap-0.1.0.sh

We tried to go back to the latest version r82rc9 and see if we could gt it to run with that, but are getting a different EMR error:

"Master instance (i-0729e393) failed attempting to download bootstrap action 1 file from S3"

when attempting to execute the following bootstrap action location:

s3://snowplow-hosted-assets-us-west-2/common/emr/snowplow-ami4-bootstrap-0.2.0.sh

This second issue looks like a file path thing. It’s all very frustrating! I have no idea how to proceed now. Any help greatly appreciated. Thanks!

An update: the issue was that an admin had changed the NAT on our VPC so we didn’t have external access to download some of the necessary components.