Scala Stream Collector - java.net.SocketException: Broken pipe (Write failed)

We are having a hard time configuring the Scala Stream Collector for our custom event pipeline. The configuration that is currently being used on the Elastic Beanstalk application is successfully writing events to a Kinesis stream in development.

Is there an obvious answer for this? There are no errors in the Kinesis reporting, or any events saved to the “bad” Kinesis endpoint.

Errors below, any guidance is greatly appreciated.

 -------------------------------------
/var/log/web-1.error.log
-------------------------------------
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketException: Broken pipe (Write failed)
	at java.net.SocketOutputStream.socketWrite0(Native Method)
	at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
	at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
	at sun.security.ssl.OutputRecord.writeBuffer(OutputRecord.java:431)
	at sun.security.ssl.OutputRecord.write(OutputRecord.java:417)
	at sun.security.ssl.SSLSocketImpl.writeRecordInternal(SSLSocketImpl.java:879)
	at sun.security.ssl.SSLSocketImpl.writeRecord(SSLSocketImpl.java:850)
	at sun.security.ssl.AppOutputStream.write(AppOutputStream.java:123)
	at org.apache.http.impl.io.SessionOutputBufferImpl.streamWrite(SessionOutputBufferImpl.java:124)
	at org.apache.http.impl.io.SessionOutputBufferImpl.flushBuffer(SessionOutputBufferImpl.java:136)
	at org.apache.http.impl.io.SessionOutputBufferImpl.write(SessionOutputBufferImpl.java:167)
	at org.apache.http.impl.io.ContentLengthOutputStream.write(ContentLengthOutputStream.java:113)
	at org.apache.http.entity.InputStreamEntity.writeTo(InputStreamEntity.java:144)
	at com.amazonaws.http.RepeatableInputStreamRequestEntity.writeTo(RepeatableInputStreamRequestEntity.java:160)
	at org.apache.http.impl.DefaultBHttpClientConnection.sendRequestEntity(DefaultBHttpClientConnection.java:156)
	at org.apache.http.impl.conn.CPoolProxy.sendRequestEntity(CPoolProxy.java:160)
	at org.apache.http.protocol.HttpRequestExecutor.doSendRequest(HttpRequestExecutor.java:238)
	at com.amazonaws.http.protocol.SdkHttpRequestExecutor.doSendRequest(SdkHttpRequestExecutor.java:63)
	at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:123)
	at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:272)
	at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185)
	at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
	at com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1297)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
	... 27 more
[pool-1-thread-5] ERROR com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink - Retrying in 10000 milliseconds...
[pool-1-thread-10] INFO com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink - Writing 241 Thrift records to Kinesis stream ******masked******
[pool-1-thread-2] ERROR com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink - Writing failed.
com.amazonaws.SdkClientException: Unable to execute HTTP request: Broken pipe (Write failed)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1175)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1121)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512)
	at com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2809)
	at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2776)
	at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2765)
	at com.amazonaws.services.kinesis.AmazonKinesisClient.executePutRecords(AmazonKinesisClient.java:2169)
	at com.amazonaws.services.kinesis.AmazonKinesisClient.putRecords(AmazonKinesisClient.java:2140)
	at com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink.$anonfun$multiPut$1(KinesisSink.scala:286)
	at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
	at scala.util.Success.$anonfun$map$1(Try.scala:255)
	at scala.util.Success.map(Try.scala:213)
	at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
	at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
	at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketException: Broken pipe (Write failed)
	at java.net.SocketOutputStream.socketWrite0(Native Method)
	at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
	at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
	at sun.security.ssl.OutputRecord.writeBuffer(OutputRecord.java:431)
	at sun.security.ssl.OutputRecord.write(OutputRecord.java:417)
	at sun.security.ssl.SSLSocketImpl.writeRecordInternal(SSLSocketImpl.java:879)
	at sun.security.ssl.SSLSocketImpl.writeRecord(SSLSocketImpl.java:850)
	at sun.security.ssl.AppOutputStream.write(AppOutputStream.java:123)
	at org.apache.http.impl.io.SessionOutputBufferImpl.streamWrite(SessionOutputBufferImpl.java:124)
	at org.apache.http.impl.io.SessionOutputBufferImpl.flushBuffer(SessionOutputBufferImpl.java:136)
	at org.apache.http.impl.io.SessionOutputBufferImpl.write(SessionOutputBufferImpl.java:167)
	at org.apache.http.impl.io.ContentLengthOutputStream.write(ContentLengthOutputStream.java:113)
	at org.apache.http.entity.InputStreamEntity.writeTo(InputStreamEntity.java:144)
	at com.amazonaws.http.RepeatableInputStreamRequestEntity.writeTo(RepeatableInputStreamRequestEntity.java:160)
	at org.apache.http.impl.DefaultBHttpClientConnection.sendRequestEntity(DefaultBHttpClientConnection.java:156)
	at org.apache.http.impl.conn.CPoolProxy.sendRequestEntity(CPoolProxy.java:160)
	at org.apache.http.protocol.HttpRequestExecutor.doSendRequest(HttpRequestExecutor.java:238)
	at com.amazonaws.http.protocol.SdkHttpRequestExecutor.doSendRequest(SdkHttpRequestExecutor.java:63)
	at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:123)
	at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:272)
	at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185)
	at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
	at com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1297)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
	... 27 more
[pool-1-thread-2] ERROR com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink - Retrying in 10000 milliseconds...

Hi @digitaltouch. Can you please share the hocon you’re trying the run the collector with? The Kinesis sink section might be enough, but it could be useful to check the other settings as well.

Apologies @dilyan , I should have shared that.


collector {
  # The collector runs as a web service specified on the following interface and port.
  interface = "0.0.0.0"
  port = 8080

  # optional SSL/TLS configuration
  ssl {
    enable = false
    # whether to redirect HTTP to HTTPS
    redirect = false
    port = 9543
  }

  # The collector responds with a cookie to requests with a path that matches the 'vendor/version' protocol.
  # The expected values are:
  # - com.snowplowanalytics.snowplow/tp2 for Tracker Protocol 2
  # - r/tp2 for redirects
  # - com.snowplowanalytics.iglu/v1 for the Iglu Webhook
  # Any path that matches the 'vendor/version' protocol will result in a cookie response, for use by custom webhooks
  # downstream of the collector.
  # But you can also map any valid (i.e. two-segment) path to one of the three defaults.
  # Your custom path must be the key and the value must be one of the corresponding default paths. Both must be full
  # valid paths starting with a leading slash.
  # Pass in an empty map to avoid mapping.
  paths {
    # "/com.acme/track" = "/com.snowplowanalytics.snowplow/tp2"
    # "/com.acme/redirect" = "/r/tp2"
    # "/com.acme/iglu" = "/com.snowplowanalytics.iglu/v1"
  }

  # Configure the P3P policy header.
  p3p {
    policyRef = "/w3c/p3p.xml"
    CP = "NOI DSP COR NID PSA OUR IND COM NAV STA"
  }

  # Cross domain policy configuration.
  # If "enabled" is set to "false", the collector will respond with a 404 to the /crossdomain.xml
  # route.
  crossDomain {
    enabled = false
    # Domains that are granted access, *.acme.com will match http://acme.com and http://sub.acme.com
    domains = [ "*" ]
    # Whether to only grant access to HTTPS or both HTTPS and HTTP sources
    secure = true
  }

  # The collector returns a cookie to clients for user identification
  # with the following domain and expiration.
  cookie {
    enabled = true
    expiration = "365 days"
    # Network cookie name
    name = sp
    # The domain is optional and will make the cookie accessible to other
    # applications on the domain. Comment out these lines to tie cookies to
    # the collector's full domain.
    # The domain is determined by matching the domains from the Origin header of the request
    # to the list below. The first match is used. If no matches are found, the fallback domain will be used,
    # if configured.
    # If you specify a main domain, all subdomains on it will be matched.
    # If you specify a subdomain, only that subdomain will be matched.
    # TO DO: update once final domains are established
    # Examples:
    # domain.com will match domain.com, www.domain.com and secure.client.domain.com
    # client.domain.com will match secure.client.domain.com but not domain.com or www.domain.com
     domains = [
        "****"
        "****"
        "****"
    ]
    # domains += ${?COLLECTOR_COOKIE_DOMAIN_1}
    # domains += ${?COLLECTOR_COOKIE_DOMAIN_2}
    # ... more domains
    # If specified, the fallback domain will be used if none of the Origin header hosts matches the list of
    # cookie domains configured above. (For example, if there is no Origin header.)
    fallbackDomain = "****"
    secure = false
    httpOnly = false
    # The sameSite is optional. You can choose to not specify the attribute, or you can use `Strict`,
    # `Lax` or `None` to limit the cookie sent context.
    #   Strict: the cookie will only be sent along with "same-site" requests.
    #   Lax: the cookie will be sent with same-site requests, and with cross-site top-level navigation.
    #   None: the cookie will be sent with same-site and cross-site requests.
    sameSite = "Lax"
  }

  # If you have a do not track cookie in place, the Scala Stream Collector can respect it by
  # completely bypassing the processing of an incoming request carrying this cookie, the collector
  # will simply reply by a 200 saying "do not track".
  # The cookie name and value must match the configuration below, where the names of the cookies must
  # match entirely and the value could be a regular expression.
  doNotTrackCookie {
    enabled = false
    name = "sp-DoNotTrack"
    value = "true"
  }

  # When enabled and the cookie specified above is missing, performs a redirect to itself to check
  # if third-party cookies are blocked using the specified name. If they are indeed blocked,
  # fallbackNetworkId is used instead of generating a new random one.
  cookieBounce {
    enabled = false
    # The name of the request parameter which will be used on redirects checking that third-party
    # cookies work.
    name = "n3pc"
    # Network user id to fallback to when third-party cookies are blocked.
    fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000"
    # Optionally, specify the name of the header containing the originating protocol for use in the
    # bounce redirect location. Use this if behind a load balancer that performs SSL termination.
    # The value of this header must be http or https. Example, if behind an AWS Classic ELB.
    forwardedProtocolHeader = "X-Forwarded-Proto"
  }

  # When enabled, redirect prefix `r/` will be enabled and its query parameters resolved.
  # Otherwise the request prefixed with `r/` will be dropped with `404 Not Found`
  # Custom redirects configured in `paths` can still be used.
  enableDefaultRedirect = true

  # When enabled, the redirect url passed via the `u` query parameter is scanned for a placeholder
  # token. All instances of that token are replaced withe the network ID. If the placeholder isn't
  # specified, the default value is `${SP_NUID}`.
  redirectMacro {
    enabled = false
    # Optional custom placeholder token (defaults to the literal `${SP_NUID}`)
    placeholder = "[TOKEN]"
  }

  # Customize response handling for requests for the root path ("/").
  # Useful if you need to redirect to web content or privacy policies regarding the use of this collector.
  rootResponse {
    enabled = false
    statusCode = 302
    # Optional, defaults to empty map
    headers = {
      Location = "https://127.0.0.1/",
      X-Custom = "something"
    }
    # Optional, defaults to empty string
    body = "302, redirecting"
  }

  # Configuration related to CORS preflight requests
  cors {
    # The Access-Control-Max-Age response header indicates how long the results of a preflight
    # request can be cached. -1 seconds disables the cache. Chromium max is 10m, Firefox is 24h.
    accessControlMaxAge = 5 seconds
  }

  # Configuration of prometheus http metrics
  prometheusMetrics {
    # If metrics are enabled then all requests will be logged as prometheus metrics
    # and '/metrics' endpoint will return the report about the requests
    enabled = false
    # Custom buckets for http_request_duration_seconds_bucket duration metric
    #durationBucketsInSeconds = [0.1, 3, 10]
  }

  streams {
    # Events which have successfully been collected will be stored in the good stream/topic
    good = "***"

    # Events that are too big (w.r.t Kinesis 1MB limit) will be stored in the bad stream/topic
    bad = "****"

    # Whether to use the incoming event's ip as the partition key for the good stream/topic
    # Note: Nsq does not make use of partition key.
    useIpAddressAsPartitionKey = false

    # Enable the chosen sink by uncommenting the appropriate configuration
    sink {
      # Choose between kinesis, google-pub-sub, kafka, nsq, or stdout.
      # To use stdout, comment or remove everything in the "collector.streams.sink" section except
      # "enabled" which should be set to "stdout".
      enabled = kinesis

      # Region where the streams are located
      region = us-east-1

      ## Optional endpoint url configuration to override aws kinesis endpoints,
      ## this can be used to specify local endpoints when using localstack
      # customEndpoint = {{kinesisEndpoint}}
      # customEndpoint = ${?COLLECTOR_STREAMS_SINK_CUSTOM_ENDPOINT}

      # Thread pool size for Kinesis API requests
      threadPoolSize = 10

      # The following are used to authenticate for the Amazon Kinesis sink.
      # If both are set to 'default', the default provider chain is used
      # (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
      # If both are set to 'iam', use AWS IAM Roles to provision credentials.
      # If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
      aws {
        accessKey = "*****"
        secretKey = "*****"
      }

      # Minimum and maximum backoff periods, in milliseconds
      backoffPolicy {
        minBackoff = 1000
        maxBackoff = 10000
      }
    }

    # Incoming events are stored in a buffer before being sent to Kinesis/Kafka.
    # Note: Buffering is not supported by NSQ.
    # The buffer is emptied whenever:
    # - the number of stored records reaches record-limit or
    # - the combined size of the stored records reaches byte-limit or
    # - the time in milliseconds since the buffer was last emptied reaches time-limit
    buffer {
      byteLimit = 10000000
      recordLimit = 500 # Not supported by Kafka; will be ignored
      timeLimit = 300000
    }
  }

}

