Error in "Elasticity S3DistCp Step: Raw S3 -> HDFS



I am attempting to setup Snowplow using the Cloudfront Collector and sending events to a Redshift database, but am receiving this error while running the EmrEtlRunner:

Exception in thread "main" java.lang.RuntimeException: Error running job
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(
	at java.lang.reflect.Method.invoke(
	at org.apache.hadoop.util.RunJar.main(
Caused by: org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://ip-10-13-1-73.ec2.internal:8020/tmp/bc831f91-0969-4863-a3bd-5932a9632619/files
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(
	at org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.listStatus(
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(
	at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(
	at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(
	at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(
	at org.apache.hadoop.mapreduce.Job$
	at org.apache.hadoop.mapreduce.Job$
	at Method)
	at org.apache.hadoop.mapreduce.Job.submit(

My .yml configuration is as follows:

  # Credentials can be hardcoded or set in environment variables
  access_key_id: "*********"
  secret_access_key: "*************"
    region: us-east-1
      assets: s3://snowplow-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
      jsonpath_assets: # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
      log: s3://snowplow-emr-etl/logs
        in:                  # Multiple in buckets are permitted
          - s3://snowplow-logs-collector       # e.g. s3://my-in-bucket
        processing: s3://etc/processing
        archive: s3://etc/raw    # e.g. s3://my-archive-bucket/in
        good: s3://etc/enriched/good       # e.g. s3://my-out-bucket/enriched/good
        bad: s3://etc/enriched/bad        # e.g. s3://my-out-bucket/enriched/bad
        errors: s3://etc/enriched/errors     # Leave blank unless continue_on_unexpected_error: set to true below
        archive: s3://etc/enriched/archive   # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched
        good: s3://etc/shredded/good       # e.g. s3://my-out-bucket/shredded/good
        bad: s3://etc/shredded/bad        # e.g. s3://my-out-bucket/shredded/bad
        errors: # Leave blank unless continue_on_unexpected_error: set to true below
        archive: s3:/etc/shredded/archive    # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded
    ami_version: 4.5.0       # Don't change this
    region: us-east-1        # Always set this
    jobflow_role: EMR_EC2_DefaultRole # Created using $ aws emr create-default-roles
    service_role: EMR_DefaultRole     # Created using $ aws emr create-default-roles
    placement:            # Set this if not running in VPC. Leave blank otherwise
    ec2_subnet_id: ******    # Set this if running in VPC. Leave blank otherwise
    ec2_key_name: *****
    bootstrap: []           # Set this to specify custom boostrap actions. Leave empty otherwise
      hbase:                # Optional. To launch on cluster, provide version, "0.92.0", keep quotes. Leave empty otherwise.
      lingual:              # Optional. To launch on cluster, provide version, "1.1", keep quotes. Leave empty otherwise.
    # Adjust your Hadoop cluster below
      master_instance_type: m1.medium
      core_instance_count: 2
      core_instance_type: m1.medium
      task_instance_count: 0 # Increase to use spot instances
      task_instance_type: m1.medium
      task_instance_bid: 0.015 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
    bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
    additional_info:        # Optional JSON string for selecting additional features
  format: cloudfront # Or 'clj-tomcat' for the Clojure Collector, or 'thrift' for Thrift records, or 'tsv/' for Cloudfront access logs
  job_name: GB Snowplow ETL # Give your job a name
    hadoop_enrich: 1.7.0 # Version of the Hadoop Enrichment process
    hadoop_shred: 0.9.0 # Version of the Hadoop Shredding process
    hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
  continue_on_unexpected_error: false # Set to 'true' (and set out_errors: above) if you don't want any exceptions thrown from ETL
  output_compression: GZIP # Compression only supported with Redshift, set to NONE if you have Postgres targets. Allowed formats: NONE, GZIP
    folder: # Postgres-only config option. Where to store the downloaded files. Leave blank for Redshift
    - name: "Snowplow-Cerebro"
      type: redshift
      host: **** # The endpoint as shown in the Redshift console
      database: snowplow # Name of database
      port: 5439 # Default Redshift port
      username: ********
      password: ********
      maxerror: 1 # Stop loading on first error, or increase to permit more load errors
      comprows: 200000 # Default for a 1 XL node cluster. Not used unless --include compupdate specified
      ssl_mode: disable
  tags: {} # Name-value pairs describing this job
    level: DEBUG # You can optionally switch to INFO for production
    method: get
    app_id: gb-snowplow # e.g. snowplow
    collector: *** # e.g.

Things were actually working earlier when I was using a different Redshift cluster, but I had to create a new one because the last cluster was configured with a private subnet that could not be accessed through a client to view/query the database. I am sure that the new host / port / database / username / password have all been changed to match the new database but am still having the problem.

Any help would be great, thanks in advance!


Hi @gbakulgod,

The problem you encountered is not related to your new Redshift cluster. From the error
InvalidInputException: Input path does not exist: hdfs://ip-10-13-1-73.ec2.internal:8020/tmp/bc831f91-0969-4863-a3bd-5932a9632619/files, I conclude your EMR cluster crashed while copying files from the processing bucket to HDFS, which is an internal EMR cluster’s storage.

You could try rerunning the EMR-ETL Runner with --skip staging option. If still the same problem, you might consider bumping up your cluster. Your current instance type is m1.medium according to your configuration file.

Generally, to understand the flow and the steps to be taken to rerun the failed job, please, refer to the diagram on the wiki page Batch Pipeline Steps. In your case the failure took place at step 2.



Thanks for the help, Ihor!

The problem ended up being fixed once we successfully ran the storage loader to empty the enriched/shredded buckets that were not emptied before. I’m assuming this somehow led to a problem copying files from the processing bucket.