Error while running EmrEltRunner

Hi @anton,

I have been getting “ERROR: org.jruby.embed.EvalFailedException: (ReturnContractError) Contract violation for return value:” while running EmrEtl.

EmrEtl version - snowplow_emr_1.0.2.

Command to run EmrEtlRunner: ./snowplow-emr-etl-runner run --config emretlrunner.yml --resolver resolver.json --resume-from shred

i am using nsq for collector and enrichment then loading the data into the s3.
above step is for loading the the data from s3 to postgres.

Config file

aws:
  # Credentials can be hardcoded or set in environment variables
  access_key_id: XXXXXXXXX
  secret_access_key: XXXXXXXXXXXX
  s3:
    region: us-east-2
    buckets:
      assets: s3://snowplow-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
      jsonpath_assets: # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
      log: ADD HERE
      encrypted: false # Whether the buckets below are enrcrypted using server side encryption (SSE-S3)
      raw:
        in:                  # This is a YAML array of one or more in buckets - you MUST use hyphens before each entry in the array, as below
          #- ADD HERE         # e.g. s3://my-old-collector-bucket
          #- ADD HERE         # e.g. s3://my-new-collector-bucket
        processing: ADD HERE
        archive: ADD HERE    # e.g. s3://my-archive-bucket/raw
      enriched:
        good: s3://clickstreamstorages3       # e.g. s3://my-out-bucket/enriched/good
        bad: s3://clickstreamstorages3       # e.g. s3://my-out-bucket/enriched/bad
        errors: ADD HERE     # Leave blank unless :continue_on_unexpected_error: set to true below
        archive: s3:clickstreamstorages3    # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched
      shredded:
        good: s3://shredded/good      # e.g. s3://my-out-bucket/shredded/good
        bad: s3://shredded/bad        # e.g. s3://my-out-bucket/shredded/bad
        errors:      # Leave blank unless :continue_on_unexpected_error: set to true below
        archive: s3://shredded    # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded
    consolidate_shredded_output: false # Whether to combine files when copying from hdfs to s3
  emr:
    ami_version: 5.9.0
    region: us-east-2      # Always set this
    jobflow_role: EMR_EC2_DefaultRole # Created using $ aws emr create-default-roles
    service_role: EMR_DefaultRole     # Created using $ aws emr create-default-roles
    placement: ADD HERE     # Set this if not running in VPC. Leave blank otherwise
    ec2_subnet_id: ADD HERE # Set this if running in VPC. Leave blank otherwise
    ec2_key_name: ADD HERE
    security_configuration: ADD HERE # Specify your EMR security configuration if needed. Leave blank otherwise
    bootstrap: []           # Set this to specify custom boostrap actions. Leave empty otherwise
    software:
      hbase:                # Optional. To launch on cluster, provide version, "0.92.0", keep quotes. Leave empty otherwise.
      lingual:              # Optional. To launch on cluster, provide version, "1.1", keep quotes. Leave empty otherwise.
    # Adjust your Hadoop cluster below
    jobflow:
      job_name: Snowplow ETL # Give your job a name
      master_instance_type: m1.medium
      core_instance_count: 2
      core_instance_type: m1.medium
      core_instance_bid: 0.015
      core_instance_ebs:    # Optional. Attach an EBS volume to each core instance.
        volume_size: 100    # Gigabytes
        volume_type: "gp2"
        volume_iops: 400    # Optional. Will only be used if volume_type is "io1"
        ebs_optimized: false # Optional. Will default to true
      task_instance_count: 0 # Increase to use spot instances
      task_instance_type: m1.medium
      task_instance_bid: 0.015 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
    bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
    configuration:
      yarn-site:
        yarn.resourcemanager.am.max-attempts: "1"
      spark:
        maximizeResourceAllocation: "true"
    additional_info:        # Optional JSON string for selecting additional features
collectors:
  format: cloudfront # For example: 'clj-tomcat' for the Clojure Collector, 'thrift' for Thrift records, 'tsv/com.amazon.aws.cloudfront/wd_access_log' for Cloudfront access logs or 'ndjson/urbanairship.connect/v1' for UrbanAirship Connect events
