FlowException: EmrEtlRunner failing either at Step "Enrich Raw Events" or Step "Shred Enriched Events"


#1

Hi,

I am running snowplow-emr-etl-runner 0.21.0 in a batch (Kinesis S3) pipeline and have started having failures in steps: Enrich Raw Events and Shred Enriched Events

The above screenshot shows the failure information in EMR. I have marked the above image with red as sections 1 to 5 in order of their execution date/time.

As you can see in sections 1 to 5 of the screenshot above, I executed EmrEtlRunner application (which was failing in step: Enrich Raw events) by using larger core instance type configuration in config.yml file after every failure until I reach a point where the Enrich Raw events step completed successfully but the later step Shred Enriched Events failed.

I am not sure why this is failing and do I need to use even more larger instance types or increase instance count to get the job finished successfully.

The exception thrown by emr in section 4 of attached image where Enrich Raw Events failed is:

log4j:ERROR Failed to rename [/mnt/var/log/hadoop/steps/s-X3JTYF9RE73G/syslog] to [/mnt/var/log/hadoop/steps/s-X3JTYF9RE73G/syslog.2016-07-11-16].
Exception in thread "main" cascading.flow.FlowException: step failed: (1/3), with job id: job_1468252448340_0002, please see cluster logs for failure messages
	at cascading.flow.planner.FlowStepJob.blockOnJob(FlowStepJob.java:221)
	at cascading.flow.planner.FlowStepJob.start(FlowStepJob.java:149)
	at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:124)
	at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:43)
	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)

The exception thrown by emr in section 5 of attached image where Shred Enriched Events failed is:

Exception in thread "main" cascading.flow.FlowException: step failed: (4/5) ...dded-events/atomic-events, with job id: job_1468268042604_0011, please see cluster logs for failure messages
	at cascading.flow.planner.FlowStepJob.blockOnJob(FlowStepJob.java:261)
	at cascading.flow.planner.FlowStepJob.start(FlowStepJob.java:162)
	at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:124)
	at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:43)
	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)

Contents of my EmrEtlRunner’s config.yml file are listed below:

aws:
  # Credentials can be hardcoded or set in environment variables
  access_key_id: <%= ENV['AWS_SNOWPLOW_ACCESS_KEY'] %>
  secret_access_key: <%= ENV['AWS_SNOWPLOW_SECRET_KEY'] %>
  s3:
    region: us-east-1
    buckets:
      assets: s3://snowplow-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
      jsonpath_assets: s3://my-jsonpaths-files # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
      log: s3n://my-out-bucket--etl-test/logs/
      raw:
        in:                  # Multiple in buckets are permitted
          - s3n://my-out-bucket-logs-test/
         # - ADD HERE          # e.g. s3://my-in-bucket
         # - ADD HERE
        processing: s3n://my-out-bucket-etl-test/processing/
        archive: s3://my-out-bucket-archive-test/raw    # e.g. s3://my-archive-bucket/raw
      enriched:
        good: s3://my-out-bucket-data-test/enriched/good       # e.g. s3://my-out-bucket/enriched/good
        bad: s3://my-out-bucket-data-test/enriched/bad         # e.g. s3://my-out-bucket/enriched/bad
        errors: s3://my-out-bucket-data-test/enriched/errors     # Leave blank unless :continue_on_unexpected_error: set to true below
        archive: s3://my-out-bucket-data-test/enriched/archive    # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched
      shredded:
        good: s3://my-out-bucket-data-test/shredded/good        # e.g. s3://my-out-bucket/shredded/good
        bad: s3://my-out-bucket-data-test/shredded/bad         # e.g. s3://my-out-bucket/shredded/bad
        errors: s3://my-out-bucket-data-test/shredded/errors      # Leave blank unless :continue_on_unexpected_error: set to true below
        archive: s3://my-out-bucket-data-test/shredded/archive     # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded
  emr:
    ami_version: 4.5.0
    region: us-east-1         # Always set this
    jobflow_role: EMR_EC2_DefaultRole # Created using $ aws emr create-default-roles
    service_role: EMR_DefaultRole     # Created using $ aws emr create-default-roles
    placement: us-east-1b     # Set this if not running in VPC. Leave blank otherwise
    ec2_subnet_id: # Set this if running in VPC. Leave blank otherwise
    ec2_key_name: my-keypair
    bootstrap: []           # Set this to specify custom boostrap actions. Leave empty otherwise
    software:
      hbase:                # Optional. To launch on cluster, provide version, "0.92.0", keep quotes. Leave empty otherwise.
      lingual:              # Optional. To launch on cluster, provide version, "1.1", keep quotes. Leave empty otherwise.
    # Adjust your Hadoop cluster below
    jobflow:
      master_instance_type: m3.xlarge
      core_instance_count: 3
      core_instance_type: c3.8xlarge
      task_instance_count: 0 # Increase to use spot instances
      task_instance_type: c3.xlarge
      task_instance_bid: 0.015 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
    bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
    additional_info:        # Optional JSON string for selecting additional features
  iglu:
    schema: iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-0
    data:
      cache_size: 500
      repositories:
        - name: "Iglu Central"
          priority: 0
          vendor_prefixes:
            - com.snowplowanalytics
          connection:
            http:
              uri: http://iglucentral.com
        - name: "Test Repository"
          priority: 5
          vendor_prefixes:
            - com.vendorname
          connection:
            http:
              uri: https://s3.amazonaws.com/repo-name
