Why RDB Transformer has potential Race-Conditions?

Hey guys,
we really like the RDB Loader, thanks a lot for that! We are reading your documentation where you mention that the Stream RDB Transformer can cause race-conditions on a multi-node setup. I am just asking myself why this is the case as the DynamoDB table is taking care about the Shard distribution and would be also capable of managing multiple transformers. Which fact do I miss that multiple transformers might cause race-conflicts?

Thank you guys! :slight_smile:

Cheers, Christoph

PS: Off-Topic: I just read that you are planning to add Kubernetes helm-charts for your services. We already did this, when you already have a repository for that we would be also happy to contribute and share there.

1 Like

Hi @capchriscap,

Thanks for the kind words about RDB Loader!

When the streaming transformer runs, it periodically sends a SQS message to the loader with a shredding_complete message. This message effectively says to the loader "now you can now load the batch of data in the S3 folder named run=2022-04-18-17-50-25".

But imagine there are multiple nodes running the streaming transformer running in parallel. Then every node would send a message telling the loader to load the same folder. Only the first loading would succeed while others would be rejected by the Loader as duplicates. As a result if a message from the first node is sent 10 seconds before the second node has finished writing - the loader could miss data for those 10 seconds.

That is what we mean by the race condition. Now you might think up ways to configure the stream transformer to avoid this problem at run time, e.g. have each node write to a different S3 path. This workaround might work for you, but it can cause problems with configuring the loader’s folder monitoring (if you use that feature). And for a high volume pipeline it could flood the loader with a huge number of messages.

We have an idea for a neat solution that even scales up to high volume pipelines: it involves using DynamoDb to coordinate the nodes, so they send a single SQS message for each batch of events. This new DynamoDb table will be separate from the KCL table you mentioned which exists already. We haven’t started working on this feature yet, but we hope to start work on it very soon.

2 Likes

Ah, now I understand, thanks a lot for the detailed explanation and your great support for the community!
We will then definitely use the workaround as we don’t use the folder monitoring currently and as our concern is currently rather system resilience.

1 Like

Hi @capchriscap please could you share the details here if you manage to get this working in a multi-node setup? E.g. how you configure the nodes to write to different directories. It might be helpful for other Snowplow users looking to scale up the transformer.

this is the first PoC solution on how we will implement multiple transformers:

Transformer Config:

