Snowplow kafka enricher is lagging way behind the collector

Hi Team,

We are using snowplow kafka enricher for our snowplow pipeline. The snowplow enricher is not able to perform and lagging from the collector offsets from a huge margin.

Below are the enricher config we are using.

# Copyright (c) 2013-2020 Snowplow Analytics Ltd. All rights reserved.
    #
    # This program is licensed to you under the Apache License Version 2.0, and
    # you may not use this file except in compliance with the Apache License
    # Version 2.0.  You may obtain a copy of the Apache License Version 2.0 at
    # http://www.apache.org/licenses/LICENSE-2.0.
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the Apache License Version 2.0 is distributed on an "AS
    # IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
    # implied.  See the Apache License Version 2.0 for the specific language
    # governing permissions and limitations there under.

    # This file (application.conf.example) contains a template with
    # configuration options for Stream Enrich.

    enrich {

      streams {

        in {
          # Stream/topic where the raw events to be enriched are located
          raw = "collector-good"
        }

        out {
          enriched = "enrich-good"
          bad = "enrich-bad"
          pii = "enrich-pii"
          partitionKey = ""
        }

        # Configuration shown is for Kafka, to use another uncomment the appropriate configuration
        # and comment out the other
        # To use stdin, comment or remove everything in the "enrich.streams.sourceSink" section except
        # "enabled" which should be set to "stdin".
        sourceSink {
          # Sources / sinks currently supported are:
          # 'kinesis' for reading Thrift-serialized records and writing enriched and bad events to a
          # Kinesis stream
          # 'kafka' for reading / writing to a Kafka topic
          # 'nsq' for reading / writing to a Nsq topic
          # 'stdin' for reading from stdin and writing to stdout and stderr
          enabled =  "kafka"

          # LATEST: most recent data.
          # TRIM_HORIZON: oldest available data.
          # "AT_TIMESTAMP": Start from the record at or after the specified timestamp
          # Note: This only effects the first run of this application on a stream.
          # (pertinent to kinesis source type)
          # initialPosition = TRIM_HORIZON
          # initialPosition = ${?ENRICH_STREAMS_SOURCE_SINK_INITIAL_POSITION}

          # Need to be specified when initial-position is "AT_TIMESTAMP".
          # Timestamp format need to be in "yyyy-MM-ddTHH:mm:ssZ".
          # Ex: "2017-05-17T10:00:00Z"
          # Note: Time need to specified in UTC.
          initialTimestamp = "{{initialTimestamp}}"
          initialTimestamp = ${?ENRICH_STREAMS_SOURCE_SINK_INITIAL_TIMESTAMP}

          # Minimum and maximum backoff periods, in milliseconds
          backoffPolicy {
            minBackoff = 1000
            maxBackoff = 10000
          }

          # Or Kafka (Comment out for other types)
          #brokers = "localhost:9092"
          brokers = ""
          # Number of retries to perform before giving up on sending a record
          retries = 0
          # The kafka producer has a variety of possible configuration options defined at
          # https://kafka.apache.org/documentation/#producerconfigs
          # Some values are set to other values from this config by default:
          # "bootstrap.servers" -> brokers
          # retries             -> retries
          # "buffer.memory"     -> buffer.byteLimit
          # "linger.ms"         -> buffer.timeLimit
          producerConf {
            acks = all
            "key.serializer"     = "org.apache.kafka.common.serialization.StringSerializer"
            "value.serializer"   = "org.apache.kafka.common.serialization.StringSerializer"
            #"batch.size"="100000"
            "buffer.memory"="18485760"
          }
          # The kafka consumer has a variety of possible configuration options defined at
          # https://kafka.apache.org/documentation/#consumerconfigs
          # Some values are set to other values from this config by default:
          # "bootstrap.servers" -> brokers
          # "group.id"          -> appName
          consumerConf {
          #  "enable.auto.commit" = true
          #  "auto.commit.interval.ms" = 1000
          #  "auto.offset.reset"  = earliest
          #  "session.timeout.ms" = 30000
          #  "key.deserializer"   = "org.apache.kafka.common.serialization.StringDeserializer"
           # "value.deserializer" = "org.apache.kafka.common.serialization.StringDeserializer"
           #"fetch.min.bytes"="1042880"
           #"fetch.max.bytes"="12428800"
           #"fetch.max.wait.ms"="30000"
           "request.timeout.ms"="60000"
           #"auto.commit.interval.ms"="5000"
           #"max.partition.fetch.bytes"="2042880"
           #"max.poll.records"="10000"
           #"session.timeout.ms"="60000"
          }
        }

        # After enrichment, events are accumulated in a buffer before being sent to Kinesis/Kafka.
        # Note: Buffering is not supported by NSQ.
        # The buffer is emptied whenever:
        # - the number of stored records reaches recordLimit or
        # - the combined size of the stored records reaches byteLimit or
        # - the time in milliseconds since it was last emptied exceeds timeLimit when
        #   a new event enters the buffer
        buffer {
          byteLimit = 12428800
          recordLimit = 120000
          timeLimit = 1000
        }

        # Used for a DynamoDB table to maintain stream state.
        # Used as the Kafka consumer group ID.
        # Used as the Google PubSub subscription name.
        appName = "enricher-test"
        
      }
    }

