Error in "Elasticity S3DistCp Step: Raw S3 -> HDFS

Hi,

I am attempting to setup Snowplow using the Cloudfront Collector and sending events to a Redshift database, but am receiving this error while running the EmrEtlRunner:

Exception in thread "main" java.lang.RuntimeException: Error running job
	at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:927)
	at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:720)
	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
	at com.amazon.elasticmapreduce.s3distcp.Main.main(Main.java:22)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
	at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
Caused by: org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://ip-10-13-1-73.ec2.internal:8020/tmp/bc831f91-0969-4863-a3bd-5932a9632619/files
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:317)
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:265)
	at org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:59)
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:352)
	at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:301)
	at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:318)
	at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:196)
	at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1290)
	at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1287)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:415)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
	at org.apache.hadoop.mapreduce.Job.submit(Job.java:1287)
	at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:901)

My .yml configuration is as follows:

aws:
  # Credentials can be hardcoded or set in environment variables
  access_key_id: "*********"
  secret_access_key: "*************"
  s3:
    region: us-east-1
    buckets:
      assets: s3://snowplow-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
      jsonpath_assets: # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
      log: s3://snowplow-emr-etl/logs
      raw:
        in:                  # Multiple in buckets are permitted
          - s3://snowplow-logs-collector       # e.g. s3://my-in-bucket
        processing: s3://etc/processing
        archive: s3://etc/raw    # e.g. s3://my-archive-bucket/in
      enriched:
        good: s3://etc/enriched/good       # e.g. s3://my-out-bucket/enriched/good
        bad: s3://etc/enriched/bad        # e.g. s3://my-out-bucket/enriched/bad
        errors: s3://etc/enriched/errors     # Leave blank unless continue_on_unexpected_error: set to true below
        archive: s3://etc/enriched/archive   # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched
      shredded:
        good: s3://etc/shredded/good       # e.g. s3://my-out-bucket/shredded/good
        bad: s3://etc/shredded/bad        # e.g. s3://my-out-bucket/shredded/bad
        errors: # Leave blank unless continue_on_unexpected_error: set to true below
        archive: s3:/etc/shredded/archive    # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded
  emr:
    ami_version: 4.5.0       # Don't change this
    region: us-east-1        # Always set this
    jobflow_role: EMR_EC2_DefaultRole # Created using $ aws emr create-default-roles
    service_role: EMR_DefaultRole     # Created using $ aws emr create-default-roles
    placement:            # Set this if not running in VPC. Leave blank otherwise
    ec2_subnet_id: ******    # Set this if running in VPC. Leave blank otherwise
    ec2_key_name: *****
    bootstrap: []           # Set this to specify custom boostrap actions. Leave empty otherwise
    software:
      hbase:                # Optional. To launch on cluster, provide version, "0.92.0", keep quotes. Leave empty otherwise.
      lingual:              # Optional. To launch on cluster, provide version, "1.1", keep quotes. Leave empty otherwise.
    # Adjust your Hadoop cluster below
    jobflow:
      master_instance_type: m1.medium
      core_instance_count: 2
      core_instance_type: m1.medium
      task_instance_count: 0 # Increase to use spot instances
      task_instance_type: m1.medium
      task_instance_bid: 0.015 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
    bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
    additional_info:        # Optional JSON string for selecting additional features
collectors:
  format: cloudfront # Or 'clj-tomcat' for the Clojure Collector, or 'thrift' for Thrift records, or 'tsv/com.amazon.aws.cloudfront/wd_access_log' for Cloudfront access logs
enrich:
  job_name: GB Snowplow ETL # Give your job a name
  versions:
    hadoop_enrich: 1.7.0 # Version of the Hadoop Enrichment process
    hadoop_shred: 0.9.0 # Version of the Hadoop Shredding process
    hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
  continue_on_unexpected_error: false # Set to 'true' (and set out_errors: above) if you don't want any exceptions thrown from ETL
  output_compression: GZIP # Compression only supported with Redshift, set to NONE if you have Postgres targets. Allowed formats: NONE, GZIP
storage:
  download:
    folder: # Postgres-only config option. Where to store the downloaded files. Leave blank for Redshift
  targets:
    - name: "Snowplow-Cerebro"
      type: redshift
      host: ****.us-east-1.redshift.amazonaws.com:5439 # The endpoint as shown in the Redshift console
      database: snowplow # Name of database
      port: 5439 # Default Redshift port
      table: atomic.events
      username: ********
      password: ********
      maxerror: 1 # Stop loading on first error, or increase to permit more load errors
      comprows: 200000 # Default for a 1 XL node cluster. Not used unless --include compupdate specified
      ssl_mode: disable
monitoring:
  tags: {} # Name-value pairs describing this job
  logging:
    level: DEBUG # You can optionally switch to INFO for production
  snowplow:
    method: get
    app_id: gb-snowplow # e.g. snowplow
    collector: ***.cloudfront.net # e.g. d3rkrsqld9gmqf.cloudfront.net

Things were actually working earlier when I was using a different Redshift cluster, but I had to create a new one because the last cluster was configured with a private subnet that could not be accessed through a client to view/query the database. I am sure that the new host / port / database / username / password have all been changed to match the new database but am still having the problem.

Any help would be great, thanks in advance!

Hi @gbakulgod,

The problem you encountered is not related to your new Redshift cluster. From the error
InvalidInputException: Input path does not exist: hdfs://ip-10-13-1-73.ec2.internal:8020/tmp/bc831f91-0969-4863-a3bd-5932a9632619/files, I conclude your EMR cluster crashed while copying files from the processing bucket to HDFS, which is an internal EMR cluster’s storage.

You could try rerunning the EMR-ETL Runner with --skip staging option. If still the same problem, you might consider bumping up your cluster. Your current instance type is m1.medium according to your configuration file.

Generally, to understand the flow and the steps to be taken to rerun the failed job, please, refer to the diagram on the wiki page Batch Pipeline Steps. In your case the failure took place at step 2.

Regards,
Ihor

Thanks for the help, Ihor!

The problem ended up being fixed once we successfully ran the storage loader to empty the enriched/shredded buckets that were not emptied before. I’m assuming this somehow led to a problem copying files from the processing bucket.