Scala stream collector error and 502 bad gateway


#1

Hi all,
I’m getting a 502 gateway error for my collector’s tracking pixel URL, and I’m getting this error from the app:

ERROR com.snowplowanalytics.snowplow.collectors.scalastream.KafkaCollector$ - REST interface could not be bound to 0.0.0.0:80

I am only seeing this issue after upgrading the collector from v0.9.0 to v0.13.0 (when I switch back to 0.9.0, everything works). I copied over the relevant configs to the config.hocon.

Here’s my current config.hocon:

# Copyright (c) 2013-2018 Snowplow Analytics Ltd. All rights reserved.
#
# This program is licensed to you under the Apache License Version 2.0, and
# you may not use this file except in compliance with the Apache License
# Version 2.0.  You may obtain a copy of the Apache License Version 2.0 at
# http://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the Apache License Version 2.0 is distributed on an "AS
# IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.  See the Apache License Version 2.0 for the specific language
# governing permissions and limitations there under.

# This file (application.conf.example) contains a template with
# configuration options for the Scala Stream Collector.
#
# To use, copy this to 'application.conf' and modify the configuration options.

# 'collector' contains configuration options for the main Scala collector.
collector {
  # The collector runs as a web service specified on the following interface and port.
  interface = "0.0.0.0"
  port = 80

  # Configure the P3P policy header.
  p3p {
    policyRef = "/w3c/p3p.xml"
    CP = "NOI DSP COR NID PSA OUR IND COM NAV STA"
  }

  # Cross domain policy configuration.
  # If "enabled" is set to "false", the collector will respond with a 404 to the /crossdomain.xml
  # route.
  crossDomain {
    enabled = true
    # Domain that is granted access, *.acme.com will match http://acme.com and http://sub.acme.com
    domain = "*"
    # Whether to only grant access to HTTPS or both HTTPS and HTTP sources
    secure = false
  }

  # The collector returns a cookie to clients for user identification
  # with the following domain and expiration.
  cookie {
    enabled = true
    expiration = "365 days" #  e.g. 365 days. 3650 days is 10 years, the max a 32-bit browser will likely accept.
    # Network cookie name
    name = _1234_
    # The domain is optional and will make the cookie accessible to other
    # applications on the domain. Comment out this line to tie cookies to
    # the collector's full domain
    domain = ".mydomain.com"
  }

  # When enabled and the cookie specified above is missing, performs a redirect to itself to check
  # if third-party cookies are blocked using the specified name. If they are indeed blocked,
  # fallbackNetworkId is used instead of generating a new random one.
  cookieBounce {
    enabled = false
    # The name of the request parameter which will be used on redirects checking that third-party
    # cookies work.
    name = "n3pc"
    # Network user id to fallback to when third-party cookies are blocked.
    fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000"
    # Optionally, specify the name of the header containing the originating protocol for use in the
    # bounce redirect location. Use this if behind a load balancer that performs SSL termination.
    # The value of this header must be http or https. Example, if behind an AWS Classic ELB.
    forwardedProtocolHeader = "X-Forwarded-Proto"
  }

  # When enabled, the redirect url passed via the `u` query parameter is scanned for a placeholder
  # token. All instances of that token are replaced withe the network ID. If the placeholder isn't
  # specified, the default value is `${SP_NUID}`.
  redirectMacro {
    enabled = false
    # Optional custom placeholder token (defaults to the literal `${SP_NUID}`)
    placeholder = "[TOKEN]"
  }

  streams {
    # Events which have successfully been collected will be stored in the good stream/topic
    good = snowplow-raw

    # Events that are too big (w.r.t Kinesis 1MB limit) will be stored in the bad stream/topic
    bad = snowplow-enriched-bad

    # Whether to use the incoming event's ip as the partition key for the good stream/topic
    # Note: Nsq does not make use of partition key.
    useIpAddressAsPartitionKey = false

    # Enable the chosen sink by uncommenting the appropriate configuration
    sink {
      # Choose between kinesis, googlepubsub, kafka, nsq, or stdout.
      # To use stdout, comment or remove everything in the "collector.streams.sink" section except
      # "enabled" which should be set to "stdout".
      enabled = kafka

      # Region where the streams are located
      region = us-east-1

      # Thread pool size for Kinesis API requests
      threadPoolSize = 10

      # The following are used to authenticate for the Amazon Kinesis sink.
      # If both are set to 'default', the default provider chain is used
      # (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
      # If both are set to 'iam', use AWS IAM Roles to provision credentials.
      # If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
      aws {
        accessKey = iam
        secretKey = iam
      }

      # Minimum and maximum backoff periods, in milliseconds
      backoffPolicy {
        minBackoff = 0
        maxBackoff = 0
      }

      # Or Google Pubsub
      #googleProjectId = ID
      ## Minimum, maximum and total backoff periods, in milliseconds
      ## and multiplier between two backoff
      #backoffPolicy {
      #  minBackoff = {{minBackoffMillis}}
      #  maxBackoff = {{maxBackoffMillis}}
      #  totalBackoff = {{totalBackoffMillis}} # must be >= 10000
      #  multiplier = {{backoffMultiplier}}
      #}

      # Or Kafka
      brokers = "kafka.my.local:29092" 
      ## Number of retries to perform before giving up on sending a record
      retries = 5

      # Or NSQ
      ## Host name for nsqd
      #host = "{{nsqHost}}"
      ## TCP port for nsqd, 4150 by default
      #port = {{nsqdPort}}
    }

    # Incoming events are stored in a buffer before being sent to Kinesis/Kafka.
    # Note: Buffering is not supported by NSQ.
    # The buffer is emptied whenever:
    # - the number of stored records reaches record-limit or
    # - the combined size of the stored records reaches byte-limit or
    # - the time in milliseconds since the buffer was last emptied reaches time-limit
    buffer {
      byteLimit = 4500000 # 4.5mb
      recordLimit = 500 # Not supported by Kafka; will be ignored. Put something here anyways or there will be an error.
      timeLimit = 5000 # 5 seconds
    }
  }
}

