A new bad row format


#1

As part of our drive to make the Snowplow community more collaborative and widen our network of open source contributors, we will be regularly posting our proposals for new features, apps and libraries under the Request for Comments section of our forum. You are welcome to post your own proposals too!

A new bad row format

Introduction

Data quality has always been a huge focus for Snowplow. Being a non-lossy data pipeline is one of the central piece of this data quality puzzle. Indeed, other pieces such as data validity (through our schema validation technology) or data richness (through our enrichments) connect directly with the non-lossiness piece.

In practice, for a Snowplow pipeline, non-lossiness meant that when something went wrong anywhere in the pipeline, instead of discarding it, the data impacted is parked as “bad” for later inspection, fixing and reprocessing. In Snowplow jargon, this bad data is called “bad rows”.

However, over the years, a certain form of antagonism has developed between how great it is to have this feature and how annoying it can be to deal with. In particular, inspection, fixing and reprocessing of this bad data are currently harder than they should be. We believe tackling these issues starts with a new format for this bad data.

It seems that the community shares our views and enthusiasm on the subject as the answer “Better bad row format” was the most voted for when answering the question “If you were Snowplow, how would you prioritize the following roadmap ideas?” in our recent 2018 open source community survey:

Of course, we have been aware of this major pain point for a while and our journey into a more intelligent bad row format actually already started last November during a hackweek when we got together to think, as a group, about data quality in a Snowplow pipeline. Of course, one of the central theme that came up was the bad row format and a team got started on a proof of concept for a new format.

Since then, data quality has been the focus of the pipeline team, this has translated into different efforts:

Furthermore, it will continue to be our focus going into 2019 as we start working on the object of discussion of this RFC.

Because of all these converging factors, we are eager to share our plans regarding a new bad row format!

Current state of affairs

At the moment, a bad row will look something like:

{
  "errors": [
    {
      "level": "error",
      "message": "a message"
    }
  ],
  "line": "a collector payload",
  "failure_tstamp": "2018-12-17T15:54:44.493Z"
}

The current format comes up short in quite a few places.

First, if we look at the errors field, all errors are stored in the message field as flat
strings which are not machine-readable and sometimes not even easy to understand by a human.
Moreover, the level field in each of those errors always has the "error" value, making it superfluous.

Next, if we look at the line field, its content is dependent on the collector you’re using:

  • if you’re using the Clojure Collector, you will get a tab-separated collector payload
  • if you’re using the Scala Stream Collector, you will get a base64-encoded Thrift-serialized collector payload which makes it virtually impossible to know what the payload actually look like

Both of these lacks of structure make it difficult to know what actually happened when looking at a specific bad row.

Furthermore, if we look at the format holistically, it also becomes apparent that there is no
inherent partitioning possible with this kind of data (except by date): every bad rows are mingled together and there is no way to tell different types of bad rows apart from just looking at them structurally. For example, a bad row resulting from a schema invalidation and another coming from an enrichment failure will both have the exact same structure despite having completely different semantics.

This makes it hard to investigate specific issues as you automatically get the full bad row firehose while you might be interested in a single specific issue.

Finally, there is a set of more esoteric issues with the current format such as what we internally refer to as “POST bombs”.
This issue starts with a batch of events being sent as a single POST request. The problem occurs if one or more of these events result in a bad row: a bad row will be generated for every one of these events containing the entirety of the original payload (the whole batch of events), leading to what can be a huge multiplication factor, hence the term “bomb”.
In addition to this, POST bombs are a source of duplicate generation.
For example, if you were to send events through a POST request containing a batch of 100 events and two of those events resulted in bad rows, you would have two bad rows each containing the 100 events. When reprocessing, you would generate 98 duplicates because you would reprocess those 100 events twice in addition to wasting processing power.

In this RFC, we set out to address these shortcomings.

A new format

As it turns out, the bad row format can be much more featureful than it currently is and work towards solving these issues.

This becomes a lot clearer if we dive into the different points of generation of bad rows. In the following schema we can see a payload move through the different stages of a Snowplow pipeline and the places where something can go wrong and a bad row can be generated.

For each step we can see the coordinates of a corresponding bad row schema which a bad row will have to conform to if a particular macro pipeline step were to fail.

For example, let’s say you are running an AWS real-time Snowplow pipeline and a payload exceeding 1Mb hits your collector (1Mb is the maximum size of a record in Kinesis), this payload will fail size validation and a bad row conforming to iglu:com.snowplowanalytics.snowplow.badrows/size_violation/jsonschema/1-0-0 will be generated.

Now that we have a better grasp of where and why different bad rows are generated, we can dive into each type in turn.

Size validation

As we’ve just seen, a payload can exceed the maximum size allowed by the used data streaming technology. This implies that the payload can’t be stored in its entirety, most likely due to a payload body that is too big. For example, allowing images to be inserted in a form leads to this type of issue.

In this case, a bad row conforming to iglu:com.snowplowanalytics.snowplow.badrows/size_violation/jsonschema/1-0-0 will be generated, for example:

