Failing in the 4th step process of storage every time.(Elasticity S3DistCp Step: Enriched HDFS -> S3: FAILED)

Hi all,

I have following the below architecture.

javascript tracker -> scala stream collector ->Kinesis S3 -> S3 -> EmrEtlRunner (shredding+enrich) -> Redshift

After the emretlrunner(shredding+enrich) step finally i am running storage step.
But i am failing in the 4th step of the process. below is the error of command line.

4. Elasticity S3DistCp Step: Enriched HDFS -> S3: FAILED ~ 00:00:06 [2017-11-06 06:27:38 +0000 - 2017-11-06 06:2700]

Some times the process will complete but many times it will fail in the 4th step process of the storage.

Below is configuration details and the command used for running the steps.

./snowplow-emr-etl-runner run --config snowplow/4-storage/config/emretlrunner.yml --resolver snowplow/4-storage/config/iglu_resolver.json --targets snowplow/4-storage/config/targets/ --skip analyze

emretlrunner.yml file is below

aws:
  # Credentials can be hardcoded or set in environment variables
  access_key_id: xxxxxxxxx
  secret_access_key: xxxxxxxxxx
  #keypair: Snowplowkeypair
  #key-pair-file: /home/ubuntu/snowplow/4-storage/config/Snowplowkeypair.pem
  region: us-east-1
  s3:
	region: us-east-1
	buckets:
	  assets: s3://snowplow-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
	  jsonpath_assets: # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
	  log: s3://snowplowdatabaseredshift/logs
	  raw:
		in:                  # This is a YAML array of one or more in buckets - you MUST use hyphens before each entry in the array, as below
		  - s3://snowplowdatabaseredshift/      # e.g. s3://my-old-collector-bucket
		processing: s3://snowplowdatabaseredshift/raw/processing1
		archive: s3://snowplowdatabaseredshift/raw/archive1   # e.g. s3://my-archive-bucket/raw
	  enriched:
		good: s3://snowplowdatabaseredshift/enriched/good1        # e.g. s3://my-out-bucket/enriched/good
		bad: s3://snowplowdatabaseredshift/enriched/bad1       # e.g. s3://my-out-bucket/enriched/bad
		errors: s3://snowplowdatabaseredshift/enriched/errors1     # Leave blank unless :continue_on_unexpected_error: set to true below
		archive: s3://snowplowdatabaseredshift/enriched/archive1    # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched
	  shredded:
		good: s3://snowplowdatabaseredshift/shredded/good1        # e.g. s3://my-out-bucket/shredded/good
		bad: s3://snowplowdatabaseredshift/shredded/bad1        # e.g. s3://my-out-bucket/shredded/bad
		errors: s3://snowplowdatabaseredshift/shredded/errors1    # Leave blank unless :continue_on_unexpected_error: set to true below
		archive: s3://snowplowdatabaseredshift/shredded/archive1     # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded
  emr:
	ami_version: 5.5.0
	region: us-east-1       # Always set this
	jobflow_role: EMR_EC2_DefaultRole # Created using $ aws emr create-default-roles
	service_role: EMR_DefaultRole     # Created using $ aws emr create-default-roles
	placement: us-east-1a      # Set this if not running in VPC. Leave blank otherwise
	ec2_subnet_id:  # Set this if running in VPC. Leave blank otherwise
	ec2_key_name: Snowplowkeypair
	bootstrap: []           # Set this to specify custom boostrap actions. Leave empty otherwise
	software:
	  hbase:              # Optional. To launch on cluster, provide version, "0.92.0", keep quotes. Leave empty otherwise.
	  lingual:              # Optional. To launch on cluster, provide version, "1.1", keep quotes. Leave empty otherwise.
	# Adjust your Hadoop cluster below
	jobflow:
	  job_name: Snowplow ETL # Give your job a name
	  master_instance_type: m2.4xlarge
	  core_instance_count: 2
	  core_instance_type: m2.4xlarge
	  core_instance_ebs:    # Optional. Attach an EBS volume to each core instance.
		volume_size: 100    # Gigabytes
		volume_type: "gp2"
		volume_iops: 400    # Optional. Will only be used if volume_type is "io1"
		ebs_optimized: false # Optional. Will default to true
	  task_instance_count: 0 # Increase to use spot instances
	  task_instance_type: m2.4xlarge
	  task_instance_bid: 0.015 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
	bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
	configuration:
	  yarn-site:
		yarn.resourcemanager.am.max-attempts: "1"
	  spark:
		maximizeResourceAllocation: "true"
	additional_info:        # Optional JSON string for selecting additional features
