Enriched event stream into Redshift using StorageLoader: Contract violation error

Hi,

I am having issues with the StorageLoader in the following setup:
– StreamCollector > StreamEnrich > kinesisS3SinkLZO > StorageLoader > Redshift

The enriched LZO formatted events are correctly getting to s3://kinesis-sink using StreamEnrich and kinesisS3SinkLZO, and redshift is setup, however I am having trouble getting the enriched events from S3 > redshift using the StorageLoader.

As a (probably) related question, since the enriched LZO formatted events are being deposited by kinesisS3Sink into s3://kinesis-sink, does the config for the storageLoader require aws.s3.buckets.enriched.archive: s3://kinesis-sink? If not, how should this be configured?

I am using the snowplow-storage-loader available in snowplow_emr_r83_bald_eagle to run the following:
$ ./snowplow-storage-loader --config emr.config --skip analyze

I get the following error:

Loading Snowplow events and shredded types into My Redshift database (Redshift cluster)...
Unexpected error: Contract violation for argument 2 of 3:
        Expected: String,
        Actual: nil
        Value guarded in: Sluice::Storage::S3::new_fog_s3_from
        With Contract: String, String, String => Fog::Storage::AWS::Real
        At: uri:classloader:/gems/sluice-0.4.0/lib/sluice/storage/s3/s3.rb:46
uri:classloader:/gems/contracts-0.11.0/lib/contracts.rb:48:in `block in Contract'
uri:classloader:/gems/contracts-0.11.0/lib/contracts.rb:154:in `failure_callback'
uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:143:in `block in redefine_method'
uri:classloader:/storage-loader/bin/snowplow-storage-loader:54:in `block in (root)'
uri:classloader:/storage-loader/bin/snowplow-storage-loader:51:in `<main>'
org/jruby/RubyKernel.java:973:in `load'
uri:classloader:/META-INF/main.rb:1:in `<main>'
org/jruby/RubyKernel.java:955:in `require'
uri:classloader:/META-INF/main.rb:1:in `(root)'
uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/rubygems/core_ext/kernel_require.rb:1:in `<main>'

the following is my config:


aws:
  # Credentials can be hardcoded or set in environment variables
  access-key: XXXXXXXXXXX
  secret-key: XXXXXXXXXXX
  s3:
    region: us-east-1
    buckets:
      assets: s3://snowplow-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
      jsonpath_assets: # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
      log: s3://emretlrunner/log

      raw: 
        in:
          - s3://kinesis-sink # e.g. s3://my-in-bucket
        processing: s3://emretlrunner/raw/processing
        archive: s3://emretlrunner/archive/raw # e.g. s3://my-archive-bucket/raw
      enriched:
        good: s3://emretlrunner/enrich/good       # e.g. s3://my-out-bucket/enriched/good
        bad: s3://emretlrunner/enrich/bad # e.g. s3://my-out-bucket/enriched/bad
        errors: s3://emretlrunner/enrich/errors # Leave blank unless :continue_on_unexpected_error: set to true below
        archive: s3://kinesis-sink # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched
      shredded:
        good: s3://emretlrunner/shredded/good # e.g. s3://my-out-bucket/shredded/good
        bad: s3://emretlrunner/shredded/bad # e.g. s3://my-out-bucket/shredded/bad
        errors: s3://emretlrunner/shredded/errors # Leave blank unless :continue_on_unexpected_error: set to true below
        archive: s3://emretlrunner/archive/shredded # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded

  emr:
    ami_version: 4.5.0
    region: us-east-1 # Always set this
    jobflow_role: EMR_EC2_DefaultRole # Created using $ aws emr create-default-roles
    service_role: EMR_DefaultRole     # Created using $ aws emr create-default-roles
    placement: # Set this if not running in VPC. Leave blank otherwise
    ec2_subnet_id: XXXXXXXXXXX # Set this if running in VPC. Leave blank otherwise
    ec2_key_name: bw-emr
    bootstrap: []           # Set this to specify custom boostrap actions. Leave empty otherwise
    software:
      hbase:  # Optional. To launch on cluster, provide version, "0.92.0", keep quotes. Leave empty otherwise.
      lingual:  # Optional. To launch on cluster, provide version, "1.1", keep quotes. Leave empty otherwise.
    # Adjust your Hadoop cluster below
    jobflow:
      master_instance_type: m1.medium
      core_instance_count: 2
      core_instance_type: m1.medium
      task_instance_count: 0 # Increase to use spot instances
      task_instance_type: m1.medium
      task_instance_bid: 0.015 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
    bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
    additional_info:        # Optional JSON string for selecting additional features
collectors:
  format: thrift # For example: 'clj-tomcat' for the Clojure Collector, 'thrift' for Thrift records, 'tsv/com.amazon.aws.cloudfront/wd_access_log' for Cloudfront access logs or 'ndjson/urbanairship.connect/v1' for UrbanAirship Connect events
enrich:
  job_name: snowplow-enrich # Give your job a name
  versions:
    hadoop_enrich: 1.8.0 # Version of the Hadoop Enrichment process
    hadoop_shred: 0.9.0 # Version of the Hadoop Shredding process
    hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
  continue_on_unexpected_error: false # Set to 'true' (and set :out_errors: above) if you don't want any exceptions thrown from ETL
  output_compression: NONE # Compression only supported with Redshift, set to NONE if you have Postgres targets. Allowed formats: NONE, GZIP
storage:
  download:
    folder: # Postgres-only config option. Where to store the downloaded files. Leave blank for Redshift
  targets: 
    - name: "My Redshift database"
      type: redshift
      host: XXXXXXXXXXX.us-east-1.redshift.amazonaws.com # The endpoint as shown in the Redshift console
      database: dev # Name of database
      port: 5439 # Default Redshift port
      ssl_mode: disable # One of disable (default), require, verify-ca or verify-full
      table: atomic.events
      username: XXXXXXXXXXX
      password: XXXXXXXXXXX
      maxerror: 10 # Stop loading on first error, or increase to permit more load errors
      comprows: 200000 # Default for a 1 XL node cluster. Not used unless --include compupdate specified
monitoring:
  # tags: {} # Name-value pairs describing this job
  logging:
    level: DEBUG # You can optionally switch to INFO for production
#   snowplow:
#     method: get
    # app_id: # e.g. snowplow
    # collector: XXXXXXXXX.compute-1.amazonaws.com # e.g. d3rkrsqld9gmqf.cloudfront.net

Any assistance or tips here would be appreciated.

Cheers!

resolved: The issue was that aws.access-key and aws.secret-key should be aws.access_key_id and aws.secret_access_key respectively.

Glad you got it working @chipchip! Just a couple of notes for anybody else reading this:

Note 1. Going on your config.yml, I believe your setup is slightly different to what you described - rather than:

StreamCollector > StreamEnrich > kinesisS3SinkLZO > StorageLoader > Redshift

It looks like you are doing:

StreamCollector > raw stream > StreamEnrich
                             > kinesisS3SinkLZO > StorageLoader > Redshift

which is great - exactly per our Snowplow Lambda architecture.

Note 2. I don’t know why you would set the enriched archive to the same location as the raw in:

     raw: 
        in:
          - s3://kinesis-sink # e.g. s3://my-in-bucket
...
      enriched:
...
        archive: s3://kinesis-sink # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched

That is going to just store enriched events right where your raw payloads are landing, causing a ton of confusion.