The collector-good topic is having 15 partitions and enricher-good topic as well having 15 partitions both are having replication factor of 2.
The Kafka cluster is 3 node cluster with configuration of 4CPU and 10GB RAM.

Also, we are receiving approximately 10,000 events/second and enricher is running on autoscaling mode of 5min instances to 30max instances.

If someone can help me on this that would be really appreciated as all our pipelines are dependent on snowplow heavily.

Thanks
Karan

Thanks for sharing your experience, we never got to scaling kafka cluster to that many events, I think most we got were up to 1k/s at some peak times, so not sure if I can give any tips, but some things we tried:

  • By default for producing enrichment messages we are using “acks = all” in producer config. If we met lag increasing we can temporary change this option to “acks = 1”. Producer won’t wait the acks from all kafka nodes. To change this configuration we should edit the enrichment configmap:

producerConf {
  acks = 1
  "key.serializer"     = "org.apache.kafka.common.serialization.StringSerializer"
  "value.serializer"   = "org.apache.kafka.common.serialization.StringSerializer"
}
  • Adding more enrichment instances than partitions, but as you have probably seen yourself that has just incremental effect and it doesn’t necessary scale linearly

Having said that the thing I don’t like with partitions in kafka that they are inelastic and higher partition numbers can require more overhead in mem and other resources, so scaling kafka cluster is something I always have on the back of my mind that might have to deal with it in future as well somehow.

Another food for thought not sure if it might apply to you, but if you are running a multi tenant environment physical separation (maybe having multi clusters) could be an option or swapping kafka with pulsar that handles resource separation out of the box. Thought, take this with a grain of salt as I haven’t tried this myself and not sure if that wouldn’t create more challenges when scaling.

So when you start having lags do you see enrichment nodes max out on cpu usage, or it’s just they don’t process messages fast enough?

4 Likes

Thanks for reply @evaldas .

That is thoughtful, will try enricher with this config as well and share the performance difference.

Yes, we have autoscale the enricher to 50 pods with 2 CPU each and it went to approx 40 in peak time but that is having a minimal impact on data process of records in enricher.

We are not sure about this that this can be an option as I am not sure how snowplow will behave pulsar setup but yes, we can see that in future.

To handle the CPU usage we have applied the threshold for autoscaling, so as soon as load increases on POD our autoscale work and enricher did not get burn out. On the other side enricher is still not able to process enough fast as we are getting messages in collector topic.

Also, to add on we have scale up our kafka cluster and increased the partition on collector-good and enrich-good topics to 30 partition each. This particular setup is still under test will keep you posted with the results as well. By the time, if you get any solution around this please let me know.

1 Like

@karan good to hear your insights,

Yes, we have autoscale the enricher to 50 pods with 2 CPU each and it went to approx 40 in peak time but that is having a minimal impact on data process of records in enricher.

  • Oh interesting 2 CPU’s per pod sounds quite generous did you see any difference between 1 vs 2 CPUs in terms of throughput increase after enricher_count >= partition_count?

re. pulsar they do provide a kafka adaptor, but as you say this is something to be tested if it would work and would solve the performance issue in the first place:
https://pulsar.apache.org/docs/v2.0.0-rc1-incubating/adaptors/KafkaWrapper/

I did find contra argument against it, but as it comes from confluence probably should be taken with a grain of salt:

Right, I think you can get away with 100 partitions or even more (based on claimed kafka limits) without serious downsides, but again probably something to test. And I guess adding more kafka nodes to cluster wouldn’t improve things (in theory I would think that could add more partitions without increasing partition size).

By the way, as you processing so many events you don’t have any bottle neck on pushing out enrich-good events to whichever sink you use? For example in our case we do some transforms on those messages to be able to read with spark-streaming to dump in parquet file and that seems to take quite a bit of resources as well I would say on similar level as enricher.

Sure I will post here if we get to work with these scaling issues again, but I don’t expect we will do it for sometime, but good to know someone else is working in this area as well :slight_smile: