[SOLVED] Trouble while sending data from stream collector to Kafka topic

Hello,

I am using “snowplow-stream-collector-kafka-1.0.0” and i am trying to publish incoming messages from the trackers to Kafka topics.
This is my collector’s configuration file:

collector {
  interface = "0.0.0.0"
  interface = ${?COLLECTOR_INTERFACE}
  port = 5359
  port = ${?COLLECTOR_PORT}

  ssl {
    enable = false
    enable = ${?COLLECTOR_SSL}
    # whether to redirect HTTP to HTTPS
    redirect = false
    redirect = ${?COLLECTOR_SSL_REDIRECT}
    port = 9543
    port = ${?COLLECTOR_SSL_PORT}
  }

  paths {
    # "/com.acme/track" = "/com.snowplowanalytics.snowplow/tp2"
    # "/com.acme/redirect" = "/r/tp2"
    # "/com.acme/iglu" = "/com.snowplowanalytics.iglu/v1"
  }

  p3p {
    policyRef = "/w3c/p3p.xml"
    CP = "NOI DSP COR NID PSA OUR IND COM NAV STA"
  }

  crossDomain {
    enabled = false
    # Domains that are granted access, *.acme.com will match http://acme.com and http://sub.acme.com
    enabled = ${?COLLECTOR_CROSS_DOMAIN_ENABLED}
    domains = [ "*" ]
    domains = [ ${?COLLECTOR_CROSS_DOMAIN_DOMAIN} ]
    # Whether to only grant access to HTTPS or both HTTPS and HTTP sources
    secure = true
    secure = ${?COLLECTOR_CROSS_DOMAIN_SECURE}
  }

  cookie {
    enabled = true
    #enabled = ${?COLLECTOR_COOKIE_ENABLED}
    expiration = "365 days" # e.g. "365 days"
    #expiration = ${?COLLECTOR_COOKIE_EXPIRATION}
    # Network cookie name
    name = charg

    secure = false
    secure = ${?COLLECTOR_COOKIE_SECURE}
    httpOnly = false
    httpOnly = ${?COLLECTOR_COOKIE_HTTP_ONLY}

  }


  doNotTrackCookie {
    enabled = false
    enabled = ${?COLLECTOR_DO_NOT_TRACK_COOKIE_ENABLED}
    name = ""
    name = ${?COLLECTOR_DO_NOT_TRACK_COOKIE_NAME}
    value = ""
    value = ${?COLLECTOR_DO_NOT_TRACK_COOKIE_VALUE}
  }

  cookieBounce {
    enabled = false
    enabled = ${?COLLECTOR_COOKIE_BOUNCE_ENABLED}
    # The name of the request parameter which will be used on redirects checking that third-party
    # cookies work.
    name = "n3pc"
    name = ${?COLLECTOR_COOKIE_BOUNCE_NAME}
    # Network user id to fallback to when third-party cookies are blocked.
    fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000"
    fallbackNetworkUserId = ${?COLLECTOR_COOKIE_BOUNCE_FALLBACK_NETWORK_USER_ID}
    # Optionally, specify the name of the header containing the originating protocol for use in the
    # bounce redirect location. Use this if behind a load balancer that performs SSL termination.
    # The value of this header must be http or https. Example, if behind an AWS Classic ELB.
    forwardedProtocolHeader = "X-Forwarded-Proto"
    forwardedProtocolHeader = ${?COLLECTOR_COOKIE_BOUNCE_FORWARDED_PROTOCOL_HEADER}
  }

  enableDefaultRedirect = true
  enableDefaultRedirect = ${?COLLECTOR_ALLOW_REDIRECTS}

  redirectMacro {
    enabled = false
    enabled = ${?COLLECTOR_REDIRECT_MACRO_ENABLED}
    # Optional custom placeholder token (defaults to the literal `${SP_NUID}`)
    placeholder = "[TOKEN]"
    placeholder = ${?COLLECTOR_REDIRECT_REDIRECT_MACRO_PLACEHOLDER}
  }

  rootResponse {
    enabled = false
    enabled = ${?COLLECTOR_ROOT_RESPONSE_ENABLED}
    statusCode = 302
    statusCode = ${?COLLECTOR_ROOT_RESPONSE_STATUS_CODE}
    # Optional, defaults to empty map
    headers = {
      Location = "https://127.0.0.1/",
      Location = ${?COLLECTOR_ROOT_RESPONSE_HEADERS_LOCATION},
      X-Custom = "something"
    }
    # Optional, defaults to empty string
    body = "302, redirecting"
    body = ${?COLLECTOR_ROOT_RESPONSE_BODY}
  }

  # Configuration related to CORS preflight requests
  cors {
    # The Access-Control-Max-Age response header indicates how long the results of a preflight
    # request can be cached. -1 seconds disables the cache. Chromium max is 10m, Firefox is 24h.
    accessControlMaxAge = 5 seconds
    accessControlMaxAge = ${?COLLECTOR_CORS_ACCESS_CONTROL_MAX_AGE}
  }

  # Configuration of prometheus http metrics
  prometheusMetrics {
    # If metrics are enabled then all requests will be logged as prometheus metrics
    # and '/metrics' endpoint will return the report about the requests
    enabled = false
    # Custom buckets for http_request_duration_seconds_bucket duration metric
    #durationBucketsInSeconds = [0.1, 3, 10]
  }

  streams {
    # Events which have successfully been collected will be stored in the good stream/topic
    good = good_sink		
    good = ${?COLLECTOR_STREAMS_GOOD}

    # Events that are too big (w.r.t Kinesis 1MB limit) will be stored in the bad stream/topic
    bad = bad_sink
    bad = ${?COLLECTOR_STREAMS_BAD}

    # Whether to use the incoming event's ip as the partition key for the good stream/topic
    # Note: Nsq does not make use of partition key.
    useIpAddressAsPartitionKey = false
    useIpAddressAsPartitionKey = ${?COLLECTOR_STREAMS_USE_IP_ADDRESS_AS_PARTITION_KEY}

    # Enable the chosen sink by uncommenting the appropriate configuration
    sink {
      # Choose between kinesis, google-pub-sub, kafka, nsq, or stdout.
      # To use stdout, comment or remove everything in the "collector.streams.sink" section except
      # "enabled" which should be set to "stdout".
      enabled = kafka
  

      brokers = "localhost:9092"
      ## Number of retries to perform before giving up on sending a record
      retries = 0
      # The kafka producer has a variety of possible configuration options defined at
      # https://kafka.apache.org/documentation/#producerconfigs
      # Some values are set to other values from this config by default:
      #"bootstrap.servers" -> brokers
      #retries             -> retries
      #"buffer.memory"     -> buffer.byteLimit
      #"linger.ms"         -> buffer.timeLimit
      producerConf {
        acks = all
      #  "key.serializer"     = "org.apache.kafka.common.serialization.StringSerializer"
      #  "value.serializer"   = "org.apache.kafka.common.serialization.ByteArraySerializer"
      }


    }

    buffer {
      byteLimit = 4500000
      byteLimit = ${?COLLECTOR_STREAMS_BUFFER_BYTE_LIMIT}
      recordLimit = 500  # Not supported by Kafka; will be ignored
      recordLimit = ${?COLLECTOR_STREAMS_BUFFER_RECORD_LIMIT}
      timeLimit = 60000
      timeLimit = ${?COLLECTOR_STREAMS_BUFFER_TIME_LIMIT}
    }
  }

}

