Streaming to BigQuery with google enhanced ecommerce events (unstructured events)


#1

Hi,

I’ve jumped into snowplow fresh and (for better or worse) decided we wanted to stream events straight to BigQuery. I’m using the JS tracker along with the Kinesis collector, Kinesis enrichment and the Kinesis S3 sink and things are working well.

From reading the documentation about Iglu and unstructured events, I knew a little about how events got shredded into separate tables joined by a foreign key and I didn’t fancy having to re-implement all that logic so I stuck to using just structured events; all that would be left to do is map the TSV enriched file to a BigQuery schema and load it in.

I then realised that we’re using Google Enhanced Ecommerce tags and they end up as unstructured events after enrichment so now I’m wondering what options I have.

At this point I’m close to giving up and using GTM to map the ecommerce events to the 5 structured event fields (https://github.com/snowplow/snowplow/wiki/2-Specific-event-tracking-with-the-Javascript-tracker#trackStructEvent) and run the risk of dropping some data that doesn’t quite fit into these fields (we’re planning to run GA in parallel with Snowplow anyway). Is that a sensible thing to do? Any other ideas?

Thanks,
Shin


#2

Hey @Shin, this is a really interesting topic. We’ve spent some time looking at BigQuery so hopefully we can help give you some guidance here.

We’ve thought about BigQuery a fair bit, and it’s unlikely that we would port our Redshift shredding code to BigQuery, for two reasons:

  1. We don’t believe BigQuery is as performant at mega-mega JOINs as Redshift
  2. BigQuery supports tables with deeply nested columns, unlike Redshift

For this reason, when we eventually add BigQuery support, we will likely dynamically mutate on a per-company basis the atomic.events table to contain (deeply nested) columns for each of the entities that a given company’s events can contain.

You are right - doing this in a real-time way that works for all users is a big engineering challenge (one which we haven’t prioritised yet). But you have an advantage, which is that you know up-front which entities you want to store in your atomic.events table: the Google Enhanced Ecommerce entities.

Therefore my suggestion is:

  • You take a look at our prior (out of date) BigQuery R&D, https://github.com/snowplow/snowplow/tree/feature/kinesis-bigquery-sink and https://github.com/snowplow/bigquery-loader-cli
  • You define a BigQuery table definition which extends atomic.events with the Google Enhanced Ecommerce entities
  • You create an updated Kinesis BigQuery Sink which reads the enriched events, looks for the Google Enhanced Ecommerce entities, and writes out each event to your custom atomic.events table
  • You open a PR back into Snowplow with your work - we won’t be able to merge it straight in, but it will help inform our thinking on how to generalize the solution for all users and entity types

Does that make sense?


#3

Thanks Alex - I didn’t know about the BQ sink and your point about deeply nested columns support in BQ was a very good one and helped reach a final solution.

Rather than going down the Kinesis sink route, I had already written an AWS Lambda to read enriched events from S3 and used the Scala SDK to transform the events. As you say, I’m only dealing with enhanced ecommerce events so I know what the json keys are in the flattened json which made writing the BQ schema ahead of time possible; I went a bit further and moved the values into fields that made a bit more sense:

public static String flattenGoogleEnhancedEcommerceRow(String json) {

JsonParser jsonParser = new JsonParser();

    HashMap<String,String> ecommerceTypes = new HashMap<String,String>() {{
        put("contexts_com_google_analytics_enhanced_ecommerce_action_field_object_1", "action");
        put("contexts_com_google_analytics_enhanced_ecommerce_impression_field_object_1", "impression");
        put("contexts_com_google_analytics_enhanced_ecommerce_product_field_object_1", "product");
        put("contexts_com_google_analytics_enhanced_ecommerce_promo_field_object_1", "promo");
    }};

    Iterator<String> types = ecommerceTypes.keySet().iterator();

    JsonObject rootObject = jsonParser.parse(json).getAsJsonObject();

    // Move each of the ecommerce types from the contexts field under ecomm where it will appear nested in BQ
    while(types.hasNext()) {
        String type = types.next();
        String prefix = ecommerceTypes.get(type);

        if (rootObject.has(type)) {
            JsonElement typeJson = rootObject.get(type);

            getEcommNode(rootObject).add(prefix, typeJson);
            rootObject.remove(type);
        }
    }

    // Flatten the action too
    String action = "unstruct_event_com_google_analytics_enhanced_ecommerce_action_1";
    if (rootObject.has(action)) {
        getEcommNode(rootObject).add("action_name", rootObject.getAsJsonObject(action).get("action"));
        rootObject.remove(action);
    }

    return rootObject.toString();
}

private static JsonObject getEcommNode(JsonObject rootObject) {
    if (!rootObject.has("ecomm")) {
        rootObject.add("ecomm", new JsonObject());
    }

    return rootObject.getAsJsonObject("ecomm");
}

Then we get back a json that we can stream to BQ. We’re still in the testing phase but this is probably what we’ll stick to.

Shin


#4

One thing you may need to be mindful with when using the S3 trigger with Lambda is that Lambda employs at least once semantics. This means it’s possible to potentially insert events from the same S3 file more than once into BQ (unless you’re deduplicating somehow before this).


#5

Thanks @mike - that’s a very good point I didn’t consider.

BigQuery does allow you to supply an insert id that it remembers for at least one minute for deduplication.

Shin


#6

Ah nice, that sounds perfect!


#7

Thanks @Shin, @mike, definitely an interesting discussion.

Right - given that you have a fixed event set, as you say a one-time BQ schema was possible.

But for the general use case, we have the challenge of mutating the atomic.events table dynamically to add in extra JSONs as they are observed within the event stream. It’s highly likely that we’ll need some kind of singleton process to do this.

In other words, I believe you have this:

Kinesis shard 1 -> Lambda instance ->
Kinesis shard 2 -> Lambda instance -> streaming API -> BigQuery table
Kinesis shard 2 -> Lambda instance ->

So it’s embarrassingly parallel.

For us I suspect instead we are going to have to do this:

( Check micro-batch for new JSONs
( Modify table to include new JSONs
( Load micro-batch into modified table
--------------------------------------
( Check micro-batch for new JSONs
( Modify table to include new JSONs
( Load micro-batch into modified table
--------------------------------------
Repeat...

To put it another way, we can’t parallelize the queue of table modifications we will need to make…

Any thoughts on this?


#8

What dynamic extra JSON do we have to add in at the moment, does this refer to shredding or something else entirely?


#9

On going over the BQ documentation again it appears it might be possible to insert data without a schema.

https://cloud.google.com/bigquery/docs/reference/v2/tabledata/insertAll - ignoreUnknownValues - boolean - [Optional] Accept rows that contain values that do not match the schema. The unknown values are ignored. Default is false, which treats unknown values as errors.

Not tested but sounds promising.

Another possibility and one I almost ended up doing is inserting the whole json into a single field. BQ has some ability to extract/query json but it sucks for many reasons like performance, cost and complex queries: http://stackoverflow.com/questions/34890339/how-to-extract-all-the-keys-in-a-json-object-with-bigquery


#10

But @Shin the docs say:

The unknown values are ignored.

So that sounds like a showstopper to me. The exciting thing about BigQuery support in Snowplow will be when we dynamically adapt an atomic.events table on a per-installation basis, based on the specific entities (self-describing events, contexts) which are seen in the given event stream (@mike that’s what I mean by “dynamic extra JSON”)…


#11

Doh - I stayed up late watching the Olympics and clearly wasn’t thinking straight :stuck_out_tongue:

https://cloud.google.com/bigquery/streaming-data-into-bigquery#template-tables could be another idea. So in each batch, we generate a template, insert (and wait) if it doesn’t already exist, then insert our data using the template as a suffix. Streaming data ends up in a table called baseName+templateName so there shouldn’t be a problem with conflicting tables - you get a new one for every new json/schema.

Another thing I’m thinking is that nothing in the event stream should be unexpected since everything has to be validated by Iglu further upstream. So we could generate everything upfront according to what’s in Iglu as part of a build/deployment. Admittedly not exciting and not very dynamic.


#12

Hey @Shin,

Interesting idea, need to take a look at template tables!

That wouldn’t normally work - an event’s constituent entities can’t be guaranteed from a snapshot of a single Iglu repository at a point in time. All it takes is for a company to start collecting SendGrid webhooks (say) after this events table has been pre-defined and the table will no longer be sufficient…