Scala stream collector error and 502 bad gateway

Hi all,
I’m getting a 502 gateway error for my collector’s tracking pixel URL, and I’m getting this error from the app:

ERROR com.snowplowanalytics.snowplow.collectors.scalastream.KafkaCollector$ - REST interface could not be bound to 0.0.0.0:80

I am only seeing this issue after upgrading the collector from v0.9.0 to v0.13.0 (when I switch back to 0.9.0, everything works). I copied over the relevant configs to the config.hocon.

Here’s my current config.hocon:

# Copyright (c) 2013-2018 Snowplow Analytics Ltd. All rights reserved.
#
# This program is licensed to you under the Apache License Version 2.0, and
# you may not use this file except in compliance with the Apache License
# Version 2.0.  You may obtain a copy of the Apache License Version 2.0 at
# http://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the Apache License Version 2.0 is distributed on an "AS
# IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.  See the Apache License Version 2.0 for the specific language
# governing permissions and limitations there under.

# This file (application.conf.example) contains a template with
# configuration options for the Scala Stream Collector.
#
# To use, copy this to 'application.conf' and modify the configuration options.

# 'collector' contains configuration options for the main Scala collector.
collector {
  # The collector runs as a web service specified on the following interface and port.
  interface = "0.0.0.0"
  port = 80

  # Configure the P3P policy header.
  p3p {
    policyRef = "/w3c/p3p.xml"
    CP = "NOI DSP COR NID PSA OUR IND COM NAV STA"
  }

  # Cross domain policy configuration.
  # If "enabled" is set to "false", the collector will respond with a 404 to the /crossdomain.xml
  # route.
  crossDomain {
    enabled = true
    # Domain that is granted access, *.acme.com will match http://acme.com and http://sub.acme.com
    domain = "*"
    # Whether to only grant access to HTTPS or both HTTPS and HTTP sources
    secure = false
  }

  # The collector returns a cookie to clients for user identification
  # with the following domain and expiration.
  cookie {
    enabled = true
    expiration = "365 days" #  e.g. 365 days. 3650 days is 10 years, the max a 32-bit browser will likely accept.
    # Network cookie name
    name = _1234_
    # The domain is optional and will make the cookie accessible to other
    # applications on the domain. Comment out this line to tie cookies to
    # the collector's full domain
    domain = ".mydomain.com"
  }

  # When enabled and the cookie specified above is missing, performs a redirect to itself to check
  # if third-party cookies are blocked using the specified name. If they are indeed blocked,
  # fallbackNetworkId is used instead of generating a new random one.
  cookieBounce {
    enabled = false
    # The name of the request parameter which will be used on redirects checking that third-party
    # cookies work.
    name = "n3pc"
    # Network user id to fallback to when third-party cookies are blocked.
    fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000"
    # Optionally, specify the name of the header containing the originating protocol for use in the
    # bounce redirect location. Use this if behind a load balancer that performs SSL termination.
    # The value of this header must be http or https. Example, if behind an AWS Classic ELB.
    forwardedProtocolHeader = "X-Forwarded-Proto"
  }

  # When enabled, the redirect url passed via the `u` query parameter is scanned for a placeholder
  # token. All instances of that token are replaced withe the network ID. If the placeholder isn't
  # specified, the default value is `${SP_NUID}`.
  redirectMacro {
    enabled = false
    # Optional custom placeholder token (defaults to the literal `${SP_NUID}`)
    placeholder = "[TOKEN]"
  }

  streams {
    # Events which have successfully been collected will be stored in the good stream/topic
    good = snowplow-raw

    # Events that are too big (w.r.t Kinesis 1MB limit) will be stored in the bad stream/topic
    bad = snowplow-enriched-bad

    # Whether to use the incoming event's ip as the partition key for the good stream/topic
    # Note: Nsq does not make use of partition key.
    useIpAddressAsPartitionKey = false

    # Enable the chosen sink by uncommenting the appropriate configuration
    sink {
      # Choose between kinesis, googlepubsub, kafka, nsq, or stdout.
      # To use stdout, comment or remove everything in the "collector.streams.sink" section except
      # "enabled" which should be set to "stdout".
      enabled = kafka

      # Region where the streams are located
      region = us-east-1

      # Thread pool size for Kinesis API requests
      threadPoolSize = 10

      # The following are used to authenticate for the Amazon Kinesis sink.
      # If both are set to 'default', the default provider chain is used
      # (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
      # If both are set to 'iam', use AWS IAM Roles to provision credentials.
      # If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
      aws {
        accessKey = iam
        secretKey = iam
      }

      # Minimum and maximum backoff periods, in milliseconds
      backoffPolicy {
        minBackoff = 0
        maxBackoff = 0
      }

      # Or Google Pubsub
      #googleProjectId = ID
      ## Minimum, maximum and total backoff periods, in milliseconds
      ## and multiplier between two backoff
      #backoffPolicy {
      #  minBackoff = {{minBackoffMillis}}
      #  maxBackoff = {{maxBackoffMillis}}
      #  totalBackoff = {{totalBackoffMillis}} # must be >= 10000
      #  multiplier = {{backoffMultiplier}}
      #}

      # Or Kafka
      brokers = "kafka.my.local:29092" 
      ## Number of retries to perform before giving up on sending a record
      retries = 5

      # Or NSQ
      ## Host name for nsqd
      #host = "{{nsqHost}}"
      ## TCP port for nsqd, 4150 by default
      #port = {{nsqdPort}}
    }

    # Incoming events are stored in a buffer before being sent to Kinesis/Kafka.
    # Note: Buffering is not supported by NSQ.
    # The buffer is emptied whenever:
    # - the number of stored records reaches record-limit or
    # - the combined size of the stored records reaches byte-limit or
    # - the time in milliseconds since the buffer was last emptied reaches time-limit
    buffer {
      byteLimit = 4500000 # 4.5mb
      recordLimit = 500 # Not supported by Kafka; will be ignored. Put something here anyways or there will be an error.
      timeLimit = 5000 # 5 seconds
    }
  }
}

