How to Improve Response Time of Scala Stream Collector

I have setup scala stream collector with kafka as sink which is running on AWS EC2 t2.xlarge instance.

The javascript tracker which send the data to collector is taking more than 200ms for a single request. What should I do to reduce the response time. I am expecting response time to be less than 50ms.

Any suggestions are welcome.

1 Like

Hi @shyamp099 would you mind sharing some more details on your setup? Namely:

  1. Collector configuration HOCON file
  2. Are you running behind a Load Balancer? Application or Classic?
  3. How are you running the Collector? Directly via the JAR? Docker container on the EC2 node?
  4. How have you configured the JVM process? How much memory has been allocated to it?
  5. Are you running anything else on this server apart from the Collector?
  6. What amount of traffic are you sending when you see this response time?

Hi @shyamp099,

I have requests below 200 but highly over 50 ms. But i am quite far away from my collector :wink: On the other hand - this should have no influence on performance as long as your requests are non blocking asynchronous ones.

I have collectors on individual M5 backed docker cluster, with ALB and CloudFront (so i believe some ms can be cut while not using CF, but i need it for some reasons).

How are your instance CPU credits? I would not risk to use burstable instance type for production collector tbh…

1 Like

Hi @josh, please see my comments below:

  1. Collector configuration HOCON file
