EmrEtlRunner Config / environment variables not recognized

Hi,

We are trying to launch a Fargate instance to trigger the EmrEtlRunner job. We chose to pass the AWS keys as env variables and via dockerfile to the Fargate. However when the binary tries to read the config file it is unable to read the AWS keys and passes “nil” instead.

When we login to our running containers locally (via garden), the AWS keys are written successfully as env vars into the container.

Dockerfile:

FROM snowplow/base-debian:0.2.2

WORKDIR /app

COPY src/ /app/

ENV AWS_ACCESS_KEY_ID $AWS_ACCESS_KEY_ID
ENV AWS_SECRET_ACCESS_KEY $AWS_SECRET_ACCESS_KEY

# hadolint ignore=DL3008,DL3016,DL3015,DL4001,DL4006,DL3013
RUN apt-get update && \
    apt-get install -yqq --no-install-recommends wget unzip git tar && \
    wget http://dl.bintray.com/snowplow/snowplow-generic/snowplow_emr_r97_knossos.zip && \
    unzip snowplow_emr_r97_knossos.zip && \
    apt-get clean && \
    rm -fr /var/lib/apt/lists/* /tmp/* /var/tmp/*


CMD ["./snowplow-emr-etl-runner", "run", "-c", "config.yml", "-r", "resolver.json"]

Config.yml:

aws:
  access_key_id: <%= ENV['AWS_ACCESS_KEY_ID'] %>
  secret_access_key: <%= ENV['AWS_SECRET_ACCESS_KEY'] %>
  s3:
    region: <%= ENV['AWS_DEFAULT_REGION'] %>
    buckets:
      assets: s3://snowplow-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
      jsonpath_assets: <%= ENV['SP_SCHEMA_JSONPATH_URI'] %> # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
      log: s3://sp-emretlrunner-log
      encrypted: true # Whether the buckets below are enrcrypted using server side encryption (SSE-S3)
      enriched:
        good: <%= ENV['SP_ENRICHED_GOOD_URI'] %>       # e.g. s3://my-out-bucket/enriched/good
        archive: <%= ENV['SP_ENRICHED_ARCHIVE_URI'] %>    # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched
        stream: <%= ENV['SP_LOADER_URI'] %>
      shredded:
        good: <%= ENV['SP_SHREDDED_GOOD_URI'] %>      # e.g. s3://my-out-bucket/shredded/good
        bad: <%= ENV['SP_SHREDDED_BAD_URI'] %>        # e.g. s3://my-out-bucket/shredded/bad
        errors:      # Leave blank unless :continue_on_unexpected_error: set to true below
        archive: <%= ENV['SP_SHREDDED_ARCHIVE_URI'] %>    # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded
    consolidate_shredded_output: true # Whether to combine files when copying from hdfs to s3
  emr:
    ami_version: 5.29.0
    region: <%= ENV['AWS_DEFAULT_REGION'] %>        # Always set this
    jobflow_role: EMR_EC2_DefaultRole # Created using $ aws emr create-default-roles
    service_role: EMR_DefaultRole     # Created using $ aws emr create-default-roles
    placement:      # Set this if not running in VPC. Leave blank otherwise
    ec2_subnet_id: <%= ENV['AWS_SUBNET_PUBLIC_ID'] %> # Set this if running in VPC. Leave blank otherwise
    ec2_key_name: EmrEtlRunner-Snowplow
    security_configuration:  # Specify your EMR security configuration if needed. Leave blank otherwise
    bootstrap: []           # Set this to specify custom boostrap actions. Leave empty otherwise
    software:
      hbase:                # Optional. To launch on cluster, provide version, "0.92.0", keep quotes. Leave empty otherwise.
      lingual:              # Optional. To launch on cluster, provide version, "1.1", keep quotes. Leave empty otherwise.
    # Adjust your Hadoop cluster below
    jobflow:
      job_name: Snowplow ETL Enriched TSV to Shredded TSV # Give your job a name
      master_instance_type: m4.large
      core_instance_count: 2
      core_instance_type: m4.large
      core_instance_bid: 0.015
      core_instance_ebs:    # Optional. Attach an EBS volume to each core instance.
        volume_size: 100    # Gigabytes
        volume_type: "gp2"
        volume_iops: 400    # Optional. Will only be used if volume_type is "io1"
        ebs_optimized: false # Optional. Will default to true
      task_instance_count: 0 # Increase to use spot instances
      task_instance_type: m4.large
      task_instance_bid: 0.015 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
    bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
    configuration:
      yarn-site:
        yarn.resourcemanager.am.max-attempts: "1"
      spark:
        maximizeResourceAllocation: "true"
    additional_info:        # Optional JSON string for selecting additional features
collectors:
  format: cloudfront # For example: 'clj-tomcat' for the Clojure Collector, 'thrift' for Thrift records, 'tsv/com.amazon.aws.cloudfront/wd_access_log' for Cloudfront access logs or 'ndjson/urbanairship.connect/v1' for UrbanAirship Connect events
enrich:
  versions:
    spark_enrich: 1.18.0 # Version of the Spark Enrichment process
  continue_on_unexpected_error: false # Set to 'true' (and set :out_errors: above) if you don't want any exceptions thrown from ETL
  output_compression: "GZIP" # Compression only supported with Redshift, set to NONE if you have Postgres targets. Allowed formats: NONE, GZIP
storage:
  versions:
    rdb_loader: 0.17.0
    rdb_shredder: 0.16.0        # Version of the Spark Shredding process
    hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
monitoring:
  tags: {} # Name-value pairs describing this job
  logging:
    level: DEBUG # You can optionally switch to INFO for production

Runner log:

unsupported Java version "11", defaulting to 1.7
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.jruby.util.io.FilenoUtil (file:/tmp/jruby15679167902097831415extract/jruby-core-9.1.7.0-complete.jar) to method sun.nio.ch.SelChImpl.getFD()
WARNING: Please consider reporting this to the maintainers of org.jruby.util.io.FilenoUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
io/console on JRuby shells out to stty for most operations

F, [2021-01-27T08:28:56.609000 #1] FATAL -- : 

**ReturnContractError (Contract violation for return value:**
       
Expected: #<Contracts::Maybe:0x22d23c27 @vals=[{:aws=>{:access_key_id=>String, :secret_access_key=>String, :s3=>{:region=>String, :buckets=>{:assets=>String, :jsonpath_assets=>#<Contracts::Maybe:0x702432cc @vals=[String, nil]>, :log=>String, :raw=>{:in=>#<Contracts::CollectionOf:0x1372696b @contract=String, @collection_class=Array>, :processing=>String, :archive=>String}, :enriched=>{:good=>String, :bad=>String, :errors=>#<Contracts::Maybe:0x69d2c460 @vals=[String, nil]>, :archive=>#<Contracts::Maybe:0x34a99d8 @vals=[String, nil]>}, :shredded=>{:good=>String, :bad=>String, :errors=>#<Contracts::Maybe:0x6c68f29c @vals=[String, nil]>, :archive=>#<Contracts::Maybe:0x3b3b2290 @vals=[String, nil]>}}}, :emr=>{:ami_version=>String, :region=>String, :jobflow_role=>String, :service_role=>String, :placement=>#<Contracts::Maybe:0x47fbf95e @vals=[String, nil]>, :ec2_subnet_id=>#<Contracts::Maybe:0x6ac0fd1d @vals=[String, nil]>, :ec2_key_name=>String, :bootstrap=>#<Contracts::Maybe:0x6413eeb7 @vals=[#<Contracts::CollectionOf:0x5dfd4fac @contract=String, @collection_class=Array>, nil]>, :software=>{:hbase=>#<Contracts::Maybe:0x2305aad0 @vals=[String, nil]>, :lingual=>#<Contracts::Maybe:0x7dc963be @vals=[String, nil]>}, :jobflow=>{:job_name=>String, :master_instance_type=>String, :core_instance_count=>Contracts::Num, :core_instance_type=>String, :core_instance_ebs=>#<Contracts::Maybe:0x4c678a1f @vals=[{:volume_size=>#<Proc:0x18a538a0@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:28 (lambda)>, :volume_type=>#<Proc:0x70736b19@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:27 (lambda)>, :volume_iops=>#<Contracts::Maybe:0x183e64a8 @vals=[#<Proc:0x18a538a0@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:28 (lambda)>, nil]>, :ebs_optimized=>#<Contracts::Maybe:0x4b8466c2 @vals=[Contracts::Bool, nil]>}, nil]>, :task_instance_count=>Contracts::Num, :task_instance_type=>String, :task_instance_bid=>#<Contracts::Maybe:0x54cce500 @vals=[Contracts::Num, nil]>}, :additional_info=>#<Contracts::Maybe:0x229c4d34 @vals=[String, nil]>, :bootstrap_failure_tries=>Contracts::Num, :configuration=>#<Contracts::Maybe:0x1c24639e @vals=[#<Contracts::HashOf:0x771b2b5b @key=Symbol, @value=#<Contracts::HashOf:0x1aa64998 @key=Symbol, @value=String>>, nil]>}}, :collectors=>{:format=>String}, :enrich=>{:versions=>{:spark_enrich=>String}, :continue_on_unexpected_error=>Contracts::Bool, :output_compression=>#<Proc:0x571b4bf6@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:26 (lambda)>}, :storage=>{:versions=>{:rdb_shredder=>String, :hadoop_elasticsearch=>String, :rdb_loader=>String}}, :monitoring=>{:tags=>#<Contracts::HashOf:0x7a491a60 @key=Symbol, @value=String>, :logging=>{:level=>String}, :snowplow=>#<Contracts::Maybe:0x19d3f4fb @vals=[{:method=>String, :collector=>String, :app_id=>String}, nil]>}}, nil]>,
       
Actual: {:aws=>{:**access_key_id=>nil, :secret_access_key=>nil**, :s3=>{:region=>"eu-west-1", :buckets=>{:assets=>"s3://snowplow-hosted-assets", :jsonpath_assets=>***, :log=>"s3://sp-emretlrunner-log", :encrypted=>true, :enriched=>{:good=>"s3://sp-enriched-***-dev/good", :archive=>"s3://sp-enriched-***-dev/archive", :stream=>"s3://sp-loader-***-dev"}, :shredded=>{:good=>"s3://sp-shredded-***-dev/good", :bad=>"s3://sp-shredded-***-dev/bad", :errors=>nil, :archive=>"s3://sp-shredded-***-dev/archive"}}, :consolidate_shredded_output=>true}, :emr=>{:ami_version=>"5.29.0", :region=>"eu-west-1", :jobflow_role=>"EMR_EC2_DefaultRole", :service_role=>"EMR_DefaultRole", :placement=>nil, :ec2_subnet_id=>"subnet-043ef23e0a9818082", :ec2_key_name=>"EmrEtlRunner-Snowplow", :security_configuration=>nil, :bootstrap=>[], :software=>{:hbase=>nil, :lingual=>nil}, :jobflow=>{:job_name=>"Snowplow ETL Enriched TSV to Shredded TSV", :master_instance_type=>"m4.large", :core_instance_count=>2, :core_instance_type=>"m4.large", :core_instance_bid=>0.015, :core_instance_ebs=>{:volume_size=>100, :volume_type=>"gp2", :volume_iops=>400, :ebs_optimized=>false}, :task_instance_count=>0, :task_instance_type=>"m4.large", :task_instance_bid=>0.015}, :bootstrap_failure_tries=>3, :configuration=>{:"yarn-site"=>{:"yarn.resourcemanager.am.max-attempts"=>"1"}, :spark=>{:maximizeResourceAllocation=>"true"}}, :additional_info=>nil}}, :collectors=>{:format=>"cloudfront"}, :enrich=>{:versions=>{:spark_enrich=>"1.18.0"}, :continue_on_unexpected_error=>false, :output_compression=>"GZIP"}, :storage=>{:versions=>{:rdb_loader=>"0.17.0", :rdb_shredder=>"0.16.0", :hadoop_elasticsearch=>"0.1.0"}}, :monitoring=>{:tags=>{}, :logging=>{:level=>"DEBUG"}}}

Value guarded in: Snowplow::EmrEtlRunner::Cli::load_config

With Contract: Maybe, String, Bool => Maybe

At: uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/cli.rb:202 ):
    uri:classloader:/gems/contracts-0.11.0/lib/contracts.rb:45:in `block in Contract'
    uri:classloader:/gems/contracts-0.11.0/lib/contracts.rb:154:in `failure_callback'
    uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:80:in `call_with'
    uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138:in `block in redefine_method'
    uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/cli.rb:191:in `process_options'
    uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/cli.rb:167:in `get_args_config_enrichments_resolver'
    uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_reference.rb:43:in `send_to'
    uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:76:in `call_with'
    uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138:in `block in redefine_method'
    uri:classloader:/emr-etl-runner/bin/snowplow-emr-etl-runner:37:in `<main>'
    org/jruby/RubyKernel.java:979:in `load'
    uri:classloader:/META-INF/main.rb:1:in `<main>'
    org/jruby/RubyKernel.java:961:in `require'
    uri:classloader:/META-INF/main.rb:1:in `(root)'
    uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/rubygems/core_ext/kernel_require.rb:1:in `<main>'

Hi @dadasami,

Sorry, I don’t exactly know what’s the problem with EmrEtlRunner here, it always has been a cryptic piece of software. But while this is not exactly what you’re asking - can I recommend you to have a look at new RDB Loader R35 that is not depending on EmrEtlRunner (which is going to be deprecated very soon), has much cleaner configuration and integration with fargate (we provide official docker image). Let me know if you need any help with setup - I’m very keen to polish the documentation if there are any issues.

3 Likes

Hi @anton ,

Thanks for your recommendation. We use “stream-enrich-kinesis” and “snowplow-s3-loader” to write enriched events into a s3 bucket and we planned to use EmrEtlRunner to shred the enriched TSV files (without Redshift loading). Does " RDB Loader R35" offer the same functionality?

Hi @dadasami,

Does " RDB Loader R35 " offer the same functionality?

Yes.

EmrEtlRunner used to be Snowplow orchestration tool. Back in the day it was responsible for launching enrichment, shredding, loading, archiving etc, which was very sensible in batch-first world of 2014.

But as pipeline was getting more stream-oriented and less batch oriented we moved out data collection and enrichment into separate components and made them independent of EmrEtlRunner. The last functionality that could not work without EmrEtlRunner was shredding and loading into Redshift. And, in R35 we made it independent as well.

To put it in other words, if you setup EmrEtlRunner - you still will have to setup RDB Loader and Shredder (EmrEtlRunner responsible for launching them though) from R34 (or earlier). So you’d have EmrEtlRunner + Shredder + Loader R34. But if you go with R35 you can remove EmrEtlRunner from that chain. Not to mention it’s cheaper and more efficient.

One problem that I foresee though is that RDB Shredder and Loader accept its configuration as base64-encoded strings, where you must put all settings as is and cannot use environment variables.

1 Like

Hi @anton,

If I understood you correctly and as we only need to deploy the shredder, then out of (EmrEtlRunner + Shredder + Loader) we ignore EmrEtlRunner and the Loader and create a playbook and a config file to submit our shredder spark job via dataflow-runner to AWS EMR. For this we will require these 4 files:

playbook.json
emr-config.json
iglu-resolver.json {base64}
config.hocon {base64}

In this case, where can we find a sample for config.hocon required for the shredder (“s3://snowplow-hosted-assets-eu-central-1/4-storage/rdb-shredder/snowplow-rdb-shredder-0.19.0”)?

From this dataflow-runner tutorial, I assume this would be how we submit our shredder spark job:

$ ./path/to/dataflow-runner run-transient --emr-config ./path/to/emr-config.json --emr-playbook ./path/to/playbook.json

Please correct me if I am wrong.

Hi @dadasami,

That’s all correct! You can find example of a config.hocon on our docs page. Also you can find the playbook.json example there - bear in mind that before Shredder you need to have S3DistCp step to move data from S3 Loader sink into Enriched Data Lake (shredder.input location from config.hocon).

Also, you need to run your Loader in background somewhere. Either Fargate or EC2.

In R34 the Loader would have run as an EMR step, costing you some money for idling cluster.

1 Like

Perfect! Does that mean that we can comment out the warehouse connection details (storage) in the config.hocon? Because we do not intend to use Redshift yet.

I assume by “Loader” you mean “s3-Loader”, not the RDB-Loader?

And at last, is the current approach to run the shredder step is going to get depricated some time soon?

Because we do not intend to use Redshift yet.

Ah, that changes a lot. But generally - yes, you don’t need connection details. I think Shredder might fail if it will be completely missing, but you can add fake details.

I assume by “Loader” you mean “s3-Loader”, not the RDB-Loader?

I meant RDB Loader, but since you said you’re not going to use Redshift, then yes, ignore that part - you don’t need it running.

People who need Redshift benefit from new architecture a lot, but if you don’t need it, it’s still has benefits: easier to configure and less fragile (has much less S3DistCp steps and less moving parts), also last Shredder has its own performance improvement (less partitioning).

And at last, is the current approach to run the shredder step is going to get depricated some time soon?

No, not really. We’re going to implement another Shredder that works directly with Kinesis stream (pretty much like S3 Loader or Stream Enrich), but I think two Shredders (Spark and Stream) will co-exist for at least several years. But EmrEtlRunner is months away from its complete retirement.

Also, bear in mind there are shredding_complete.json files in every folder produced by shredder. Once you decide to start loading to Redshift - you will need to start the RDB Loader app and send those files into SQS queue - each file will trigger loading of its folder.

2 Likes

Hi @anton . After setting up the shredder, the EMR log says the Shredder job is completed successfully, and the “shredded/good path/run={{nowWithFormat “2006-01-02-15-04-05”}}/*” contains many objects including the " _SUCCESS" file in the root. However, there is no “shredding_complete.json” in any “run={{nowWithFormat “2006-01-02-15-04-05”}}/” folder. Do you possibly know what may have gone wrong?
It seems this json is required to load data to Redshift.

1 Like

Hi @dadasami,

It seems to be similar to @mgloel’s issue:

Can you check that your SQS queue is FIFO? Also is there anything in the SQS queue?

1 Like