# Akka has a variety of possible configuration options defined at
# http://doc.akka.io/docs/akka/current/scala/general/configuration.html
akka {
  loglevel = OFF # 'OFF' for no logging, 'DEBUG' for all logging.
  loggers = ["akka.event.slf4j.Slf4jLogger"]

  # akka-http is the server the Stream collector uses and has configurable options defined at
  # http://doc.akka.io/docs/akka-http/current/scala/http/configuration.html
  http.server {
    # To obtain the hostname in the collector, the 'remote-address' header
    # should be set. By default, this is disabled, and enabling it
    # adds the 'Remote-Address' header to every request automatically.
    remote-address-header = on
    raw-request-uri-header = on

    # Define the maximum request length (the default is 2048)
    parsing {
      max-uri-length = 32768
      uri-parsing-mode = relaxed
    }
  }

  # By default setting `collector.ssl` relies on JSSE (Java Secure Socket
  # Extension) to enable secure communication.
  # To override the default settings set the following section as per
  # https://lightbend.github.io/ssl-config/ExampleSSLConfig.html
  # ssl-config {
  #   debug = {
  #     ssl = true
  #   }
  #   keyManager = {
  #     stores = [
  #       {type = "PKCS12", classpath = false, path = "/etc/ssl/mycert.p12", password = "mypassword" }
  #     ]
  #   }
  #   loose {
  #     disableHostnameVerification = false
  #   }
  # }
}

