Issue reading enriched data thrift


#1

Hi all,

We are building a real time pipeline with kinesis and at the moment we want to work with the enriched data on the stream.
We were hoping to use a simple ruby script, in a aws lambda, to transform the enriched base64 data that comes from the kinesis stream to a json, as @grzegorzewald has mentioned. But in order to do that we need to deserialize the thrift data, which needs something like a enriched_payload.thrift but we haven’t found nothing like that.

There is the collector-payload.thrift at: https://github.com/snowplow/snowplow/blob/master/2-collectors/thrift-schemas/collector-payload-1/src/main/thrift/collector-payload.thrift
but isn’t it just for the collector output?

And there’s one more issue. In our tests we have locally deserialized some data using the collector-payload.thrift using something like seen in here: (Decoding real-time bad records (Thrift) [tutorial]) but in ruby, and it looks like this:

require "json"
require "thrift-base64"
require_relative "gen-rb/collector_types"

unless ARGV.empty?
  message = JSON.parse(ARGV.first)
  message["Records"].each do |record|
    encoded_tracking = record["kinesis"]["data"]
    struct = CollectorPayload.new

    binary_base64_deserializer =
      Thrift::Base64Deserializer.new(Thrift::BinaryProtocolFactory.new)

    track = binary_base64_deserializer.deserialize(struct, encoded_tracking)

    puts track.inspect
  end
end

which works! But it depends on the gem thrift-base64 which depends on the gem thrift which has a system dependency, and because of that we didn’t manage to make it work on AWS lambda :disappointed:

So… Is there a enrich.thrift to use? Has anyone made a ruby lambda that reads the stream? Is it a bad idea to work with the kinesis enriched stream without lambda, (a container reading the stream?)

PS: I know that there is the Python-Analytics-SDK and that I could use an elastic search sink, I’m looking for alternatives.

Many Thanks,
Caio


#2

Another question.

What you guys think about using kinesis analytics to read the enriched stream?

Thanks


#3

The good news is that if you want to read off the enriched stream (in Kinesis) it will be a bas64 encoded tab separated record rather than a Thrift record. The raw stream that the collector outputs is Thrift serialized but the output of the enricher that consumes this will output TSV (in which some columns are JSON).

Both the Scala and Python Analytics SDK will allow you to transform and convert this enriched format which can make things significantly easier - so if it’s possible for you to use them I’d recommend that.

If that isn’t an option you can also apply your own custom logic in a Lambda function with a Kinesis trigger (this will take care of some of the semantics of reading/batching records for you) or you can also read off the Kinesis enriched stream using any other client. If you do decide to read off the stream directly I’d strongly encourage using the Ruby (or other language) KCL library that AWS provides as this significantly simplifies some of the logic you’d otherwise have to implement around record processing and checkpointing.


#4

Thanks for the answer @mike!

I’m trying to work with the TSV, but I’m having some troubles:

  • It seems that there is lots of “empty columns” (\t\t)
  • How can I know which data it which? What are the columns names

Here’s an example of a row, its a page view for www.pv-test.com generate via ruby tracker, locally :

data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data data
ID_test srv 2017-08-11 18:22:40.369 2017-08-11 18:20:12.495 2017-08-11 18:20:12.010 page_view ea7ef03e-a75f-4250-a28b-918d3bdb149e namespace rb-0.6.0 ssc-0.9.0-kinesis kinesis-0.10.0-common-0.24.0 172.18.0.1 60e6512b-cab2-4b11-b107-40bbb15f24bf http://www.pv-test.com http www.pv-test.com 80 Ruby Unknown Unknown unknown OTHER Unknown Unknown Other Unknown 0 2017-08-11 18:20:12.493 {“schema”:“iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1”,“data”:[{“schema”:“iglu:com.snowplowanalytics.snowplow/ua_parser_context/jsonschema/1-0-0”,“data”:{“useragentFamily”:“Other”,“useragentMajor”:null,“useragentMinor”:null,“useragentPatch”:null,“useragentVersion”:“Other”,“osFamily”:“Other”,“osMajor”:null,“osMinor”:null,“osPatch”:null,“osPatchMinor”:null,“osVersion”:“Other”,“deviceFamily”:“Other”}}]} 2017-08-11 18:20:12.012 com.snowplowanalytics.snowplow page_view jsonschema 1-0-0

raw:

ID_test srv 2017-08-11 18:22:40.369 2017-08-11 18:20:12.495 2017-08-11 18:20:12.010 page_view ea7ef03e-a75f-4250-a28b-918d3bdb149e namespace rb-0.6.0 ssc-0.9.0-kinesis kinesis-0.10.0-common-0.24.0 172.18.0.1 60e6512b-cab2-4b11-b107-40bbb15f24bf http://www.pv-test.com http www.pv-test.com 80 Ruby Unknown Unknown unknown OTHER Unknown Unknown Other Unknown 0 2017-08-11 18:20:12.493 {"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1","data":[{"schema":"iglu:com.snowplowanalytics.snowplow/ua_parser_context/jsonschema/1-0-0","data":{"useragentFamily":"Other","useragentMajor":null,"useragentMinor":null,"useragentPatch":null,"useragentVersion":"Other","osFamily":"Other","osMajor":null,"osMinor":null,"osPatch":null,"osPatchMinor":null,"osVersion":"Other","deviceFamily":"Other"}}]} 2017-08-11 18:20:12.012 com.snowplowanalytics.snowplow page_view jsonschema 1-0-0 3baf6fb871880d77b2ebcd3866bbc05b 

How can I know what to put in the place of data?

Many Thanks,
Caio


#5

Hi @caio - the code in the Python Analytics SDK is probably the best place to go to understand the data format:

The human-readable definition of the enriched event TSV is in this wiki page:

Unfortunately we don’t have any plans to write a Ruby Analytics SDK at this time.


#6

Hi @calo81

Why you want to use Ruby on AWS lambda? It is not supported natively. Better go for python/JS.

If you want to analyse enriched data, go here: https://github.com/snowplow/snowplow/blob/master/4-storage/kinesis-elasticsearch-sink/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/elasticsearch/SnowplowElasticsearchTransformer.scala and inspect fields list. It contains list of actually used columns + descriptions of types. Some time ago i used this to build record hash (field => value) in ruby + a kind of decoder based on value type.