We are building a real time pipeline with kinesis and at the moment we want to work with the enriched data on the stream.
We were hoping to use a simple ruby script, in a aws lambda, to transform the enriched base64 data that comes from the kinesis stream to a json, as @grzegorzewald has mentioned. But in order to do that we need to deserialize the thrift data, which needs something like a
enriched_payload.thrift but we haven’t found nothing like that.
There is the
collector-payload.thrift at: https://github.com/snowplow/snowplow/blob/master/2-collectors/thrift-schemas/collector-payload-1/src/main/thrift/collector-payload.thrift
but isn’t it just for the collector output?
And there’s one more issue. In our tests we have locally deserialized some data using the
collector-payload.thrift using something like seen in here: (Decoding real-time bad records (Thrift) [tutorial]) but in ruby, and it looks like this:
require "json" require "thrift-base64" require_relative "gen-rb/collector_types" unless ARGV.empty? message = JSON.parse(ARGV.first) message["Records"].each do |record| encoded_tracking = record["kinesis"]["data"] struct = CollectorPayload.new binary_base64_deserializer = Thrift::Base64Deserializer.new(Thrift::BinaryProtocolFactory.new) track = binary_base64_deserializer.deserialize(struct, encoded_tracking) puts track.inspect end end
which works! But it depends on the gem
thrift-base64 which depends on the gem
thrift which has a system dependency, and because of that we didn’t manage to make it work on AWS lambda
So… Is there a
enrich.thrift to use? Has anyone made a ruby lambda that reads the stream? Is it a bad idea to work with the kinesis enriched stream without lambda, (a container reading the stream?)
PS: I know that there is the
Python-Analytics-SDK and that I could use an elastic search sink, I’m looking for alternatives.