# Akka has a variety of possible configuration options defined at
# http://doc.akka.io/docs/akka/current/scala/general/configuration.html
akka {
  loglevel = OFF # 'OFF' for no logging, 'DEBUG' for all logging.
  loggers = ["akka.event.slf4j.Slf4jLogger"]

  # akka-http is the server the Stream collector uses and has configurable options defined at
  # http://doc.akka.io/docs/akka-http/current/scala/http/configuration.html
  http.server {
    # To obtain the hostname in the collector, the 'remote-address' header
    # should be set. By default, this is disabled, and enabling it
    # adds the 'Remote-Address' header to every request automatically.
    remote-address-header = on

    raw-request-uri-header = on

    # Define the maximum request length (the default is 2048)
    parsing {
      max-uri-length = 32768
      uri-parsing-mode = relaxed
    }
  }
}

Here’s my dockerfile:

# Instructions here https://github.com/snowplow/snowplow-docker
FROM snowplow-docker-registry.bintray.io/snowplow/scala-stream-collector-kafka:0.13.0

# Move over configs
COPY config/config.hocon /var/snowplow/config/config.hocon

# Set a custom CMD to add the necessary docker run configs
CMD [ "--config", "/var/snowplow/config/config.hocon" ]

Does anyone have any ideas about what I could change?


Setting up a single collector for multiple domains
#2

Without knowing how you are hosting the collector, could it be that port 80 is restricted? Ports <1024 usually requires root. I don’t see why it would stop working from 0.9 to 0.13, however, you could try a different port.


#3

Thanks @asgergb, so I moved it to port 8000 and it appears that the REST error is fixed. However, still getting a 502 gateway error. The docker container is not failing, and here is all of the log output I see from it (DEBUG mode is enabled in config.hocon):