collector {
  interface = "0.0.0.0"
  interface = ${?COLLECTOR_INTERFACE}
  port = 80
  port = ${?COLLECTOR_PORT}
'vendor/version' protocol.
  
  paths {
    # "/com.acme/track" = "/com.snowplowanalytics.snowplow/tp2"
    # "/com.acme/redirect" = "/r/tp2"
    # "/com.acme/iglu" = "/com.snowplowanalytics.iglu/v1"
  }
  p3p {
    policyRef = "/w3c/p3p.xml"
    CP = "NOI DSP COR NID PSA OUR IND COM NAV STA"
  }

  crossDomain {
    enabled = false
    # Domains that are granted access, *.acme.com will match http://acme.com and http://sub.acme.com
    enabled = ${?COLLECTOR_CROSS_DOMAIN_ENABLED}
    domains = [ "*" ]
    domains = [ ${?COLLECTOR_CROSS_DOMAIN_DOMAIN} ]
    # Whether to only grant access to HTTPS or both HTTPS and HTTP sources
    secure = true
    secure = ${?COLLECTOR_CROSS_DOMAIN_SECURE}
  }

  cookie {
    enabled = true
    #enabled = ${?COLLECTOR_COOKIE_ENABLED}
    expiration = 365d # e.g. "365 days"
    #expiration = ${?COLLECTOR_COOKIE_EXPIRATION}
    # Network cookie name
    name = l5_sp
    #name = ${?COLLECTOR_COOKIE_NAME}
    # The domain is optional and will make the cookie accessible to other
    # applications on the domain. Comment out this line to tie cookies to
    # the collector's full domain
    #domain = ".layerfive.com"
    #domain = ${?COLLECTOR_COOKIE_DOMAIN}
    secure = false
    secure = ${?COLLECTOR_COOKIE_SECURE}
    httpOnly = false
    httpOnly = ${?COLLECTOR_COOKIE_HTTP_ONLY}
  }

   doNotTrackCookie {
    enabled = false
    enabled = ${?COLLECTOR_DO_NOT_TRACK_COOKIE_ENABLED}
    name = lfcookiename
    name = ${?COLLECTOR_DO_NOT_TRACK_COOKIE_NAME}
    value = lfcookievalue
    value = ${?COLLECTOR_DO_NOT_TRACK_COOKIE_VALUE}
  }

  cookieBounce {
    enabled = false
    enabled = ${?COLLECTOR_COOKIE_BOUNCE_ENABLED}
    # The name of the request parameter which will be used on redirects checking that third-party
    # cookies work.
    name = "n3pc"
    name = ${?COLLECTOR_COOKIE_BOUNCE_NAME}
    # Network user id to fallback to when third-party cookies are blocked.
    fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000"
    fallbackNetworkUserId = ${?COLLECTOR_COOKIE_BOUNCE_FALLBACK_NETWORK_USER_ID}
    # Optionally, specify the name of the header containing the originating protocol for use in the
    # bounce redirect location. Use this if behind a load balancer that performs SSL termination.
    # The value of this header must be http or https. Example, if behind an AWS Classic ELB.
    forwardedProtocolHeader = "X-Forwarded-Proto"
    forwardedProtocolHeader = ${?COLLECTOR_COOKIE_BOUNCE_FORWARDED_PROTOCOL_HEADER}
  }

  redirectMacro {
    enabled = false
    enabled = ${?COLLECTOR_REDIRECT_MACRO_ENABLED}
    # Optional custom placeholder token (defaults to the literal `${SP_NUID}`)
    placeholder = "[TOKEN]"
    placeholder = ${?COLLECTOR_REDIRECT_REDIRECT_MACRO_PLACEHOLDER}
  }

  rootResponse {
    enabled = false
    enabled = ${?COLLECTOR_ROOT_RESPONSE_ENABLED}
    statusCode = 302
    statusCode = ${?COLLECTOR_ROOT_RESPONSE_STATUS_CODE}
    # Optional, defaults to empty map
    headers = {
      Location = "https://127.0.0.1/",
      Location = ${?COLLECTOR_ROOT_RESPONSE_HEADERS_LOCATION},
      X-Custom = "something"
    }
    # Optional, defaults to empty string
    body = "302, redirecting"
    body = ${?COLLECTOR_ROOT_RESPONSE_BODY}
  }

  # Configuration related to CORS preflight requests
  cors {
    # The Access-Control-Max-Age response header indicates how long the results of a preflight
    # request can be cached. -1 seconds disables the cache. Chromium max is 10m, Firefox is 24h.
    accessControlMaxAge = 5 seconds
    accessControlMaxAge = ${?COLLECTOR_CORS_ACCESS_CONTROL_MAX_AGE}
  }

  # Configuration of prometheus http metrics
  prometheusMetrics {
    # If metrics are enabled then all requests will be logged as prometheus metrics
    # and '/metrics' endpoint will return the report about the requests
    enabled = false
    enabled = ${?COLLECTOR_PROMETHEUS_METRICS_ENABLED}
    # Custom buckets for http_request_duration_seconds_bucket duration metric
    #durationBucketsInSeconds = [0.1, 3, 10]
    #durationbucketsInSeconds = ${?COLLECTOR_PROMETHEUS_METRICS_DURATION_BUCKETS_IN_SECONDS}
  }

  # Configuration of prometheus http metrics
  prometheusMetrics {
    # If metrics are enabled then all requests will be logged as prometheus metrics
    # and '/metrics' endpoint will return the report about the requests
    enabled = false
    # Custom buckets for http_request_duration_seconds_bucket duration metric
    #durationBucketsInSeconds = [0.1, 3, 10]
  }

  streams {
    # Events which have successfully been collected will be stored in the good stream/topic
    good = mytopic_good
    good = ${?COLLECTOR_STREAMS_GOOD}

    # Events that are too big (w.r.t Kinesis 1MB limit) will be stored in the bad stream/topic
    bad = mytopic_bad
    bad = ${?COLLECTOR_STREAMS_BAD}

    # Whether to use the incoming event's ip as the partition key for the good stream/topic
    # Note: Nsq does not make use of partition key.
    useIpAddressAsPartitionKey = false
    useIpAddressAsPartitionKey = ${?COLLECTOR_STREAMS_USE_IP_ADDRESS_AS_PARTITION_KEY}

    # Enable the chosen sink by uncommenting the appropriate configuration
    sink {
      # Choose between kinesis, googlepubsub, kafka, nsq, or stdout.
      # To use stdout, comment or remove everything in the "collector.streams.sink" section except
      # "enabled" which should be set to "stdout".
      enabled = kafka
      # enabled = ${?COLLECTOR_STREAMS_SINK_ENABLED}

      # Region where the streams are located
      # region = {{kinesisRegion}}
      # region = ${?COLLECTOR_STREAMS_SINK_REGION}

      ## Optional endpoint url configuration to override aws kinesis endpoints,
      ## this can be used to specify local endpoints when using localstack
      # customEndpoint = {{kinesisEndpoint}}
      # customEndpoint = ${?COLLECTOR_STREAMS_SINK_CUSTOM_ENDPOINT}

      # Thread pool size for Kinesis API requests
      # threadPoolSize = 10
      # threadPoolSize = ${?COLLECTOR_STREAMS_SINK_THREAD_POOL_SIZE}

      # The following are used to authenticate for the Amazon Kinesis sink.
      # If both are set to 'default', the default provider chain is used
      # (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
      # If both are set to 'iam', use AWS IAM Roles to provision credentials.
      # If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
      # aws {
      #   accessKey = iam
      #   accessKey = ${?COLLECTOR_STREAMS_SINK_AWS_ACCESS_KEY}
      #   secretKey = iam
      #   secretKey = ${?COLLECTOR_STREAMS_SINK_AWS_SECRET_KEY}
      # }

      # # Minimum and maximum backoff periods, in milliseconds
      # backoffPolicy {
      #   minBackoff = {{minBackoffMillis}}
      #   minBackoff = ${?COLLECTOR_STREAMS_SINK_MIN_BACKOFF}
      #   maxBackoff = {{maxBackoffMillis}}
      #   maxBackoff = ${?COLLECTOR_STREAMS_SINK_MAX_BACKOFF}
      # }

      # Or Google Pubsub
      #googleProjectId = ID
      ## Minimum, maximum and total backoff periods, in milliseconds
      ## and multiplier between two backoff
      #backoffPolicy {
      #  minBackoff = {{minBackoffMillis}}
      #  maxBackoff = {{maxBackoffMillis}}
      #  totalBackoff = {{totalBackoffMillis}} # must be >= 10000
      #  multiplier = {{backoffMultiplier}}
      #}

      # Or Kafka
      brokers = "10.175.17.158:9092"
      ## Number of retries to perform before giving up on sending a record
      retries = 0
      # The kafka producer has a variety of possible configuration options defined at
      # https://kafka.apache.org/documentation/#producerconfigs
      # Some values are set to other values from this config by default:
      # "bootstrap.servers" -brokers
      # retries             -retries
      # "buffer.memory"     -buffer.byteLimit
      # "linger.ms"         -buffer.timeLimit
      producerConf {
       acks = all
      #  "key.serializer"     = "org.apache.kafka.common.serialization.StringSerializer"
      #  "value.serializer"   = "org.apache.kafka.common.serialization.StringSerializer"
      }

      # Or NSQ
      ## Host name for nsqd
      #host = "{{nsqHost}}"
      ## TCP port for nsqd, 4150 by default
      #port = {{nsqdPort}}
    }

    # Incoming events are stored in a buffer before being sent to Kinesis/Kafka.
    # Note: Buffering is not supported by NSQ.
    # The buffer is emptied whenever:
    # - the number of stored records reaches record-limit or
    # - the combined size of the stored records reaches byte-limit or
    # - the time in milliseconds since the buffer was last emptied reaches time-limit
    buffer {
      byteLimit = 4500000
      byteLimit = ${?COLLECTOR_STREAMS_BUFFER_BYTE_LIMIT}
      recordLimit = 500 # Not supported by Kafka; will be ignored
      recordLimit = ${?COLLECTOR_STREAMS_BUFFER_RECORD_LIMIT}
      timeLimit = 60000
      timeLimit = ${?COLLECTOR_STREAMS_BUFFER_TIME_LIMIT}
    }
  }

}

