Reading raw events from kinesis-s3 log

Hi there.
I’d like to use snowplow in next way:


apps sending events -> scala collector -> kinesis -> kinesis S3 sink -> S3


Raw events will be collecting and storing on S3 for some time till analytics processing module is ready.

So now I simply want to make sure those events could be read with Java and they have all the data needed.
My unarchived log file consists of following sections

block_containing_collector_ip
new_line_symbol
event_that_needs_to_be_decoded

block_containing_collector_ip
new_line_symbol
event_that_needs_to_be_decoded

Questions:

  • Do I understand correct that this file contains list of events in thrift format wrapped into elephant bird?
  • Is there some code reading events from S3 after writing it with kinesis-s3 available? (Java/Scala)
  • What is recommended way to process S3 data written with kinesis-s3?

I’m not a Scala guy so reading kinesis-s3 sources hasn’t helped to solve this puzzle :slight_smile:

Hi @vshulga,

You are right in your understanding of the event format for the files produced by Kinesis S3. More details could be found here: https://github.com/snowplow/snowplow/wiki/Collector-logging-formats#the-snowplow-thrift-raw-event-format

As for the event processing, you would probably want to read the enriched data, not raw. Our Analytics SDKs are designed to do just that.

Thus, you would probably want to have the data enriched first before applying analytics.

You might be interested in considering an implementation (or rather a part of) as depicted in the Lambda architecture: How to setup a Lambda architecture for Snowplow. You could deploy EmrEtlRunner and run it with --skip shred option to produce just the enriched events.

2 Likes

Hi @vshulga,

I have been reading raw data from s3 with s3 failure recovery, that may be found here.

Hi @grzegorzewald . Looks like your code is working with base64 records and thrift stuff is unused in recovery.py.
I’m working on decoding success records not failed ones so format is different. Thank you for the code anyway.

Hi @ihor,
Thank you for the previous reply. Snowplow has rich functionality for events enrichment/storing and this is the reason we’ve chosen it. On this stage of project we don’t have enough time to put efforts on events enrichment and storing. That is why we decided to go with raw events and Kinisis-S3 sink.

Could you please give a look to the file’s format that I downloaded from S3.
It doesn’t look as containing thrift records but as UTF-8 strings with some byte delimiters.

I’ve tried to read it with elephant bird in this repo without success. Could you please give a look to this code too?

Hi @vshulga - trying to parse a Snowplow raw event is a solved problem - this is precisely what our Scala Common Enrich library does, and this library is embedded into both our Hadoop Enrich and Stream Enrich applications. Given that you have your raw events in S3 already, I would recommend going with Hadoop Enrich.