Real-time pipeline reprocessing

Hi there:

how do reprocess the raw lzo files from the S3 bucket into the Kinesis stream for the real-time pipeline to reprocess them?

What’s the issue you are trying to solve where you need to do this?

Typically the raw LZO files are picked up by the batch enrichment process rather than refed into the raw Kinesis stream (to be picked up by stream enrich). Although it would be possible to extract events out and push them into the raw Kinesis stream again this would probably result in more trouble than it’s worth (duplicate events, having “stale” data in your real time Kinesis stream).

hey @mike: we had issues with the pipeline where the ES Sink was not able to bulk add the enriched records to AWS ES.

That data has already made its way to RedShift. We have both batch and real time pipelines.

Hey @cmartins - your best bet is probably to:

  1. Run the batch pipeline on the raw LZO files
  2. Write a Spark job or similar to relay the enriched events in S3 into a Kinesis stream
  3. Point a Snowplow ES Loader at that Kinesis stream

Going forwards, you might want to set up an S3 Loader for the enriched events in Kinesis, so you could skip step 1 above and rely on a true “Kappa architecture” if this recurs.

hi @alex: thanks for the reply.

  1. already done
  2. not sure what you mean here - the batch already ran, creating the enriched and shreadded files and placed on S3. the real-time pipeline does scala collector -> kinesis -> (to both) scala enricher and s3-raw-bucket

the good output from the scala enricher goes to a kinesis stream which is consumed by the ES loader. the data that existed in this stream has already been loaded.

3.your point here is - having the enriched files post scala enricher stored in an S3 bucket to later consumed from and bulk upload to ElasticSearch?

Ah sorry, I missed the part that you already have the enriched events in S3 from the batch pipeline.

So you want to find a way to get your enriched events in S3 into Elasticsearch, is that correct?

@alex:

that very correct, sir.

Right, so I would:

@alex: thanks.

so the idea here is to parse the CSV file, each column, not part of the context json, are one attribute in a ES document, correct? and the following contexts are subdocuments, correct?

what would be the correct format to send to the KinesisStreamEnriched that ES Sink consumes from? A CSV or a JSON or even an LZO?

Just write each line of enriched event into a new Kinesis record. No transformation needed, just “replay”.

@alex: thanks.

I put the TSV record in the KinesisEnriched stream manually (aws cli) - I tested both ways, in plain TSV and base64 of the TSV record. Neither worked.

Any hunch?

The ES loader is definetely caught up as I have events arriving with time always in the last 30seconds.

Have a look in the bad stream output of the ES Loader…

@alex
got this error below:

"errors":[{"level":"error","message":"Expected 131 fields, received 1 fields. This may be caused by attempting to use this SDK version on an older (pre-R73) or newer version of Snowplow enriched events."}]

I checked the base64 - it does include the tabs

Matched after the decode with the original TSV record - position by position it matches, 100% identical.

Hi,

Some time ago I did a python script to replay/fix events: https://github.com/grzegorzewald/SnowplowRecovery. It reads raw events form backups and writes them either to Kinesis stream or to standard output. I am using it in case of any data issues, including errors in context/unstructured events.

1 Like

@grzegorzewald Hi!

I tested it - but I’m assuming you need the original raw file in gzip format, right?

My raw files are in lzo format and if I push them again through the pipeline, it would also land in the batch process.

@alex: any idea on the error message I shared yesterday?

Hey @cmartins - I suspect something is going wrong with the tabs on the round-trip.