EmrEtlRunner failed to start

Hi,

I have been checking a lot of the posts, but still can’t make the snowplow-emr-etl-runner to start.

The command I used is:
./snowplow-emr-etl-runner -c config.yml -f staging_stream_enrich

Because I want to use EmrEtl just to shred and then insert data to Redshift

This is the error:
uri:classloader:/gems/avro-1.8.1/lib/avro/schema.rb:350: warning: constant ::Fixnum is deprecated
ReturnContractError: Contract violation for return value:
Expected: #<Contracts::Maybe:0x1939a394 @vals=[{:aws=>{:access_key_id=>String, :secret_access_key=>String, :s3=>{:region=>String, :buckets=>{:assets=>String, :jsonpath_assets=>#<Contracts::Maybe:0x35e689a0 @vals=[String, nil]>, :log=>String, :encrypted=>Contracts::Bool, :raw=>#<Contracts::Maybe:0x69d2fb0a @vals=[{:in=>#<Contracts::CollectionOf:0x574218f @collection_class=Array, @contract=String>, :processing=>String, :archive=>String}, nil]>, :enriched=>{:good=>String, :bad=>#<Contracts::Maybe:0x69a90b81 @vals=[String, nil]>, :errors=>#<Contracts::Maybe:0x35329a05 @vals=[String, nil]>, :archive=>#<Contracts::Maybe:0x17136390 @vals=[String, nil]>, :stream=>#<Contracts::Maybe:0x6f1d799 @vals=[String, nil]>}, :shredded=>{:good=>String, :bad=>String, :errors=>#<Contracts::Maybe:0x333e5fb6 @vals=[String, nil]>, :archive=>#<Contracts::Maybe:0x3b7b0b57 @vals=[String, nil]>}}, :consolidate_shredded_output=>Contracts::Bool}, :emr=>{:ami_version=>String, :region=>String, :jobflow_role=>String, :service_role=>String, :placement=>#<Contracts::Maybe:0x1e0294a7 @vals=[String, nil]>, :ec2_subnet_id=>#<Contracts::Maybe:0x15cee630 @vals=[String, nil]>, :ec2_key_name=>String, :security_configuration=>#<Contracts::Maybe:0x19647566 @vals=[String, nil]>, :bootstrap=>#<Contracts::Maybe:0x527d48db @vals=[#<Contracts::CollectionOf:0x4c000cc4 @collection_class=Array, @contract=String>, nil]>, :software=>{:hbase=>#<Contracts::Maybe:0x2f038d3c @vals=[String, nil]>, :lingual=>#<Contracts::Maybe:0x39a8e2fa @vals=[String, nil]>}, :jobflow=>{:job_name=>String, :master_instance_type=>String, :core_instance_count=>Contracts::Num, :core_instance_type=>String, :core_instance_ebs=>#<Contracts::Maybe:0x5fbae40 @vals=[{:volume_size=>#<Proc:0x1a4cbcc6@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:41 (lambda)>, :volume_type=>#<Proc:0x7cecab19@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:40 (lambda)>, :volume_iops=>#<Contracts::Maybe:0x56402642 @vals=[#<Proc:0x1a4cbcc6@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:41 (lambda)>, nil]>, :ebs_optimized=>#<Contracts::Maybe:0x1d5048d1 @vals=[Contracts::Bool, nil]>}, nil]>, :task_instance_count=>Contracts::Num, :task_instance_type=>String, :task_instance_bid=>#<Contracts::Maybe:0x709d6de5 @vals=[Contracts::Num, nil]>}, :additional_info=>#<Contracts::Maybe:0x51f34185 @vals=[String, nil]>, :bootstrap_failure_tries=>Contracts::Num, :configuration=>#<Contracts::Maybe:0x7645b7d @vals=[#<Contracts::HashOf:0x6d7e2795 @key=Symbol, @value=#<Contracts::HashOf:0x78743dd8 @key=Symbol, @value=String>>, nil]>}}, :collectors=>#<Contracts::Maybe:0x3ea84e01 @vals=[{:format=>String}, nil]>, :enrich=>{:versions=>#<Contracts::Maybe:0x986b619 @vals=[{:spark_enrich=>String}, nil]>, :continue_on_unexpected_error=>#<Contracts::Maybe:0x7c956dda @vals=[Contracts::Bool, nil]>, :output_compression=>#<Proc:0x26c6288d@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:39 (lambda)>}, :storage=>{:versions=>{:rdb_shredder=>String, :hadoop_elasticsearch=>String, :rdb_loader=>String}}, :monitoring=>{:tags=>#<Contracts::HashOf:0x225e09f0 @key=Symbol, @value=String>, :logging=>{:level=>String}, :snowplow=>#<Contracts::Maybe:0x238ad211 @vals=[{:method=>String, :collector=>String, :app_id=>String}, nil]>}}, nil]>,
Actual: {:aws=>{:access_key_id=>“redacted”, :secret_access_key=>“redacted”, :s3=>{:region=>“us-east-2”, :buckets=>{:assets=>“s3://snowplow-hosted-assets”, :jsonpath_assets=>nil, :log=>“s3://sp-archive-acc/logs”, :encrypted=>false, :raw=>{:in=>[“s3://sp-archive-acc/events”], :processing=>“s3://sp-archive-acc/processing”, :archive=>“s3://sp-archive-acc/raw”}, :enriched=>{:good=>“s3://sp-archive-acc/enriched”, :bad=>“s3://sp-archive-acc/enriched/bad”, :errors=>nil, :archive=>“s3://sp-archive-acc/enriched/archive”, :stream=>“s3://sp-archive-acc/enriched/good”}, :shredded=>{:good=>“s3://sp-archive-acc/shredded/good”, :bad=>“s3://sp-archive-acc/shredded/bad”, :errors=>nil, :archive=>“s3://sp-archive-acc/shredded/archive”}}}, :emr=>{:ami_version=>“5.9.0”, :region=>“us-east-2”, :jobflow_role=>“EMR_EC2_DefaultRole”, :service_role=>“EMR_DefaultRole”, :placement=>nil, :ec2_subnet_id=>“subnet-a9f19bd3”, :ec2_key_name=>“snowplow00”, :bootstrap=>, :software=>{:hbase=>nil, :lingual=>nil}, :jobflow=>{:job_name=>“Snowplow ETL”, :master_instance_type=>“m4.large”, :core_instance_count=>1, :core_instance_type=>“m4.large”, :core_instance_ebs=>{:volume_size=>100, :volume_type=>“gp2”, :volume_iops=>400, :ebs_optimized=>false}, :task_instance_count=>0, :task_instance_type=>“m4.large”, :task_instance_bid=>0.015}, :bootstrap_failure_tries=>3, :additional_info=>nil, :configuration=>nil}}, :collectors=>{:format=>“thrift”}, :enrich=>{:versions=>{:spark_enrich=>“1.17.0”}, :continue_on_unexpected_error=>false, :output_compression=>“NONE”}, :storage=>{:versions=>{:rdb_shredder=>“0.13.1”, :rdb_loader=>“0.14.0”, :hadoop_elasticsearch=>“0.1.0”}}, :monitoring=>{:tags=>{:name=>“data-pipeline-enrichment”}, :logging=>{:level=>“DEBUG”}, :snowplow=>{:method=>“get”, :protocol=>“http”, :port=>80, :app_id=>“snowplow”, :collector=>nil}}}
Value guarded in: Snowplow::EmrEtlRunner::Cli::load_config
With Contract: Maybe, String, Bool => Maybe
At: uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/cli.rb:211
failure_callback at uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:32
call_with at uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:80
block in redefine_method at uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138
process_options at uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/cli.rb:199
get_args_config_enrichments_resolver at uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/cli.rb:173
send_to at uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_reference.rb:43
call_with at uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:76
block in redefine_method at uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138
at uri:classloader:/emr-etl-runner/bin/snowplow-emr-etl-runner:37
load at org/jruby/RubyKernel.java:994
at uri:classloader:/META-INF/main.rb:1
require at org/jruby/RubyKernel.java:970
(root) at uri:classloader:/META-INF/main.rb:1
at uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/rubygems/core_ext/kernel_require.rb:1
ERROR: org.jruby.embed.EvalFailedException: (ReturnContractError) Contract violation for return value:
Expected: #<Contracts::Maybe:0x1939a394 @vals=[{:aws=>{:access_key_id=>String, :secret_access_key=>String, :s3=>{:region=>String, :buckets=>{:assets=>String, :jsonpath_assets=>#<Contracts::Maybe:0x35e689a0 @vals=[String, nil]>, :log=>String, :encrypted=>Contracts::Bool, :raw=>#<Contracts::Maybe:0x69d2fb0a @vals=[{:in=>#<Contracts::CollectionOf:0x574218f @collection_class=Array, @contract=String>, :processing=>String, :archive=>String}, nil]>, :enriched=>{:good=>String, :bad=>#<Contracts::Maybe:0x69a90b81 @vals=[String, nil]>, :errors=>#<Contracts::Maybe:0x35329a05 @vals=[String, nil]>, :archive=>#<Contracts::Maybe:0x17136390 @vals=[String, nil]>, :stream=>#<Contracts::Maybe:0x6f1d799 @vals=[String, nil]>}, :shredded=>{:good=>String, :bad=>String, :errors=>#<Contracts::Maybe:0x333e5fb6 @vals=[String, nil]>, :archive=>#<Contracts::Maybe:0x3b7b0b57 @vals=[String, nil]>}}, :consolidate_shredded_output=>Contracts::Bool}, :emr=>{:ami_version=>String, :region=>String, :jobflow_role=>String, :service_role=>String, :placement=>#<Contracts::Maybe:0x1e0294a7 @vals=[String, nil]>, :ec2_subnet_id=>#<Contracts::Maybe:0x15cee630 @vals=[String, nil]>, :ec2_key_name=>String, :security_configuration=>#<Contracts::Maybe:0x19647566 @vals=[String, nil]>, :bootstrap=>#<Contracts::Maybe:0x527d48db @vals=[#<Contracts::CollectionOf:0x4c000cc4 @collection_class=Array, @contract=String>, nil]>, :software=>{:hbase=>#<Contracts::Maybe:0x2f038d3c @vals=[String, nil]>, :lingual=>#<Contracts::Maybe:0x39a8e2fa @vals=[String, nil]>}, :jobflow=>{:job_name=>String, :master_instance_type=>String, :core_instance_count=>Contracts::Num, :core_instance_type=>String, :core_instance_ebs=>#<Contracts::Maybe:0x5fbae40 @vals=[{:volume_size=>#<Proc:0x1a4cbcc6@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:41 (lambda)>, :volume_type=>#<Proc:0x7cecab19@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:40 (lambda)>, :volume_iops=>#<Contracts::Maybe:0x56402642 @vals=[#<Proc:0x1a4cbcc6@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:41 (lambda)>, nil]>, :ebs_optimized=>#<Contracts::Maybe:0x1d5048d1 @vals=[Contracts::Bool, nil]>}, nil]>, :task_instance_count=>Contracts::Num, :task_instance_type=>String, :task_instance_bid=>#<Contracts::Maybe:0x709d6de5 @vals=[Contracts::Num, nil]>}, :additional_info=>#<Contracts::Maybe:0x51f34185 @vals=[String, nil]>, :bootstrap_failure_tries=>Contracts::Num, :configuration=>#<Contracts::Maybe:0x7645b7d @vals=[#<Contracts::HashOf:0x6d7e2795 @key=Symbol, @value=#<Contracts::HashOf:0x78743dd8 @key=Symbol, @value=String>>, nil]>}}, :collectors=>#<Contracts::Maybe:0x3ea84e01 @vals=[{:format=>String}, nil]>, :enrich=>{:versions=>#<Contracts::Maybe:0x986b619 @vals=[{:spark_enrich=>String}, nil]>, :continue_on_unexpected_error=>#<Contracts::Maybe:0x7c956dda @vals=[Contracts::Bool, nil]>, :output_compression=>#<Proc:0x26c6288d@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:39 (lambda)>}, :storage=>{:versions=>{:rdb_shredder=>String, :hadoop_elasticsearch=>String, :rdb_loader=>String}}, :monitoring=>{:tags=>#<Contracts::HashOf:0x225e09f0 @key=Symbol, @value=String>, :logging=>{:level=>String}, :snowplow=>#<Contracts::Maybe:0x238ad211 @vals=[{:method=>String, :collector=>String, :app_id=>String}, nil]>}}, nil]>,
Actual: {:aws=>{:access_key_id=>“redacted”, :secret_access_key=>“redacted”, :s3=>{:region=>“us-east-2”, :buckets=>{:assets=>“s3://snowplow-hosted-assets”, :jsonpath_assets=>nil, :log=>“s3://sp-archive-acc/logs”, :encrypted=>false, :raw=>{:in=>[“s3://sp-archive-acc/events”], :processing=>“s3://sp-archive-acc/processing”, :archive=>“s3://sp-archive-acc/raw”}, :enriched=>{:good=>“s3://sp-archive-acc/enriched”, :bad=>“s3://sp-archive-acc/enriched/bad”, :errors=>nil, :archive=>“s3://sp-archive-acc/enriched/archive”, :stream=>“s3://sp-archive-acc/enriched/good”}, :shredded=>{:good=>“s3://sp-archive-acc/shredded/good”, :bad=>“s3://sp-archive-acc/shredded/bad”, :errors=>nil, :archive=>“s3://sp-archive-acc/shredded/archive”}}}, :emr=>{:ami_version=>“5.9.0”, :region=>“us-east-2”, :jobflow_role=>“EMR_EC2_DefaultRole”, :service_role=>“EMR_DefaultRole”, :placement=>nil, :ec2_subnet_id=>“subnet-a9f19bd3”, :ec2_key_name=>“snowplow00”, :bootstrap=>, :software=>{:hbase=>nil, :lingual=>nil}, :jobflow=>{:job_name=>“Snowplow ETL”, :master_instance_type=>“m4.large”, :core_instance_count=>1, :core_instance_type=>“m4.large”, :core_instance_ebs=>{:volume_size=>100, :volume_type=>“gp2”, :volume_iops=>400, :ebs_optimized=>false}, :task_instance_count=>0, :task_instance_type=>“m4.large”, :task_instance_bid=>0.015}, :bootstrap_failure_tries=>3, :additional_info=>nil, :configuration=>nil}}, :collectors=>{:format=>“thrift”}, :enrich=>{:versions=>{:spark_enrich=>“1.17.0”}, :continue_on_unexpected_error=>false, :output_compression=>“NONE”}, :storage=>{:versions=>{:rdb_shredder=>“0.13.1”, :rdb_loader=>“0.14.0”, :hadoop_elasticsearch=>“0.1.0”}}, :monitoring=>{:tags=>{:name=>“data-pipeline-enrichment”}, :logging=>{:level=>“DEBUG”}, :snowplow=>{:method=>“get”, :protocol=>“http”, :port=>80, :app_id=>“snowplow”, :collector=>nil}}}
Value guarded in: Snowplow::EmrEtlRunner::Cli::load_config
With Contract: Maybe, String, Bool => Maybe
At: uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/cli.rb:211

This is my conf.yml

aws:
  # Credentials can be hardcoded or set in environment variables
  access_key_id: "XXXXX"
  secret_access_key: "XXXXX"

  s3:
    region: us-east-2
    buckets:
      assets: s3://snowplow-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
      jsonpath_assets: # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
      log: s3://sp-archive-acc/logs
      encrypted: false # Whether the buckets below are enrcrypted using server side encryption (SSE-S3)
      raw:
        in:                  # Multiple in buckets are permitted
          - s3://sp-archive-acc/events
        processing: s3://sp-archive-acc/processing
        archive: s3://sp-archive-acc/raw
      enriched:
        good: s3://sp-archive-acc/enriched
        bad: s3://sp-archive-acc/enriched/bad
        errors:      # Leave blank unless continue_on_unexpected_error: set to true below
        archive: s3://sp-archive-acc/enriched/archive   # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched
        stream: s3://sp-archive-acc/enriched/good
      shredded:
        good:  s3://sp-archive-acc/shredded/good       # e.g. s3://my-out-bucket/shredded/good
        bad: s3://sp-archive-acc/shredded/bad        # e.g. s3://my-out-bucket/shredded/bad
        errors:      # Leave blank unless continue_on_unexpected_error: set to true below
        archive:  s3://sp-archive-acc/shredded/archive    # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded
  emr:
    ami_version: 5.9.0      # Don't change this
    region: us-east-2        # Always set this
    jobflow_role: EMR_EC2_DefaultRole # Created using $ aws emr create-default-roles
    service_role: EMR_DefaultRole     # Created using $ aws emr create-default-roles
    placement:     # Set this if not running in VPC. Leave blank otherwise
    ec2_subnet_id: subnet-a9f19bd3 # Set this if running in VPC. Leave blank otherwise
    ec2_key_name: snowplow00
    bootstrap: []           # Set this to specify custom boostrap actions. Leave empty otherwise
    software:
      hbase:                # Optional. To launch on cluster, provide version, "0.92.0", keep quotes. Leave empty otherwise.
      lingual:              # Optional. To launch on cluster, provide version, "1.1", keep quotes. Leave empty otherwise.
    # Adjust your Hadoop cluster below
    jobflow:
      job_name: Snowplow ETL # Give your job a name
      master_instance_type: m4.large
      core_instance_count: 1
      core_instance_type: m4.large
      core_instance_ebs:    # Optional. Attach an EBS volume to each core instance.
        volume_size: 100    # Gigabytes
        volume_type: "gp2"
        volume_iops: 400    # Optional. Will only be used if volume_type is "io1"
        ebs_optimized: false # Optional. Will default to true
      task_instance_count: 0 # Increase to use spot instances
      task_instance_type: m4.large
      task_instance_bid: 0.015 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
    bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
    additional_info:        # Optional JSON string for selecting additional features
    configuration:
collectors:
  format: thrift # Or 'clj-tomcat' for the Clojure Collector, or 'thrift' for Thrift records, or 'tsv/com.amazon.aws.cloudfront/wd_access_log' for Cloudfront access logs
enrich:
  versions:
    spark_enrich: 1.17.0 # Version of the Spark Enrichment process
  continue_on_unexpected_error: false # Set to 'true' (and set out_errors: above) if you don't want any exceptions thrown from ETL
  output_compression: NONE # Compression only supported with Redshift, set to NONE if you have Postgres targets. Allowed formats: NONE, GZIP
storage:
  versions:
    rdb_shredder: 0.13.1        # Version of the Relational Database Shredding process
    rdb_loader: 0.14.0          # Version of the Relational Database Loader app
    hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
monitoring:
  tags: {name: "data-pipeline-enrichment"} # Name-value pairs describing this job
  logging:
    level: DEBUG # You can optionally switch to INFO for production
  snowplow:
    method: get
    protocol: http
    port: 80
    app_id: snowplow # e.g. snowplow
    collector:  # e.g. d3rkrsqld9gmqf.cloudfront.net

Any help, please.

Hi @mosan,

Your config is malformed. Please verify. I did a quick look, but did not found any obvious issues. Verify also if in any palcy you do not mangle with spaces and tabs.

Hi @grzegorzewald
Thank you! I have created another config file from the scratch and it seems to work!

Now I got this error whe I run:
./snowplow-emr-etl-runner run -c config.yml -f staging_stream_enrich -r iglu_resolver.json --debug -t targets

Please, help!

uri:classloader:/gems/avro-1.8.1/lib/avro/schema.rb:350: warning: constant ::Fixnum is deprecated
uri:classloader:/gems/json-schema-2.7.0/lib/json-schema/util/array_set.rb:18: warning: constant ::Fixnum is deprecated
D, [2019-04-15T15:08:55.424229 #15989] DEBUG -- : Initializing EMR jobflow
E, [2019-04-15T15:08:56.616238 #15989] ERROR -- : No run folders in [s3://sp-archive-acc/shredded/] found
F, [2019-04-15T15:08:56.617764 #15989] FATAL -- :

Snowplow::EmrEtlRunner::UnexpectedStateError (No run folders in [s3://sp-archive-acc/shredded/] found):
    uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/emr_job.rb:931:in `get_latest_run_id'
    uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/emr_job.rb:665:in `initialize'
    uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_reference.rb:43:in `send_to'
    uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:76:in `call_with'
    uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138:in `block in redefine_method'
    uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/runner.rb:135:in `run'
    uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_reference.rb:43:in `send_to'
    uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:76:in `call_with'
    uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138:in `block in redefine_method'
    uri:classloader:/emr-etl-runner/bin/snowplow-emr-etl-runner:41:in `<main>'
    org/jruby/RubyKernel.java:994:in `load'
    uri:classloader:/META-INF/main.rb:1:in `<main>'
    org/jruby/RubyKernel.java:970:in `require'
    uri:classloader:/META-INF/main.rb:1:in `(root)'
    uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/rubygems/core_ext/kernel_require.rb:1:in `<main>'

Can you share your config.yml ? I m stuck on similar issue too.

@mosan, you are skipping staging_stream_enrich step. Is there a reason for that? By doing so, you are attempting to start shred step. That implies the enriched files are present in s3://sp-archive-acc/enriched/run=....

The staging_stream_enrich step moves the enriched files to a dedicated run=... folder for processing. I don’t think it’s a good idea to have enriched/stream and enriched/good buckets to be the same.

Yes @roy
Here is!
aws:

Credentials can be hardcoded or set in environment variables

access_key_id: XXX
secret_access_key: XXXXX
s3:
region: us-east-2
buckets:
assets: s3://snowplow-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
jsonpath_assets: # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
log: s3://sp-archive-acc/logs
encrypted: false # Whether the buckets below are enrcrypted using server side encryption (SSE-S3)
raw:
in: # This is a YAML array of one or more in buckets - you MUST use hyphens before each entry in the array, as below
- s3://sp-archive-acc/events # e.g. s3://my-old-collector-bucket
processing: s3://sp-archive-acc/processing
archive: s3://sp-archive-acc/raw # e.g. s3://my-archive-bucket/raw
enriched:
good: s3://sp-archive-acc/enriched/good # e.g. s3://my-out-bucket/enriched/good
bad: s3://sp-archive-acc/enriched/bad # e.g. s3://my-out-bucket/enriched/bad
errors: # Leave blank unless :continue_on_unexpected_error: set to true below
archive: s3://sp-archive-acc/enriched/archive # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched
stream: s3://sp-archive-acc/enriched/good
shredded:
good: s3://sp-archive-acc/shredded/good # e.g. s3://my-out-bucket/shredded/good
bad: s3://sp-archive-acc/shredded/bad # e.g. s3://my-out-bucket/shredded/bad
errors: continue_on_unexpected_error # Leave blank unless :continue_on_unexpected_error: set to true below
archive: s3://sp-archive-acc/shredded/archive # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded
consolidate_shredded_output: false # Whether to combine files when copying from hdfs to s3
emr:
ami_version: 5.9.0
region: us-east-2 # Always set this
jobflow_role: EMR_EC2_DefaultRole # Created using aws emr create-default-roles service_role: EMR_DefaultRole # Created using aws emr create-default-roles
placement: # Set this if not running in VPC. Leave blank otherwise
ec2_subnet_id: subnet-a9f19bd3 # Set this if running in VPC. Leave blank otherwise
ec2_key_name: snowplow00
security_configuration: # Specify your EMR security configuration if needed. Leave blank otherwise
bootstrap: # Set this to specify custom boostrap actions. Leave empty otherwise
software:
hbase: # Optional. To launch on cluster, provide version, “0.92.0”, keep quotes. Leave empty otherwise.
lingual: # Optional. To launch on cluster, provide version, “1.1”, keep quotes. Leave empty otherwise.
# Adjust your Hadoop cluster below
jobflow:
job_name: Snowplow ETL # Give your job a name
master_instance_type: m4.large
core_instance_count: 2
core_instance_type: m4.large
core_instance_ebs: # Optional. Attach an EBS volume to each core instance.
volume_size: 100 # Gigabytes
volume_type: “gp2”
volume_iops: 400 # Optional. Will only be used if volume_type is “io1”
ebs_optimized: false # Optional. Will default to true
task_instance_count: 0 # Increase to use spot instances
task_instance_type: m1.medium
task_instance_bid: 0.015 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
configuration:
yarn-site:
yarn.resourcemanager.am.max-attempts: “1”
spark:
maximizeResourceAllocation: “true”
additional_info: # Optional JSON string for selecting additional features
collectors:
format: thrift # For example: ‘clj-tomcat’ for the Clojure Collector, ‘thrift’ for Thrift records, ‘tsv/com.amazon.aws.cloudfront/wd_access_log’ for Cloudfront access logs or 'ndjson/urbanairship.con$
enrich:
versions:
spark_enrich: 1.17.0 # Version of the Spark Enrichment process
continue_on_unexpected_error: false # Set to ‘true’ (and set :out_errors: above) if you don’t want any exceptions thrown from ETL
output_compression: NONE # Compression only supported with Redshift, set to NONE if you have Postgres targets. Allowed formats: NONE, GZIP
storage:
versions:
rdb_loader: 0.14.0
rdb_shredder: 0.13.1 # Version of the Spark Shredding process
hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
monitoring:
tags: {name: “data-pipeline-enrichment”} # Name-value pairs describing this job
logging:
level: DEBUG # You can optionally switch to INFO for production
snowplow:
method: get
protocol: http
port: 80
app_id: snowplow # e.g. snowplow
collector: d3rkrsqld9gmqf.cloudfront.net # e.g. d3rkrsqld9gmqf.cloudfront.net

Hi @ihor, Thank you very much!
My system goes like this.
The “Elasticsearch part” is working fine. Now I’m setting up the “Redshift leg”.
So my plan is to load the enriched files from Kinesis to a S3 bucket via snowplow s3 loader 0.6.0. Then, I need to shred those files via EmrEtl runner in enriched_stream_mode and load to Redshift.


Shoud I try to load raw files from AnalyticsCollector-Good Kinesis Stream to s3 and then enrich them there with EmrEtl?
Thanks!

your flow is exactly same as mine. while we enrich in stream_enrich not emretlrunner, we do not have run folders, which are created by emretlrunner. so during shreding we encounter this error.

@ihor we are enriching through Stream_Enrich instead of EmrEtlRunner so no run folders are created during the process. However, EmrEtlRunner always ask for run folders when we are trying to move the data to Redshift with EmrEtlRunner by executing with “–skip staging_stream_enrich” or “-f shred” option.

is there a way we can tell EmrEtlRunner “Our data are enriched by Stream Enrich not EmrEtlRunner, please let us proceed without the run folders”?

Thanks!

@roy / @mosan,

The EmrEtlRunner figures out the pipeline is in Stream Enrich mode by the fact of the presence of enriched/stream bucket in your configuration file. The step staging_stream_enrich must be a part of your workflow. It stages the files (moves) from enriched/stream bucket to enriched/good bucket (into run=... folder).

This is also the reason why it is not a good idea to share the same bucket for stream enriched data and staged data. See the dataflow diagram to understand the workings behind the scene: https://github.com/snowplow/snowplow/wiki/Batch-pipeline-steps#dataflow-diagram-for-stream-enrich-mode.

@ihor / @mosan,

first of all, thanks for the advice. I used -f staging_stream_enrich and the job started.

However, once the job claimed to have finished successfully, the enriched/stream folder still contains all the gz files and there is no new records in Redshift nor there is anything in shredded/good.

If i run it without --skip or without -f options, even though it will re-enrich data in enriched/stream folder, there will at least be new records created in Redshift.

Is there anything I have done wrong which makes-f staging_stream_enrich not working?

The emretlrunner version used is 104

Here is the command and message:

ubuntu@ip-172-xx-xx-xx:~$ java -Dorg.slf4j.simpleLogger.defaultLogLevel=debug -jar snowplow-emr-etl-runner_r104 run -c config/r102_config.yml -r config/iglu_resolver.json -t config/targets -f staging_stream_enrich
D, [2019-04-17T03:30:57.481000 #18304] DEBUG -- : Initializing EMR jobflow
D, [2019-04-17T03:31:01.561000 #18304] DEBUG -- : EMR jobflow j-2RZ6P4NC0OCKE started, waiting for jobflow to complete...
I, [2019-04-17T03:45:05.141000 #18304]  INFO -- : No RDB Loader logs
D, [2019-04-17T03:45:05.141000 #18304] DEBUG -- : EMR jobflow j-2RZ6P4NC0OCKE completed successfully.
I, [2019-04-17T03:45:05.142000 #18304]  INFO -- : Completed successfully

Here is the screen from aws:

Here is the config:

aws:
  # Credentials can be hardcoded or set in environment variables
  access_key_id: xxxxxxxxxxxxxxxxxxxxxxxxx
  secret_access_key: xxxxxxxxxxxxxxxxxxx
  s3:
    region: ap-southeast-1
    buckets:
      assets: s3://snowplow-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
      jsonpath_assets: # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
      log: s3://snowplow-s3-to-redshift-log
      encrypted: false
      enriched:
        good: s3://snowplow-s3-to-redshift/enriched/good       # e.g. s3://my-out-bucket/enriched/good
        archive: s3://snowplow-s3-to-redshift/archive    # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched
        stream: s3://snowplow-s3storage     # S3 Loader's output folder with enriched data. If present raw buckets will be discarded
      shredded:
        good: s3://snowplow-s3-to-redshift/shredded/good       # e.g. s3://my-out-bucket/shredded/good
        bad: s3://snowplow-s3-to-redshift/shredded/bad        # e.g. s3://my-out-bucket/shredded/bad
        errors: # Leave blank unless :continue_on_unexpected_error: set to true below
        archive: s3://snowplow-s3-to-redshift/shredded/archive    # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded
    consolidate_shredded_output: false # Whether to combine files when copying from hdfs to s3
  emr:
    ami_version: 5.9.0
    region: ap-southeast-1        # Always set this
    jobflow_role: EMR_EC2_DefaultRole # Created using $ aws emr create-default-roles
    service_role: EMR_DefaultRole     # Created using $ aws emr create-default-roles
    placement:      # Set this if not running in VPC. Leave blank otherwise
    ec2_subnet_id: subnet-xxxxxxx# Set this if running in VPC. Leave blank otherwise
    ec2_key_name: xxxxxxxx
    security_configuration:  # Specify your EMR security configuration if needed. Leave blank otherwise
    bootstrap: []           # Set this to specify custom boostrap actions. Leave empty otherwise
    software:
      hbase:                # Optional. To launch on cluster, provide version, "0.92.0", keep quotes. Leave empty otherwise.
      lingual:              # Optional. To launch on cluster, provide version, "1.1", keep quotes. Leave empty otherwise.
    # Adjust your Hadoop cluster below
    jobflow:
      job_name: Data Load - S3 to Redshift # Give your job a name
      master_instance_type: m1.medium
      core_instance_count: 2
      core_instance_type: m1.medium
      core_instance_ebs:    # Optional. Attach an EBS volume to each core instance.
        volume_size: 100    # Gigabytes
        volume_type: "gp2"
        volume_iops: 400    # Optional. Will only be used if volume_type is "io1"
        ebs_optimized: false # Optional. Will default to true
      task_instance_count: 0 # Increase to use spot instances
      task_instance_type: m1.medium
      task_instance_bid: 0.015 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
    bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
    configuration:
      yarn-site:
        yarn.resourcemanager.am.max-attempts: "1"
      spark:
        maximizeResourceAllocation: "true"
    additional_info:        # Optional JSON string for selecting additional features
enrich:
  versions:
    spark_enrich: 1.17.0 # Version of the Spark Enrichment process
  output_compression: GZIP # Stream mode supports only GZIP
storage:
  versions:
    rdb_loader: 0.14.0
    rdb_shredder: 0.13.1        # Version of the Spark Shredding process
    hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
monitoring:
  tags: {} # Name-value pairs describing this job
  logging:
    level: DEBUG # You can optionally switch to INFO for production
#  snowplow:
#    method: get
#    app_id: ADD HERE # e.g. snowplow
#    collector: ADD HERE # e.g. d3rkrsqld9gmqf.cloudfront.net
#    protocol: http
#    port: 80

Here is the result if I run it without --skip staging_stream_enrich:

ubuntu@ip-172-xx-xx-xxx:~$ java -Dorg.slf4j.simpleLogger.defaultLogLevel=debug -jar snowplow-emr-etl-runner_r104 run -c config/r102_config.yml -r config/iglu_resolver.json -t config/targets
D, [2019-04-17T04:42:29.038000 #18426] DEBUG -- : Initializing EMR jobflow
D, [2019-04-17T04:42:37.545000 #18426] DEBUG -- : EMR jobflow j-2IGKJE6W7SWRL started, waiting for jobflow to complete...
I, [2019-04-17T05:22:44.954000 #18426]  INFO -- : RDB Loader logs
D, [2019-04-17T05:22:44.959000 #18426] DEBUG -- : Downloading s3://snowplow-s3-to-redshift-log/rdb-loader/2019-04-17-04-42-29/0489a8a5-f686-4136-ade3-20518cc33e64 to /tmp/rdbloader20190417-18426-1ktfcoc
I, [2019-04-17T05:22:46.944000 #18426]  INFO -- : AWS Redshift enriched events storage
I, [2019-04-17T05:22:46.945000 #18426]  INFO -- : RDB Loader successfully completed following steps: [Discover, Load, Analyze]
D, [2019-04-17T05:22:46.946000 #18426] DEBUG -- : EMR jobflow j-2IGKJE6W7SWRL completed successfully.
I, [2019-04-17T05:22:46.946000 #18426]  INFO -- : Completed successfully

@roy, I think you misunderstood Strem Enrich mode. You should be using neither --skip (-x) nor --resume-from (-f) flag under normal circumstances. Again, the EmrEtlRunner “understands” that you run the batch pipeline in Stream Enrich mode. It will not enrich the data and will start shredding it instead (again, see the Dataflow diagram).

Thus, your command line should only contain flags -c, -r, and -t.

@ihor, thank you so much for pointing this out :grin: :+1:. i now have a much clearer understanding of the process.

Thank you!!