Open source kinesis scala stream collector

Hi team,

I am an absolute beginner at setting up the open source version of Snowplow, using the following Github documentation.

I am trying to set up the Scala Stream collector on an Ubuntu 18.04 machine. I have installed the correct JDK version “1.8.0_252”, I have the snowplow-stream-collector-kinesis-1.0.0.jar stored on my machine, yet I am still receiving the following error when running the following command:

java -jar snowplow-stream-collector-kinesis-1.0.0.jar --config my.conf
Exception in thread “main” com.typesafe.config.ConfigException$Parse: my.conf: 24: expecting a close parentheses ‘)’ here, not: ‘{’

Any help on this would be much appreciated!

1 Like

Can you paste your configuration file? It sounds like there is an incorrect value in your configuration.

The config is as below. I have not altered it yet, so it is all the default values.

# Copyright (c) 2013-2019 Snowplow Analytics Ltd. All rights reserved.
#
# This program is licensed to you under the Apache License Version 2.0, and
# you may not use this file except in compliance with the Apache License
# Version 2.0.  You may obtain a copy of the Apache License Version 2.0 at
# http://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the Apache License Version 2.0 is distributed on an "AS
# IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.  See the Apache License Version 2.0 for the specific language
# governing permissions and limitations there under.

# This file (config.hocon.sample) contains a template with
# configuration options for the Scala Stream Collector.
#
# To use, copy this to 'application.conf' and modify the configuration options.

# 'collector' contains configuration options for the main Scala collector.
collector {
  # The collector runs as a web service specified on the following interface and port.
  interface = "0.0.0.0"
  interface = ${?COLLECTOR_INTERFACE}
  port = {{collectorPort}}
  port = ${?COLLECTOR_PORT}

  # optional SSL/TLS configuration
  ssl {
    enable = false
    enable = ${?COLLECTOR_SSL}
    # whether to redirect HTTP to HTTPS
    redirect = false
    redirect = ${?COLLECTOR_SSL_REDIRECT}
    port = 9543
    port = ${?COLLECTOR_SSL_PORT}
  }

  # The collector responds with a cookie to requests with a path that matches the 'vendor/version' protocol.
  # The expected values are:
  # - com.snowplowanalytics.snowplow/tp2 for Tracker Protocol 2
  # - r/tp2 for redirects
  # - com.snowplowanalytics.iglu/v1 for the Iglu Webhook
  # Any path that matches the 'vendor/version' protocol will result in a cookie response, for use by custom webhooks
  # downstream of the collector.
  # But you can also map any valid (i.e. two-segment) path to one of the three defaults.
  # Your custom path must be the key and the value must be one of the corresponding default paths. Both must be full
  # valid paths starting with a leading slash.
  # Pass in an empty map to avoid mapping.
  paths {
    # "/com.acme/track" = "/com.snowplowanalytics.snowplow/tp2"
    # "/com.acme/redirect" = "/r/tp2"
    # "/com.acme/iglu" = "/com.snowplowanalytics.iglu/v1"
  }

  # Configure the P3P policy header.
  p3p {
    policyRef = "/w3c/p3p.xml"
    CP = "NOI DSP COR NID PSA OUR IND COM NAV STA"
  }

  # Cross domain policy configuration.
  # If "enabled" is set to "false", the collector will respond with a 404 to the /crossdomain.xml
  # route.
  crossDomain {
    enabled = false
    # Domains that are granted access, *.acme.com will match http://acme.com and http://sub.acme.com
    enabled = ${?COLLECTOR_CROSS_DOMAIN_ENABLED}
    domains = [ "*" ]
    domains = [ ${?COLLECTOR_CROSS_DOMAIN_DOMAIN} ]
    # Whether to only grant access to HTTPS or both HTTPS and HTTP sources
    secure = true
    secure = ${?COLLECTOR_CROSS_DOMAIN_SECURE}
  }

  # The collector returns a cookie to clients for user identification
  # with the following domain and expiration.
  cookie {
    enabled = true
    enabled = ${?COLLECTOR_COOKIE_ENABLED}
    expiration = {{cookieExpiration}} # e.g. "365 days"
    expiration = ${?COLLECTOR_COOKIE_EXPIRATION}
    # Network cookie name
    name = {{collectorCookieName}}
    name = ${?COLLECTOR_COOKIE_NAME}
    # The domain is optional and will make the cookie accessible to other
    # applications on the domain. Comment out these lines to tie cookies to
    # the collector's full domain.
    # The domain is determined by matching the domains from the Origin header of the request
    # to the list below. The first match is used. If no matches are found, the fallback domain will be used,
    # if configured.
    # If you specify a main domain, all subdomains on it will be matched.
    # If you specify a subdomain, only that subdomain will be matched.
    # Examples:
    # domain.com will match domain.com, www.domain.com and secure.client.domain.com
    # client.domain.com will match secure.client.domain.com but not domain.com or www.domain.com
    domains = [
        "{{cookieDomain1}}" # e.g. "domain.com" -> any origin domain ending with this will be matched and domain.com will be returned
        "{{cookieDomain2}}" # e.g. "secure.anotherdomain.com" -> any origin domain ending with this will be matched and secure.anotherdomain.com will be returned
        # ... more domains
    ]
    domains += ${?COLLECTOR_COOKIE_DOMAIN_1}
    domains += ${?COLLECTOR_COOKIE_DOMAIN_2}
    # ... more domains
    # If specified, the fallback domain will be used if none of the Origin header hosts matches the list of
    # cookie domains configured above. (For example, if there is no Origin header.)
    fallbackDomain = "{{fallbackDomain}}"
    fallbackDomain = ${?FALLBACK_DOMAIN}
    secure = false
    secure = ${?COLLECTOR_COOKIE_SECURE}
    httpOnly = false
    httpOnly = ${?COLLECTOR_COOKIE_HTTP_ONLY}
    # The sameSite is optional. You can choose to not specify the attribute, or you can use `Strict`,
    # `Lax` or `None` to limit the cookie sent context.
    #   Strict: the cookie will only be sent along with "same-site" requests.
    #   Lax: the cookie will be sent with same-site requests, and with cross-site top-level navigation.
    #   None: the cookie will be sent with same-site and cross-site requests.
    sameSite = "{{cookieSameSite}}"
    sameSite = ${?COLLECTOR_COOKIE_SAME_SITE}
  }

  # If you have a do not track cookie in place, the Scala Stream Collector can respect it by
  # completely bypassing the processing of an incoming request carrying this cookie, the collector
  # will simply reply by a 200 saying "do not track".
  # The cookie name and value must match the configuration below, where the names of the cookies must
  # match entirely and the value could be a regular expression.
  doNotTrackCookie {
    enabled = false
    enabled = ${?COLLECTOR_DO_NOT_TRACK_COOKIE_ENABLED}
    name = {{doNotTrackCookieName}}
    name = ${?COLLECTOR_DO_NOT_TRACK_COOKIE_NAME}
    value = {{doNotTrackCookieValue}}
    value = ${?COLLECTOR_DO_NOT_TRACK_COOKIE_VALUE}
  }

  # When enabled and the cookie specified above is missing, performs a redirect to itself to check
  # if third-party cookies are blocked using the specified name. If they are indeed blocked,
  # fallbackNetworkId is used instead of generating a new random one.
  cookieBounce {
    enabled = false
    enabled = ${?COLLECTOR_COOKIE_BOUNCE_ENABLED}
    # The name of the request parameter which will be used on redirects checking that third-party
    # cookies work.
    name = "n3pc"
    name = ${?COLLECTOR_COOKIE_BOUNCE_NAME}
    # Network user id to fallback to when third-party cookies are blocked.
    fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000"
    fallbackNetworkUserId = ${?COLLECTOR_COOKIE_BOUNCE_FALLBACK_NETWORK_USER_ID}
    # Optionally, specify the name of the header containing the originating protocol for use in the
    # bounce redirect location. Use this if behind a load balancer that performs SSL termination.
    # The value of this header must be http or https. Example, if behind an AWS Classic ELB.
    forwardedProtocolHeader = "X-Forwarded-Proto"
    forwardedProtocolHeader = ${?COLLECTOR_COOKIE_BOUNCE_FORWARDED_PROTOCOL_HEADER}
  }

  # When enabled, redirect prefix `r/` will be enabled and its query parameters resolved.
  # Otherwise the request prefixed with `r/` will be dropped with `404 Not Found`
  # Custom redirects configured in `paths` can still be used.
  enableDefaultRedirect = true
  enableDefaultRedirect = ${?COLLECTOR_ALLOW_REDIRECTS}

  # When enabled, the redirect url passed via the `u` query parameter is scanned for a placeholder
  # token. All instances of that token are replaced withe the network ID. If the placeholder isn't
  # specified, the default value is `${SP_NUID}`.
  redirectMacro {
    enabled = false
    enabled = ${?COLLECTOR_REDIRECT_MACRO_ENABLED}
    # Optional custom placeholder token (defaults to the literal `${SP_NUID}`)
    placeholder = "[TOKEN]"
    placeholder = ${?COLLECTOR_REDIRECT_REDIRECT_MACRO_PLACEHOLDER}
  }

  # Customize response handling for requests for the root path ("/").
  # Useful if you need to redirect to web content or privacy policies regarding the use of this collector.
  rootResponse {
    enabled = false
    enabled = ${?COLLECTOR_ROOT_RESPONSE_ENABLED}
    statusCode = 302
    statusCode = ${?COLLECTOR_ROOT_RESPONSE_STATUS_CODE}
    # Optional, defaults to empty map
    headers = {
      Location = "https://127.0.0.1/",
      Location = ${?COLLECTOR_ROOT_RESPONSE_HEADERS_LOCATION},
      X-Custom = "something"
    }
    # Optional, defaults to empty string
    body = "302, redirecting"
    body = ${?COLLECTOR_ROOT_RESPONSE_BODY}
  }

  # Configuration related to CORS preflight requests
  cors {
    # The Access-Control-Max-Age response header indicates how long the results of a preflight
    # request can be cached. -1 seconds disables the cache. Chromium max is 10m, Firefox is 24h.
    accessControlMaxAge = 5 seconds
    accessControlMaxAge = ${?COLLECTOR_CORS_ACCESS_CONTROL_MAX_AGE}
  }

  # Configuration of prometheus http metrics
  prometheusMetrics {
    # If metrics are enabled then all requests will be logged as prometheus metrics
    # and '/metrics' endpoint will return the report about the requests
    enabled = false
    # Custom buckets for http_request_duration_seconds_bucket duration metric
    #durationBucketsInSeconds = [0.1, 3, 10]
  }

  streams {
    # Events which have successfully been collected will be stored in the good stream/topic
    good = {{good}}
    good = ${?COLLECTOR_STREAMS_GOOD}

    # Events that are too big (w.r.t Kinesis 1MB limit) will be stored in the bad stream/topic
    bad = {{bad}}
    bad = ${?COLLECTOR_STREAMS_BAD}

    # Whether to use the incoming event's ip as the partition key for the good stream/topic
    # Note: Nsq does not make use of partition key.
    useIpAddressAsPartitionKey = false
    useIpAddressAsPartitionKey = ${?COLLECTOR_STREAMS_USE_IP_ADDRESS_AS_PARTITION_KEY}

    # Enable the chosen sink by uncommenting the appropriate configuration
    sink {
      # Choose between kinesis, googlepubsub, kafka, nsq, or stdout.
      # To use stdout, comment or remove everything in the "collector.streams.sink" section except
      # "enabled" which should be set to "stdout".
      enabled = kinesis
      enabled = ${?COLLECTOR_STREAMS_SINK_ENABLED}

      # Region where the streams are located
      region = {{kinesisRegion}}
      region = ${?COLLECTOR_STREAMS_SINK_REGION}

      ## Optional endpoint url configuration to override aws kinesis endpoints,
      ## this can be used to specify local endpoints when using localstack
      # customEndpoint = {{kinesisEndpoint}}
      # customEndpoint = ${?COLLECTOR_STREAMS_SINK_CUSTOM_ENDPOINT}

      # Thread pool size for Kinesis API requests
      threadPoolSize = 10
      threadPoolSize = ${?COLLECTOR_STREAMS_SINK_THREAD_POOL_SIZE}

      # The following are used to authenticate for the Amazon Kinesis sink.
      # If both are set to 'default', the default provider chain is used
      # (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
      # If both are set to 'iam', use AWS IAM Roles to provision credentials.
      # If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
      aws {
        accessKey = iam
        accessKey = ${?COLLECTOR_STREAMS_SINK_AWS_ACCESS_KEY}
        secretKey = iam
        secretKey = ${?COLLECTOR_STREAMS_SINK_AWS_SECRET_KEY}
      }

      # Minimum and maximum backoff periods, in milliseconds
      backoffPolicy {
        minBackoff = {{minBackoffMillis}}
        minBackoff = ${?COLLECTOR_STREAMS_SINK_MIN_BACKOFF}
        maxBackoff = {{maxBackoffMillis}}
        maxBackoff = ${?COLLECTOR_STREAMS_SINK_MAX_BACKOFF}
      }

      # Or Google Pubsub
      #googleProjectId = ID
      ## Minimum, maximum and total backoff periods, in milliseconds
      ## and multiplier between two backoff
      #backoffPolicy {
      #  minBackoff = {{minBackoffMillis}}
      #  maxBackoff = {{maxBackoffMillis}}
      #  totalBackoff = {{totalBackoffMillis}} # must be >= 10000
      #  multiplier = {{backoffMultiplier}}
      #}

      # Or Kafka
      #brokers = "{{kafkaBrokers}}"
      ## Number of retries to perform before giving up on sending a record
      #retries = 0
      # The kafka producer has a variety of possible configuration options defined at
      # https://kafka.apache.org/documentation/#producerconfigs
      # Some values are set to other values from this config by default:
      # "bootstrap.servers" -> brokers
      # retries             -> retries
      # "buffer.memory"     -> buffer.byteLimit
      # "linger.ms"         -> buffer.timeLimit
      #producerConf {
      #  acks = all
      #  "key.serializer"     = "org.apache.kafka.common.serialization.StringSerializer"
      #  "value.serializer"   = "org.apache.kafka.common.serialization.StringSerializer"
      #}

      # Or NSQ
      ## Host name for nsqd
      #host = "{{nsqHost}}"
      ## TCP port for nsqd, 4150 by default
      #port = {{nsqdPort}}
    }

    # Incoming events are stored in a buffer before being sent to Kinesis/Kafka.
    # Note: Buffering is not supported by NSQ.
    # The buffer is emptied whenever:
    # - the number of stored records reaches record-limit or
    # - the combined size of the stored records reaches byte-limit or
    # - the time in milliseconds since the buffer was last emptied reaches time-limit
    buffer {
      byteLimit = {{bufferByteThreshold}}
      byteLimit = ${?COLLECTOR_STREAMS_BUFFER_BYTE_LIMIT}
      recordLimit = {{bufferRecordThreshold}} # Not supported by Kafka; will be ignored
      recordLimit = ${?COLLECTOR_STREAMS_BUFFER_RECORD_LIMIT}
      timeLimit = {{bufferTimeThreshold}}
      timeLimit = ${?COLLECTOR_STREAMS_BUFFER_TIME_LIMIT}
    }
  }

}

# Akka has a variety of possible configuration options defined at
# http://doc.akka.io/docs/akka/current/scala/general/configuration.html
akka {
  loglevel = DEBUG # 'OFF' for no logging, 'DEBUG' for all logging.
  loglevel = ${?AKKA_LOGLEVEL}
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  loggers = [${?AKKA_LOGGERS}]

  # akka-http is the server the Stream collector uses and has configurable options defined at
  # http://doc.akka.io/docs/akka-http/current/scala/http/configuration.html
  http.server {
    # To obtain the hostname in the collector, the 'remote-address' header
    # should be set. By default, this is disabled, and enabling it
    # adds the 'Remote-Address' header to every request automatically.
    remote-address-header = on
    remote-address-header = ${?AKKA_HTTP_SERVER_REMOTE_ADDRESS_HEADER}

    raw-request-uri-header = on
    raw-request-uri-header = ${?AKKA_HTTP_SERVER_RAW_REQUEST_URI_HEADER}

    # Define the maximum request length (the default is 2048)
    parsing {
      max-uri-length = 32768
      max-uri-length = ${?AKKA_HTTP_SERVER_PARSING_MAX_URI_LENGTH}
      uri-parsing-mode = relaxed
      uri-parsing-mode = ${?AKKA_HTTP_SERVER_PARSING_URI_PARSING_MODE}
    }
  }

  # By default setting `collector.ssl` relies on JSSE (Java Secure Socket
  # Extension) to enable secure communication.
  # To override the default settings set the following section as per
  # https://lightbend.github.io/ssl-config/ExampleSSLConfig.html
  # ssl-config {
  #   debug = {
  #     ssl = true
  #   }
  #   keyManager = {
  #     stores = [
  #       {type = "PKCS12", classpath = false, path = "/etc/ssl/mycert.p12", password = "mypassword" }
  #     ]
  #   }
  #   loose {
  #     disableHostnameVerification = false
  #   }
  # }
}

Hello David,
Welcome to Discourse – Snowplow.

The raw config files has some gaps that you have to fill in before it will be usable.
Such notation: {{collectorPort}} is basically a placeholder that you have to replace with some real data. So the error in line 24 says that you need to input the port into the config file, i.e. port = 8080

Every {{...}} needs to be updated.

I hope this helps. In case of any more questions feel free to ask!
All the Best,
Luke

2 Likes

Hi Luke,

Thank you for the previous assistance, it was very helpful and got me through the error I was getting.

I have now configured the config file as below but am receiving more errors.

I have tried to run the following command and received the following error:

java -jar snowplow-stream-collector-kinesis-1.0.0.jar --config my.conf
Exception in thread “main” com.amazonaws.SdkClientException: Unable to marshall request to JSON: Jackson jackson-core/jackson-dataformat-cbor incompatible library version detected.
You have two possible resolutions:

  1. Ensure the com.fasterxml.jackson.core:jackson-core & com.fasterxml.jackson.dataformat:jackson-dataformat-cbor libraries on your classpath have the same version number
  2. Disable CBOR wire-protocol by passing the -Dcom.amazonaws.sdk.disableCbor property or setting the AWS_CBOR_DISABLE environment variable (warning this may affect performance)

After this, I tried to add the flag in the command to disable the need for CBOR but received the following error:

java -jar -Dcom.amazonaws.sdk.disableCbor snowplow-stream-collector-kinesis-1.0.0.jar --config my.conf
Exception in thread “main” com.amazonaws.SdkClientException: The requested metadata is not found at http://169.254.169.254/latest/meta-data/iam/security-credentials/

How do I ensure the com.fasterxml.jackson.core:jackson-core & com.fasterxml.jackson.dataformat:jackson-dataformat-cbor libraries on your classpath have the same version number? Or is there potentially something else I have missed in the config file?

Here is the updated config file:

# Copyright (c) 2013-2020 Snowplow Analytics Ltd. All rights reserved.
#
# This program is licensed to you under the Apache License Version 2.0, and
# you may not use this file except in compliance with the Apache License
# Version 2.0.  You may obtain a copy of the Apache License Version 2.0 at
# http://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the Apache License Version 2.0 is distributed on an "AS
# IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.  See the Apache License Version 2.0 for the specific language
# governing permissions and limitations there under.

# This file (config.hocon.sample) contains a template with
# configuration options for the Scala Stream Collector.
#
# To use, copy this to 'application.conf' and modify the configuration options.

# 'collector' contains configuration options for the main Scala collector.
collector {
  # The collector runs as a web service specified on the following interface and port.
  interface = "0.0.0.0"
  interface = ${?COLLECTOR_INTERFACE}
  port = 8080
  port = ${?COLLECTOR_PORT}

  # optional SSL/TLS configuration
  ssl {
    enable = false
    enable = ${?COLLECTOR_SSL}
    # whether to redirect HTTP to HTTPS
    redirect = false
    redirect = ${?COLLECTOR_SSL_REDIRECT}
    port = 9543
    port = ${?COLLECTOR_SSL_PORT}
  }

  # The collector responds with a cookie to requests with a path that matches the 'vendor/version' protocol.
  # The expected values are:
  # - com.snowplowanalytics.snowplow/tp2 for Tracker Protocol 2
  # - r/tp2 for redirects
  # - com.snowplowanalytics.iglu/v1 for the Iglu Webhook
  # Any path that matches the 'vendor/version' protocol will result in a cookie response, for use by custom webhooks
  # downstream of the collector.
  # But you can also map any valid (i.e. two-segment) path to one of the three defaults.
  # Your custom path must be the key and the value must be one of the corresponding default paths. Both must be full
  # valid paths starting with a leading slash.
  # Pass in an empty map to avoid mapping.
  paths {
    # "/com.acme/track" = "/com.snowplowanalytics.snowplow/tp2"
    # "/com.acme/redirect" = "/r/tp2"
    # "/com.acme/iglu" = "/com.snowplowanalytics.iglu/v1"
  }

  # Configure the P3P policy header.
  p3p {
    policyRef = "/w3c/p3p.xml"
    CP = "NOI DSP COR NID PSA OUR IND COM NAV STA"
  }

  # Cross domain policy configuration.
  # If "enabled" is set to "false", the collector will respond with a 404 to the /crossdomain.xml
  # route.
  crossDomain {
    enabled = false
    # Domains that are granted access, *.acme.com will match http://acme.com and http://sub.acme.com
    enabled = ${?COLLECTOR_CROSS_DOMAIN_ENABLED}
    domains = [ "*" ]
    domains = [ ${?COLLECTOR_CROSS_DOMAIN_DOMAIN} ]
    # Whether to only grant access to HTTPS or both HTTPS and HTTP sources
    secure = true
    secure = ${?COLLECTOR_CROSS_DOMAIN_SECURE}
  }

  # The collector returns a cookie to clients for user identification
  # with the following domain and expiration.
  cookie {
    enabled = true
    enabled = ${?COLLECTOR_COOKIE_ENABLED}
    expiration = "365 days" # e.g. "365 days"
    expiration = ${?COLLECTOR_COOKIE_EXPIRATION}
    # Network cookie name
    name = "userEvent"
    name = ${?COLLECTOR_COOKIE_NAME}
    # The domain is optional and will make the cookie accessible to other
    # applications on the domain. Comment out these lines to tie cookies to
    # the collector's full domain.
    # The domain is determined by matching the domains from the Origin header of the request
    # to the list below. The first match is used. If no matches are found, the fallback domain will be used,
    # if configured.
    # If you specify a main domain, all subdomains on it will be matched.
    # If you specify a subdomain, only that subdomain will be matched.
    # Examples:
    # domain.com will match domain.com, www.domain.com and secure.client.domain.com
    # client.domain.com will match secure.client.domain.com but not domain.com or www.domain.com
    domains = [
        "{{cookieDomain1}}" # e.g. "domain.com" -> any origin domain ending with this will be matched and domain.com will be returned
        "{{cookieDomain2}}" # e.g. "secure.anotherdomain.com" -> any origin domain ending with this will be matched and secure.anotherdomain.com will be returned
        # ... more domains
    ]
    domains += ${?COLLECTOR_COOKIE_DOMAIN_1}
    domains += ${?COLLECTOR_COOKIE_DOMAIN_2}
    # ... more domains
    # If specified, the fallback domain will be used if none of the Origin header hosts matches the list of
    # cookie domains configured above. (For example, if there is no Origin header.)
    fallbackDomain = "{{fallbackDomain}}"
    fallbackDomain = ${?FALLBACK_DOMAIN}
    secure = false
    secure = ${?COLLECTOR_COOKIE_SECURE}
    httpOnly = false
    httpOnly = ${?COLLECTOR_COOKIE_HTTP_ONLY}
    # The sameSite is optional. You can choose to not specify the attribute, or you can use `Strict`,
    # `Lax` or `None` to limit the cookie sent context.
    #   Strict: the cookie will only be sent along with "same-site" requests.
    #   Lax: the cookie will be sent with same-site requests, and with cross-site top-level navigation.
    #   None: the cookie will be sent with same-site and cross-site requests.
    sameSite = "{{cookieSameSite}}"
    sameSite = ${?COLLECTOR_COOKIE_SAME_SITE}
  }

  # If you have a do not track cookie in place, the Scala Stream Collector can respect it by
  # completely bypassing the processing of an incoming request carrying this cookie, the collector
  # will simply reply by a 200 saying "do not track".
  # The cookie name and value must match the configuration below, where the names of the cookies must
  # match entirely and the value could be a regular expression.
  doNotTrackCookie {
    enabled = false
    enabled = ${?COLLECTOR_DO_NOT_TRACK_COOKIE_ENABLED}
    name = "doNotTrackCookie"
    name = ${?COLLECTOR_DO_NOT_TRACK_COOKIE_NAME}
    value = "doNotTrackCookie"
    value = ${?COLLECTOR_DO_NOT_TRACK_COOKIE_VALUE}
  }

  # When enabled and the cookie specified above is missing, performs a redirect to itself to check
  # if third-party cookies are blocked using the specified name. If they are indeed blocked,
  # fallbackNetworkId is used instead of generating a new random one.
  cookieBounce {
    enabled = false
    enabled = ${?COLLECTOR_COOKIE_BOUNCE_ENABLED}
    # The name of the request parameter which will be used on redirects checking that third-party
    # cookies work.
    name = "n3pc"
    name = ${?COLLECTOR_COOKIE_BOUNCE_NAME}
    # Network user id to fallback to when third-party cookies are blocked.
    fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000"
    fallbackNetworkUserId = ${?COLLECTOR_COOKIE_BOUNCE_FALLBACK_NETWORK_USER_ID}
    # Optionally, specify the name of the header containing the originating protocol for use in the
    # bounce redirect location. Use this if behind a load balancer that performs SSL termination.
    # The value of this header must be http or https. Example, if behind an AWS Classic ELB.
    forwardedProtocolHeader = "X-Forwarded-Proto"
    forwardedProtocolHeader = ${?COLLECTOR_COOKIE_BOUNCE_FORWARDED_PROTOCOL_HEADER}
  }

  # When enabled, redirect prefix `r/` will be enabled and its query parameters resolved.
  # Otherwise the request prefixed with `r/` will be dropped with `404 Not Found`
  # Custom redirects configured in `paths` can still be used.
  enableDefaultRedirect = true
  enableDefaultRedirect = ${?COLLECTOR_ALLOW_REDIRECTS}

  # When enabled, the redirect url passed via the `u` query parameter is scanned for a placeholder
  # token. All instances of that token are replaced withe the network ID. If the placeholder isn't
  # specified, the default value is `${SP_NUID}`.
  redirectMacro {
    enabled = false
    enabled = ${?COLLECTOR_REDIRECT_MACRO_ENABLED}
    # Optional custom placeholder token (defaults to the literal `${SP_NUID}`)
    placeholder = "[TOKEN]"
    placeholder = ${?COLLECTOR_REDIRECT_REDIRECT_MACRO_PLACEHOLDER}
  }

  # Customize response handling for requests for the root path ("/").
  # Useful if you need to redirect to web content or privacy policies regarding the use of this collector.
  rootResponse {
    enabled = false
    enabled = ${?COLLECTOR_ROOT_RESPONSE_ENABLED}
    statusCode = 302
    statusCode = ${?COLLECTOR_ROOT_RESPONSE_STATUS_CODE}
    # Optional, defaults to empty map
    headers = {
      Location = "https://127.0.0.1/",
      Location = ${?COLLECTOR_ROOT_RESPONSE_HEADERS_LOCATION},
      X-Custom = "something"
    }
    # Optional, defaults to empty string
    body = "302, redirecting"
    body = ${?COLLECTOR_ROOT_RESPONSE_BODY}
  }

  # Configuration related to CORS preflight requests
  cors {
    # The Access-Control-Max-Age response header indicates how long the results of a preflight
    # request can be cached. -1 seconds disables the cache. Chromium max is 10m, Firefox is 24h.
    accessControlMaxAge = 5 seconds
    accessControlMaxAge = ${?COLLECTOR_CORS_ACCESS_CONTROL_MAX_AGE}
  }

  # Configuration of prometheus http metrics
  prometheusMetrics {
    # If metrics are enabled then all requests will be logged as prometheus metrics
    # and '/metrics' endpoint will return the report about the requests
    enabled = false
    # Custom buckets for http_request_duration_seconds_bucket duration metric
    #durationBucketsInSeconds = [0.1, 3, 10]
  }

  streams {
    # Events which have successfully been collected will be stored in the good stream/topic
    good = collector.streams.good
    good = ${?COLLECTOR_STREAMS_GOOD}

    # Events that are too big (w.r.t Kinesis 1MB limit) will be stored in the bad stream/topic
    bad = collector.streams.bad
    bad = ${?COLLECTOR_STREAMS_BAD}

    # Whether to use the incoming event's ip as the partition key for the good stream/topic
    # Note: Nsq does not make use of partition key.
    useIpAddressAsPartitionKey = false
    useIpAddressAsPartitionKey = ${?COLLECTOR_STREAMS_USE_IP_ADDRESS_AS_PARTITION_KEY}

    # Enable the chosen sink by uncommenting the appropriate configuration
    sink {
      # Choose between kinesis, google-pub-sub, kafka, nsq, or stdout.
      # To use stdout, comment or remove everything in the "collector.streams.sink" section except
      # "enabled" which should be set to "stdout".
      enabled = kinesis
      enabled = ${?COLLECTOR_STREAMS_SINK_ENABLED}

      # Region where the streams are located
      region = "ap-southeast-2"
      region = ${?COLLECTOR_STREAMS_SINK_REGION}

      ## Optional endpoint url configuration to override aws kinesis endpoints,
      ## this can be used to specify local endpoints when using localstack
      # customEndpoint = {{kinesisEndpoint}}
      # customEndpoint = ${?COLLECTOR_STREAMS_SINK_CUSTOM_ENDPOINT}

      # Thread pool size for Kinesis API requests
      threadPoolSize = 10
      threadPoolSize = ${?COLLECTOR_STREAMS_SINK_THREAD_POOL_SIZE}

      # The following are used to authenticate for the Amazon Kinesis sink.
      # If both are set to 'default', the default provider chain is used
      # (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
      # If both are set to 'iam', use AWS IAM Roles to provision credentials.
      # If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
      aws {
        accessKey = iam
        accessKey = ${?COLLECTOR_STREAMS_SINK_AWS_ACCESS_KEY}
        secretKey = iam
        secretKey = ${?COLLECTOR_STREAMS_SINK_AWS_SECRET_KEY}
      }

      # Minimum and maximum backoff periods, in milliseconds
      backoffPolicy {
        minBackoff = 43200
        minBackoff = ${?COLLECTOR_STREAMS_SINK_MIN_BACKOFF}
        maxBackoff = 86400
        maxBackoff = ${?COLLECTOR_STREAMS_SINK_MAX_BACKOFF}
      }

      # Or Google Pubsub
      #googleProjectId = ID
      ## Minimum, maximum and total backoff periods, in milliseconds
      ## and multiplier between two backoff
      #backoffPolicy {
      #  minBackoff = {{minBackoffMillis}}
      #  maxBackoff = {{maxBackoffMillis}}
      #  totalBackoff = {{totalBackoffMillis}} # must be >= 10000
      #  multiplier = {{backoffMultiplier}}
      #}

      # Or Kafka
      #brokers = "{{kafkaBrokers}}"
      ## Number of retries to perform before giving up on sending a record
      #retries = 0
      # The kafka producer has a variety of possible configuration options defined at
      # https://kafka.apache.org/documentation/#producerconfigs
      # Some values are set to other values from this config by default:
      # "bootstrap.servers" = brokers
      # "buffer.memory"     = buffer.byteLimit
      # "linger.ms"         = buffer.timeLimit
      #producerConf {
      #  acks = all
      #  "key.serializer"     = "org.apache.kafka.common.serialization.StringSerializer"
      #  "value.serializer"   = "org.apache.kafka.common.serialization.StringSerializer"
      #}

      # Or NSQ
      ## Host name for nsqd
      #host = "{{nsqHost}}"
      ## TCP port for nsqd, 4150 by default
      #port = {{nsqdPort}}
    }

    # Incoming events are stored in a buffer before being sent to Kinesis/Kafka.
    # Note: Buffering is not supported by NSQ.
    # The buffer is emptied whenever:
    # - the number of stored records reaches record-limit or
    # - the combined size of the stored records reaches byte-limit or
    # - the time in milliseconds since the buffer was last emptied reaches time-limit
    buffer {
      byteLimit = 10000
      byteLimit = ${?COLLECTOR_STREAMS_BUFFER_BYTE_LIMIT}
      recordLimit = 1000000 # Not supported by Kafka; will be ignored
      recordLimit = ${?COLLECTOR_STREAMS_BUFFER_RECORD_LIMIT}
      timeLimit = 50000
      timeLimit = ${?COLLECTOR_STREAMS_BUFFER_TIME_LIMIT}
    }
  }

}

# Akka has a variety of possible configuration options defined at
# http://doc.akka.io/docs/akka/current/scala/general/configuration.html
akka {
  loglevel = DEBUG # 'OFF' for no logging, 'DEBUG' for all logging.
  loglevel = ${?AKKA_LOGLEVEL}
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  loggers = [${?AKKA_LOGGERS}]

  # akka-http is the server the Stream collector uses and has configurable options defined at
  # http://doc.akka.io/docs/akka-http/current/scala/http/configuration.html
  http.server {
    # To obtain the hostname in the collector, the 'remote-address' header
    # should be set. By default, this is disabled, and enabling it
    # adds the 'Remote-Address' header to every request automatically.
    remote-address-header = on
    remote-address-header = ${?AKKA_HTTP_SERVER_REMOTE_ADDRESS_HEADER}

    raw-request-uri-header = on
    raw-request-uri-header = ${?AKKA_HTTP_SERVER_RAW_REQUEST_URI_HEADER}

    # Define the maximum request length (the default is 2048)
    parsing {
      max-uri-length = 32768
      max-uri-length = ${?AKKA_HTTP_SERVER_PARSING_MAX_URI_LENGTH}
      uri-parsing-mode = relaxed
      uri-parsing-mode = ${?AKKA_HTTP_SERVER_PARSING_URI_PARSING_MODE}
    }
  }

  # By default setting `collector.ssl` relies on JSSE (Java Secure Socket
  # Extension) to enable secure communication.
  # To override the default settings set the following section as per
  # https://lightbend.github.io/ssl-config/ExampleSSLConfig.html
  # ssl-config {
  #   debug = {
  #     ssl = true
  #   }
  #   keyManager = {
  #     stores = [
  #       {type = "PKCS12", classpath = false, path = "/etc/ssl/mycert.p12", password = "mypassword" }
  #     ]
  #   }
  #   loose {
  #     disableHostnameVerification = false
  #   }
  # }
}

Hi David_D, it looks like disabling CBOR got rid of the related error and you’re now getting an error about credentials. I can see that you are using IAM roles for authentication. Can you please double check that all required roles and policies have been created and the app has access to them? Alternatively, have you tested using the default provider chain or an environment variable for passing credentials?

1 Like

HI @David_D,

You need to run inside an AWS EC2 machine to use iam authentication option since it tries to use EC2 metadata service and this service can only be used in EC2 deployments.

As @dilyan suggested, for local deployment, I’d recommend using alternative authentication options, as explained in the config template.

1 Like

Hi all,

Thank you again for all your help so far! The issue was the incompatibility of the CBOR and snowplow-stream-collector-kinesis-.jar versions - I fixed this by using the latest stream-collector-kinesis-.jar.

The collector is now set up and logging events into S3 through a Kinesis stream. However, the JSON schema isn’t showing the values that the front end developers are trying to send through. They are trying to send this:

trackSelfDescribingEvent({ // Self-describing JSON for the custom event
  'schema': 'iglu:com.acme/event/jsonschema/1-0-0',
  'data': {'message': 'hello world'}

But what is coming through to the bucket is:

{"schema":"iglu:com.snowplowanalytics.snowplow\/payload_data\/jsonschema\/1-0-4","data":[{"p":"mob","eid":"d2fa12b1-476f-414e-af95-6c64710752c6","tv":"andr-1.4.2","e":"ue","cx":"eyJzY2hlb...J9fV19","tna":"frugl","tz":"Australia\/Brisbane","ue_px":"eyJzY2hlbWEiOiJpZ2...19fQ==","dtm":"159362974","lang":"English","aid":"frugl-arimac","stm":"159434511"}

Could this be the collector not being configured correctly?

The front end documentation they are using is:

Here are the parameters they have set:

import Tracker from '@snowplow/react-native-tracker';
async setupSnowPlow (){
        try{
            Tracker.initialize(
                'endpoint:8080',
                'post',
                'http',
                'frugl',
                'frugl-arimac', {
                setPlatformContext: true,
                setBase64Encoded: true,
                setApplicationContext:true,
                setLifecycleEvents: true,
                setScreenContext: true,
                setSessionContext: true,
                foregroundTimeout: 10,
                backgroundTimeout: 10,
                checkInterval: 5,
                setInstallEvent: true
                });
        }catch(err){
            console.log("setupSnowPlow Error",err)
        }
}
async trackSampleEvent () {
               try{
            Tracker.trackSelfDescribingEvent({
                'schema': 'iglu:com.snowplowanalytics.snowplow/payload_data/jsonschema/1-0-4',
                'data': {'message': 'hello world'}
            },
            []);
        }catch(err){
            console.log("setupSnowPlow Error",err)

Any help is very much appreciated!

@David_D, if you base64 decode the value of ue_px you will see your data. Your data is encoded as per your tracker settings, specifically setBase64Encoded: true. Check Snowplow Tracker Protocol to find the answer to your question.

One more thing though. The data you are trying to send is not quite what your tracking code shows. You show the code

trackSelfDescribingEvent({ // Self-describing JSON for the custom event
  'schema': 'iglu:com.acme/event/jsonschema/1-0-0',
  'data': {'message': 'hello world'}

but then also showed this

Tracker.trackSelfDescribingEvent({
                'schema': 'iglu:com.snowplowanalytics.snowplow/payload_data/jsonschema/1-0-4',
                'data': {'message': 'hello world'}
            }

You have different schemas, Your tracking code indeed should have 'schema': 'iglu:com.acme/event/jsonschema/1-0-0' (if that’s what you want) and you also have to host that schema in your Iglu server.

2 Likes