Nothing jumps out at me as obviously wrong in this configuration @digitaltouch.

Just to clarify: is the collector able to write any events to the good Kinesis stream at all or does this error happen always when it tries to write the events?

The collectors can successfully write to the same stream when we are sending a few records per second (from a different app). This application when under full load will send up to 300 - 400 (larger) unstructured events per second. They are successfully hitting the Scala Stream Collector, but failing to write the the Good Stream.

This leads me to believe that the we may need a larger pipe than 40 Shards on Kinesis to handle the load, or we need to basically be flushing the buffer with a much smaller byte limit? It seems like there would be something in the logs / monitoring in Kinesis that would state that that the throughout was exceeded, but there is not.

If some messages fail to be inserted (because of throughput limits), you should see a message along the lines of:

Successfully wrote XYZ out of ABC records
Retrying all failed records in X milliseconds...

Also, Kinesis limits vary between regions but I think it’s like 500-1,000 records per second per shard.

However, there is also a limit on the size of the record – 1MB per second per shard. So it might indeed be the case that the batches are too big… Now that I said that, I notice the your byteLimit is 10MBs. Can you try with 1MB and see if that fixes things?

Reducing to 1MB corrected it, however, its adding additional load to the servers. We will continue to work to tweak the thread pool, but looks like anything over 1MB is causing the “Broken Pipe” failures.

I appreciate the help. It likely would have taken another day to chase that one down.