collectors:
  format: thrift # For example: 'clj-tomcat' for the Clojure Collector, 'thrift' for Thrift records, 'tsv/com.amazon.aws.cloudfront/wd_access_log' for Cloudfront access logs or 'ndjson/urbanairship.connect/v1' for UrbanAirship Connect events
enrich:
  versions:
	spark_enrich: 1.9.0 # Version of the Spark Enrichment process
  continue_on_unexpected_error: false # Set to 'true' (and set :out_errors: above) if you don't want any exceptions thrown from ETL
  output_compression: NONE # Compression only supported with Redshift, set to NONE if you have Postgres targets. Allowed formats: NONE, GZIP
storage:
  versions:
	rdb_loader: 0.13.0
	rdb_shredder: 0.12.0        # Version of the Spark Shredding process
	hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
monitoring:
  tags: {} # Name-value pairs describing this job
  logging:
	level: DEBUG # You can optionally switch to INFO for production
  #snowplow:
	#method: get
	#app_id: unilog # e.g. snowplow
	#collector: 172.31.38.39:8082 # e.g. d3rkrsqld9gmqf.cloudfront.net

iglu_resolver.json file is

{
  "schema": "iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-1",
  "data": {
	"cacheSize": 500,
	"repositories": [
	  {
		"name": "Iglu Central",
		"priority": 0,
		"vendorPrefixes": [ "com.snowplowanalytics" ],
		"connection": {
		  "http": {
			"uri": "http://iglucentral.com"
		  }
		}
	  }
	]
  }
}

redshift.json file is below

{
	"schema": "iglu:com.snowplowanalytics.snowplow.storage/redshift_config/jsonschema/2-0-0",
	"data": {
		"name": "AWS Redshift enriched events storage",
		"host": "xxxxxxx",
		"database": "unilog",
		"port": 5439,
		"sslMode": "DISABLE",
		"username": "xx",
		"password": "xxx",
		"roleArn": "arn:aws:iam::302576851619:role/NewRedshiftRole",
		"schema": "atomic",
		"maxError": 1,
		"compRows": 20000,
		"purpose": "ENRICHED_EVENTS"
	}
}

error when i checked in stderr.gz file is below

	Exception in thread "main" java.lang.RuntimeException: Error running job
		at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:927)
		at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:705)
		at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
		at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
		at com.amazon.elasticmapreduce.s3distcp.Main.main(Main.java:22)
		at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
		at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
		at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
		at java.lang.reflect.Method.invoke(Method.java:498)
		at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
		at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
	Caused by: org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://ip-172-31-1-117.ec2.internal:8020/tmp/03063a2b-6318-40ca-bfd2-73c1928e39fd/files
		at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:317)
		at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:265)
		at org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:59)
		at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:352)
		at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:301)
		at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:318)
		at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:196)
		at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1290)
		at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1287)
		at java.security.AccessController.doPrivileged(Native Method)
		at javax.security.auth.Subject.doAs(Subject.java:422)
		at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
		at org.apache.hadoop.mapreduce.Job.submit(Job.java:1287)
		at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:901)
		... 10 more

i have given all the details…
please help me out to resolve this error.

Hi all,

Please help me out to resolve this error.

Thanks,
Sandesh P

@sandesh,

The actual error is

Caused by: org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://ip-172-31-1-117.ec2.internal:8020/tmp/03063a2b-6318-40ca-bfd2-73c1928e39fd/files

I suppose it could be due to one of the following:

  • EMR cluster crashing prematurely
  • There are no “enriched” data (maybe due to being rejected and all ending up as “bad” data)

Another point is you use m2.4xlarge type in your EMR cluster. How big is your normal payload (number of events processed per batch run)? Note that m2 is an older generation instance types that do not require EBS storage. I would switch to m4 instead.

How do you recover from such a failure?