Replay collector data from s3 firehose files to enrich

Hey guys,

I had an outage in my stream enrich setup over the long weekend, and as a result some data that was sent to my collector kinesis stream fell off (1 day retention). I have a firehose to s3 set up for backup purposes, but it’s not clear to me how I can get that reprocessed in my enrich setup.

It seems the simplest solution would be to send the s3 data back into a kinesis stream that my enrich servers can pull from, but that doesn’t seem to work. I tried using the aws-kinesis-agent, but it didn’t seem to be able to parse the files: Writing to Amazon Kinesis Data Streams Using Kinesis Agent - Amazon Kinesis Data Streams

Has anyone dealt with this in the past? Any suggestions?

So this might be a tricky one, depending on what firehose is doing with the data.

I know others have run into issues with enriched data using Firehose, where the issue was essentially down to Firehose not adding a newline between events (I think this was one such relevant thread).

It does seem weird that firehose wouldn’t have internal consistency with the aws-kinesis-agent you linked, but it’s an area that I’m not terribly familiar with, so I’m not certain.

Assuming it is some similar issue, I reckon one way of going about it is to download one of the files, and try to identify a single collector payload, see if you can send just one alone into the stream successfully, and identify from there how to write a script to do that for the rest of the data. Collector payloads are thrift-encoded according to this Schema, in case that helps.

I know this does sound like a lot of work… unfortunately Firehose isn’t part of the supported stack so I’m not sure of a less manual way. Ultimately the task is to figure out how what’s going wrong with parsing - my bet is either it’s to do with thrift encoding or the newline problem I mentioned.

For future reference, if you use our S3 loader to dump the data into a bucket with lzo compression, then in future cases we could run the (now deprecated) old batch pipeline over the data to get it to enriched-good format in S3, ready for the shredder/loader.

I hope that’s helpful! Let us know if we can be of use in figuring things out.

Thanks Colm. I think it comes down to needing to be able to replay the firehose data into a new kinesis stream. I’ve got some questions posted on the AWS forums, their kinesis agent github, and stackoverflow. I’ll also keep trying to get it working myself, and if I do I’ll post my solution back here.

I’ve made at least some progress here. I’m now using the aws-sdk to write a ruby script to loop over the files and put each line into a new kinesis stream. The only issue is that now when I run my enrich service pointing at that stream it’s not pulling any records. There aren’t any error messages or anything, it’s just not pulling anything. I suspect it’s because I’m putting each record based on the newline character, and that’s not right.
I think I’ll have to figure out a way to parse individual thrift records from the firehose files and put those onto the kinesis stream.

For reference here’s my ruby script:

require 'aws-sdk-kinesis'

kinesis = Aws::Kinesis::Client.new(
  region: 'us-east-1',
)

Dir.glob('kinesis/**/*').each do |filename|
  begin
    puts "\nstarting #{filename}"
    file = File.new(filename)
    while data = file.readline do
      resp = kinesis.put_record({
        stream_name: "snowplow-production-collector-replay",
        data: data,
        partition_key: "key",
      })
      print '.'
    end
  rescue EOFError, Errno::EISDIR
    next
  end
end

And here’s the logs from my enricher:

2021-06-03 10:59:41[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - No activities assigned
2021-06-03 10:59:41[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Sleeping ...
2021-06-03 10:58:51[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Initialization complete. Starting worker loop.
2021-06-03 10:58:41[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Starting LeaseCoordinator
2021-06-03 10:58:41[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Syncing Kinesis shard info
2021-06-03 10:58:40[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Initialization attempt 1
2021-06-03 10:58:40[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Initializing LeaseCoordinator
2021-06-03 10:58:40[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Shard sync strategy determined as SHARD_END.
2021-06-03 10:58:40[main] INFO com.amazonaws.services.kinesis.leases.impl.LeaseCoordinator - With failover time 10000 ms and epsilon 25 ms, LeaseCoordinator will renew leases every 3308 ms, takeleases every 20050 ms, process maximum of 2147483647 leases and steal 1 lease(s) at a time.
2021-06-03 10:58:40[main] WARN com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Received configuration for region as us-east-1.
2021-06-03 10:58:39[main] WARN com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Received configuration for endpoint as https://dynamodb.us-east-1.amazonaws.com, and region as us-east-1.
2021-06-03 10:58:39[main] WARN com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Received configuration for endpoint as https://kinesis.us-east-1.amazonaws.com, and region as us-east-1.
2021-06-03 10:58:39[main] INFO com.snowplowanalytics.snowplow.enrich.stream.sources.KinesisSource - Running: snowplow-enrich-production.
2021-06-03 10:58:39[main] INFO com.snowplowanalytics.snowplow.enrich.stream.sources.KinesisSource - Processing raw input stream: snowplow-production-collector-replay
2021-06-03 10:58:39[main] INFO com.snowplowanalytics.snowplow.enrich.stream.sources.KinesisSource - Using workerId: 10.0.0.204:3e1f6be5-3fb3-4883-9d3f-b214642301cf
2021-06-03 10:58:35[main] INFO nl.basjes.parse.useragent.AbstractUserAgentAnalyzerDirect - Building all matchers for all possible fields.

OK so heads up - the expected behaviour of enrich is that it pulls everything from the raw stream, and if there’s a problem it rejects it by pushing into the bad stream. Enrich will read any format of data from the stream.

So I’d double check that the data is arriving into the stream, that enrich is reading from the correct stream, etc etc. - since enrich shouldn’t ever just do nothing it’s worth double checking the setup for any gotchas. :slight_smile: