EmrEtlRunner::EmrExecutionError

Hi all,

I am using R92 of storage loader to store the events to postgreSQL database.
By using below command i am running:

./snowplow-emr-etl-runner run --config snowplow/4-storage/config/emretlrunner.yml --resolver snowplow/4-storage/config/iglu_resolver.json --targets snowplow/4-storage/config/targets/ --skip analyze

i am getting below error:

D, [2017-10-04T14:31:46.285000 #11914] DEBUG -- : Initializing EMR jobflow
D, [2017-10-04T14:32:01.675000 #11914] DEBUG -- : EMR jobflow j-IYEN5P6IYGJQ started, waiting for jobflow to complete...
I, [2017-10-04T14:44:05.143000 #11914]  INFO -- : No RDB Loader logs
F, [2017-10-04T14:44:05.478000 #11914] FATAL -- :

Snowplow::EmrEtlRunner::EmrExecutionError (EMR jobflow j-IYEN5P6IYGJQ failed, check Amazon EMR console and Hadoop logs for details (help: https://github.com/snowplow/snowplow/wiki/Troubleshooting-jobs-on-Elastic-MapReduce). Data files not archived.
Snowplow ETL: TERMINATING [STEP_FAILURE] ~ elapsed time n/a [2017-10-04 14:37:30 +0000 - ]
 - 1. Elasticity S3DistCp Step: Raw s3://unilogregion1/ -> Raw Staging S3: FAILED ~ 00:04:10 [2017-10-04 14:37:32 +0000 - 2017-10-04 14:41:42 +0000]
 - 2. Elasticity S3DistCp Step: Shredded S3 -> Shredded Archive S3: CANCELLED ~ elapsed time n/a [ - ]
 - 3. Elasticity S3DistCp Step: Enriched S3 -> Enriched Archive S3: CANCELLED ~ elapsed time n/a [ - ]
 - 4. Elasticity Custom Jar Step: Load PostgreSQL enriched events storage Storage Target: CANCELLED ~ elapsed time n/a [ - ]
 - 5. Elasticity S3DistCp Step: Raw Staging S3 -> Raw Archive S3: CANCELLED ~ elapsed time n/a [ - ]
 - 6. Elasticity S3DistCp Step: Shredded HDFS _SUCCESS -> S3: CANCELLED ~ elapsed time n/a [ - ]
 - 7. Elasticity S3DistCp Step: Shredded HDFS -> S3: CANCELLED ~ elapsed time n/a [ - ]
 - 8. Elasticity Spark Step: Shred Enriched Events: CANCELLED ~ elapsed time n/a [ - ]
 - 9. Elasticity Custom Jar Step: Empty Raw HDFS: CANCELLED ~ elapsed time n/a [ - ]
 - 10. Elasticity S3DistCp Step: Enriched HDFS _SUCCESS -> S3: CANCELLED ~ elapsed time n/a [ - ]
 - 11. Elasticity S3DistCp Step: Enriched HDFS -> S3: CANCELLED ~ elapsed time n/a [ - ]
 - 12. Elasticity Spark Step: Enrich Raw Events: CANCELLED ~ elapsed time n/a [ - ]
 - 13. Elasticity S3DistCp Step: Raw S3 -> Raw HDFS: CANCELLED ~ elapsed time n/a [ - ]):
	uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/emr_job.rb:586:in `run'
	uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_reference.rb:43:in `send_to'
	uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:76:in `call_with'
	uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138:in `block in redefine_method'
	uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/runner.rb:103:in `run'
	uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_reference.rb:43:in `send_to'
	uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:76:in `call_with'
	uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138:in `block in redefine_method'
	uri:classloader:/emr-etl-runner/bin/snowplow-emr-etl-runner:41:in `<main>'
	org/jruby/RubyKernel.java:979:in `load'
	uri:classloader:/META-INF/main.rb:1:in `<main>'
	org/jruby/RubyKernel.java:961:in `require'
	uri:classloader:/META-INF/main.rb:1:in `(root)'
	uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/rubygems/core_ext/kernel_require.rb:1:in `<main>'

Please help me to store the shredded events (present in s3 bucket) to postgreSql database.

Below is my emretlrunner.conf

	aws:
	  # Credentials can be hardcoded or set in environment variables
	  access_key_id: xxxxx
	  secret_access_key: xxxx
	  #keypair: Snowplowkeypair
	  #key-pair-file: /home/ubuntu/snowplow/4-storage/config/Snowplowkeypair.pem
	  region: us-east-1
	  s3:
		region: us-east-1
		buckets:
		  assets: s3://snowplow-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
		  jsonpath_assets: # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
		  log: s3://unilogregion1/logs
		  raw:
			in:                  # This is a YAML array of one or more in buckets - you MUST use hyphens before each entry in the array, as below
			  - s3://unilogregion1/      # e.g. s3://my-old-collector-bucket
			processing: s3://unilogregion1/raw/processing
			archive: s3://unilogregion1/raw/archive   # e.g. s3://my-archive-bucket/raw
		  enriched:
			good: s3://unilogregion1/enriched/good        # e.g. s3://my-out-bucket/enriched/good
			bad: s3://unilogregion1/enriched/bad       # e.g. s3://my-out-bucket/enriched/bad
			errors: s3://unilogregion1/enriched/errors     # Leave blank unless :continue_on_unexpected_error: set to true below
			archive: s3://unilogregion1/enriched/archive    # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched
		  shredded:
			good: s3://unilogregion1/shredded/good        # e.g. s3://my-out-bucket/shredded/good
			bad: s3://unilogregion1/shredded/bad        # e.g. s3://my-out-bucket/shredded/bad
			errors: s3://unilogregion1/shredded/errors     # Leave blank unless :continue_on_unexpected_error: set to true below
			archive: s3://unilogregion1/shredded/archive     # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded
	  emr:
		ami_version: 5.5.0
		region: us-east-1       # Always set this
		jobflow_role: EMR_EC2_DefaultRole # Created using $ aws emr create-default-roles
		service_role: EMR_DefaultRole     # Created using $ aws emr create-default-roles
		placement: us-east-1a      # Set this if not running in VPC. Leave blank otherwise
		ec2_subnet_id:  # Set this if running in VPC. Leave blank otherwise
		ec2_key_name: Snowplowkeypair
		bootstrap: []           # Set this to specify custom boostrap actions. Leave empty otherwise
		software:
		  hbase:              # Optional. To launch on cluster, provide version, "0.92.0", keep quotes. Leave empty otherwise.
		  lingual:              # Optional. To launch on cluster, provide version, "1.1", keep quotes. Leave empty otherwise.
		# Adjust your Hadoop cluster below
		jobflow:
		  job_name: Snowplow ETL # Give your job a name
		  master_instance_type: m2.4xlarge
		  core_instance_count: 2
		  core_instance_type: m2.4xlarge
		  core_instance_ebs:    # Optional. Attach an EBS volume to each core instance.
			volume_size: 100    # Gigabytes
			volume_type: "gp2"
			volume_iops: 400    # Optional. Will only be used if volume_type is "io1"
			ebs_optimized: false # Optional. Will default to true
		  task_instance_count: 0 # Increase to use spot instances
		  task_instance_type: m2.4xlarge
		  task_instance_bid: 0.015 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
		bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
		configuration:
		  yarn-site:
			yarn.resourcemanager.am.max-attempts: "1"
		  spark:
			maximizeResourceAllocation: "true"
		additional_info:        # Optional JSON string for selecting additional features
	collectors:
	  format: thrift # For example: 'clj-tomcat' for the Clojure Collector, 'thrift' for Thrift records, 'tsv/com.amazon.aws.cloudfront/wd_access_log' for Cloudfront access logs or 'ndjson/urbanairship.connect/v1' for UrbanAirship Connect events
	enrich:
	  versions:
		spark_enrich: 1.9.0 # Version of the Spark Enrichment process
	  continue_on_unexpected_error: false # Set to 'true' (and set :out_errors: above) if you don't want any exceptions thrown from ETL
	  output_compression: NONE # Compression only supported with Redshift, set to NONE if you have Postgres targets. Allowed formats: NONE, GZIP
	storage:
	  versions:
		rdb_loader: 0.12.0
		rdb_shredder: 0.12.0        # Version of the Spark Shredding process
		hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
	monitoring:
	  tags: {} # Name-value pairs describing this job
	  logging:
		level: DEBUG # You can optionally switch to INFO for production
	  #snowplow:
		#method: get
		#app_id: unilog # e.g. snowplow
		#collector: 172.31.38.39:8082 # e.g. d3rkrsqld9gmqf.cloudfront.net

Iglu_resolver.json file is below.

{
  "schema": "iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-1",
  "data": {
	"cacheSize": 500,
	"repositories": [
	  {
		"name": "Iglu Central",
		"priority": 0,
		"vendorPrefixes": [ "com.snowplowanalytics" ],
		"connection": {
		  "http": {
			"uri": "http://iglucentral.com"
		  }
		}
	  }
	]
  }
}

In the targets section i have kept postgres.json and below is that file.

{
	"schema": "iglu:com.snowplowanalytics.snowplow.storage/postgresql_config/jsonschema/1-0-0",
	"data": {
		"name": "PostgreSQL enriched events storage",
		"host": "localhost",
		"database": "snowplow",
		"port": 5432,
		"sslMode": "DISABLE",
		"username": "power_user",
		"password": "hadoop",
		"schema": "atomic",
		"purpose": "ENRICHED_EVENTS"
	}
}

I am following below architecture.

JavaScript Tracker --> Scala Stream Collector --> Stream enrich --> kinesis S3 --> S3 -> EmrEtlRunner (shredding) -> PostgreSQL

Please help me out to store the events in postgreSQL database.

Hi @sandesh,

Sorry, yesterday I missed the fact that you’re trying to build a pipeline with Stream Enrich instead of Spark Enrich. Command that you posted launching entire batch pipeline and failing most likely because of the fact that there’s nothing to copy.

On the other hand - you already have data enriched with Kinesis and it means you just need to start pipeline from shredding step. You can do this by adding this option: --resume-from shred to EmrEtlRunner.

Another important caution is that PostgreSQL does not fully support shredded data. You’ll be able to load only atomic.events table and it can be less than what you want. There’s a ticket for that. But if you’re aware of this limitation - above architecture should work.

Thanks anton for sharing every steps details.
do i need to run below command as you mention in second part.

./snowplow-emr-etl-runner run --config snowplow/4-storage/config/emretlrunner.yml --resolver snowplow/3-enrich/config/iglu_resolver.json --enrichments snowplow/3-enrich/config/enrichments/ --resume-from shred

Is this the command i need to run?

PostgreSQl doesnt fullt support shredded data means, shall i use redshift instead of PostgreSQL?
suggest me which is better storage loader.

I ran the below command as you said.

./snowplow-emr-etl-runner run --config snowplow/4-storage/config/emretlrunner.yml --resolver snowplow/3-enrich/config/iglu_resolver.json --enrichments snowplow/3-enrich/config/enrichments/ --resume-from shred

Below is the error.

D, [2017-10-05T07:40:06.265000 #16669] DEBUG -- : Initializing EMR jobflow
E, [2017-10-05T07:40:08.998000 #16669] ERROR -- : No run folders in [s3://unilogregion1/enriched/good/] found
F, [2017-10-05T07:40:09.004000 #16669] FATAL -- :

Snowplow::EmrEtlRunner::UnexpectedStateError (No run folders in [s3://unilogregion1/enriched/good/] found):
	uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/emr_job.rb:715:in `get_latest_run_id'
	uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/emr_job.rb:480:in `initialize'
	uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_reference.rb:43:in `send_to'
	uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:76:in `call_with'
	uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138:in `block in redefine_method'
	uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/runner.rb:100:in `run'
	uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_reference.rb:43:in `send_to'
	uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:76:in `call_with'
	uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138:in `block in redefine_method'
	uri:classloader:/emr-etl-runner/bin/snowplow-emr-etl-runner:41:in `<main>'
	org/jruby/RubyKernel.java:979:in `load'
	uri:classloader:/META-INF/main.rb:1:in `<main>'
	org/jruby/RubyKernel.java:961:in `require'
	uri:classloader:/META-INF/main.rb:1:in `(root)'
	uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/rubygems/core_ext/kernel_require.rb:1:in `<main>'

When that run folders will get created?