enrich:
  versions:
    spark_enrich: 1.18.0 # Version of the Spark Enrichment process
  continue_on_unexpected_error: false # Set to 'true' (and set :out_errors: above) if you don't want any exceptions thrown from ETL
  output_compression: NONE # Compression only supported with Redshift, set to NONE if you have Postgres targets. Allowed formats: NONE, GZIP
storage:
  versions:
    rdb_loader: 0.14.0
    rdb_shredder: 0.13.1        # Version of the Spark Shredding process
    hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
monitoring:
  tags: {} # Name-value pairs describing this job
  logging:
    level: DEBUG # You can optionally switch to INFO for production
  snowplow:
    method: get
    protocol: http
    port: 80
    app_id: ADD HERE # e.g. snowplow
    collector: ADD HERE # e.g. d3rkrsqld9gmqf.cloudfront.net

Thanks :slight_smile:

Hi @Vipin,

Can you provide us whole error message? It should have a line number where this error has been thrown. It might be a bug introduced in 1.x series, so also worth to try out 0.38.1

ReturnContractError: Contract violation for return value:
        Expected: #<Contracts::Maybe:0x1f013047 @vals=[{:aws=>{:access_key_id=>String, :secret_access_key=>String, :s3=>{:region=>String, :buckets=>{:assets=>String, :jsonpath_assets=>#<Contracts::Maybe:0x1ab9c735 @vals=[String, nil]>, :log=>String, :raw=>#<Contracts::Maybe:0x3bcedb06 @vals=[{:in=>#<Contracts::CollectionOf:0x450f0d89 @contract=String, @collection_class=Array>, :processing=>String, :archive=>String}, nil]>, :enriched=>{:good=>String, :bad=>#<Contracts::Maybe:0x16d4024e @vals=[String, nil]>, :errors=>#<Contracts::Maybe:0x63d3c9dc @vals=[String, nil]>, :archive=>#<Contracts::Maybe:0x35d114f4 @vals=[String, nil]>, :stream=>#<Contracts::Maybe:0x4be460e5 @vals=[String, nil]>}, :shredded=>{:good=>String, :bad=>String, :errors=>#<Contracts::Maybe:0x6807989e @vals=[String, nil]>, :archive=>#<Contracts::Maybe:0xc2e33 @vals=[String, nil]>}}}, :emr=>{:ami_version=>String, :region=>String, :jobflow_role=>String, :service_role=>String, :placement=>#<Contracts::Maybe:0x4f2544b0 @vals=[String, nil]>, :ec2_subnet_id=>#<Contracts::Maybe:0x41ec4271 @vals=[String, nil]>, :ec2_key_name=>String, :bootstrap=>#<Contracts::Maybe:0x24f90aff @vals=[#<Contracts::CollectionOf:0x75c589f2 @contract=String, @collection_class=Array>, nil]>, :software=>{:hbase=>#<Contracts::Maybe:0x201a8ea7 @vals=[String, nil]>, :lingual=>#<Contracts::Maybe:0x7b64bbad @vals=[String, nil]>}, :jobflow=>{:job_name=>String, :master_instance_type=>String, :core_instance_count=>Contracts::Num, :core_instance_type=>String, :core_instance_ebs=>#<Contracts::Maybe:0x13662609 @vals=[{:volume_size=>#<Proc:0x451a4187@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:28 (lambda)>, :volume_type=>#<Proc:0x565c887e@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:27 (lambda)>, :volume_iops=>#<Contracts::Maybe:0x5c215642 @vals=[#<Proc:0x451a4187@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:28 (lambda)>, nil]>, :ebs_optimized=>#<Contracts::Maybe:0x416c1b0 @vals=[Contracts::Bool, nil]>}, nil]>, :task_instance_count=>Contracts::Num, :task_instance_type=>String, :task_instance_bid=>#<Contracts::Maybe:0x4c000cc4 @vals=[Contracts::Num, nil]>}, :additional_info=>#<Contracts::Maybe:0x45d46254 @vals=[String, nil]>, :bootstrap_failure_tries=>Contracts::Num, :configuration=>#<Contracts::Maybe:0x1a4cbcc6 @vals=[#<Contracts::HashOf:0x176e839e @key=Symbol, @value=#<Contracts::HashOf:0x4ac0d49 @key=Symbol, @value=String>>, nil]>}}, :collectors=>#<Contracts::Maybe:0x56402642 @vals=[{:format=>String}, nil]>, :enrich=>{:versions=>#<Contracts::Maybe:0x5358c8fa @vals=[{:spark_enrich=>String}, nil]>, :continue_on_unexpected_error=>#<Contracts::Maybe:0x58b3eb1 @vals=[Contracts::Bool, nil]>, :output_compression=>#<Proc:0xce0bbd5@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:26 (lambda)>}, :storage=>{:versions=>{:rdb_shredder=>String, :hadoop_elasticsearch=>String, :rdb_loader=>String}}, :monitoring=>{:tags=>#<Contracts::HashOf:0xe2c627e @key=Symbol, @value=String>, :logging=>{:level=>String}, :snowplow=>#<Contracts::Maybe:0x59a2bed1 @vals=[{:method=>String, :collector=>String, :app_id=>String}, nil]>}}, nil]>,
        Actual: {:aws=>{:access_key_id=>"...", :secret_access_key=>"...", :s3=>{:region=>"us-east-2", :buckets=>{:assets=>"s3://snowplow-hosted-assets", :jsonpath_assets=>nil, :log=>"ADD HERE", :encrypted=>false, :raw=>{:in=>nil, :processing=>"ADD HERE", :archive=>"ADD HERE"}, :enriched=>{:good=>"s3://clickstreamstorages3", :bad=>"s3://clickstreamstorages3", :errors=>"ADD HERE", :archive=>"s3:clickstreamstorages3"}, :shredded=>{:good=>"s3://shredded/good", :bad=>"s3://shredded/bad", :errors=>nil, :archive=>"s3://shredded"}}, :consolidate_shredded_output=>false}, :emr=>{:ami_version=>"5.9.0", :region=>"us-east-2", :jobflow_role=>"EMR_EC2_DefaultRole", :service_role=>"EMR_DefaultRole", :placement=>"ADD HERE", :ec2_subnet_id=>"ADD HERE", :ec2_key_name=>"ADD HERE", :security_configuration=>"ADD HERE", :bootstrap=>[\*], :software=>{:hbase=>nil, :lingual=>nil}, :jobflow=>{:job_name=>"Snowplow ETL", :master_instance_type=>"m1.medium", :core_instance_count=>2, :core_instance_type=>"m1.medium", :core_instance_bid=>0.015, :core_instance_ebs=>{:volume_size=>100, :volume_type=>"gp2", :volume_iops=>400, :ebs_optimized=>false}, :task_instance_count=>0, :task_instance_type=>"m1.medium", :task_instance_bid=>0.015}, :bootstrap_failure_tries=>3, :configuration=>{:"yarn-site"=>{:"yarn.resourcemanager.am.max-attempts"=>"1"}, :spark=>{:maximizeResourceAllocation=>"true"}}, :additional_info=>nil}}, :collectors=>{:format=>"cloudfront"}, :enrich=>{:versions=>{:spark_enrich=>"1.18.0"}, :continue_on_unexpected_error=>false, :output_compression=>"NONE"}, :storage=>{:versions=>{:rdb_loader=>"0.14.0", :rdb_shredder=>"0.13.1", :hadoop_elasticsearch=>"0.1.0"}}, :monitoring=>{:tags=>{}, :logging=>{:level=>"DEBUG"}, :snowplow=>{:method=>"get", :protocol=>"http", :port=>80, :app_id=>"ADD HERE", :collector=>"ADD HERE"}}}
        Value guarded in: Snowplow::EmrEtlRunner::Cli::load_config
        With Contract: Maybe, String, Bool => Maybe
        At: uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/cli.rb:202 
                     block in Contract at uri:classloader:/gems/contracts-0.11.0/lib/contracts.rb:45
                      failure_callback at uri:classloader:/gems/contracts-0.11.0/lib/contracts.rb:154
                             call_with at uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:80
              block in redefine_method at uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138
                       process_options at uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/cli.rb:191
  get_args_config_enrichments_resolver at uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/cli.rb:167
                               send_to at uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_reference.rb:43
                             call_with at uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:76
              block in redefine_method at uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138
                                <main> at uri:classloader:/emr-etl-runner/bin/snowplow-emr-etl-runner:37
                                  load at org/jruby/RubyKernel.java:979
                                <main> at uri:classloader:/META-INF/main.rb:1
                               require at org/jruby/RubyKernel.java:961
                                (root) at uri:classloader:/META-INF/main.rb:1
                                <main> at uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/rubygems/core_ext/kernel_require.rb:1
