How to setup Shredder?

We have implemented the snowplow pipeline to the point that “stream-enrich-kinesis” writes TSV files into the S3 loader bucket. At this point we decided to shred our enriched events into separate entities, using the RDB Shredder. It seems that RDB Shredder is part of emretlrunner batch process, however it is also mentioned here that it can be run manually. Now I have the following questions regarding the Shredder setup:

  1. Does running Shredder manually actually mean to use this Snowplow hosted asset?

  2. What is the difference between what “stream-enrich-kinesis” does and the enrichment of emretlrunner?

  3. Can we actually run emretlrunner (in case it is necessary to do the Shredding job) within a Fargate instance? Else, what is the recommended implementation?

  4. How should the emretlrunner config file be setup for the whole s3 block (below), in our case? We have only one bucket that collects the good enriched events). Are the buckets in the block, all required? Are they all outputs for the Shredder? In case not, which one is required for which steps?

s3:
    region: ADD HERE
    buckets:
      assets: s3://snowplow-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
      jsonpath_assets: # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
      log: ADD HERE
      encrypted: false # Whether the buckets below are enrcrypted using server side encryption (SSE-S3)
      raw:
        in:                  # This is a YAML array of one or more in buckets - you MUST use hyphens before each entry in the array, as below
          - ADD HERE         # e.g. s3://my-old-collector-bucket
          - ADD HERE         # e.g. s3://my-new-collector-bucket
        processing: ADD HERE
        archive: ADD HERE    # e.g. s3://my-archive-bucket/raw
      enriched:
        good: ADD HERE       # e.g. s3://my-out-bucket/enriched/good
        bad: ADD HERE        # e.g. s3://my-out-bucket/enriched/bad
        errors: ADD HERE     # Leave blank unless :continue_on_unexpected_error: set to true below
        archive: ADD HERE    # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched
      shredded:
        good: ADD HERE       # e.g. s3://my-out-bucket/shredded/good
        bad: ADD HERE        # e.g. s3://my-out-bucket/shredded/bad
        errors: ADD HERE     # Leave blank unless :continue_on_unexpected_error: set to true below
        archive: ADD HERE    # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded
    consolidate_shredded_output: true # Whether to combine files when copying from hdfs to s3

@dadasami, to shred your data you need to run EmrEtlRunner in Stream Enrich mode. It is different from processing the data in the standard (batch pipeline) mode. However, from configuration and running EmrEtlRunner point of view, it is different only by 1 line in the configuration code. Your EmrEtlRunner configuration file has to have an extra property “enriched:stream”. This is your bucket that contains Kinesis-enriched TSV files (uploaded by S3 Loader). A mere presence of that line instructs EmrEtlRunner to run in Stream Enrich mode.

1 Like

@ihor Thanks a lot! And what else might be necessary? “enriched: good”, “enriched:bad”, “shredded:good”, “shredded:bad”, do I need to create empty buckets for each before running EmrEtlRunner?

And may I please know if you vote for or against running the jobs on Fargate?

@dadasami, I think all the buckets are mandatory to have in the configuration file as per configuration file for Stream Enrich mode (there are inline comments giving you advice when it could be omitted). While some values could be blank the property itself has to be present to pass the validation.

And may I please know if you vote for or against running the jobs on Fargate?

EmrEtlRunner orchestrates job execution for you. It will spin EMR cluster with the predefined specs as per your configuration file. Therefore, you need to make sure the cluster is big enough for the task and it has the appropriate Spark configuration. The cluster spun this way does not autoscale.

I haven’t heard of the shredding running on Fargate. If it is possible it will be without EmrEtlRunner involvement.

2 Likes