collectors:
  format: thrift # For example: 'clj-tomcat' for the Clojure Collector, 'thrift' for Thrift records, 'tsv/com.amazon.aws.cloudfront/wd_access_log' for Cloudfront access logs or 'ndjson/urbanairship.connect/$
enrich:
  job_name:  ETL job name # Give your job a name
  versions:
    hadoop_enrich: 1.7.0 # Version of the Hadoop Enrichment process
    hadoop_shred: 0.9.0 # Version of the Hadoop Shredding process
    hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
  continue_on_unexpected_error: true # Set to 'true' (and set :out_errors: above) if you don't want any exceptions thrown from ETL
  output_compression: GZIP # Compression only supported with Redshift, set to NONE if you have Postgres targets. Allowed formats: NONE, GZIP
storage:
  download:
    folder: # Postgres-only config option. Where to store the downloaded files. Leave blank for Redshift
  targets:
    - name: "some name"
      type: redshift
      host: somehost.us-east-1.redshift.amazonaws.com # The endpoint as shown in the Redshift console
      database: snowplowdb # Name of database
      port: 5439 # Default Redshift port
      ssl_mode: disable # One of disable (default), require, verify-ca or verify-full
      table: atomic.events
      username: someuser
      password: somepassword
      maxerror: 1 # Stop loading on first error, or increase to permit more load errors
      comprows: 200000 # Default for a 1 XL node cluster. Not used unless --include compupdate specified
monitoring:
  tags: {} # Name-value pairs describing this job
  logging:
    level: DEBUG # You can optionally switch to INFO for production
#  snowplow:
#    method: get
#    app_id: snowplow # e.g. snowplow
#    collector: somecollector.cloudfront.net/ # e.g. d3rkrsqld9gmqf.cloudfront.net

Any idea on what is going wrong ? Or what I should look for.

Thanks,
Jasmeet


#2

The most common reason for failures so far into processing is the cluster running out of disk space. Can you share your I/O charts from the EMR Monitoring tab for the duration of the job?


#3

Hi Alex,

I/O Charts attached:

Regards,
Jasmeet


#4

Thanks @Jasmeet - you can see from the chart “HDFS Utilization” that your cluster reaches 90% utilization and then utilization drops steeply on cluster failure.

The solution is to re-run the job on a cluster with significantly more disk space attached.


#5

Hi @alex,

How do you attach more disk space?

Enrico


#6

You can’t currently - you have to bump your instance type to one with significantly more disk space.


#7

Thanks @alex – I created a ticket in GitHub as feature request and linked the AWS Article. https://github.com/snowplow/snowplow/issues/2950


#8

@alex I increased it to an instance size which makes more sense, but we’re still running out of disk space: