Parquet - how to get it from Enriched Stream

I’d like to sink the enriched kinesis data stream directly to s3 in parquet format, for straightforward consumption by Redshift Spectrum. Weirdly, the data is formatted in a very unfriendly way: if I use Kinesis Firehose naively, I get a tab-separated file with nested Jsons, and variable length multi-tab delimiters, so I am not able to use the Firehose capability of converting it to Parquet format.

Are there plans to make this format more user-friendly, such as Json?

Has anyone written/seen a Lambda function that can be used by Kinesis Firehose to convert from the enriched format to Json?


It’s a topic that has come up before. For certain in the long term, a more amenable format is definitely something we’re interested in implementing - but there’s a lot on the immediate development agenda so I don’t think we can promise anything at the moment.

To convert the TSV enriched event into JSON format, you can use one of our Analytics SDKs.

People have used Firehose in the past, but we haven’t built for compatibility. AFAIK firehose doesn’t add a separator between events, so your function might need to add a newline to the end or something along those lines.

You might also be interested in this guide to querying the data in Athena, which might help you achieve your goal without a custom function (using the Snowplow S3 loader instead of firehose).

Hope that helps.


So the very last piece of code in the document might be exactly what I need, right? It’s a lambda code that does the transformation I was looking for. Will test it out.

In case anyone needs this in the future, here is my lambda scala code:

package lambda

import java.nio.ByteBuffer
import scala.collection.JavaConversions._

object HmHandler{

  def recordHandler(event: KinesisFirehoseEvent): KinesisAnalyticsInputPreprocessingResponse = {

    val events = for {
      rec <- event.getRecords
      line = new String(rec.getData.array())
      event = Event.parse(line)
      val recc = if( event.isValid ) new Record(rec.getRecordId, Result.Ok, ByteBuffer.wrap(event.toOption.get.toJson(true).noSpaces.getBytes)) else new Record(rec.getRecordId, Result.ProcessingFailed, rec.getData)
    } yield recc
    val response = new KinesisAnalyticsInputPreprocessingResponse()


1 Like

Nice! Thanks for sharing!