EtlEmrRunner fails at step [enrich] spark: Enriched HDFS -> S3 with error "Input path does not exist: hdfs://ip-172-31-10-133.us-east-2.compute.internal:8020/tmp/1d59dbe9-98f2-473f-8d65-288f5019fdca/files"

Status :FAILED

Reason :Input path does not exist.

Log File :s3://snowplow-logs.instacar.in/logs/j-Y2HH5UP2UD3S/steps/s-21T9RDMJ0CNJS/stderr.gz

Details :Caused by: org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://ip-172-31-10-133.us-east-2.compute.internal:8020/tmp/1d59dbe9-98f2-473f-8d65-288f5019fdca/files

JAR location :/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar

Main class :None

Arguments :–src hdfs:///local/snowplow/enriched-events/ --dest s3n://snowplow-databucket.instacar.in/good/run=2020-08-09-13-03-52/ --groupBy .(part-)\d±(.) --targetSize 24 --s3Endpoint s3-us-east-2.amazonaws.com

Action on failure:Terminate cluster

I have not been able to get it working, for the last 2 days.
Here is my config file:
aws:

Credentials can be hardcoded or set in environment variables

access_key_id: <%= ENV[‘AWS_SNOWPLOW_ACCESS_KEY’] %>
secret_access_key: <%= ENV[‘AWS_SNOWPLOW_SECRET_KEY’] %>
s3:
region: us-east-2
buckets:
assets: s3n://snowplow-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
jsonpath_assets: # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
log: s3n://snowplow-logs.instacar.in/logs
encrypted: false # Whether the buckets below are enrcrypted using server side encryption (SSE-S3)
raw:
in: # This is a YAML array of one or more in buckets - you MUST use hyphens before each entry in the array, as below
- s3n://snowplow-logs.instacar.in # e.g. s3://my-old-collector-bucket
processing: s3n://snowplow-etlbucket.instacar.in/processing
archive: s3n://snowplow-etlbucket.instacar.in/archive # e.g. s3://my-archive-bucket/raw
enriched:
good: s3n://snowplow-databucket.instacar.in/good # e.g. s3://my-out-bucket/enriched/good
bad: s3n://snowplow-databucket.instacar.in/bad # e.g. s3://my-out-bucket/enriched/bad
errors: s3n://snowplow-databucket.instacar.in/errors # Leave blank unless :continue_on_unexpected_error: set to true below
archive: s3n://snowplow-databucket.instacar.in/errors #Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched
shredded:
good: s3n://snowplow-databucket.instacar.in/shredded/good # e.g. s3://my-out-bucket/shredded/good
bad: s3n://snowplow-databucket.instacar.in/shredded/bad # e.g. s3://my-out-bucket/shredded/bad
errors: s3n://snowplow-databucket.instacar.in/shredded/errors # Leave blank unless :continue_on_unexpected_error: set to true below
archive: s3n://snowplow-databucket.instacar.in/shredded/archive # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded
consolidate_shredded_output: false # Whether to combine files when copying from hdfs to s3
emr:
ami_version: 5.9.0
region: us-east-2 # Always set this
jobflow_role: EMR_EC2_DefaultRole # Created using aws emr create-default-roles service_role: EMR_DefaultRole # Created using aws emr create-default-roles
placement: # Set this if not running in VPC. Leave blank otherwise
ec2_subnet_id: subnet-2eAG0f47 # Set ec2_subnet_id: subnet-2e0b6f47 # Set this if running in VPC. Leave blank otherwise
ec2_key_name: snowplow1
security_configuration: # Specify your EMR security configuration if needed. Leave blank otherwise
bootstrap: # Set this to specify custom boostrap actions. Leave empty otherwise
software:
hbase: # Optional. To launch on cluster, provide version, “0.92.0”, keep quotes. Leave empty otherwise.
lingual: # Optional. To launch on cluster, provide version, “1.1”, keep quotes. Leave empty otherwise.
# Adjust your Hadoop cluster below
jobflow:
job_name: SnowplowETL # Give your job a name
master_instance_type: m4.xlarge
core_instance_count: 2
core_instance_type: m4.xlarge
core_instance_bid: 0.05
core_instance_ebs: # Optional. Attach an EBS volume to each core instance.
volume_size: 100 # Gigabytes
volume_type: “gp2”
volume_iops: 400 # Optional. Will only be used if volume_type is “io1”
ebs_optimized: false # Optional. Will default to true
task_instance_count: 0 # Increase to use spot instances
task_instance_type: m1.medium
task_instance_bid: 0.05 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
configuration:
yarn-site:
yarn.resourcemanager.am.max-attempts: “1”
spark:
maximizeResourceAllocation: “true”
additional_info: # Optional JSON string for selecting additional features
collectors:
format: thrift # For example: ‘clj-tomcat’ for the Clojure Collector, ‘thrift’ for Thrift records, ‘tsv/com.amazon.aws.cloudfront/wd_access_log’ for Cloudfront access logs or ‘ndjson/urbanairship.connect/v1’ for UrbanAirship Connect events
enrich:
versions:
spark_enrich: 1.18.0 # Version of the Spark Enrichment process
continue_on_unexpected_error: false # Set to ‘true’ (and set :out_errors: above) if you don’t want any exceptions thrown from ETL
output_compression: NONE # Compression only supported with Redshift, set to NONE if you have Postgres targets. Allowed formats: NONE, GZIP
storage:
versions:
rdb_loader: 0.14.0
rdb_shredder: 0.13.1 # Version of the Spark Shredding process
hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
monitoring:
tags: {} # Name-value pairs describing this job
logging:
level: DEBUG # You can optionally switch to INFO for production

@ckrishnamoorthy, according to another post of your, you are using Amazon Kinesis Firehose to upload the data to S3. I suspect your data, therefore, is not in the right format/location. I also do not follow that architecture shown below if that is the case here as well.

Kinesis ScalaCollector -> EnrichEmrEtlrunner -> Amazon Kinesis Firehose -> S3 Enriched records -> PostgresqlLoader -> PostgresqlDB

Did you mean to show “Firehose” before EmrEtlRunner (EER)?

To upload the streamed data to S3 you would have to use the dedicated application to work with EmrEtlRunner - Kinesis S3 Loader.

Your EER configuration file is very hard to read. To retain the indentation, could you place your YAML or other code in between pairs of ``` (triple tick - Markdown).

Hi @ihor,
Thanks for the quick response. I meant this

  1. Scala Collector -> Outputs to a Kinesis Stream
  2. StreamEnrich -> Reads from Kinesis from #1 and outputs Enriched stream to Kinesis
  3. Firehose transforms from Enriched Stream in Kinesis to S3.

Now i realised, i need Firehose is not supported, so i am using SnowPlowS3Loader, successfully.

This above error is not seen now, and i see the EmrEtlrunner running successfully,but Postgresql db is empty. I see records being sent as one single record instead of 100+ fields.

@ckrishnamoorthy, are you using RDB Loader to load the data to Postgres DB? Here’s the target configuration file sample to go with the Loader.

Do note Postgress DB loading is restricted to loading only events table, no self-describing events (whether Snowplow-authored like link clicks, no custom events and entities). To get all your data to the warehouse you would use Redshift or Snowflake DB instead.

Thanks. Yes i am using RDB loader for Postgres. But it is not working, as EmrEtlRunner works without errors but there is no data. I shifted to Redshift but that gives errors at the enrich stage. I am posting the error in a separate post .