Observability for the Snowplow streaming pipeline


#1

As part of our drive to make the Snowplow community more collaborative and widen our network of open source contributors, we will be regularly posting our proposals for new features, apps and libraries under the Request for Comments section of our forum. You are welcome to post your own proposals too!

Observability in the streaming pipeline

This request for comments is about increasing observability in the Snowplow streaming pipeline.

Background on observability

As observability becomes more and more important in today’s technology landscape, we would like to build out our observability capabilities in the Snowplow pipeline.

So what is observability? For us, it’s about providing key insights into a running system, be it about performance, throughput, usage or interactions with other running systems.

Those key insights can then be sampled and analyzed to feed visualizations, trigger alerts or make automated decisions, for example.

For more background on observability, we invite you to check out the existing art through the following blog posts:

Our previous efforts on batch pipeline observability

In the Snowplow batch pipeline, our RDB Loader and EmrEtlRunner components emit Snowplow events which can be ingested into a secondary or “meta” Snowplow pipeline.

Snowplow RDB Loader emits the following events:

EmrEtlRunner emits the following events:

These events are emitted along with a context describing the application:

iglu:com.snowplowanalytics.monitoring.batch/application_context/jsonschema/1-0-0

We experimented with additional observability for the batch pipeline as a great hackathon project last November in Prague. As a proof of concept, we tracked whenever a batch job (be it Spark Enrich, Spark Shred or RDB Loader) started and ended. The events we emitted bundled additional information such as the number of raw events, or the number of events once successfully enriched.

With this RFC, we’re taking our hackathon concept and taking it further with the real-time pipeline.

Our plan for streaming pipeline observability

We want to first focus on the real-time pipeline components. Accordingly, we have compiled a list of potential metrics that would provide key observability insights for the Scala Stream Collector and the real-time enrichment process.

Scala Stream Collector

The metrics we would like to cover for the Scala Stream Collector are as follows:

  • Windowed GET requests count
  • Windowed POST requests count
  • Windowed bad / malformed requests count
  • Windowed average Content-Length for POST requests
  • Windowed average query string size for GET requests
  • Windowed number of retries while pushing out to the output stream
  • Mean time to HTTP response

The first three are general usage indicators into how the collector is “hit” by our trackers.

The fourth and fifth ones, which measure the average size of the payloads hitting the collector, could influence how downstream components are sized, e.g. increase the number of Kinesis shards.

The penultimate indicator, which measures the number of retries performed before a successful push to the output stream, would provide insights into how the collector and its output stream interact.

Finally, mean time to HTTP response is general health indicator for a web server which measures, on average, how long it takes for the server to respond to a request.

Real-time enrichment process

The proposed metrics for the real-time enrichment process:

  • Windowed event vendor counts (e.g. com.snowplowanalytics.snowplow, com.google.analytics, etc.).
  • Windowed counts of which tracker sent the events (e.g. JavaScript Tracker v2.9.0)
  • Windowed average event size after enrichment
  • Windowed counts of bad rows
  • Mean time to enrich

The first two monitor typical usage of the pipeline, and try to identify to which vendors the events belong, and which trackers were used to send those events.

As with the Scala Stream Collector’s raw payloads, we want to monitor the size of the events after enrichment, to be able to take actions into how the pipeline is sized.

Again we want to track the number of retries performed before a successful push to the output stream.

The count of bad rows will give some steer into how to debug a running pipeline which is perhaps seeing elevated failure rates.

Finally, mean time to enrich would measure how long it takes for an event to be enriched. It would help us to understand how adding or removing specific enrichments can influence the performance of the enrichment process.

Pitfalls

One thing we need to be wary about is having too many metrics that are never looked at or not actionable enough as they can have a non-negligible performance impact on the running application.

We have tried to provide a tight set of metrics which can evolve in time; let us know if you think there are other metrics which we should include, or if some of the above are non-essential. We can of course evolve the metric set over time.

Phasing

We have started working on this initiative for the upcoming Beam Enrich milestone - the specific ticket is:

Beam Enrich will be our new enrichment platform for Google Cloud Platform’s Dataflow. It is built on top of Apache Beam and SCIO. For more information on Beam Enrich, see our Google Cloud Platform RFC (where it’s referred to as “Dataflow Enrich”):

After this roll out, we will tackle the Scala Stream Collector and Stream Enrich in future milestones.

