Cannot start EmrEtlRunner due to Contract Violation


#1

Hello,
I cannot run EmrEtlRunner due to contract violation error.
The common snowplow usage flow is described on


At the moment the next part is done:
Web application => Scala Stream Collector => Stream Enrich => Amazon Firehose => Amazon S3 (bucket “raken-events-dev”).
My goal is to transfer (shred) enriched data from S3:raken-events-dev to Postgres RDB.
I am trying to use EmrEtrRunner (shred only) installed on Amazon EC2. Iglu is also installed there.
Version: http://dl.bintray.com/snowplow/snowplow-generic/snowplow_emr_r92_maiden_castle_rc3.zip

I have contract violation error when I try to run, please see Error.txt. I cannot find a place where contract is violated. Could you please help me to correct my emr-etl-runner.config.yaml ?

[ec2-user@dev1 etl]$ snowplow-emr-etl-runner run -f shred --config /home/ec2-user/etl/emr-etl-runner.config.yaml --targets file:///home/ec2-user/etl/postgres-config.json --resolver file:///home/ec2-user/etl/iglu.resolver.json
F, [2017-09-06T17:05:56.846000 #2785] FATAL -- :

ReturnContractError (Contract violation for return value:
        Expected: #<Contracts::Maybe:0x7c9512c6 @vals=[{:aws=>{:access_key_id=>String, :secret_access_key=>String, :s3=>{:region=>String, :buckets=>{:assets=>String, :jsonpath_assets=>#<Contracts::Maybe:0x53d87b2d @vals=[String, nil]>, :log=>String, :raw=>{:in=>#<Contracts::CollectionOf:0x118a4d5 @contract=String, @collection_class=Array>, :processing=>String, :archive=>String}, :enriched=>{:good=>String, :bad=>String, :errors=>#<Contracts::Maybe:0xbc52a41 @vals=[String, nil]>, :archive=>#<Contracts::Maybe:0x9f48198 @vals=[String, nil]>}, :shredded=>{:good=>String, :bad=>String, :errors=>#<Contracts::Maybe:0x3dc2f14 @vals=[String, nil]>, :archive=>#<Contracts::Maybe:0x2e8986b6 @vals=[String, nil]>}}}, :emr=>{:ami_version=>String, :region=>String, :jobflow_role=>String, :service_role=>String, :placement=>#<Contracts::Maybe:0x18d22ecf @vals=[String, nil]>, :ec2_subnet_id=>#<Contracts::Maybe:0x31443680 @vals=[String, nil]>, :ec2_key_name=>String, :bootstrap=>#<Contracts::Maybe:0x878feb2 @vals=[#<Contracts::CollectionOf:0xf98cff @contract=String, @collection_class=Array>, nil]>, :software=>{:hbase=>#<Contracts::Maybe:0x31b650e9 @vals=[String, nil]>, :lingual=>#<Contracts::Maybe:0x5b5b8730 @vals=[String, nil]>}, :jobflow=>{:job_name=>String, :master_instance_type=>String, :core_instance_count=>Contracts::Num, :core_instance_type=>String, :core_instance_ebs=>#<Contracts::Maybe:0x1ab9c735 @vals=[{:volume_size=>#<Proc:0x5a4e492c@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:28 (lambda)>, :volume_type=>#<Proc:0x1d33e72e@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:27 (lambda)>, :volume_iops=>#<Contracts::Maybe:0x26a202ae @vals=[#<Proc:0x5a4e492c@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:28 (lambda)>, nil]>, :ebs_optimized=>#<Contracts::Maybe:0x6e12f38c @vals=[Contracts::Bool, nil]>}, nil]>, :task_instance_count=>Contracts::Num, :task_instance_type=>String, :task_instance_bid=>#<Contracts::Maybe:0x6f9a3325 @vals=[Contracts::Num, nil]>}, :additional_info=>#<Contracts::Maybe:0x65b2ee36 @vals=[String, nil]>, :bootstrap_failure_tries=>Contracts::Num, :configuration=>#<Contracts::Maybe:0x11e24688 @vals=[#<Contracts::HashOf:0x34e9de8d @key=Symbol, @value=#<Contracts::HashOf:0x33a8f553 @key=Symbol, @value=String>>, nil]>}}, :collectors=>{:format=>String}, :enrich=>{:versions=>{:spark_enrich=>String}, :continue_on_unexpected_error=>Contracts::Bool, :output_compression=>#<Proc:0x61ca5134@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:26 (lambda)>}, :storage=>{:versions=>{:rdb_shredder=>String, :hadoop_elasticsearch=>String, :rdb_loader=>String}}, :monitoring=>{:tags=>#<Contracts::HashOf:0x7252bdb @key=Symbol, @value=String>, :logging=>{:level=>String}, :snowplow=>#<Contracts::Maybe:0x7b64bbad @vals=[{:method=>String, :collector=>String, :app_id=>String}, nil]>}}, nil]>,
        Actual: {:aws=>{:access_key_id=>"XXXXXX", :secret_access_key=>"XXXXXXXXXXXXX", :s3=>{:region=>"us-west-2", :buckets=>{:assets=>"s3://snowplow-hosted-assets", :jsonpath_assets=>nil, :log=>"s3://raken-analytics-dev-error/log", :raw=>{:in=>["s3://raken-analytics-dev-error/raw/in"], :processing=>"s3://raken-analytics-dev-error/raw/processing", :archive=>"s3://raken-analytics-dev-error/raw/archive"}, :enriched=>{:good=>"s3://raken-analytics-dev", :bad=>"s3://raken-analytics-dev-error/enriched/bad", :errors=>nil, :archive=>nil}, :shredded=>{:good=>"s3://raken-analytics-dev-temp/shredded/good", :bad=>"s3://raken-analytics-dev-temp/shredded/bad", :errors=>"s3://raken-analytics-dev-error/shredded", :archive=>"s3://raken-analytics-dev-temp/shredded/archive"}}}, :emr=>{:job_name=>"Snowplow ETL", :ami_version=>"5.5.0", :region=>"us-west-2", :jobflow_role=>"EMR_EC2_DefaultRole", :service_role=>"EMR_DefaultRole", :placement=>"us-west-2", :ec2_subnet_id=>nil, :ec2_key_name=>"development", :bootstrap=>nil, :software=>{:hbase=>nil, :lingual=>nil}, :jobflow=>{:master_instance_type=>"m1.medium", :core_instance_count=>2, :core_instance_type=>"m1.medium", :core_instance_ebs=>{:volume_size=>100, :volume_type=>"gp2", :volume_iops=>400, :ebs_optimized=>false}, :task_instance_count=>0, :task_instance_type=>"m1.medium", :task_instance_bid=>0.015}, :bootstrap_failure_tries=>3, :additional_info=>nil}}, :collectors=>{:format=>"thrift"}, :enrich=>{:versions=>{:spark_enrich=>"1.9.0"}, :continue_on_unexpected_error=>true, :output_compression=>"NONE"}, :storage=>{:versions=>{:rdb_shredder=>"0.12.0", :rdb_loader=>"0.12.0", :hadoop_elasticsearch=>"0.1.0"}}, :monitoring=>{:tags=>{}, :logging=>{:level=>"TRACE"}, :snowplow=>{:method=>"get", :app_id=>"raken-local", :collector=>"localhost:8088"}}}
        Value guarded in: Snowplow::EmrEtlRunner::Cli::load_config
        With Contract: Maybe, String, Bool => Maybe
        At: uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/cli.rb:202 ):
    uri:classloader:/gems/contracts-0.11.0/lib/contracts.rb:45:in `block in Contract'
    uri:classloader:/gems/contracts-0.11.0/lib/contracts.rb:154:in `failure_callback'
    uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:80:in `call_with'
    uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138:in `block in redefine_method'
    uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/cli.rb:191:in `process_options'
    uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/cli.rb:167:in `get_args_config_enrichments_resolver'
    uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_reference.rb:43:in `send_to'
    uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:76:in `call_with'
    uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138:in `block in redefine_method'
    uri:classloader:/emr-etl-runner/bin/snowplow-emr-etl-runner:37:in `<main>'
    org/jruby/RubyKernel.java:979:in `load'
    uri:classloader:/META-INF/main.rb:1:in `<main>'
    org/jruby/RubyKernel.java:961:in `require'
    uri:classloader:/META-INF/main.rb:1:in `(root)'
    uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/rubygems/core_ext/kernel_require.rb:1:in `<main>'

Additioanal Questions:

  1. Do I understand correctly that for my goal I need to use only “shred” process and ignore others, because “collect” and “enrich” are already done without EmrEtlRunner?

3 ) . I use “-f shred” to run shred process without collect and enrich. Does it correct? Or there is another way to start shred without collect and enrich?
I use [ec2-user@dev1 ~]$ snowplow-emr-etl-runner run -f shred --config /home/ec2-user/etl/emr-etl-runner.config.yaml --targets file:///home/ec2-user/etl/postgres-config.json --resolver file:///home/ec2-user/etl/iglu.resolver.json to run EmrEtlRunner

  1. Below is a formatted version where I added comments “# FAKE NOT EXPECTED TO USE” to the buckets and collector, which are required by contract but not necessary as I suppose. Do I understand correctly, that these buckets and collector are not used by EmrEtlRunner in case when I run shred process only ( -f shred) ?

aws:
access_key_id: XXXXXXXXXXXXXXXXX
secret_access_key: XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXx
s3:
region: us-west-2
buckets:
assets: s3://snowplow-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
jsonpath_assets: # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
log: s3://raken-analytics-dev-error/log
raw:
in:
- s3://raken-analytics-dev-error/raw/in # FAKE NOT EXPECTED TO USE
processing: s3://raken-analytics-dev-error/raw/processing # FAKE NOT EXPECTED TO USE
archive: s3://raken-analytics-dev-error/raw/archive # FAKE NOT EXPECTED TO USE
enriched:
good: s3://raken-analytics-dev # SOURCE BUCKED WITH ENRICHED DATA
bad: s3://raken-analytics-dev-error/enriched/bad # FAKE NOT EXPECTED TO USE
errors: # Leave blank unless continue_on_unexpected_error: set to true below
archive: # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched
shredded:
good: s3://raken-analytics-dev-temp/shredded/good # OUTPUT BUCKET
bad: s3://raken-analytics-dev-temp/shredded/bad # OUTPUT BUCKET
errors: s3://raken-analytics-dev-error/shredded # OUTPUT BUCKET
archive: s3://raken-analytics-dev-temp/shredded/archive # OUTPUT BUCKET
emr:
job_name: Snowplow ETL # Give your job a name
ami_version: 5.5.0 # Don’t change this
region: us-west-2 # Always set this
jobflow_role: EMR_EC2_DefaultRole # Created using aws emr create-default-roles service_role: EMR_DefaultRole # Created using aws emr create-default-roles
placement: us-west-2 # Set this if not running in VPC. Leave blank otherwise
ec2_subnet_id: # Set this if running in VPC. Leave blank otherwise
ec2_key_name: development
bootstrap: # Set this to specify custom boostrap actions. Leave empty otherwise
software:
hbase: # Optional. To launch on cluster, provide version, “0.92.0”, keep quotes. Leave empty otherwise.
lingual: # Optional. To launch on cluster, provide version, “1.1”, keep quotes. Leave empty otherwise.
# Adjust your Spark cluster below
jobflow:
master_instance_type: m1.medium
core_instance_count: 2
core_instance_type: m1.medium
core_instance_ebs: # Optional. Attach an EBS volume to each core instance.
volume_size: 100 # Gigabytes
volume_type: "gp2"
volume_iops: 400 # Optional. Will only be used if volume_type is "io1"
ebs_optimized: false # Optional. Will default to true
task_instance_count: 0 # Increase to use spot instances
task_instance_type: m1.medium
task_instance_bid: 0.015 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
additional_info: # Optional JSON string for selecting additional features
collectors:
format: thrift # Or ‘clj-tomcat’ for the Clojure Collector, or ‘thrift’ for Thrift records, or ‘tsv/com.amazon.aws.cloudfront/wd_access_log’ for Cloudfront access logs
enrich:
versions:
spark_enrich: 1.9.0 # Version of the Spark Enrichment process
continue_on_unexpected_error: true # Set to ‘true’ (and set out_errors: above) if you don’t want any exceptions thrown from ETL
output_compression: NONE # Compression only supported with Redshift, set to NONE if you have Postgres targets. Allowed formats: NONE, GZIP
storage:
versions:
rdb_shredder: 0.12.0 # Version of the Relational Database Shredding process
rdb_loader: 0.12.0 # Version of the Relational Database Loader app
hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
monitoring:
tags: {} # Name-value pairs describing this job
logging:
level: TRACE # You can optionally switch to INFO for production
snowplow:
method: get
app_id: raken-local # e.g. snowplow
collector: localhost:8088 # FAKE NOT EXPECTED TO USE

I would appreciate your help.
Thank you.