{
  "schema": "iglu:com.snowplowanalytics.snowplow.badrows/size_violation/jsonschema/1-0-0",
  "data": {
    "failure": {
      "timestamp": "2018-12-23T13:37:57.056Z",
      "maximumAllowedSizeBytes": 1048576,
      "actualSizeBytes": 1073741824
    },
    "payload": {
      "timestamp": "2018-12-24T13:37:57.056Z",
      "headers": {},
      "path": "/com.snowplowanalytics.snowplow/tp2",
      "truncatedQueryString": "a=12",
      "truncatedBody": "{}"
    },
    "processor": {
      "artifact": "scala-stream-collector",
      "platform": "kinesis",
      "version": "0.14.0"
    }
  }
}

If this check is succeful, it means that we can safely store our whole payload into our real-time data storage technology of choice.

Keep in mind that this case is basically impossible to recover from as the truncation leads to data loss.

Collector payload validation

Since collectors are web-facing, they can be reached by entities that are not Snowplow
trackers or are not part of the webhooks we currently support. Those entities are more often than not robots trying to find some sort of vulnerability.

For such cases, we will generate bad rows according to the iglu:com.snowplowanalytics.snowplow.badrows/format_violation/jsonschema/1-0-0 schema:

{
  "schema" : "iglu:com.snowplowanalytics.snowplow.badrows/format_violation/jsonschema/1-0-0",
  "data" : {
    "failure": {
      "timestamp": "2018-12-23T13:37:57.056Z",
      "message" : "Unrecognized collector format"
    },
    "payload": {
      "timestamp": "2018-12-24T13:37:57.056Z",
      "headers": {},
      "path": "/test",
      "queryString": "a=12",
      "body": "{}"
    },
    "processor": {
      "artifact": "stream-enrich",
      "platform": "kafka",
      "version": "0.19.1"
    }
  }
}

From this type of bad rows, we can aggregate along different dimensions in order to find out the most common robots.

If the payload manages to pass this step, we know it is Snowplow-related.

Tracker protocol validation

Moving along our Snowplow pipeline, we can check that this Snowplow-related payload conform to our tracker protocol.
Failures for this step could resemble event ids not being UUIDs or contexts being invalid json which would have been generated by something Snowplow-aware, i.e. a faulty SDK (tracker or webhook integration).

If tracker protocol validation fails, we’ll be confronted to a iglu:com.snowplowanalytics.snowplow.badrows/tracker_protocol_violations/jsonschema/1-0-0 bad row:

{
  "schema" : "iglu:com.snowplowanalytics.snowplow.badrows/tracker_protocol_violations/jsonschema/1-0-0",
  "data" : {
    "failure": {
      "timestamp": "2018-12-23T13:37:57.056Z",
      "messages": [
        {"key": "eid", "value": "INVALID_UUID"},
        {"key": "cx", "value": "INVALID_PAYLOAD"}
      ]
    },
    "payload": {
      "timestamp": "2018-12-24T13:37:57.056Z",
      "headers": {},
      "path": "/test",
      "queryString": "a=12",
      "body": "{}"
    },
    "processor": {
      "artifact": "spark-enrich",
      "platform": "emr",
      "version": "1.16.0"
    }
  }
}

Succeeding in going through this step, means that, of course, we know every part of the tracker protocol as well as how many self-describing entities are contained in the payload, for example.

Schema validation

Next, the topic that readers will probably be the most familiar with, are schema validation
failures. We’ll go through every self-describing entities in the payload and check that they conform to their schema.

As such, there could be one or more schema validation failures per self-describing entity. These bad rows, produced by Iglu, will follow the
iglu:com.snowplowanalytics.snowplow.badrows/schema_violations/jsonschema/1-0-0 schema:

{
  "schema" : "iglu:com.snowplowanalytics.snowplow.badrows/schema_violations/jsonschema/1-0-0",
  "data" : {
    "failure": {
      "timestamp": "2018-12-23T13:37:57.056Z",
      "messages": [
        {
          "schemaKey": "iglu:com.acme/signup/jsonschema/1-0-2",
          "instance": "unstructured_event",
          "pointer": "/personal/1/name",
          "property": "minLength",
          "message": "property name has length 2; minimum allowed 3",
          "igluRepository": "Iglu Central"
        },
        {
          "schemaKey": "iglu:com.acme/healthcheck/jsonschema/1-0-0",
          "instance": "contexts/2",
          "message": "schema not found",
          "igluRepository": "com.acme Iglu"
        }
      ]
    },
    "payload": {
      "timestamp": "2018-12-24T13:37:57.056Z",
      "headers": {},
      "path": "/test",
      "queryString": "a=12",
      "body": "{}"
    },
    "processor": {
      "artifact": "beam-enrich",
      "platform": "pubsub",
      "version": "0.1.0"
    }
  }
}

Success here means that we know every self-describing entity is valid.

Enrichments

Moving on to the enrichment phase, failures here will mean that one or more enrichments will have failed and bad rows will conform to iglu:com.snowplowanalytics.snowplow.badrows/enrichment_failures/jsonschema/1-0-0:

{
  "schema" : "iglu:com.snowplowanalytics.snowplow.badrows/enrichment_failures/jsonschema/1-0-0",
  "data" : {
    "failure": {
      "timestamp": "2018-12-23T13:37:57.056Z",
      "messages": [
        {
          "enrichment": "iglu:com.snowplowanalytics.snowplow.enrichments/weather_enrichment/jsonschema/1-0-0",
          "message": "Timeout error"
        },
        {
          "enrichment": "iglu:com.snowplowanalytics.snowplow.enrichments/pii_enrichment/jsonschema/2-0-0",
          "message": "No $.users.id property found"
        }
      ]
    },
    "payload": {
      "timestamp": "2018-12-24T13:37:57.056Z",
      "headers": {},
      "path": "/test",
      "queryString": "a=12",
      "body": "{}"
    },
    "processor": {
      "artifact": "mini",
      "platform": "nsq",
      "version": "0.6.0"
    }
  }
}

After a successful enrichment phase, we will be in possession of a canonical event ready for loading.

Storage

Finally, we arrive at storage loading where we can have very semantically different failures. Those could range from connection issues with the targeted database to database user authorization issues.

Because of this heterogeneous nature, failures generated by storage components are nested:

{
  "schema" : "iglu:com.snowplowanalytics.snowplow.badrows/storage_loader_failure/jsonschema/1-0-0",
  "data" : {
    "failure": {
      "schema": "iglu:com.snowplowanalytics.snowplow.badrows/bigquery_bad_row/jsonschema/1-0-0",
      "data": {
        "timestamp": "2018-12-23T13:37:57.056Z",
        "column": "contexts_com_acme_event_1",
        "message": "Unrecognized type integer"
      }
    },
    "payload": {
      "enrichedEvent": "TSV enriched event",
      "id": "deadbeef-dead-beef-dead-beefdeadbeef"
    },
    "processor": {
      "artifact": "bigquery-loader",
      "version": "0.2.0"
    }
  }
}

This final storage step ends our bad row journey!

A common collector payload format

By now, you will have noticed that the payloads contained in those bad rows are completely collector-independent as well as not obfuscated by serialization or weird formatting. This should help greatly during investigation and recovery as serialization and formatting do not get in the way.

Phasing

Our current plan of action will start, as do a lot of others projects at Snowplow, with defining schemas for the new different types of bad rows as well as a new collector payload format unifying the different collector formats (Scala Stream Collector, Clojure Collector, etc).

Work will continue on the Iglu client, focusing on improving the processing message structure which will help, in turn, the bad row format with clearer and immutable error messages to facilitate rollups when analyzing bad data.

With those foundations laid down, we will then works towards integrating them into Scala Common Enrich, our library common to all enrichment platforms. We will also take advantage of this opportunity to change the json and functional programming libraries (we’ll move from json4s to circe and from scalaz to cats respectively).

From there, we will be able to integrate this work into the different enrichment platforms
(Stream Enrich, Beam Enrich and Spark Enrich) so that this change can reach all users (AWS real-time, GCP real-time and batch respectively).

Moving further downstream, we will need to adjust the different components which let you store bad rows into durable storage. Those are the S3, Google Cloud Storage and Elasticsearch loaders.

Finally, the new bad row format workstream will end with a new recovery process leveraging the new bad row structure allowing you to act and recover certain types of bad rows while neglecting others for example.

Out of scope

One item that is currently out of scope which we want to work on further down the road is to enable recovery traceability. In essence, this process would add metadata relating to the kind of recoveries which have been run on a particular event. This would allow people to inspect an event and know how many times and for what purpose an event has been through recovery.

A process such as this one would unlock smarter and real-time recoveries.

Your feedback

With this approach which combines a lot more fine-grained structure through schemaing and bad rows discrimination thanks to the different types we’ve established above, we aim to make bad data easier to investigate and load into SQL-compatible systems and fix.

We’d love to hear what you think about the above proposal. Don’t hesitate to share
ideas and discuss the proposed format.


#2

This looks really good. It will certainly make it easier to debug and reprocess rows that end up in bad.

There’s a few things I’d add to all schemas which I think are currently missing

  • Add userAgent
  • Timestamp of the failure (vs timestamp of the payload itself)
  • This is a tricker one and more of an edge case but it’d be nice to include the Iglu repository associated with the schemaKey

#3

Add userAgent

This will be part of the headers, do you think it deserve its own payload top-level spot?

Timestamp of the failure (vs timestamp of the payload itself)

Yup, great point will edit the RFC

This is a tricker one and more of an edge case but it’d be nice to include the Iglu repository associated with the schemaKey

Do you mean for the schema validation phase? Yes, definitely.


#4

If it’s in the headers part of the event that’s probably enough. I’m not sure if it should have it’s own column or not though I’d lean towards yes - only because userAgent is top level in the Thrift schema as well as it’s own column in the Beanstalk logs.

Yes - just the schema validation phase.


#5

Thanks for the feedback @mike, we’ve started implementing schemas in https://github.com/snowplow/iglu-central/pull/882