akka {
  loglevel = DEBUG # 'OFF' for no logging, 'DEBUG' for all logging.
  loglevel = ${?AKKA_LOGLEVEL}
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  loggers = [${?AKKA_LOGGERS}]

  http.server {
    # To obtain the hostname in the collector, the 'remote-address' header
    # should be set. By default, this is disabled, and enabling it
    # adds the 'Remote-Address' header to every request automatically.
    remote-address-header = on
    remote-address-header = ${?AKKA_HTTP_SERVER_REMOTE_ADDRESS_HEADER}

    raw-request-uri-header = on
    raw-request-uri-header = ${?AKKA_HTTP_SERVER_RAW_REQUEST_URI_HEADER}

    # Define the maximum request length (the default is 2048)
    parsing {
      max-uri-length = 32768
      max-uri-length = ${?AKKA_HTTP_SERVER_PARSING_MAX_URI_LENGTH}
      uri-parsing-mode = relaxed
      uri-parsing-mode = ${?AKKA_HTTP_SERVER_PARSING_URI_PARSING_MODE}
    }
  }

}

The collector’s response to the JS tracker is 200 an then in the collector’s console i can see the message:

[scala-stream-collector-akka.actor.default-dispatcher-4] [akka://scala-stream-collector/system/IO-T/$a/0] New connection accepted".

But nothing seems to reach the kafka topic. Then after 1 minute the message from the tracker reaches Kafka and in the collectors console there is the message:

[scala-stream-collector-akka.actor.default-dispatcher-7] [akka://scala-stream-collector/system/StreamSupervisor-0/flow-2-0-detacher] Aborting tcp connection to /0:0:0:0:0:0:0:1:59177 because of upstream failure: akka.http.impl.engine.HttpIdleTimeoutException: HTTP idle-timeout encountered, no bytes passed in the last 1 minute. This is configurable by akka.http.[server|client].idle-timeout.

Can anyone explain why is this happening? Why the collector is waiting for this timeout before pushing the data to Kafka?
Thank you!

Akka looks like it is timing out when trying to publish to Kafka on 0:0:0:0:0:0:0:1 (lookback address). Is Kafka running and available on the local system at the specified port?

Yes, kafka is in the same local machine… After the timeout, the message is being published in kafka. I think that there is not a problem in the connection, right?

Hi @bambachas79 this is due to your timeLimit which is set to 60 seconds currently. The collector will only write downstream once one of the buffer thresholds is hit - either time, event count or byte limit.

Lowering these limits will trigger to push events downstream faster.

Thank you very much josh! I ll check it, but i have a question. What if my tracker is sending data every second? should i lower the timeLimit to 1 second, too? Its a little bit strange to me that the collector publishes data only when one of the buffer thresholds is hit.

Everything here works on micro-batches to ensure the best use of resources. If you want things to arrive faster setting a lower interval will achieve this end. In production we are using 500ms time-limit to flush very aggressively and have data move to the next service faster.

Thank you so much josh! I changed it and works fine now!

1 Like

Thanks for the update great to hear you got it working now!