Example: Running Snowplow real-time pipeline on GCP with Kafka and Kubernetes

Hi,

last week I gave a talk on how to run real-time clickstream pipeline on GCP with Snowplow and Kafka. The byproduct of that is github example and simple demo. Here are the details:

Note of warning it’s not production proof, but I can say this template is very similar of what we tried on production pipeline and it performed quite well (no loss of events or unexpected suprises).

Would be happy to answer any questions!

Cheers,
Evaldas

4 Likes

Awesome - thanks for sharing this!

How are you handling shredding/custom schemas in BigQuery? The BQ schema in the example just looks like contexts and derived_contexts are treated as strings.

Good question. I haven’t added any example there, but generally you would:

  1. parse the contexts from the message
  2. match based on schema name to record schema
  3. load them together with the main event into a single table

BigQuery supports nested data structures. So for custom contexts you would add a Record type for every custom context.

Thanks - keen to get thoughts on different ideas of how this data could/should be stored in BigQuery.

  • Are you recommending loading multiple contexts into a single contexts columns or multiple columns (one per schema)?
  • Is having the data as records in atomic.events more performant than joining to a context per table?
  • Are shredded events more performant and does shredding events decrease cost by reducing total bytes scanned in a query?

Are you recommending loading multiple contexts into a single contexts columns or multiple columns (one per schema)?

I would use 1 context per 1 record column

Is having the data as records in atomic.events more performant than joining to a context per table?

It is more performant for sure if events are loaded together with contexts, then you essentially don’t even need to do any joins as it sits as 1 row which you ‘flatten’ during select. Also works for 1-to-many cases, as record can be repeated.

Are shredded events more performant and does shredding events decrease cost by reducing total bytes scanned in a query?

yes, essentially you pay for how many columns your query touches (translated to bytes), so if you select only contexts that are necessary then the cost would be smaller, but I think the bigger benefit is convenience of structuring the schema this way otherwise you would have a long list of columns in fat table or seperated tables as is in redshift case. Quite informative talk on this nested data modeling is here (nested schema part starts at 45 min): https://www.youtube.com/watch?v=Vj6ksosHdhw

  • It seems like at the least we’d need to have one column per context per model for any breaking schema changes?
  • Would revisions/additions to a schema result in a column patch or an additional column?

I also discovered today that the BQ pricing isn’t entirely clear particularly when it comes to pricing for record/structs and repeated fields.

  • For a single nested record you are charged for the data you select not the fields in the record/structure, documentation refers to this as “contained fields.
  • For repeated structs/arrays you are charged for the data you select * number of items in the array, even if you’re selecting the first item in a repeated struct (as far as I can tell).

It seems like at the least we’d need to have one column per context per model for any breaking schema changes?

Not really sure what schema changes you have in mind, but generally adding a new column doesn’t require any transformations. If you want to change a type of course then it would require some sql transforms, though I find those much simpler to do in BigQuery than Redshift as the scaling is handled by Google under the hood.

For a single nested record you are charged for the data you select not the fields in the record/structure, documentation refers to this as “contained fields.”

I never looked into this too much as we don’t have that much data volume to impact the costs, but I was curious to look at some examples, see below:

In general I think the cost is always per column touches no matter if they are in nested or top level. For repeated fields is that it just flattens them out so you get more rows. If you really want to be aggressive about cost reduction using partitioned tables could help a lot if querying only recent data, like “WHERE _PARTITIONTIME = TIMESTAMP(“2017-06-01”) LIMIT 1000” that is something like a virtual table where all rows loaded on that day lives.