Implementation

The collector metrics can be emitted as Snowplow events to a secondary or “meta” Snowplow pipeline with the help of the Scala tracker. A Snowplow Mini could serve as this secondary pipeline.

From this pipeline, these metrics can be distributed to downstream components to perform time-based aggregation for further investigation / dashboarding, for example.

Unfortunately, the Beam Enrich metrics cannot be forwarded directly to a Snowplow collector from inside the job running in Cloud Dataflow. However, they are available through the UI:

and through a REST API: https://cloud.google.com/dataflow/docs/reference/rest/ which contains additional metrics provided out of the box such as:

  • Virtual CPU count
  • Total / current memory usage
  • Mean byte count at each step
  • Element count at each step (e.g. number of bad rows)

Out of scope

We have purposefully restrained the number of metrics around bad rows. This is due to the fact that, in its current form, the bad row format is very poor at providing any kind of metrics that could be useful for analysis.

We plan on publishing an RFC around a new bad row format soon, to address this type of issue. After this, we can revisit emitting more granular observability metrics around bad rows.

Your feedback

We’d love to hear what you our community thinks about the current proposal. Don’t hesitate to share ideas on what metrics you would like to see surfaced or removed.

Finally, it’s important to keep in mind that observability is a company-wide initiative and as such will be extended, in time, to other components of the Snowplow stacks.


#5

#6

I like the initiative to make the snowplow pipeline more observable. But i think it’s the wrong approach to use the snowplow collector to receive metric events from a different collector. There are great instrumentalization/monitoring tools out there, even open source.

I highly recommend to expose metrics to a prometheus server. About a year ago, i opened an issue to start the process. https://github.com/snowplow/snowplow/issues/3421

For any part of the pipeline, that does not respond to an http call, it’s also possible to write a file to be ingesting into prometheus.

Again, i highly recommend not using the snowplow pipeline to observe the snowplow pipeline, even though i like the “eat your own dogfood” mentality.

Just my 2c
Chris


#7

Any thoughts here @alex or @BenFradet?


#8

Hey @christoph-buente - we appreciate the point: it’s very easy for us to spin up another Snowplow pipeline that monitors the first Snowplow pipeline under the above regime, but for a user operating just a single Snowplow pipeline, there could be a quis custodiet ipsos custodes problem to solve.

We don’t use Prometheus internally, so we would need some steer on how we could offer support for that alongside the above approach. Would you be able to set out a vision for how this could work?


#9

Hi Alex,

sure i can give a little bit of guidance on how to integrate with prometheus. For the collector it’s quite easy, as you expose all metrics under the /metrics with a given text format. With the enricher and sinks it might be different, as they do not expose an HTTP interface. However, one could leverage statsd or collectd locally to push metrics there and then have prometheus import it. I think the most difficult question is, what metrics to expose for each stage of the pipeline. Can you point me to a documentation, which metrics are currently send to the second collector? I could rework those, to make sure they are aligned with prometheus’ naming convention.


#10

Hey @christoph-buente - sure thing - the metrics for Stream Enrich should be similar to the ones we are planning for Beam Enrich:


#11

These metrics look ok, however i think tracker version and event vendor should be a dimension of all enricher metrics. Something like this:

the total number of events enriched, for a unit-less accumulating count

  • enrich_events_total

the total number of events failed to be enriched, for a unit-less accumulating count

  • enrich_events_failure_total

the count of events that have been observed

  • enrich_events_duration_seconds_count

the total sum of all observed values

  • enrich_events_duration_seconds_sum

cumulative counters for the observation buckets

  • enrich_events_duration_seconds_bucket{le: "<upper inclusive bound>"}

the count of events that have been observed

  • enrich_events_size_bytes_count

the total sum of all observed values

  • enrich_events_size_bytes_sum

cumulative counters for the observation buckets

  • enrich_events_size_bytes_bucket{le: "<upper inclusive bound>"}

I’m also wondering, if we need metrics regarding the incoming and outgoing streams. Like how many seconds are we behind consuming a stream. Type of stream, How many bytes have been read from stream, how many bytes have been written to the stream. Also the shard/partition id might be interesting, however i think this can lead to a very high dimensionality.

Was that helpful?


#12

@Alex does your :heart: mean, you like the approach and we can move that approach forward?