# Akka has a variety of possible configuration options defined at
# http://doc.akka.io/docs/akka/current/scala/general/configuration.html
akka {
  loglevel = OFF # 'OFF' for no logging, 'DEBUG' for all logging.
  loggers = ["akka.event.slf4j.Slf4jLogger"]

  # akka-http is the server the Stream collector uses and has configurable options defined at
  # http://doc.akka.io/docs/akka-http/current/scala/http/configuration.html
  http.server {
    # To obtain the hostname in the collector, the 'remote-address' header
    # should be set. By default, this is disabled, and enabling it
    # adds the 'Remote-Address' header to every request automatically.
    remote-address-header = on

    raw-request-uri-header = on

    # Define the maximum request length (the default is 2048)
    parsing {
      max-uri-length = 32768
      uri-parsing-mode = relaxed
    }
  }
}

Here’s my dockerfile:

# Instructions here https://github.com/snowplow/snowplow-docker
FROM snowplow-docker-registry.bintray.io/snowplow/scala-stream-collector-kafka:0.13.0

# Move over configs
COPY config/config.hocon /var/snowplow/config/config.hocon

# Set a custom CMD to add the necessary docker run configs
CMD [ "--config", "/var/snowplow/config/config.hocon" ]

Does anyone have any ideas about what I could change?

Without knowing how you are hosting the collector, could it be that port 80 is restricted? Ports <1024 usually requires root. I don’t see why it would stop working from 0.9 to 0.13, however, you could try a different port.

Thanks @asgergb, so I moved it to port 8000 and it appears that the REST error is fixed. However, still getting a 502 gateway error. The docker container is not failing, and here is all of the log output I see from it (DEBUG mode is enabled in config.hocon):

