~350k rpm of throughput with Stream Collector/Kinesis


#1

Hi,

I’m struggling to handle a peak of ~350k rpm of throughput during a soccer match transmission with Stream Collector/Kinesis. I don’t have much knowledge about akka and actors, but tried to increase parameters related to level of parallelism and changed for a c4.large ec2 instance type what gave more capacity. Another variable is the collector buffer size, and I ask if it affects only the instance -> kinesis flow but also the internet -> collector flow.

Today I observed two behaviors:

1st Scenario

  • Load Balancer -> Collector sending to Enrichment process on same instance -> Kinesis with enough shards

2nd Scenario

  • Load Ballancer -> Collector sending to Enrichment process on same instance -> stdout

  • 2nd scenario handled 350k rpm with 6 instances behind Load Balancer.

  • 1st scenario was a disaster (hundreds of thousands of 504 from Load Balancer), couldn’t even estimate exactly how many instances I should use to don’t get any 5xx responses. Important to highlight here that I did not have any error response from Kinesis (ex: max throughput rate/max records per secs).

How far is possible to go with Stream colletor and what is the maximum RPS or RPM that is possible to reach with a single instance by your experience?


#2

Hi @aikeda,

As far as performant settings for the Kinesis Collector go you will need to first look at Kinesis Stream limitations. From the documentation:

Each shard can support up to 1,000 records per second for writes, up to a maximum total data write rate of 1 MB per second (including partition keys). This write limit applies to operations such as PutRecord and PutRecords.

As such you should keep your byte & record limits to, at a maximum, these limits to allow the Collector to function properly. Increasing to their max should yield the best performance as you are incurring less NIC I/O. It will affect only the instance -> Kinesis flow.

The other option to play with here is the thread-pool-size which determines the amount of Threads that can be used to send events to the Kinesis Stream, depending on the instance you provision you can easily increase this above the default 10 threads.

Both of your scenarios are inherently dangerous as you are depending on Enrichment to never work slower than the Collector. This is often not the case as Enrichments such as geo_location or anything remotely intensive will cause it to be slower than the collector - not to mention that it validates every event; this creates a bottleneck that could prevent the collector from accepting any more events as it cannot write them out fast enough.

This is why we recommend the following flow:

Load Balancer -> Stream Collector ASG -> Kinesis Stream -> Kinesis Enrichment ASG -> Kinesis Stream

This allows you to scale each application based on the intensity of what they need to do, or to even use entirely different instance types.

With getting your 1st Scenario working I suspect the increased latency of the Enrichment + writing to Kinesis was creating too much back pressure for the collector, under high load you will either need to wildly over-provision or you will need to separate the processes as shown in the above flow.

That question really depends on the instance type, I was doing some load testing today with an m4.xlarge which was happily accepting and sending upwards of 100,000 RPM - it was still under 50% CPU usage so potentially could have gone quite a bit higher. Lower tier boxes will obviously yield lower performance; due not only to the CPU and memory available but due to the network connectivity of the box.

Hope that helps!


#3

@josh - would you be able to share your config settings for getting 100,000 RPM on an m4.large? i recently updated to an m4.2xlarge to handle increase to traffic for the stream-enrich-0.7.0 scala app, but still hitting provisionthroughputexceeded error.

Also - how many kinesis shards were you running? Nice work!
-D


#4

Hi @13scoobie for the collector the buffer settings are:

buffer {
  byte-limit: 4000000
  record-limit: 500
  time-limit: 5000
}

Please note that to achieve this amount of throughput we do not use a single instance for any part of the processing. We have autoscaling rules in place that will scale the amount of Stream Enrich, LZO Sink and Elasticsearch Sink instances in use to ensure low latency across the pipeline. We also scale the amount of shards in the stream to handle increases in traffic.

If you are getting ProvisionedThroughputExceededException you will likely need to increase the amount of shards in your stream so you can keep up with the amount of data you are pushing into the stream, in that you will be able to have more than one KCL consumer working on the stream - 1 shard == 1 consumer. From the docs on the Kinesis API:

The size of the data returned by GetRecords varies depending on the utilization of the shard. The maximum size of data that GetRecords can return is 10 MB. If a call returns this amount of data, subsequent calls made within the next 5 seconds throw ProvisionedThroughputExceededException. If there is insufficient provisioned throughput on the shard, subsequent calls made within the next 1 second throw ProvisionedThroughputExceededException.

What is your current instance and stream configuration?