# Akka has a variety of possible configuration options defined at
# http://doc.akka.io/docs/akka/current/scala/general/configuration.html
akka {
  loglevel = OFF # 'OFF' for no logging, 'DEBUG' for all logging.
  loglevel = ${?AKKA_LOGLEVEL}
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  loggers = [${?AKKA_LOGGERS}]

  # akka-http is the server the Stream collector uses and has configurable options defined at
  # http://doc.akka.io/docs/akka-http/current/scala/http/configuration.html
  http.server {
    # To obtain the hostname in the collector, the 'remote-address' header
    # should be set. By default, this is disabled, and enabling it
    # adds the 'Remote-Address' header to every request automatically.
    remote-address-header = on
    remote-address-header = ${?AKKA_HTTP_SERVER_REMOTE_ADDRESS_HEADER}

    raw-request-uri-header = on
    raw-request-uri-header = ${?AKKA_HTTP_SERVER_RAW_REQUEST_URI_HEADER}

    # Define the maximum request length (the default is 2048)
    parsing {
      max-uri-length = 32768
      max-uri-length = ${?AKKA_HTTP_SERVER_PARSING_MAX_URI_LENGTH}
      uri-parsing-mode = relaxed
      uri-parsing-mode = ${?AKKA_HTTP_SERVER_PARSING_URI_PARSING_MODE}
    }
  }
}
  1. Are you running behind a Load Balancer? Application or Classic?
    No, I am not using any load balancer. Directly serving from EC2 instance
  2. How are you running the Collector? Directly via the JAR? Docker container on the EC2 node?
    I am using Docker container on the EC2 on which Kafka is also running as container.
  3. How have you configured the JVM process? How much memory has been allocated to it?
    No specific memory assigned, using the default one
  4. Are you running anything else on this server apart from the Collector?
    Yes, kafka and zookeeper as docker container
  5. What amount of traffic are you sending when you see this response time?
    When I hit the single request it take around 200-250 ms. When I tested it with 200 users the RPM was 193 and average response time was 700ms which is too high.

Apart from this, when I used cloudfront collector it took only 60ms which is what I am expecting but CF collector does not suit my requirements and I want to use SSC.

You’ll definitely want to run your EC2 instances behind a load balancer - this will help improve response time as well as other factors such as scalability of the pipeline.

Docker on EC2 is fine but it’s not recommended to run additional services (such as Kafka) on the same machine to avoid any contention between services (e.g., microservice infrastructure).

You’ll likely want to tune this according to your EC2 instance type and any other services running on the machine.

That’s very slow but that latency could come from a wide variety of sources including (which can each be debugged individually):

  • Network (including DNS, SSL, HTTP(S) request etc) between the client and the collector
  • Latency in the collector processing the event (should not be high unless there’s some resource contention)
  • Latency in writing to the Kafka topic (quite likely if Kafka is running on the same machine within a docker container)
  • Latency in returning a response to the client (e.g., TTFB)
1 Like