Correct/change data in thrift LZO files for reprocessing

Hi @dkeidel,

We are using Kinesis S3 storage on raw Kinesis stream to backup raw data “just in case”. The backup is compressed with GZIP (by vanilla mechanism).

As apparently turned out, we need to make some important fixes to data, I have ugly python script, that:

  • List backup files in backup S3 bucket
  • Iterates over them, downloading one after another
  • Every file is being processed as follows:
  • Pickup a record form (4 bytes)d to next similar sequence
  • look by regexp for r"({\"schema\":\"iglu:com\.snowplowanalytics\.snowplow\/payload_data\/jsonschema\/1-0-3\",\"data\":\[.*\]})"
  • decode either context or unstructured event data (ue_px or cx) and make required fix
  • encode back to Base64 and put it back to original event data
  • (now the tricky part - mangle Thrifts like a pirate) calculate original event data length, find integer representation as 4 bytes, convert to characters. Do the same for fixed event and exchange
  • print fixed event to std out
  • pipe std out to enrichment having std in as input and enriched kinesis as output
  • world is fixed :wink:

I know, it is not the best approach, but the fastest I could generate. Code is extremely awful, but works. Only native python, no external libraries (just boto3 for S3) what is huge advantage in my application.

The Thrift data fix is as follows:
old_l_hex = re.findall('..','{:08x}'.format(len(event_data))) old_l = "".join(chr(int(h, 16)) for h in old_l_hex) new_l_hex = re.findall('..','{:08x}'.format(len(fixed_data))) new_l = "".join(chr(int(h, 16)) for h in new_l_hex) event_data_raw_fixed = event_data_raw_fixed.replace(old_l, new_l)