Loading stream enriched data into Snowflake


Hi all,

I have the following pipeline set up:

Scala-stream-collector > Stream Enrich > S3 Loader

I’m wondering if I can use the Snowflake loader to load this enriched data from S3 to Snowflake.
I know it’s not possible to do that with Redshift because it’s missing the shredding step but it’s not clear from the doc if it’s possible to do that using the Snowflake loader.



Hi @abrenaut,

Technically it is possible, but up to same level where it is also possible with Redshift, e.g. it wouldn’t be a real-time pipeline, but instead something like “micro-batch”. To implement this kind of pipeline after S3 Loader you’re missing three core components:

  1. Persistent EMR cluster
  2. Dataflow runner playbook that stages data enriched by Kinesis to Transformer’s input directory, then launches Transformer and Loader on your persistent cluster
  3. Some scheduler (cron) that submits this playbook

This is basically a very manual way to run steps that EmrEtlRunner launches for batch pipeline. This is also quite fragile way to do this (don’t forget about locks!), so up to you to decide if it worth to implement, but no doubt this is possible.


Thanks for the reply Anton,

The main advantage I see of this approach is that, by using stream enrich, I can load data in real time into elasticsearch and into s3 and then, once every x hours, I can run a batch to load data into Snowflake.

I managed to get data transformed for Snowflake using the following playbook

            "type": "CUSTOM_JAR",
            "name": "Enriched Events Archiving",
            "actionOnFailure": "CANCEL_AND_WAIT",
            "jar": "/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar",
            "arguments": [
                "--src", "s3://snowplow-bucket/enriched/",
                "--dest", "s3://snowplow-bucket/archived/{{nowWithFormat .timeFormat}}",
            "name":"Snowflake Transformer",
      "tags":[ ]

When you say

Are you talking about the fact that two loader playbook couldn’t run concurrently ?



Partly. Basically concurrency shouldn’t be a big problem as EMR will only execute one job at a time (if and only if you go with persistent cluster, obviously), but if your Loader fails - you don’t really want Transformer to start another batch until Loader is done with current folder. Situations like this can lead to all kinds of inconsistent state.

Sure, this approach has many advantages, but as I said earlier also requires lot of manual labor and in our experience little bit more fragile.

Yep, this is quite similar to what we’ve used. Loader can be added in the same manner.


Thanks a lot @anton,

Based on that post I thought that Dataflow Runner was the recommended way to load data into Snowflake.
I’ll give it a shot and if it’s too fragile switch to EmrEtlRunner.

One more question though. when you say

Do you mean Dataflow-runner won’t accept concurrent job submission on the same EMR cluster, hence you’ll never get two concurrent job running with a persistent cluster ?



Hey @abrenaut,

Very sorry for confusion! Dataflow Runner is indeed a recommended way to load data to Snowflake. What I meant is that EmrEtlRunner submits similar jobs to EMR, but unfortunately only RDB Shredder/Loader are supported by EmrEtlRunner.

“Micro-batch” (S3 Loader + Dataflow Runner) approach is fragile compared to usual batch when EMR works with big enough loads and within long enough period of time (using Spark Enrich). When your micro-batch pipeline fails - you need to act very quickly, otherwise enriched backlog will outgrow capabilities of your cluster - you’ll have to abandon persistent cluster and bootstrap new one, which also can fail. Also you can loose track of what data has been processed and loaded, and it makes whole load process very messy. And it happens relatively often, so that’s what I meant by fragile.

Yes. Though, EMR will accept jobs, but they will always run in specific order and race will never happen between them.


Ho I think I got it.

The problem is with “Micro batch” i.e running Dataflow-runner every 30 minutes because you may have jobs running concurrently or you may run behind if the backlog outgrows capabilities of your cluster.

Now if I just run this job say every day with an EMR transient cluster it should be fine, is that it ?