— metrics.num.samples = 2
— metrics.recording.level = INFO
— metrics.sample.window.ms = 30000
— partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
— receive.buffer.bytes = 32768
— reconnect.backoff.max.ms = 1000
— reconnect.backoff.ms = 50
— request.timeout.ms = 30000
— retries = 0
— retry.backoff.ms = 100
— sasl.jaas.config = null
— sasl.kerberos.kinit.cmd = /usr/bin/kinit
— sasl.kerberos.min.time.before.relogin = 60000
— sasl.kerberos.service.name = null
— sasl.kerberos.ticket.renew.jitter = 0.05
— sasl.kerberos.ticket.renew.window.factor = 0.8
— sasl.mechanism = GSSAPI
— security.protocol = PLAINTEXT
— send.buffer.bytes = 131072
— ssl.cipher.suites = null
— ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
— ssl.endpoint.identification.algorithm = null
— ssl.key.password = null
— ssl.keymanager.algorithm = SunX509
— ssl.keystore.location = null
— ssl.keystore.password = null
— ssl.keystore.type = JKS
— ssl.protocol = TLS
— ssl.provider = null
— ssl.secure.random.implementation = null
— ssl.trustmanager.algorithm = PKIX
— ssl.truststore.location = null
— ssl.truststore.password = null
— ssl.truststore.type = JKS
— transaction.timeout.ms = 60000
— transactional.id = null
— value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
— [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 1.0.1
— [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : c0518aa65f25317e
— [main] INFO com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KafkaSink - Create Kafka Producer to brokers: kafka.mycloud.local:29092
— [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
— acks = all
— batch.size = 16384
— bootstrap.servers = [kafka.mycloud.local:29092]
— buffer.memory = 4500000
— client.id =
— compression.type = none
— connections.max.idle.ms = 540000
— enable.idempotence = false
— interceptor.classes = null
— key.serializer = class org.apache.kafka.common.serialization.StringSerializer
— linger.ms = 5000
— max.block.ms = 60000
— max.in.flight.requests.per.connection = 5
— max.request.size = 1048576
— metadata.max.age.ms = 300000
— metric.reporters = []
— metrics.num.samples = 2
— metrics.recording.level = INFO
— metrics.sample.window.ms = 30000
— partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
— receive.buffer.bytes = 32768
— reconnect.backoff.max.ms = 1000
— reconnect.backoff.ms = 50
— request.timeout.ms = 30000
— retries = 0
— retry.backoff.ms = 100
— sasl.jaas.config = null
— sasl.kerberos.kinit.cmd = /usr/bin/kinit
— sasl.kerberos.min.time.before.relogin = 60000
— sasl.kerberos.service.name = null
— sasl.kerberos.ticket.renew.jitter = 0.05
— sasl.kerberos.ticket.renew.window.factor = 0.8
— sasl.mechanism = GSSAPI
— security.protocol = PLAINTEXT
— send.buffer.bytes = 131072
— ssl.cipher.suites = null
— ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
— ssl.endpoint.identification.algorithm = null
— ssl.key.password = null
— ssl.keymanager.algorithm = SunX509
— ssl.keystore.location = null
— ssl.keystore.password = null
— ssl.keystore.type = JKS
— ssl.protocol = TLS
— ssl.provider = null
— ssl.secure.random.implementation = null
— ssl.trustmanager.algorithm = PKIX
— ssl.truststore.location = null
— ssl.truststore.password = null
— ssl.truststore.type = JKS
— transaction.timeout.ms = 60000
— transactional.id = null
— value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
— [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 1.0.1
— [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : c0518aa65f25317e
— [scala-stream-collector-akka.actor.default-dispatcher-5] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
— [scala-stream-collector-akka.actor.default-dispatcher-2] INFO com.snowplowanalytics.snowplow.collectors.scalastream.KafkaCollector$ - REST interface bound to /0:0:0:0:0:0:0:0:8000

Hosting wise, it’s a docker container with NGINX routing the traffic to the container. Nothing has changed in NGINX since I upgraded from v0.9.0, and I currently have a container running 0.9.0 for production currently, and another container with 0.13.0 that’s getting the error (I’ve also tried just running 0.13.0 without the other container, and same problems).

Anything else I can try to change? I tried disabling crossdomain, and nothing changed.


#4

What exactly is returning 502? nginx or some LB in front of it or…? Did you also update the port mapping from 80 to 8000 for nginx (and a potential LB and so on)?


#5

@asgergb I did some additional work on the port mapping like you suggested, and now it’s up an running. Thank you!

For anyone else reading this with a similar problem, my docker-compose is something like this (although I’m not using docker-compose exactly, it’s a different hosting system with similar configs):

ports:
- container: 8000
  http: 80
  https: 443

and my config.hocon has this config

  # The collector runs as a web service specified on the following interface and port.
  interface = "0.0.0.0"
  port = 8000

I still don’t know why the collector won’t receive data on port 80, but I using an alternate port is no problem.