Optionally support Avro and the Confluent schema registry for Kafka


#1

Currently, the data written to Kafka is in the Thrift format, which is not the accepted standard in Kafka. We should provide the option to leverage the Schema Registry https://github.com/confluentinc/schema-registry and write the data in Avro format.

This will make Snowplow Kafka options AWESOME


#2

I’m not too familiar with Kafka - is there a particular reason that Avro is the standard over Thrift? Is it just convention to typically serialise with Avro over Thrift or does it provide additional functionality/advantages?


#3

The whole Kafka Connect platform is built around json and jsonschema, being the de-facto standard.
Confluent (creators of Kafka) created the schema registry, which allows to have Avro data in Kafka, where the schema is stored externally in the schema registry, cf http://docs.confluent.io/3.2.0/schema-registry/docs/. The payload is therefore way smaller.

The added benefit is that we can have strong enforcement of schema migrations and compatibility guarantees, such as backwards, forward, etc. Once this is implemented, people can start using Kafka Connectors (https://www.confluent.io/product/connectors/) so they can externalize the Snowplow data to ElasticSearch, S3, etc, for basically free.


#4

Thanks @simplesteph that makes a lot of sense.

At the moment the data in the collector stream is written in Thrift but the enriched stream is TSV - I’m guessing it would make sense to have both the collector and enriched stream/topic as Avro records?


#5

Absolutely.
Is the schema documented anywhere? How much effort would it be to translate that to an Avro schema? That also has some deep implications about how Snowplow plans on evolving schema (in a breaking or non breaking way)


#6

I tried to figure that out as well. No luck, simply columns in TSV that I get after enrichment did not add up.
I’ve checked:


#7

I asked about replacing the Thrift collector payload a while ago while discussing the Clojure collector replacement: Replacing the Clojure Collector

If anything, I would expect Snowplow to leverage Iglu Central and JSON Schemas instead of Confluent’s Schema Registry and Avro. As far as I know JSON is fully supported by Kafka Connect too (see key.converter in https://kafka.apache.org/documentation/#connect_running).


#8

If you support JSON schemas that’s perfect. And you’re then one step away to also supporting Avro :slight_smile:
Should it support JSON schemas using Igloo, then we already have 90% of the work done, all the Kafka Connectors should be working then anyway. I’ll be happy to provide the avro code then

(the one downside of json schemas being that you have to join it to every payload, augmenting the size of the message. Could be decreased by compressing the record as well though).


#9

Hey @simplesteph, @mike, @magaton - lots of interesting thoughts here! I’ll discuss them in turn.

Last time I checked, Apache Kafka was schema technology agnostic :wink: But in all seriousness, I know what you mean - the Apache Kafka ecosystem is at the moment consolidating around the open-source and commercial technology provided by the VC-backed startup Confluent.

There are two independent questions here:

  1. Should we use Avro with Snowplow?
  2. Should we add support for Confluent schema registry alongside Iglu schema registry?

Should we use Avro with Snowplow?

In principle, the answer is yes - it’s a great schema technology. But there are a couple of qualifications to this:

1. The collector payloads

Currently these are represented in Thrift with this schema:

There are minimal benefits to moving this to Avro:

  • Thrift is already a very compact format
  • Avro has better schema evolution capabilities than Thrift, but the CollectorPayload schema evolves very slowly (no changes since defining it)

If we were starting Snowplow again today then I would use Avro for the collector payloads, but I’m not convinced there’s a strong reason to move. Happy to hear a counter-argument.

2. The enriched events

At the moment Snowplow enriched events are an… unusual… TSV+JSON format, comprising two components:

  1. A set of legacy properties represented as first-class columns in the TSV
  2. Single/arrays of heterogeneous self-describing JSON records, stored in three columns

This is a very non-standard format, and we plan on migrating away from it over time.

The first phase of this migration is straightforward: moving the legacy TSV properties into JSON, alongside all the other JSON records. Expect an RFC on this soon.

Where we go next in the migration is an open question. There isn’t an existing schema technology which really fits the Snowplow enriched event, because it’s so heterogenous: a given event can contain 10 or 20 discrete entities, all independently versioned. Complicating things further, we also want to move to a graph structure inside each Snowplow enriched event (think IP->Location+Time->Weather), which again is difficult to model in a first-class way using existing schema technology.

So it’s an open question. However, what is clear is that lots of users want to analyze Snowplow events at scale in a type-safe way using Spark, Athena, Flink, Impala, Presto, Hive etc. To support them, we are planning to add an equivalent of our Redshift load process, which will pre-process our enriched events into S3 in an analytics-friendly way, using Avro plus Parquet:

3. Enriched events in Avro/Parquet/S3 with the Hive metastore

Everything old is new again - it seems like the Hive metastore has “won” in terms of being the place where you register the various tables/folders of events that you are writing to your data lake, ready for subsequent querying.

We need to come up with a process like so:

enriched events -> Avro format -> Parquet + folder structure -> S3 / Azure Data Lake Store
                        ▲                         |
                        |                         |
                        ▼                         ▼
                Iglu schema registry        Hive metastore

There are plenty of questions still about how to structure all of this. Expect an RFC on this once the atomic.events migration RFC has settled down.

Should we add support for Confluent schema registry alongside Iglu schema registry?

Confluent schema registry is much narrower in scope than Iglu:

  • Depends on Kafka and Kafka clients
  • Limited to Avro
  • Doesn’t support schema federation (assumes that all schemas live in a single company-internal schema registry)
  • Doesn’t support new MODEL version changes to schemas (so you end up with adimp, adimp_new, adimp_newest)

However, it does have deep integration with the Confluent ecosystem that’s emerging around Kafka.

Where does that leave us? The interesting thing is that Confluent schema registry is effectively a subset of the functionality of Iglu schema registry, so I believe it would be relatively straightforward to maintain a clone/mirror of the Iglu schema registry in Confluent, in much the same way that we will support the Hive metastore.

So potentially we would implement something like this:

enriched events -> Snowplow Avro format -> Confluent Avro format  -> Confluent ecosystem
                        ▲                         |
                        |                         |
                        ▼                         ▼
                Iglu schema registry ---->  Confluent schema registry

Anyway it needs further thought - certainly an interesting idea…


Using AWS Athena to query the 'good' bucket on S3
Enrich with Kafka
#10

Hey @Alex!

Happy to found this! Do you have an update on how far you got on this topic and any estimation when this could be released?


#11

Hi @lehneres - no update on this currently. We have a dedicated team focused on Iglu and storage loaders now, but they are focused on our BigQuery and Redshift Loaders currently.

Hopefully we will revisit this later this year.