ERROR: org.jruby.embed.EvalFailedException: (ReturnContractError) Contract violation for return value:
        Expected: #<Contracts::Maybe:0x1f013047 @vals=[{:aws=>{:access_key_id=>String, :secret_access_key=>String, :s3=>{:region=>String, :buckets=>{:assets=>String, :jsonpath_assets=>#<Contracts::Maybe:0x1ab9c735 @vals=[String, nil]>, :log=>String, :raw=>#<Contracts::Maybe:0x3bcedb06 @vals=[{:in=>#<Contracts::CollectionOf:0x450f0d89 @contract=String, @collection_class=Array>, :processing=>String, :archive=>String}, nil]>, :enriched=>{:good=>String, :bad=>#<Contracts::Maybe:0x16d4024e @vals=[String, nil]>, :errors=>#<Contracts::Maybe:0x63d3c9dc @vals=[String, nil]>, :archive=>#<Contracts::Maybe:0x35d114f4 @vals=[String, nil]>, :stream=>#<Contracts::Maybe:0x4be460e5 @vals=[String, nil]>}, :shredded=>{:good=>String, :bad=>String, :errors=>#<Contracts::Maybe:0x6807989e @vals=[String, nil]>, :archive=>#<Contracts::Maybe:0xc2e33 @vals=[String, nil]>}}}, :emr=>{:ami_version=>String, :region=>String, :jobflow_role=>String, :service_role=>String, :placement=>#<Contracts::Maybe:0x4f2544b0 @vals=[String, nil]>, :ec2_subnet_id=>#<Contracts::Maybe:0x41ec4271 @vals=[String, nil]>, :ec2_key_name=>String, :bootstrap=>#<Contracts::Maybe:0x24f90aff @vals=[#<Contracts::CollectionOf:0x75c589f2 @contract=String, @collection_class=Array>, nil]>, :software=>{:hbase=>#<Contracts::Maybe:0x201a8ea7 @vals=[String, nil]>, :lingual=>#<Contracts::Maybe:0x7b64bbad @vals=[String, nil]>}, :jobflow=>{:job_name=>String, :master_instance_type=>String, :core_instance_count=>Contracts::Num, :core_instance_type=>String, :core_instance_ebs=>#<Contracts::Maybe:0x13662609 @vals=[{:volume_size=>#<Proc:0x451a4187@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:28 (lambda)>, :volume_type=>#<Proc:0x565c887e@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:27 (lambda)>, :volume_iops=>#<Contracts::Maybe:0x5c215642 @vals=[#<Proc:0x451a4187@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:28 (lambda)>, nil]>, :ebs_optimized=>#<Contracts::Maybe:0x416c1b0 @vals=[Contracts::Bool, nil]>}, nil]>, :task_instance_count=>Contracts::Num, :task_instance_type=>String, :task_instance_bid=>#<Contracts::Maybe:0x4c000cc4 @vals=[Contracts::Num, nil]>}, :additional_info=>#<Contracts::Maybe:0x45d46254 @vals=[String, nil]>, :bootstrap_failure_tries=>Contracts::Num, :configuration=>#<Contracts::Maybe:0x1a4cbcc6 @vals=[#<Contracts::HashOf:0x176e839e @key=Symbol, @value=#<Contracts::HashOf:0x4ac0d49 @key=Symbol, @value=String>>, nil]>}}, :collectors=>#<Contracts::Maybe:0x56402642 @vals=[{:format=>String}, nil]>, :enrich=>{:versions=>#<Contracts::Maybe:0x5358c8fa @vals=[{:spark_enrich=>String}, nil]>, :continue_on_unexpected_error=>#<Contracts::Maybe:0x58b3eb1 @vals=[Contracts::Bool, nil]>, :output_compression=>#<Proc:0xce0bbd5@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:26 (lambda)>}, :storage=>{:versions=>{:rdb_shredder=>String, :hadoop_elasticsearch=>String, :rdb_loader=>String}}, :monitoring=>{:tags=>#<Contracts::HashOf:0xe2c627e @key=Symbol, @value=String>, :logging=>{:level=>String}, :snowplow=>#<Contracts::Maybe:0x59a2bed1 @vals=[{:method=>String, :collector=>String, :app_id=>String}, nil]>}}, nil]>,
        Actual: {:aws=>{:access_key_id=>"...", :secret_access_key=>"...", :s3=>{:region=>"us-east-2", :buckets=>{:assets=>"s3://snowplow-hosted-assets", :jsonpath_assets=>nil, :log=>"ADD HERE", :encrypted=>false, :raw=>{:in=>nil, :processing=>"ADD HERE", :archive=>"ADD HERE"}, :enriched=>{:good=>"s3://clickstreamstorages3", :bad=>"s3://clickstreamstorages3", :errors=>"ADD HERE", :archive=>"s3:clickstreamstorages3"}, :shredded=>{:good=>"s3://shredded/good", :bad=>"s3://shredded/bad", :errors=>nil, :archive=>"s3://shredded"}}, :consolidate_shredded_output=>false}, :emr=>{:ami_version=>"5.9.0", :region=>"us-east-2", :jobflow_role=>"EMR_EC2_DefaultRole", :service_role=>"EMR_DefaultRole", :placement=>"ADD HERE", :ec2_subnet_id=>"ADD HERE", :ec2_key_name=>"ADD HERE", :security_configuration=>"ADD HERE", :bootstrap=>[], :software=>{:hbase=>nil, :lingual=>nil}, :jobflow=>{:job_name=>"Snowplow ETL", :master_instance_type=>"m1.medium", :core_instance_count=>2, :core_instance_type=>"m1.medium", :core_instance_bid=>0.015, :core_instance_ebs=>{:volume_size=>100, :volume_type=>"gp2", :volume_iops=>400, :ebs_optimized=>false}, :task_instance_count=>0, :task_instance_type=>"m1.medium", :task_instance_bid=>0.015}, :bootstrap_failure_tries=>3, :configuration=>{:"yarn-site"=>{:"yarn.resourcemanager.am.max-attempts"=>"1"}, :spark=>{:maximizeResourceAllocation=>"true"}}, :additional_info=>nil}}, :collectors=>{:format=>"cloudfront"}, :enrich=>{:versions=>{:spark_enrich=>"1.18.0"}, :continue_on_unexpected_error=>false, :output_compression=>"NONE"}, :storage=>{:versions=>{:rdb_loader=>"0.14.0", :rdb_shredder=>"0.13.1", :hadoop_elasticsearch=>"0.1.0"}}, :monitoring=>{:tags=>{}, :logging=>{:level=>"DEBUG"}, :snowplow=>{:method=>"get", :protocol=>"http", :port=>80, :app_id=>"ADD HERE", :collector=>"ADD HERE"}}}
        Value guarded in: Snowplow::EmrEtlRunner::Cli::load_config
        With Contract: Maybe, String, Bool => Maybe
        At: uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/cli.rb:202

@Vipin, we belive it’s a problem in your configuration, not in EmrEtlRunner itself. We encourage you to double check that no required fields are missing.

Thanks for reply.