— metrics.num.samples = 2
— metrics.recording.level = INFO
— metrics.sample.window.ms = 30000
— partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
— receive.buffer.bytes = 32768
— reconnect.backoff.max.ms = 1000
— reconnect.backoff.ms = 50
— request.timeout.ms = 30000
— retries = 0
— retry.backoff.ms = 100
— sasl.jaas.config = null
— sasl.kerberos.kinit.cmd = /usr/bin/kinit
— sasl.kerberos.min.time.before.relogin = 60000
— sasl.kerberos.service.name = null
— sasl.kerberos.ticket.renew.jitter = 0.05
— sasl.kerberos.ticket.renew.window.factor = 0.8
— sasl.mechanism = GSSAPI
— security.protocol = PLAINTEXT
— send.buffer.bytes = 131072
— ssl.cipher.suites = null
— ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
— ssl.endpoint.identification.algorithm = null
— ssl.key.password = null
— ssl.keymanager.algorithm = SunX509
— ssl.keystore.location = null
— ssl.keystore.password = null
— ssl.keystore.type = JKS
— ssl.protocol = TLS
— ssl.provider = null
— ssl.secure.random.implementation = null
— ssl.trustmanager.algorithm = PKIX
— ssl.truststore.location = null
— ssl.truststore.password = null
— ssl.truststore.type = JKS
— transaction.timeout.ms = 60000
— transactional.id = null
— value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
— [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 1.0.1
— [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : c0518aa65f25317e
— [main] INFO com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KafkaSink - Create Kafka Producer to brokers: kafka.mycloud.local:29092
— [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
— acks = all
— batch.size = 16384
— bootstrap.servers = [kafka.mycloud.local:29092]
— buffer.memory = 4500000
— client.id =
— compression.type = none
— connections.max.idle.ms = 540000
— enable.idempotence = false
— interceptor.classes = null
— key.serializer = class org.apache.kafka.common.serialization.StringSerializer
— linger.ms = 5000
— max.block.ms = 60000
— max.in.flight.requests.per.connection = 5
— max.request.size = 1048576
— metadata.max.age.ms = 300000
— metric.reporters = []
— metrics.num.samples = 2
— metrics.recording.level = INFO
— metrics.sample.window.ms = 30000
— partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
— receive.buffer.bytes = 32768
— reconnect.backoff.max.ms = 1000
— reconnect.backoff.ms = 50
— request.timeout.ms = 30000
— retries = 0
— retry.backoff.ms = 100
— sasl.jaas.config = null
— sasl.kerberos.kinit.cmd = /usr/bin/kinit
— sasl.kerberos.min.time.before.relogin = 60000
— sasl.kerberos.service.name = null
— sasl.kerberos.ticket.renew.jitter = 0.05
— sasl.kerberos.ticket.renew.window.factor = 0.8
— sasl.mechanism = GSSAPI
— security.protocol = PLAINTEXT
— send.buffer.bytes = 131072
— ssl.cipher.suites = null
— ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
— ssl.endpoint.identification.algorithm = null
— ssl.key.password = null
— ssl.keymanager.algorithm = SunX509
— ssl.keystore.location = null
— ssl.keystore.password = null
— ssl.keystore.type = JKS
— ssl.protocol = TLS
— ssl.provider = null
— ssl.secure.random.implementation = null
— ssl.trustmanager.algorithm = PKIX
— ssl.truststore.location = null
— ssl.truststore.password = null
— ssl.truststore.type = JKS
— transaction.timeout.ms = 60000
— transactional.id = null
— value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
— [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 1.0.1
— [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : c0518aa65f25317e
— [scala-stream-collector-akka.actor.default-dispatcher-5] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
— [scala-stream-collector-akka.actor.default-dispatcher-2] INFO com.snowplowanalytics.snowplow.collectors.scalastream.KafkaCollector$ - REST interface bound to /0:0:0:0:0:0:0:0:8000

Hosting wise, it’s a docker container with NGINX routing the traffic to the container. Nothing has changed in NGINX since I upgraded from v0.9.0, and I currently have a container running 0.9.0 for production currently, and another container with 0.13.0 that’s getting the error (I’ve also tried just running 0.13.0 without the other container, and same problems).

Anything else I can try to change? I tried disabling crossdomain, and nothing changed.

What exactly is returning 502? nginx or some LB in front of it or…? Did you also update the port mapping from 80 to 8000 for nginx (and a potential LB and so on)?

@asgergb I did some additional work on the port mapping like you suggested, and now it’s up an running. Thank you!

For anyone else reading this with a similar problem, my docker-compose is something like this (although I’m not using docker-compose exactly, it’s a different hosting system with similar configs):

ports:
- container: 8000
  http: 80
  https: 443

and my config.hocon has this config

  # The collector runs as a web service specified on the following interface and port.
  interface = "0.0.0.0"
  port = 8000

I still don’t know why the collector won’t receive data on port 80, but I using an alternate port is no problem.