I can’t seem to understand how the Kafka Enricher scales.
I’m running very 2.0.5
of stream-enrich-kafka in docker-compose. I’m running on Linux c5n.4xlarge
, amzn linux 2. Docker version: Docker version 20.10.13, build a224086
, docker-compose: docker-compose version 1.29.2, build unknown
.
Scenario:
- We had a giant backup messages on our good collector stream (the stream the enricher processes off of). We only had 4 partitions (with two replicas) and the CPU was low on the enricher instance (< 10%). The CPU was also low on the kafka cluster.
- Using AWS MSK (managed kafka) metrics, we saw that the enricher was processing around 425 messages a second.
We increased the CPU from 2 to 16 to try to see that effect.
- No increase in messages processed. As we expected due to CPU metrics, but this also told us that the EC2 instance was likely not network bound either.
We figured that we didn’t have enough partitions to process the messages fast enough. So we created a new topic with 40 partitions. We ran the same 16 vCPU enricher instance against it that was running the enricher previously.
- It is still processing basically the same number of messages (~480 per second or 160 per broker per second). This was shocking. The CPU on the instance is just a few percentage points higher (I assume to iterate through the extra partitions). The only good thing that happened is the message processing is more spread across the three brokers (likely because going from 4 to 40 partitions reduced hot partitions on certain brokers since 4 isn’t divisible by 3).
Questions:
-
Does the Snowplow Enricher use threads? I was under the impression it spun up a thread per CPU or some such thing. In which case, I’d expect a 16 vCPU enricher to increase dramatically when trying to process 40 partitions vs 4.
-
Any other ideas of what might be limiting things? I tried increasing the instance size to get more network, but hit same limits. I have not tried increasing kafka node size to see if network increase would help there, but I’d be shocked if that was the issue honestly.
-
I know I can do ack = 1 rather than ack = all to possibly speed things up, but I’d like to understand this behavior first. I’ve used kafka before, and this behavior doesn’t make sense…it should be able to scale close to linearly as you increase the number of partitions and this isn’t even close. (10x the partitions led to 13% increase in message processing).
-
I also know I should be spreading the enricher over many containers for redundancy, but again, I want to understand the scaling with one instance before spreading out across many.
SETTINGS
docker-compose.yml:
snowplow-stream-enrich:
image: snowplow/stream-enrich-kafka:2.0.5
command: [
"--config", "/snowplow/enricher.hocon",
"--resolver", "file:/snowplow/resolver.json",
"--enrichments", "file:/snowplow/enrichments/"
]
volumes:
- ./snowplow:/snowplow
environment:
- "SP_JAVA_OPTS=-Xms512m -Xmx512m"
enricher.hocon
enrich {
streams {
in {
# Stream/topic where the raw events to be enriched are located
raw = sp_collector_good_events2
raw = ${?ENRICH_STREAMS_IN_RAW}
}
out {
# Stream/topic where the events that were successfully enriched will end up
enriched = sp_enricher_good_events2
enriched = ${?ENRICH_STREAMS_OUT_ENRICHED}
# Stream/topic where the event that failed enrichment will be stored
bad = sp_enricher_bad_events2
bad = ${?ENRICH_STREAMS_OUT_BAD}
# Stream/topic where the pii tranformation events will end up
pii = ${?ENRICH_STREAMS_OUT_PII}
# How the output stream/topic will be partitioned.
# Possible partition keys are: event_id, event_fingerprint, domain_userid, network_userid,
# user_ipaddress, domain_sessionid, user_fingerprint.
# Refer to https://github.com/snowplow/snowplow/wiki/canonical-event-model to know what the
# possible parittion keys correspond to.
# Otherwise, the partition key will be a random UUID.
partitionKey = domain_userid
partitionKey = ${?ENRICH_STREAMS_OUT_PARTITION_KEY}
}
sourceSink {
# Sources / sinks currently supported are:
# 'kafka' for reading / writing to a Kafka topic
enabled = kafka
enabled = ${?ENRICH_STREAMS_SOURCE_SINK_ENABLED}
# Or Kafka (Comment out for other types)
brokers = "<REDACTED: list of 3 brokers with 9092 ports>"
# Number of retries to perform before giving up on sending a record
retries = 0
# The kafka producer has a variety of possible configuration options defined at
# https://kafka.apache.org/documentation/#producerconfigs
# Some values are set to other values from this config by default:
# "bootstrap.servers" -> brokers
# retries -> retries
# "buffer.memory" -> buffer.byteLimit
# "linger.ms" -> buffer.timeLimit
#producerConf {
# acks = all
# "key.serializer" = "org.apache.kafka.common.serialization.StringSerializer"
# "value.serializer" = "org.apache.kafka.common.serialization.StringSerializer"
#}
# The kafka consumer has a variety of possible configuration options defined at
# https://kafka.apache.org/documentation/#consumerconfigs
# Some values are set to other values from this config by default:
# "bootstrap.servers" -> brokers
# "group.id" -> appName
#consumerConf {
# "enable.auto.commit" = true
# "auto.commit.interval.ms" = 1000
# "auto.offset.reset" = earliest
# "session.timeout.ms" = 30000
# "key.deserializer" = "org.apache.kafka.common.serialization.StringDeserializer"
# "value.deserializer" = "org.apache.kafka.common.serialization.ByteArrayDeserializer"
#}
}
# After enrichment, events are accumulated in a buffer before being sent to Kinesis/Kafka.
# The buffer is emptied whenever:
# - the number of stored records reaches recordLimit or
# - the combined size of the stored records reaches byteLimit or
# - the time in milliseconds since it was last emptied exceeds timeLimit when
# a new event enters the buffer
buffer {
byteLimit = 1000000
byteLimit = ${?ENRICH_STREAMS_BUFFER_BYTE_LIMIT}
recordLimit = 0 # Not supported by Kafka; will be ignored
recordLimit = ${?ENRICH_STREAMS_BUFFER_RECORD_LIMIT}
timeLimit = 30000
timeLimit = ${?ENRICH_STREAMS_BUFFER_TIME_LIMIT}
}
# Used as the Kafka consumer group ID.
appName = "snowplow2"
appName = ${?ENRICH_STREAMS_APP_NAME}
}
}
Thanks!
Patrick