Speeding up Stream Enricher


#1

Hello there,

I am running stream enrich on a t2.medium instance.

There are 2 shards in my raw data kinesis stream and 3 shards in my enriched kinesis stream.
The enrichments I am running are

  • ip lookup
  • campaign_attribution
  • ua_parser_config
  • user_agent_utils_config
  • Validation against two iglu schemas

I have low cpu % on my box, network latency is fine but the enrich process is very slow as a stream sometimes 500 events takes minute to process.

Does anyone have any experience that can help to boost performance? Or it is just about load balancing the enrichers properly so that they can lease to different shards?


#2

Hi @brucey31 - could you please paste your stream enrich configuration file here so we can have a look at the settings to see why perhaps it is going so slowly?


#3
enrich {
# Sources currently supported are:
# 'kinesis' for reading Thrift-serialized records from a Kinesis stream
# 'stdin' for reading Base64-encoded Thrift-serialized records from stdin
source = "kinesis"
   # Sinks currently supported are:
# 'kinesis' for writing enriched events to one Kinesis stream and invalid events to another.
# 'stdouterr' for writing enriched events to stdout and invalid events to stderr.
#    Using "sbt assembly" and "java -jar" is recommended to disable sbt
#    logging.
sink = "kinesis"
   # AWS credentials
#
# If both are set to 'default', use the default AWS credentials provider chain.
#
# If both are set to 'iam', use AWS IAM Roles to provision credentials.
#
# If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
aws {
  access-key: "{aws_key}"
  secret-key: "{aws_secret}"
}
   streams {
  in: {
    raw: "{raw collector stream}"
       # After enrichment, are accumulated in a buffer before being sent to Kinesis.
    # The buffer is emptied whenever:
    # - the number of stored records reaches record-limit or
    # - the combined size of the stored records reaches byte-limit or
    # - the time in milliseconds since it was last emptied exceeds time-limit when
    #   a new event enters the buffer
    buffer: {
      byte-limit: 10
      record-limit: 5
      time-limit: 1
    }
  }
     out: {
    enriched: "{Enriched Sream}"
    enriched_shards: 3
    bad: "{Validation Rejection Stream}"
    bad_shards: 1
       # Minimum and maximum backoff periods
    # - Units: Milliseconds
    backoffPolicy: {
      minBackoff: 3000
      maxBackoff: 3000
    }
  }
     # "app-name" is used for a DynamoDB table to maintain stream state.
  # You can set it automatically using: "SnowplowKinesisEnrich-$\\{enrich.streams.in.raw\\}"
  app-name: "{dynamo db table}"
     # LATEST: most recent data.
  # TRIM_HORIZON: oldest available data.
  # Note: This only effects the first run of this application
  # on a stream.
  initial-position = "TRIM_HORIZON"
     region: "{AWS Region}"
}

Thanks for your help


#4

Hi @brucey31 what version of Stream Enrich are you using here?

The issue seems to stem from the very small buffer settings. What you are essentially saying is that every 1 millisecond flush the buffer - or every 10 bytes flush the buffer. This will cause a heap of overhead as you will end up writing only 1 event to the next stream at a time.

Could you try changing those buffer settings too:

  • byte-limit: 4500000
  • record_limit: 500
  • time_limit: 250

This should mean that your latency is around 250 ms but will be much much more performant.


#5

Thank you for your help with this, it has made it quicker in terms of how often a batch of events are sent through to the enrich stream.

However the box is still massively underused, this is a t2.medium and the cpu us below 10%, memory is 14% used and the network has lots of bandwidth free, yet I still have a backlog of data building up. I have also tried running it without the enrichments with no speed increase. My iglu repo is in s3 and the ip lookup databases are local.

Is there anything else I can do to whip this worker into gear?


#6

Hey @brucey31 - the other option is maxRecords:

This determines how many records to grab from Kinesis at a time so setting it to the max of 10000 should be the most performant.

What do the logs like when it is running?


#7

This is what is going on in my logs below, it writes to enrich stream, starts to process more records and then just hangs for ages before sending the other records. The processing power needed to validate and enrich cannot be that much at all for it to hang like this. I have changed the byte-limit , record-limit and time-limit to your suggested thresholds and that has helped but adding more records to each batch will make it take even longer no?

Thanks for your help

[pool-2-thread-3] INFO com.snowplowanalytics.snowplow.enrich.kinesis.sinks.KinesisSink - Writing 209 records to Kinesis stream {enriched_stream}
[pool-2-thread-3] INFO com.snowplowanalytics.snowplow.enrich.kinesis.sinks.KinesisSink - Successfully wrote 209 out of 209 records
[pool-2-thread-3] INFO com.snowplowanalytics.snowplow.enrich.kinesis.sources.KinesisSource - Checkpointing shard shardId-x
[pool-2-thread-2] INFO com.snowplowanalytics.snowplow.enrich.kinesis.sources.KinesisSource - Processing 1 records from shardId-x
[pool-2-thread-2] INFO com.snowplowanalytics.snowplow.enrich.kinesis.sinks.KinesisSink - Writing 1 records to Kinesis stream {enriched_stream}
[pool-2-thread-2] INFO com.snowplowanalytics.snowplow.enrich.kinesis.sinks.KinesisSink - Successfully wrote 1 out of 1 records
[pool-2-thread-2] INFO com.snowplowanalytics.snowplow.enrich.kinesis.sources.KinesisSource - Checkpointing shard shardId-x
[pool-2-thread-2] INFO com.snowplowanalytics.snowplow.enrich.kinesis.sources.KinesisSource - Processing 578 records from shardId-x
[cw-metrics-publisher] WARN com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable - Could not publish 18 datums to CloudWatch
[cw-metrics-publisher] WARN com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable - Could not publish 13 datums to CloudWatch
[cw-metrics-publisher] WARN com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable - Could not publish 8 datums to CloudWatch
[cw-metrics-publisher] WARN com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable - Could not publish 13 datums to CloudWatch
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Current stream shard assignments: shardId-x, shardId-0x shardId-y
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Sleeping ...
[cw-metrics-publisher] WARN com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable - Could not publish 8 datums to CloudWatch
[cw-metrics-publisher] WARN com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable - Could not publish 13 datums to CloudWatch
[cw-metrics-publisher] WARN com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable - Could not publish 8 datums to CloudWatch
[cw-metrics-publisher] WARN com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable - Could not publish 13 datums to CloudWatch
[cw-metrics-publisher] WARN com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable - Could not publish 8 datums to CloudWatch
[cw-metrics-publisher] WARN com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable - Could not publish 13 datums to CloudWatch
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Current stream shard assignments: shardId-x, shardId-0x shardId-y
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Sleeping ...
[cw-metrics-publisher] WARN com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable - Could not publish 8 datums to CloudWatch
[cw-metrics-publisher] WARN com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable - Could not publish 13 datums to CloudWatch
[cw-metrics-publisher] WARN com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable - Could not publish 8 datums to CloudWatch
[cw-metrics-publisher] WARN com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable - Could not publish 13 datums to CloudWatch
[cw-metrics-publisher] WARN com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable - Could not publish 8 datums to CloudWatch
[cw-metrics-publisher] WARN com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable - Could not publish 13 datums to CloudWatch
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Current stream shard assignments: shardId-x, shardId-0x shardId-y
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Sleeping ...
[cw-metrics-publisher] WARN com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable - Could not publish 8 datums to CloudWatch
[cw-metrics-publisher] WARN com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable - Could not publish 13 datums to CloudWatch
[cw-metrics-publisher] WARN com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable - Could not publish 8 datums to CloudWatch
[cw-metrics-publisher] WARN com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable - Could not publish 13 datums to CloudWatch
[cw-metrics-publisher] WARN com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable - Could not publish 8 datums to CloudWatch
[cw-metrics-publisher] WARN com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable - Could not publish 11 datums to CloudWatch
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Current stream shard assignments: shardId-x, shardId-0x shardId-y
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Sleeping ...
[cw-metrics-publisher] WARN com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable - Could not publish 8 datums to CloudWatch
[cw-metrics-publisher] WARN com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable - Could not publish 13 datums to CloudWatch
[cw-metrics-publisher] WARN com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable - Could not publish 8 datums to CloudWatch
[cw-metrics-publisher] WARN com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable - Could not publish 13 datums to CloudWatch
[cw-metrics-publisher] WARN com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable - Could not publish 8 datums to CloudWatch
[cw-metrics-publisher] WARN com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable - Could not publish 13 datums to CloudWatch
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Current stream shard assignments: shardId-x, shardId-0x shardId-y
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Sleeping ...
[cw-metrics-publisher] WARN com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable - Could not publish 8 datums to CloudWatch
[cw-metrics-publisher] WARN com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable - Could not publish 13 datums to CloudWatch
[cw-metrics-publisher] WARN com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable - Could not publish 8 datums to CloudWatch
[cw-metrics-publisher] WARN com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable - Could not publish 13 datums to CloudWatch
[cw-metrics-publisher] WARN com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable - Could not publish 8 datums to CloudWatch
[cw-metrics-publisher] WARN com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable - Could not publish 13 datums to CloudWatch
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Current stream shard assignments: shardId-x, shardId-0x shardId-y
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Sleeping ...
[cw-metrics-publisher] WARN com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable - Could not publish 8 datums to CloudWatch
[cw-metrics-publisher] WARN com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable - Could not publish 13 datums to CloudWatch
[cw-metrics-publisher] WARN com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable - Could not publish 8 datums to CloudWatch
[cw-metrics-publisher] WARN com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable - Could not publish 13 datums to CloudWatch
[cw-metrics-publisher] WARN com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable - Could not publish 8 datums to CloudWatch
[cw-metrics-publisher] WARN com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable - Could not publish 13 datums to CloudWatch
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Current stream shard assignments: shardId-x, shardId-0x shardId-y

#8

Does your role allow to perform CloudWatch.PutMetric? It looks like it is not allowed…


#9

Thank you both for those suggestions,

I have started logging everything to cloudwatch and changed the following settings:

maxRecords: 10000

buffer: {
    byte-limit: 4500000
    record-limit: 500
    time-limit: 60000
  }


backoffPolicy: {
    minBackoff: 3000
    maxBackoff: 600000
  }

What cloudwatch is showing me is that for my four shards and 4 workers working on each of those shards then placing them into 3 enriched shards:

  • each worker/shard pair is only processing 100 or so events a minute each
  • each of these events is around 4kb each
  • this mismatches the thousands of events coming into the raw stream resulting in a huge backlog.

#10

I have sorted this issue. After much playing with the buffer and backoff of the enricher and validation process I noticed some huge traffic to s3:

Turns out each enrichment batch was calling s3 to validate all of the schemas to both our own iglu schema and the iglucentral repo (hosted on s3).

Once we moved all schemas onto an apache webserver on the box itself the whole system sped up 20-30x

Thank you all for your help in getting to the solution all


#11

While it’s possible to maintain and use a local mirror of Iglu Central, you really shouldn’t have to - it’s statically hosted via CloudFront, and aggressively cached inside Stream Enrich, so it should be very performant. Surprising that you had to do this.


#12

Hi! We were having the same problem. We are testing the system on a single machine at the time, and we are serving the Iglu schemas by using a node static server, reading the JSON files from some folder.
I am seeing on the node server’s log that its getting requests for each schema that is being used by the enrichment process, that makes me think that it is not caching them.
We are also doing a migration, that means that a single record coming from the collector could have up to 100 snowplow records, I don’t know if that can make a difference.
What could be wrong?


#13

Nevermind! Found how to set up caching here: https://github.com/snowplow/iglu/wiki/Iglu-client-configuration