Copy from S3 to Redshift EmrEtlRunner staging stream rich not working

Hi Fellows,

My current pipeline is: Scala Collector Instance → Kinesis → Enrich Instance → Kinesis → S3 Loader Instance → S3

I am now trying to copy the data in S3 to Redshift. However, no matter what I tried, it just gives the error message below.

Please help! Thank you!!!

Here is the error result after executing:

java -Dorg.slf4j.simpleLogger.defaultLogLevel=debug -jar snowplow-emr-etl-runner run -c config/s3-to-redshift-config.yml -r config/iglu_resolver.json -t config/targets --skip staging_stream_enrich

ubuntu@ip-172-xx-xx-xxx:~$ java -Dorg.slf4j.simpleLogger.defaultLogLevel=debug -jar snowplow-emr-etl-runner run -c config/s3-to-redshift-config.yml -r config/iglu_resolver.json -t config/targets --skip staging_stream_enrich
uri:classloader:/gems/avro-1.8.1/lib/avro/schema.rb:350: warning: constant ::Fixnum is deprecated
ReturnContractError: Contract violation for return value:
Expected: #<Contracts::Maybe:0xf01fc6d @vals=[{:aws=>{:access_key_id=>String, :secret_access_key=>String, :s3=>{:region=>String, :buckets=>{:assets=>String, :jsonpath_assets=>#<Contracts::Maybe:0x6591f8ea @vals=[String, nil]>, :log=>String, :encrypted=>Contracts::Bool, :raw=>#<Contracts::Maybe:0x136fece2 @vals=[{:in=>#<Contracts::CollectionOf:0x77476fcf @collection_class=Array, @contract=String>, :processing=>String, :archive=>String}, nil]>, :enriched=>{:good=>String, :bad=>#<Contracts::Maybe:0x5dd747c1 @vals=[String, nil]>, :errors=>#<Contracts::Maybe:0x7aa3857b @vals=[String, nil]>, :archive=>#<Contracts::Maybe:0x37806be6 @vals=[String, nil]>, :stream=>#<Contracts::Maybe:0x1d2046bb @vals=[String, nil]>}, :shredded=>{:good=>String, :bad=>String, :errors=>#<Contracts::Maybe:0xfee881 @vals=[String, nil]>, :archive=>#<Contracts::Maybe:0x1ff463bb @vals=[String, nil]>}}, :consolidate_shredded_output=>Contracts::Bool}, :emr=>{:ami_version=>String, :region=>String, :jobflow_role=>String, :service_role=>String, :placement=>#<Contracts::Maybe:0x4ca0b9b1 @vals=[String, nil]>, :ec2_subnet_id=>#<Contracts::Maybe:0x1be77a76 @vals=[String, nil]>, :ec2_key_name=>String, :security_configuration=>#<Contracts::Maybe:0x25f7cc38 @vals=[String, nil]>, :bootstrap=>#<Contracts::Maybe:0x53e28097 @vals=[#<Contracts::CollectionOf:0x7747cc1b @collection_class=Array, @contract=String>, nil]>, :software=>{:hbase=>#<Contracts::Maybe:0x2e1ba142 @vals=[String, nil]>, :lingual=>#<Contracts::Maybe:0x595ec862 @vals=[String, nil]>}, :jobflow=>{:job_name=>String, :master_instance_type=>String, :core_instance_count=>Contracts::Num, :core_instance_type=>String, :core_instance_ebs=>#<Contracts::Maybe:0x173b1af1 @vals=[{:volume_size=>#<Proc:0x2ce24a1a@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:41 (lambda)>, :volume_type=>#<Proc:0x26bce60d@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:40 (lambda)>, :volume_iops=>#<Contracts::Maybe:0x76eadc5a @vals=[#<Proc:0x2ce24a1a@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:41 (lambda)>, nil]>, :ebs_optimized=>#<Contracts::Maybe:0x20914835 @vals=[Contracts::Bool, nil]>}, nil]>, :task_instance_count=>Contracts::Num, :task_instance_type=>String, :task_instance_bid=>#<Contracts::Maybe:0x6f68756d @vals=[Contracts::Num, nil]>}, :additional_info=>#<Contracts::Maybe:0x5d9bb69b @vals=[String, nil]>, :bootstrap_failure_tries=>Contracts::Num, :configuration=>#<Contracts::Maybe:0x7ffcb232 @vals=[#<Contracts::HashOf:0x1dd76982 @key=Symbol, @value=#<Contracts::HashOf:0x7e76a66f @key=Symbol, @value=String>>, nil]>}}, :collectors=>#<Contracts::Maybe:0x5feb82b3 @vals=[{:format=>String}, nil]>, :enrich=>{:versions=>#<Contracts::Maybe:0x54e0f76f @vals=[{:spark_enrich=>String}, nil]>, :continue_on_unexpected_error=>#<Contracts::Maybe:0x7462ba4b @vals=[Contracts::Bool, nil]>, :output_compression=>#<Proc:0x2b058bfd@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:39 (lambda)>}, :storage=>{:versions=>{:rdb_shredder=>String, :hadoop_elasticsearch=>String, :rdb_loader=>String}}, :monitoring=>{:tags=>#<Contracts::HashOf:0x7ee64b53 @key=Symbol, @value=String>, :logging=>{:level=>String}, :snowplow=>#<Contracts::Maybe:0x26d73519 @vals=[{:method=>String, :collector=>String, :app_id=>String}, nil]>}}, nil]>,
Actual: {:aws=>{:access_key_id=>“redacted”, :secret_access_key=>“redacted”, :s3=>{:region=>“ap-southeast-1”, :buckets=>{:assets=>“s3://snowplow-hosted-assets”, :jsonpath_assets=>nil, :log=>“s3://snowplow-s3-to-redshift-log”, :encrypted=>false, :enriched=>{:good=>nil, :archive=>“s3://snowplow-s3-to-redshift/archive”, :stream=>“s3://snowplow-s3storage”}, :shredded=>{:good=>“s3://snowplow-s3-to-redshift/shredded/good”, :bad=>“s3://snowplow-s3-to-redshift/shredded/bad”, :errors=>“ADD HERE”, :archive=>“s3://snowplow-s3-to-redshift/shredded/archive”}}, :consolidate_shredded_output=>false}, :emr=>{:ami_version=>“5.9.0”, :region=>“ap-southeast-1”, :jobflow_role=>“EMR_EC2_DefaultRole”, :service_role=>“EMR_DefaultRole”, :placement=>“ADD HERE”, :ec2_subnet_id=>“subnet-6aa9a123”, :ec2_key_name=>“conroychan2”, :security_configuration=>nil, :bootstrap=>, :software=>{:hbase=>nil, :lingual=>nil}, :jobflow=>{:job_name=>“Data Load from S3 to Redshift”, :master_instance_type=>“m1.medium”, :core_instance_count=>2, :core_instance_type=>“m1.medium”, :core_instance_ebs=>{:volume_size=>100, :volume_type=>“gp2”, :volume_iops=>400, :ebs_optimized=>false}, :task_instance_count=>0, :task_instance_type=>“m1.medium”, :task_instance_bid=>0.015}, :bootstrap_failure_tries=>3, :configuration=>{:“yarn-site”=>{:“yarn.resourcemanager.am.max-attempts”=>“1”}, :spark=>{:maximizeResourceAllocation=>“true”}}, :additional_info=>nil}}, :enrich=>{:versions=>{:spark_enrich=>“1.17.0”}, :output_compression=>“GZIP”}, :storage=>{:versions=>{:rdb_loader=>“0.14.0”, :rdb_shredder=>“0.13.1”, :hadoop_elasticsearch=>“0.1.0”}}, :monitoring=>{:tags=>{}, :logging=>{:level=>“DEBUG”}}}
Value guarded in: Snowplow::EmrEtlRunner::Cli::load_config
With Contract: Maybe, String, Bool => Maybe
At: uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/cli.rb:211
failure_callback at uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:32
call_with at uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:80
block in redefine_method at uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138
process_options at uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/cli.rb:199
get_args_config_enrichments_resolver at uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/cli.rb:173
send_to at uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_reference.rb:43
call_with at uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:76
block in redefine_method at uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138
at uri:classloader:/emr-etl-runner/bin/snowplow-emr-etl-runner:37
load at org/jruby/RubyKernel.java:994
at uri:classloader:/META-INF/main.rb:1
require at org/jruby/RubyKernel.java:970
(root) at uri:classloader:/META-INF/main.rb:1
at uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/rubygems/core_ext/kernel_require.rb:1
ERROR: org.jruby.embed.EvalFailedException: (ReturnContractError) Contract violation for return value:
Expected: #<Contracts::Maybe:0xf01fc6d @vals=[{:aws=>{:access_key_id=>String, :secret_access_key=>String, :s3=>{:region=>String, :buckets=>{:assets=>String, :jsonpath_assets=>#<Contracts::Maybe:0x6591f8ea @vals=[String, nil]>, :log=>String, :encrypted=>Contracts::Bool, :raw=>#<Contracts::Maybe:0x136fece2 @vals=[{:in=>#<Contracts::CollectionOf:0x77476fcf @collection_class=Array, @contract=String>, :processing=>String, :archive=>String}, nil]>, :enriched=>{:good=>String, :bad=>#<Contracts::Maybe:0x5dd747c1 @vals=[String, nil]>, :errors=>#<Contracts::Maybe:0x7aa3857b @vals=[String, nil]>, :archive=>#<Contracts::Maybe:0x37806be6 @vals=[String, nil]>, :stream=>#<Contracts::Maybe:0x1d2046bb @vals=[String, nil]>}, :shredded=>{:good=>String, :bad=>String, :errors=>#<Contracts::Maybe:0xfee881 @vals=[String, nil]>, :archive=>#<Contracts::Maybe:0x1ff463bb @vals=[String, nil]>}}, :consolidate_shredded_output=>Contracts::Bool}, :emr=>{:ami_version=>String, :region=>String, :jobflow_role=>String, :service_role=>String, :placement=>#<Contracts::Maybe:0x4ca0b9b1 @vals=[String, nil]>, :ec2_subnet_id=>#<Contracts::Maybe:0x1be77a76 @vals=[String, nil]>, :ec2_key_name=>String, :security_configuration=>#<Contracts::Maybe:0x25f7cc38 @vals=[String, nil]>, :bootstrap=>#<Contracts::Maybe:0x53e28097 @vals=[#<Contracts::CollectionOf:0x7747cc1b @collection_class=Array, @contract=String>, nil]>, :software=>{:hbase=>#<Contracts::Maybe:0x2e1ba142 @vals=[String, nil]>, :lingual=>#<Contracts::Maybe:0x595ec862 @vals=[String, nil]>}, :jobflow=>{:job_name=>String, :master_instance_type=>String, :core_instance_count=>Contracts::Num, :core_instance_type=>String, :core_instance_ebs=>#<Contracts::Maybe:0x173b1af1 @vals=[{:volume_size=>#<Proc:0x2ce24a1a@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:41 (lambda)>, :volume_type=>#<Proc:0x26bce60d@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:40 (lambda)>, :volume_iops=>#<Contracts::Maybe:0x76eadc5a @vals=[#<Proc:0x2ce24a1a@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:41 (lambda)>, nil]>, :ebs_optimized=>#<Contracts::Maybe:0x20914835 @vals=[Contracts::Bool, nil]>}, nil]>, :task_instance_count=>Contracts::Num, :task_instance_type=>String, :task_instance_bid=>#<Contracts::Maybe:0x6f68756d @vals=[Contracts::Num, nil]>}, :additional_info=>#<Contracts::Maybe:0x5d9bb69b @vals=[String, nil]>, :bootstrap_failure_tries=>Contracts::Num, :configuration=>#<Contracts::Maybe:0x7ffcb232 @vals=[#<Contracts::HashOf:0x1dd76982 @key=Symbol, @value=#<Contracts::HashOf:0x7e76a66f @key=Symbol, @value=String>>, nil]>}}, :collectors=>#<Contracts::Maybe:0x5feb82b3 @vals=[{:format=>String}, nil]>, :enrich=>{:versions=>#<Contracts::Maybe:0x54e0f76f @vals=[{:spark_enrich=>String}, nil]>, :continue_on_unexpected_error=>#<Contracts::Maybe:0x7462ba4b @vals=[Contracts::Bool, nil]>, :output_compression=>#<Proc:0x2b058bfd@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:39 (lambda)>}, :storage=>{:versions=>{:rdb_shredder=>String, :hadoop_elasticsearch=>String, :rdb_loader=>String}}, :monitoring=>{:tags=>#<Contracts::HashOf:0x7ee64b53 @key=Symbol, @value=String>, :logging=>{:level=>String}, :snowplow=>#<Contracts::Maybe:0x26d73519 @vals=[{:method=>String, :collector=>String, :app_id=>String}, nil]>}}, nil]>,
Actual: {:aws=>{:access_key_id=>“redacted”, :secret_access_key=>“redacted”, :s3=>{:region=>“ap-southeast-1”, :buckets=>{:assets=>“s3://snowplow-hosted-assets”, :jsonpath_assets=>nil, :log=>“s3://snowplow-s3-to-redshift-log”, :encrypted=>false, :enriched=>{:good=>nil, :archive=>“s3://snowplow-s3-to-redshift/archive”, :stream=>“s3://snowplow-s3storage”}, :shredded=>{:good=>“s3://snowplow-s3-to-redshift/shredded/good”, :bad=>“s3://snowplow-s3-to-redshift/shredded/bad”, :errors=>“ADD HERE”, :archive=>“s3://snowplow-s3-to-redshift/shredded/archive”}}, :consolidate_shredded_output=>false}, :emr=>{:ami_version=>“5.9.0”, :region=>“ap-southeast-1”, :jobflow_role=>“EMR_EC2_DefaultRole”, :service_role=>“EMR_DefaultRole”, :placement=>“ADD HERE”, :ec2_subnet_id=>“subnet-6aa9a123”, :ec2_key_name=>“conroychan2”, :security_configuration=>nil, :bootstrap=>, :software=>{:hbase=>nil, :lingual=>nil}, :jobflow=>{:job_name=>“Data Load from S3 to Redshift”, :master_instance_type=>“m1.medium”, :core_instance_count=>2, :core_instance_type=>“m1.medium”, :core_instance_ebs=>{:volume_size=>100, :volume_type=>“gp2”, :volume_iops=>400, :ebs_optimized=>false}, :task_instance_count=>0, :task_instance_type=>“m1.medium”, :task_instance_bid=>0.015}, :bootstrap_failure_tries=>3, :configuration=>{:“yarn-site”=>{:“yarn.resourcemanager.am.max-attempts”=>“1”}, :spark=>{:maximizeResourceAllocation=>“true”}}, :additional_info=>nil}}, :enrich=>{:versions=>{:spark_enrich=>“1.17.0”}, :output_compression=>“GZIP”}, :storage=>{:versions=>{:rdb_loader=>“0.14.0”, :rdb_shredder=>“0.13.1”, :hadoop_elasticsearch=>“0.1.0”}}, :monitoring=>{:tags=>{}, :logging=>{:level=>“DEBUG”}}}
Value guarded in: Snowplow::EmrEtlRunner::Cli::load_config
With Contract: Maybe, String, Bool => Maybe
At: uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/cli.rb:211

Here is my config file:

aws:

Credentials can be hardcoded or set in environment variables

access_key_id: XXXXXXXXXXXX
secret_access_key: ZZZZZZZZZZZZZZ

s3:
region: ap-southeast-1
buckets:
assets: s3://snowplow-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
jsonpath_assets: # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
log: s3://snowplow-s3-to-redshift-log
encrypted: false
enriched:
good: # e.g. s3://my-out-bucket/enriched/good
archive: s3://snowplow-s3-to-redshift/archive # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched
stream: s3://snowplow-s3storage # S3 Loader’s output folder with enriched data. If present raw buckets will be discarded
shredded:
good: s3://snowplow-s3-to-redshift/shredded/good # e.g. s3://my-out-bucket/shredded/good
bad: s3://snowplow-s3-to-redshift/shredded/bad # e.g. s3://my-out-bucket/shredded/bad
errors: ADD HERE # Leave blank unless :continue_on_unexpected_error: set to true below
archive: s3://snowplow-s3-to-redshift/shredded/archive # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded
consolidate_shredded_output: false # Whether to combine files when copying from hdfs to s3
emr:
ami_version: 5.9.0
region: ap-southeast-1 # Always set this
jobflow_role: EMR_EC2_DefaultRole # Created using $ aws emr create-default-roles
service_role: EMR_DefaultRole # Created using $ aws emr create-default-roles
placement: ADD HERE # Set this if not running in VPC. Leave blank otherwise
ec2_subnet_id: subnet-6aa9a123 # Set this if running in VPC. Leave blank otherwise
ec2_key_name: conroychan2
security_configuration: # Specify your EMR security configuration if needed. Leave blank otherwise
bootstrap: # Set this to specify custom boostrap actions. Leave empty otherwise
software:
hbase: # Optional. To launch on cluster, provide version, “0.92.0”, keep quotes. Leave empty otherwise.
lingual: # Optional. To launch on cluster, provide version, “1.1”, keep quotes. Leave empty otherwise.
# Adjust your Hadoop cluster below
jobflow:
job_name: Data Load from S3 to Redshift
master_instance_type: m1.medium
core_instance_count: 2
core_instance_type: m1.medium
core_instance_ebs: # Optional. Attach an EBS volume to each core instance.
volume_size: 100 # Gigabytes
volume_type: “gp2”
volume_iops: 400 # Optional. Will only be used if volume_type is “io1”
ebs_optimized: false # Optional. Will default to true
task_instance_count: 0 # Increase to use spot instances
task_instance_type: m1.medium
task_instance_bid: 0.015 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
configuration:
yarn-site:
yarn.resourcemanager.am.max-attempts: “1”
spark:
maximizeResourceAllocation: “true”
additional_info: # Optional JSON string for selecting additional features
enrich:
versions:
spark_enrich: 1.17.0 # Version of the Spark Enrichment process
output_compression: GZIP # Stream mode supports only GZIP
storage:
versions:
rdb_loader: 0.14.0
rdb_shredder: 0.13.1 # Version of the Spark Shredding process
hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
monitoring:
tags: {} # Name-value pairs describing this job
logging:
level: DEBUG # You can optionally switch to INFO for production

Here is the iglu_resolver.json:

{

“schema”: “iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-1”,

“data”: {

"cacheSize": 500,

"repositories": [

  {

    "name": "Iglu Central",

    "priority": 0,

    "vendorPrefixes": [ "com.snowplowanalytics" ],

    "connection": {

      "http": {

        "uri": "http://iglucentral.com"

      }

    }

  },

  {

    "name": "Iglu Central - GCP Mirror",

    "priority": 1,

    "vendorPrefixes": [ "com.snowplowanalytics" ],

    "connection": {

      "http": {

        "uri": "http://mirror01.iglucentral.com"

      }

    }

  }

]

}

}

Here is the redshift.json:

{

"schema": "iglu:com.snowplowanalytics.snowplow.storage/redshift_config/jsonschema/2-1-0",

"data": {

    "name": "AWS Redshift enriched events storage",

    "host": "xxxxxxx.ap-southeast-1.redshift.amazonaws.com",

    "database": "snowplowstream",

    "port": 5439,

    "sslMode": "DISABLE",

    "username": "storageloader",

    "password": "xxxxxxxxx",

    "roleArn": "arn:aws:iam::xxxxxxxxxxxx:role/RedshiftS3Access",

    "schema": "atomic",

    "maxError": 1,

    "compRows": 20000,

    "sshTunnel": null,

    "purpose": "ENRICHED_EVENTS"

}

}

@roy, the error message indicates your s3-to-redshift-config.yml format conflicts with the EmrEtlRunner version you use, hence ReturnContractError: Contract violation.

Your configuration file appears to correlate with the latest version R113. What version of EmrEtlRunner are you using? If the version is correct, make sure the config file has all the properties in place including correct indentation. Your snippet does not provide that visibility.

Hint: to retain the indentation when pasting in on this forum, do it between a couple of ``` quotes.

Thank you Ihor !
thats very helpful!