{
  "output": {
    "region": "eu-central-1",
    "path": "s3://my-bucket-rdb-loader-test/transformed/node=[NODE]/"
  }

Transformer Deployment (K8s-Deployment):

    spec:
      containers:
      - args:
        - -c
        - export CONFIG=$(echo $CONFIG_BASE64 | base64 -d | sed "s/\[NODE\]/$POD_NAME/g"
          | base64 --wrap=0) && /opt/snowplow/bin/snowplow-transformer-kinesis --config=$CONFIG  --iglu-config=$RESOLVERS_BASE64
        command:
        - bash
        env:
        - name: POD_NAME
          valueFrom:
            fieldRef:
              apiVersion: v1
              fieldPath: metadata.name
        - name: CONFIG_BASE64
          valueFrom:
            configMapKeyRef:
              key: config.json
              name: rdb-stream-transformer-config

In the end, this looks then like that:

Hope this helps also others :slight_smile: first loading tries look good.

4 Likes

Update: after testing it turned out that this PoC does not work with the current snowflake loader.

Reason: the files are properly written to S3 and also the manifest table is updated correctly. However, the loading step ignores the node=xyz folder because it takes only the last folder (=run= folder) and ignores the node folder.

Example:

  • S3: s3://my-bucket/transformed/node=xyz/run=2022-04-08-12-10-00
  • Expected SQL statement: COPY INTO events FROM @ATOMIC_TEST.SANDBOX_DEV/node=xyz/run=2022-04-08-12-10-00/output=good/
  • Current SQL statement: COPY INTO events FROM @ATOMIC_TEST.SANDBOX_DEV/run=2022-04-08-12-10-00/output=good/

Due to snowplow-rdb-loader/S3.scala at master · snowplow/snowplow-rdb-loader (github.com), additional folders (apart from the run= folder) are ignored. This unfortunately breaks the whole implementation :frowning:

Then we also wait for the new horizontal scaling feature for now.

Do you approx. know when you would get started with this feature? This would help us to know whether working on an own workaround during this time would make sense :slight_smile:

PS: potential workaround would be to create own docker image with

    def folderName: String =
      f.split("/").drop(f.split("/").length-2).mkString("/")

instead of

    def folderName: String =
      f.split("/").last

snowplow-rdb-loader/S3.scala at master · snowplow/snowplow-rdb-loader (github.com)

Indeed that would work!

It’s hard to give you an ETA, but we’ve just started working on it, and we aim at releasing it before the end of the quarter.

1 Like

FYI @capchriscap we’ve created this Github milestone for production-ready streaming transformer.

1 Like

Hi @BenB ,
thanks a lot for the hint, the roadmap looks pretty good :heart: Two things we also noticed is while testing the RDB Loader:

  1. The RAM explodes when the RDB Transformer needs to catch up and process a lot of events. The transformer is highly performant (which is good) but the more events it processes at a time, the more we need to overprovision just for some peaks. It is currently a little bit unclear to me how to throttle the RDB Transformer a little bit to avoid such a behavior.

Example: 1000 events/sec

The normal load looks like that with minimal usage (1-2vCPU and ~6GB RAM), could be easily handled with a t3.large instance:

However, if the transformer needs to catch up e.g. one hour because it was deactivated, the load explodes (>20GB sometimes). Here is an example where it killed the memory constraint pretty quickly.

Is there a way planned to throttle the RDB Transformer or should there rather be CPU throttling used to ensure that events are processed slower?

  1. The RDB Snowflake Loader is always running currently and listens to new events in the SQS queue. To save costs (as Snowflake processing is expensive), we would like to execute the Loader only every hour to not keep the Warehouse running the whole time. Is this planned? If not, we would need another SQS queue that holds only one hour batch (filled by a Lambda or Cronjob):

    However, we would like to avoid as many additional services as possible to reduce the complexity. Therefore, I am asking :slight_smile:

Thanks a lot! :slight_smile:

Hi @capchriscap

You could try setting the windowing to 60 minutes in the stream transformer config file. This would mean you get 1 SQS message per hour, and therefore 1 load into Snowflake per hour.

I think that’s the best option, but I’ll suggest one other possible solution in case you don’t like the idea of having such large windows:

You might be able to use the schedules.noOperation feature in the loader to make the loads happen once per hour. It uses cron schedule syntax to create periods of time when loading is disallowed (originally intended for warehouse maintenance windows). See this example config file for a few more details.

Try adding something like this to your loader config file:

"schedules": {
  "noOperation": [
    {
      "name": "Once per hour loading"
      "when": "0 0 * * * ?"
      "duration": "55 minutes"
    }
  ]
}

You might need some trial and error with duration to make sure the loader is active for long enough to load all SQS messages. And for that reason I’m afraid it’s not a perfect solution.

You raise a great point though – there may be many other snowplow users who want to load infrequently to keep costs down. I will have a think about how we can better support that requirement as we work on the streaming transformer in the next few months.

1 Like

Hey, just came across this thread and I’m curious if the delivery delay feature of SQS could serve as a short term workaround for this race condition. If S3 folders are being created at the beginning of the time window (seems like they are rounded down to the closest windowing interval based on the source code), then would we be able to set the delivery delay to the value of windowing to guarantee all nodes have finished their same-folder batch before the loader sees the message?

For example:

  • windowing set to 5 minutes
  • node1 starts a batch at 2022-10-14T10:00:01, uses folder 2022-10-14-10-00
  • node2 starts a batch at 2022-10-14T10:04:59, uses same folder
  • node1 finishes at 2022-10-14T10:05:01, sends message to SQS
  • node2 finishes at 2022-10-14T10:09:59, sends identical message to SQS
  • node1 message is readable 5 minutes later at 2022-10-14T10:10:01, after node2 finishes

Please let me know if I’m misunderstanding anything as I’ve only begun setting up a pipeline and don’t have a great grasp of how snowplow works yet.

edit: just saw that this was addressed already in 4.1.0 by appending UUID. Please ignore :sweat_smile:

Hi @keemax, thanks for sharing the idea. As you spotted (in your edit) we fixed this in version 4.1.0 using a different approach. The fix seems to be working quite well, and now there should be no problem scaling to multiple parallel transformers.