Shredding & loading enriched events in near-real-time

Here’s the beginnings of a Dataflow Runner playbook that should help you guys:

{
  "schema": "iglu:com.snowplowanalytics.dataflowrunner/PlaybookConfig/avro/1-0-1",
  "data": {
    "region": "eu-west-1",
    "credentials": {
      "accessKeyId": "...",
      "secretAccessKey": "..."
    },
    "steps": [
      {
        "type": "CUSTOM_JAR",
        "name": "S3DistCp Step: Enriched events -> staging S3",
        "actionOnFailure": "CANCEL_AND_WAIT",
        "jar": "/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar",
        "arguments": [
          "--src","s3n://path-to-kinesis-s3-sink-output/",
          "--dest","s3n://my-working-bucket/enriched/good/run={{timeWithFormat .epochTime "2006-01-02-15-04-05"}}/",
          "--s3Endpoint","s3-eu-west-1.amazonaws.com",
          "--deleteOnSuccess"
        ]
      },
      {
        "type": "CUSTOM_JAR",
        "name": "Hadoop Shred: shred enriched events for Redshift",
        "actionOnFailure": "CANCEL_AND_WAIT",
        "jar": "s3://snowplow-hosted-assets/3-enrich/scala-hadoop-shred/snowplow-hadoop-shred-0.11.0-rc2.jar",
        "arguments": [
          "com.snowplowanalytics.snowplow.enrich.hadoop.ShredJob",
          "--hdfs",
          "--iglu_config",
          "{{base64File "/snowplow-config/iglu_resolver.json"}}",
          "--duplicate_storage_config",
          "{{base64File "/snowplow-config/targets/duplicate_dynamodb.json"}}",
          "--input_folder","s3n://my-working-bucket/enriched/good/run={{timeWithFormat .epochTime "2006-01-02-15-04-05"}}/",
          "--output_folder","s3n://my-working-bucket/shredded/good/run={{timeWithFormat .epochTime "2006-01-02-15-04-05"}}/",
          "--bad_rows_folder","s3n://my-working-bucket/shredded/bad/run={{timeWithFormat .epochTime "2006-01-02-15-04-05"}}/"
        ]
      }
    ],
    "tags": [
    ]
  }
}

Invoked with:

--vars epochTime,`date +%s`
```