Setting up scala collector

I am trying to set up the Scala collector on a local machine followed by produce to kafka, but i am unable to get it to work. Can anyone guide?

The config file is:

collector {
  # The collector runs as a web service specified on the following interface and port.
  interface = "0.0.0.0"
  interface = ${?COLLECTOR_INTERFACE}
  port = "8092"
  port = ${?COLLECTOR_PORT}

  # optional SSL/TLS configuration
  ssl {
    enable = false
    enable = ${?COLLECTOR_SSL}
    # whether to redirect HTTP to HTTPS
    redirect = false
    redirect = ${?COLLECTOR_SSL_REDIRECT}
    port = 9543
    port = ${?COLLECTOR_SSL_PORT}
  }

  # The collector responds with a cookie to requests with a path that matches the 'vendor/version' protocol.
  # The expected values are:
  # - com.snowplowanalytics.snowplow/tp2 for Tracker Protocol 2
  # - r/tp2 for redirects
  # - com.snowplowanalytics.iglu/v1 for the Iglu Webhook
  # Any path that matches the 'vendor/version' protocol will result in a cookie response, for use by custom webhooks
  # downstream of the collector.
  # But you can also map any valid (i.e. two-segment) path to one of the three defaults.
  # Your custom path must be the key and the value must be one of the corresponding default paths. Both must be full
  # valid paths starting with a leading slash.
  # Pass in an empty map to avoid mapping.
  paths {
    # "/com.acme/track" = "/com.snowplowanalytics.snowplow/tp2"
    # "/com.acme/redirect" = "/r/tp2"
    # "/com.acme/iglu" = "/com.snowplowanalytics.iglu/v1"
  }

  # Configure the P3P policy header.
  p3p {
    policyRef = "/w3c/p3p.xml"
    CP = "NOI DSP COR NID PSA OUR IND COM NAV STA"
  }

  # Cross domain policy configuration.
  # If "enabled" is set to "false", the collector will respond with a 404 to the /crossdomain.xml
  # route.
  crossDomain {
    enabled = false
    # Domains that are granted access, *.acme.com will match http://acme.com and http://sub.acme.com
    enabled = ${?COLLECTOR_CROSS_DOMAIN_ENABLED}
    domains = [ "*" ]
    domains = [ ${?COLLECTOR_CROSS_DOMAIN_DOMAIN} ]
    # Whether to only grant access to HTTPS or both HTTPS and HTTP sources
    secure = true
    secure = ${?COLLECTOR_CROSS_DOMAIN_SECURE}
  }

  # The collector returns a cookie to clients for user identification
  # with the following domain and expiration.
  cookie {
    enabled = true
    enabled = ${?COLLECTOR_COOKIE_ENABLED}
    expiration = "365 days" # e.g. "365 days"
    expiration = ${?COLLECTOR_COOKIE_EXPIRATION}
    # Network cookie name
    name = sp
    name = ${?COLLECTOR_COOKIE_NAME}
    # The domain is optional and will make the cookie accessible to other
    # applications on the domain. Comment out these lines to tie cookies to
    # the collector's full domain.
    # The domain is determined by matching the domains from the Origin header of the request
    # to the list below. The first match is used. If no matches are found, the fallback domain will be used,
    # if configured.
    # If you specify a main domain, all subdomains on it will be matched.
    # If you specify a subdomain, only that subdomain will be matched.
    # Examples:
    # domain.com will match domain.com, www.domain.com and secure.client.domain.com
    # client.domain.com will match secure.client.domain.com but not domain.com or www.domain.com
    domains = [
        "{{cookieDomain1}}" # e.g. "domain.com" -> any origin domain ending with this will be matched and domain.com will be returned
        "{{cookieDomain2}}" # e.g. "secure.anotherdomain.com" -> any origin domain ending with this will be matched and secure.anotherdomain.com will be returned
        # ... more domains
    ]
    domains += ${?COLLECTOR_COOKIE_DOMAIN_1}
    domains += ${?COLLECTOR_COOKIE_DOMAIN_2}
    # ... more domains
    # If specified, the fallback domain will be used if none of the Origin header hosts matches the list of
    # cookie domains configured above. (For example, if there is no Origin header.)
    fallbackDomain = "{{fallbackDomain}}"
    fallbackDomain = ${?FALLBACK_DOMAIN}
    secure = false
    secure = ${?COLLECTOR_COOKIE_SECURE}
    httpOnly = false
    httpOnly = ${?COLLECTOR_COOKIE_HTTP_ONLY}
    # The sameSite is optional. You can choose to not specify the attribute, or you can use `Strict`,
    # `Lax` or `None` to limit the cookie sent context.
    #   Strict: the cookie will only be sent along with "same-site" requests.
    #   Lax: the cookie will be sent with same-site requests, and with cross-site top-level navigation.
    #   None: the cookie will be sent with same-site and cross-site requests.
    sameSite = "{{cookieSameSite}}"
    sameSite = ${?COLLECTOR_COOKIE_SAME_SITE}
  }

  # If you have a do not track cookie in place, the Scala Stream Collector can respect it by
  # completely bypassing the processing of an incoming request carrying this cookie, the collector
  # will simply reply by a 200 saying "do not track".
  # The cookie name and value must match the configuration below, where the names of the cookies must
  # match entirely and the value could be a regular expression.
  doNotTrackCookie {
    enabled = false
    enabled = ${?COLLECTOR_DO_NOT_TRACK_COOKIE_ENABLED}
    name = no_track_cookie_name
    name = ${?COLLECTOR_DO_NOT_TRACK_COOKIE_NAME}
    value = no_track_cookie_value
    value = ${?COLLECTOR_DO_NOT_TRACK_COOKIE_VALUE}
  }

  # When enabled and the cookie specified above is missing, performs a redirect to itself to check
  # if third-party cookies are blocked using the specified name. If they are indeed blocked,
  # fallbackNetworkId is used instead of generating a new random one.
  cookieBounce {
    enabled = false
    enabled = ${?COLLECTOR_COOKIE_BOUNCE_ENABLED}
    # The name of the request parameter which will be used on redirects checking that third-party
    # cookies work.
    name = "n3pc"
    name = ${?COLLECTOR_COOKIE_BOUNCE_NAME}
    # Network user id to fallback to when third-party cookies are blocked.
    fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000"
    fallbackNetworkUserId = ${?COLLECTOR_COOKIE_BOUNCE_FALLBACK_NETWORK_USER_ID}
    # Optionally, specify the name of the header containing the originating protocol for use in the
    # bounce redirect location. Use this if behind a load balancer that performs SSL termination.
    # The value of this header must be http or https. Example, if behind an AWS Classic ELB.
    forwardedProtocolHeader = "X-Forwarded-Proto"
    forwardedProtocolHeader = ${?COLLECTOR_COOKIE_BOUNCE_FORWARDED_PROTOCOL_HEADER}
  }

  # When enabled, redirect prefix `r/` will be enabled and its query parameters resolved.
  # Otherwise the request prefixed with `r/` will be dropped with `404 Not Found`
  # Custom redirects configured in `paths` can still be used.
  enableDefaultRedirect = false
  enableDefaultRedirect = ${?COLLECTOR_ALLOW_REDIRECTS}

  # When enabled, the redirect url passed via the `u` query parameter is scanned for a placeholder
  # token. All instances of that token are replaced withe the network ID. If the placeholder isn't
  # specified, the default value is `${SP_NUID}`.
  redirectMacro {
    enabled = false
    enabled = ${?COLLECTOR_REDIRECT_MACRO_ENABLED}
    # Optional custom placeholder token (defaults to the literal `${SP_NUID}`)
    placeholder = "[TOKEN]"
    placeholder = ${?COLLECTOR_REDIRECT_REDIRECT_MACRO_PLACEHOLDER}
  }

  # Customize response handling for requests for the root path ("/").
  # Useful if you need to redirect to web content or privacy policies regarding the use of this collector.
  rootResponse {
    enabled = false
    enabled = ${?COLLECTOR_ROOT_RESPONSE_ENABLED}
    statusCode = 302
    statusCode = ${?COLLECTOR_ROOT_RESPONSE_STATUS_CODE}
    # Optional, defaults to empty map
    headers = {
      Location = "https://127.0.0.1/",
      Location = ${?COLLECTOR_ROOT_RESPONSE_HEADERS_LOCATION},
      X-Custom = "something"
    }
    # Optional, defaults to empty string
    body = "302, redirecting"
    body = ${?COLLECTOR_ROOT_RESPONSE_BODY}
  }

  # Configuration related to CORS preflight requests
  cors {
    # The Access-Control-Max-Age response header indicates how long the results of a preflight
    # request can be cached. -1 seconds disables the cache. Chromium max is 10m, Firefox is 24h.
    accessControlMaxAge = 5 seconds
    accessControlMaxAge = ${?COLLECTOR_CORS_ACCESS_CONTROL_MAX_AGE}
  }

  # Configuration of prometheus http metrics
  prometheusMetrics {
    # If metrics are enabled then all requests will be logged as prometheus metrics
    # and '/metrics' endpoint will return the report about the requests
    enabled = false
    # Custom buckets for http_request_duration_seconds_bucket duration metric
    #durationBucketsInSeconds = [0.1, 3, 10]
  }

  streams {
    # Events which have successfully been collected will be stored in the good stream/topic
    good = GOOD_STREAM
    good = ${?COLLECTOR_STREAMS_GOOD}

    # Events that are too big (w.r.t Kinesis 1MB limit) will be stored in the bad stream/topic
    bad = BAD_STREAM
    bad = ${?COLLECTOR_STREAMS_BAD}

    # Whether to use the incoming event's ip as the partition key for the good stream/topic
    # Note: Nsq does not make use of partition key.
    useIpAddressAsPartitionKey = false
    useIpAddressAsPartitionKey = ${?COLLECTOR_STREAMS_USE_IP_ADDRESS_AS_PARTITION_KEY}

    # Enable the chosen sink by uncommenting the appropriate configuration
    sink {
      # Choose between kinesis, google-pub-sub, kafka, nsq, or stdout.
      # To use stdout, comment or remove everything in the "collector.streams.sink" section except
      # "enabled" which should be set to "stdout".
      enabled = kafka
      enabled = ${?COLLECTOR_STREAMS_SINK_ENABLED}

      # Region where the streams are located
      #region = {{kinesisRegion}}
      #region = ${?COLLECTOR_STREAMS_SINK_REGION}

      ## Optional endpoint url configuration to override aws kinesis endpoints,
      ## this can be used to specify local endpoints when using localstack
      # customEndpoint = {{kinesisEndpoint}}
      # customEndpoint = ${?COLLECTOR_STREAMS_SINK_CUSTOM_ENDPOINT}

      # Thread pool size for Kinesis API requests
      #threadPoolSize = 10
      #threadPoolSize = ${?COLLECTOR_STREAMS_SINK_THREAD_POOL_SIZE}

      # Optional SQS buffer for good and bad events (respectively).
      # When messages can't be sent to Kinesiss, they will be sent to SQS.
      # If not configured, sending to Kinesis will be retried. 
      #sqsGoodBuffer = {{sqsGoodBuffer}}
      #sqsGoodBuffer = ${?COLLECTOR_STREAMS_SINK_SQS_GOOD_BUFFER}

      #sqsBadBuffer = {{sqsBadBuffer}}
      #sqsBadBuffer = ${?COLLECTOR_STREAMS_SINK_SQS_BAD_BUFFER}

      # The following are used to authenticate for the Amazon Kinesis sink.
      # If both are set to 'default', the default provider chain is used
      # (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
      # If both are set to 'iam', use AWS IAM Roles to provision credentials.
      # If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
      #aws {
      #  accessKey = iam
      #  accessKey = ${?COLLECTOR_STREAMS_SINK_AWS_ACCESS_KEY}
      #  secretKey = iam
      #  secretKey = ${?COLLECTOR_STREAMS_SINK_AWS_SECRET_KEY}
      #}

      # Minimum and maximum backoff periods, in milliseconds
      #backoffPolicy {
      #  minBackoff = {{minBackoffMillis}}
      #  minBackoff = ${?COLLECTOR_STREAMS_SINK_MIN_BACKOFF}
      #  maxBackoff = {{maxBackoffMillis}}
      #  maxBackoff = ${?COLLECTOR_STREAMS_SINK_MAX_BACKOFF}
      #}

      # Or Google Pubsub
      #googleProjectId = ID
      ## Minimum, maximum and total backoff periods, in milliseconds
      ## and multiplier between two backoff
      #backoffPolicy {
      #  minBackoff = {{minBackoffMillis}}
      #  maxBackoff = {{maxBackoffMillis}}
      #  totalBackoff = {{totalBackoffMillis}} # must be >= 10000
      #  multiplier = {{backoffMultiplier}}
      #}

      # Or Kafka
      brokers = "localhost:9092"
      ## Number of retries to perform before giving up on sending a record
      retries = 10
      # The kafka producer has a variety of possible configuration options defined at
      # https://kafka.apache.org/documentation/#producerconfigs
      # Some values are set to other values from this config by default:
      "bootstrap.servers" = brokers
      "buffer.memory"     = buffer.byteLimit
      "linger.ms"         = buffer.timeLimit
      producerConf {
        acks = all
        "key.serializer"     = "org.apache.kafka.common.serialization.StringSerializer"
        "value.serializer"   = "org.apache.kafka.common.serialization.StringSerializer"
      }

      # Or NSQ
      ## Host name for nsqd
      #host = "{{nsqHost}}"
      ## TCP port for nsqd, 4150 by default
      #port = {{nsqdPort}}
    }

    # Incoming events are stored in a buffer before being sent to Kinesis/Kafka.
    # Note: Buffering is not supported by NSQ.
    # The buffer is emptied whenever:
    # - the number of stored records reaches record-limit or
    # - the combined size of the stored records reaches byte-limit or
    # - the time in milliseconds since the buffer was last emptied reaches time-limit
    buffer {
      byteLimit = 2000
      byteLimit = ${?COLLECTOR_STREAMS_BUFFER_BYTE_LIMIT}
      recordLimit = 1000
      recordLimit = ${?COLLECTOR_STREAMS_BUFFER_RECORD_LIMIT}
      timeLimit = 200
      timeLimit = ${?COLLECTOR_STREAMS_BUFFER_TIME_LIMIT}
    }
  }

}

# Akka has a variety of possible configuration options defined at
# http://doc.akka.io/docs/akka/current/scala/general/configuration.html
akka {
  loglevel = DEBUG # 'OFF' for no logging, 'DEBUG' for all logging.
  loglevel = ${?AKKA_LOGLEVEL}
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  loggers = [${?AKKA_LOGGERS}]

  # akka-http is the server the Stream collector uses and has configurable options defined at
  # http://doc.akka.io/docs/akka-http/current/scala/http/configuration.html
  http.server {
    # To obtain the hostname in the collector, the 'remote-address' header
    # should be set. By default, this is disabled, and enabling it
    # adds the 'Remote-Address' header to every request automatically.
    remote-address-header = on
    remote-address-header = ${?AKKA_HTTP_SERVER_REMOTE_ADDRESS_HEADER}

    raw-request-uri-header = on
    raw-request-uri-header = ${?AKKA_HTTP_SERVER_RAW_REQUEST_URI_HEADER}

    # Define the maximum request length (the default is 2048)
    parsing {
      max-uri-length = 32768
      max-uri-length = ${?AKKA_HTTP_SERVER_PARSING_MAX_URI_LENGTH}
      uri-parsing-mode = relaxed
      uri-parsing-mode = ${?AKKA_HTTP_SERVER_PARSING_URI_PARSING_MODE}
    }
  }

  # By default setting `collector.ssl` relies on JSSE (Java Secure Socket
  # Extension) to enable secure communication.
  # To override the default settings set the following section as per
  # https://lightbend.github.io/ssl-config/ExampleSSLConfig.html
  # ssl-config {
  #   debug = {
  #     ssl = true
  #   }
  #   keyManager = {
  #     stores = [
  #       {type = "PKCS12", classpath = false, path = "/etc/ssl/mycert.p12", password = "mypassword" }
  #     ]
  #   }
  #   loose {
  #     disableHostnameVerification = false
  #   }
  # }
}


# Copyright (c) 2013-2020 Snowplow Analytics Ltd. All rights reserved.
#
# This program is licensed to you under the Apache License Version 2.0, and
# you may not use this file except in compliance with the Apache License
# Version 2.0.  You may obtain a copy of the Apache License Version 2.0 at
# http://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the Apache License Version 2.0 is distributed on an "AS
# IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.  See the Apache License Version 2.0 for the specific language
# governing permissions and limitations there under.

# This file (config.hocon.sample) contains a template with
# configuration options for the Scala Stream Collector.
#
# To use, copy this to 'application.conf' and modify the configuration options.

# 'collector' contains configuration options for the main Scala collector.
collector {
  # The collector runs as a web service specified on the following interface and port.
  interface = "0.0.0.0"
  interface = ${?COLLECTOR_INTERFACE}
  port = "8092"
  port = ${?COLLECTOR_PORT}

  # optional SSL/TLS configuration
  ssl {
    enable = false
    enable = ${?COLLECTOR_SSL}
    # whether to redirect HTTP to HTTPS
    redirect = false
    redirect = ${?COLLECTOR_SSL_REDIRECT}
    port = 9543
    port = ${?COLLECTOR_SSL_PORT}
  }

  # The collector responds with a cookie to requests with a path that matches the 'vendor/version' protocol.
  # The expected values are:
  # - com.snowplowanalytics.snowplow/tp2 for Tracker Protocol 2
  # - r/tp2 for redirects
  # - com.snowplowanalytics.iglu/v1 for the Iglu Webhook
  # Any path that matches the 'vendor/version' protocol will result in a cookie response, for use by custom webhooks
  # downstream of the collector.
  # But you can also map any valid (i.e. two-segment) path to one of the three defaults.
  # Your custom path must be the key and the value must be one of the corresponding default paths. Both must be full
  # valid paths starting with a leading slash.
  # Pass in an empty map to avoid mapping.
  paths {
    # "/com.acme/track" = "/com.snowplowanalytics.snowplow/tp2"
    # "/com.acme/redirect" = "/r/tp2"
    # "/com.acme/iglu" = "/com.snowplowanalytics.iglu/v1"
  }

  # Configure the P3P policy header.
  p3p {
    policyRef = "/w3c/p3p.xml"
    CP = "NOI DSP COR NID PSA OUR IND COM NAV STA"
  }

  # Cross domain policy configuration.
  # If "enabled" is set to "false", the collector will respond with a 404 to the /crossdomain.xml
  # route.
  crossDomain {
    enabled = false
    # Domains that are granted access, *.acme.com will match http://acme.com and http://sub.acme.com
    enabled = ${?COLLECTOR_CROSS_DOMAIN_ENABLED}
    domains = [ "*" ]
    domains = [ ${?COLLECTOR_CROSS_DOMAIN_DOMAIN} ]
    # Whether to only grant access to HTTPS or both HTTPS and HTTP sources
    secure = true
    secure = ${?COLLECTOR_CROSS_DOMAIN_SECURE}
  }

  # The collector returns a cookie to clients for user identification
  # with the following domain and expiration.
  cookie {
    enabled = true
    enabled = ${?COLLECTOR_COOKIE_ENABLED}
    expiration = "365 days" # e.g. "365 days"
    expiration = ${?COLLECTOR_COOKIE_EXPIRATION}
    # Network cookie name
    name = sp
    name = ${?COLLECTOR_COOKIE_NAME}
    # The domain is optional and will make the cookie accessible to other
    # applications on the domain. Comment out these lines to tie cookies to
    # the collector's full domain.
    # The domain is determined by matching the domains from the Origin header of the request
    # to the list below. The first match is used. If no matches are found, the fallback domain will be used,
    # if configured.
    # If you specify a main domain, all subdomains on it will be matched.
    # If you specify a subdomain, only that subdomain will be matched.
    # Examples:
    # domain.com will match domain.com, www.domain.com and secure.client.domain.com
    # client.domain.com will match secure.client.domain.com but not domain.com or www.domain.com
    domains = [
        "{{cookieDomain1}}" # e.g. "domain.com" -> any origin domain ending with this will be matched and domain.com will be returned
        "{{cookieDomain2}}" # e.g. "secure.anotherdomain.com" -> any origin domain ending with this will be matched and secure.anotherdomain.com will be returned
        # ... more domains
    ]
    domains += ${?COLLECTOR_COOKIE_DOMAIN_1}
    domains += ${?COLLECTOR_COOKIE_DOMAIN_2}
    # ... more domains
    # If specified, the fallback domain will be used if none of the Origin header hosts matches the list of
    # cookie domains configured above. (For example, if there is no Origin header.)
    fallbackDomain = "{{fallbackDomain}}"
    fallbackDomain = ${?FALLBACK_DOMAIN}
    secure = false
    secure = ${?COLLECTOR_COOKIE_SECURE}
    httpOnly = false
    httpOnly = ${?COLLECTOR_COOKIE_HTTP_ONLY}
    # The sameSite is optional. You can choose to not specify the attribute, or you can use `Strict`,
    # `Lax` or `None` to limit the cookie sent context.
    #   Strict: the cookie will only be sent along with "same-site" requests.
    #   Lax: the cookie will be sent with same-site requests, and with cross-site top-level navigation.
    #   None: the cookie will be sent with same-site and cross-site requests.
    sameSite = "{{cookieSameSite}}"
    sameSite = ${?COLLECTOR_COOKIE_SAME_SITE}
  }

  # If you have a do not track cookie in place, the Scala Stream Collector can respect it by
  # completely bypassing the processing of an incoming request carrying this cookie, the collector
  # will simply reply by a 200 saying "do not track".
  # The cookie name and value must match the configuration below, where the names of the cookies must
  # match entirely and the value could be a regular expression.
  doNotTrackCookie {
    enabled = false
    enabled = ${?COLLECTOR_DO_NOT_TRACK_COOKIE_ENABLED}
    name = no_track_cookie_name
    name = ${?COLLECTOR_DO_NOT_TRACK_COOKIE_NAME}
    value = no_track_cookie_value
    value = ${?COLLECTOR_DO_NOT_TRACK_COOKIE_VALUE}
  }

  # When enabled and the cookie specified above is missing, performs a redirect to itself to check
  # if third-party cookies are blocked using the specified name. If they are indeed blocked,
  # fallbackNetworkId is used instead of generating a new random one.
  cookieBounce {
    enabled = false
    enabled = ${?COLLECTOR_COOKIE_BOUNCE_ENABLED}
    # The name of the request parameter which will be used on redirects checking that third-party
    # cookies work.
    name = "n3pc"
    name = ${?COLLECTOR_COOKIE_BOUNCE_NAME}
    # Network user id to fallback to when third-party cookies are blocked.
    fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000"
    fallbackNetworkUserId = ${?COLLECTOR_COOKIE_BOUNCE_FALLBACK_NETWORK_USER_ID}
    # Optionally, specify the name of the header containing the originating protocol for use in the
    # bounce redirect location. Use this if behind a load balancer that performs SSL termination.
    # The value of this header must be http or https. Example, if behind an AWS Classic ELB.
    forwardedProtocolHeader = "X-Forwarded-Proto"
    forwardedProtocolHeader = ${?COLLECTOR_COOKIE_BOUNCE_FORWARDED_PROTOCOL_HEADER}
  }

  # When enabled, redirect prefix `r/` will be enabled and its query parameters resolved.
  # Otherwise the request prefixed with `r/` will be dropped with `404 Not Found`
  # Custom redirects configured in `paths` can still be used.
  enableDefaultRedirect = false
  enableDefaultRedirect = ${?COLLECTOR_ALLOW_REDIRECTS}

  # When enabled, the redirect url passed via the `u` query parameter is scanned for a placeholder
  # token. All instances of that token are replaced withe the network ID. If the placeholder isn't
  # specified, the default value is `${SP_NUID}`.
  redirectMacro {
    enabled = false
    enabled = ${?COLLECTOR_REDIRECT_MACRO_ENABLED}
    # Optional custom placeholder token (defaults to the literal `${SP_NUID}`)
    placeholder = "[TOKEN]"
    placeholder = ${?COLLECTOR_REDIRECT_REDIRECT_MACRO_PLACEHOLDER}
  }

  # Customize response handling for requests for the root path ("/").
  # Useful if you need to redirect to web content or privacy policies regarding the use of this collector.
  rootResponse {
    enabled = false
    enabled = ${?COLLECTOR_ROOT_RESPONSE_ENABLED}
    statusCode = 302
    statusCode = ${?COLLECTOR_ROOT_RESPONSE_STATUS_CODE}
    # Optional, defaults to empty map
    headers = {
      Location = "https://127.0.0.1/",
      Location = ${?COLLECTOR_ROOT_RESPONSE_HEADERS_LOCATION},
      X-Custom = "something"
    }
    # Optional, defaults to empty string
    body = "302, redirecting"
    body = ${?COLLECTOR_ROOT_RESPONSE_BODY}
  }

  # Configuration related to CORS preflight requests
  cors {
    # The Access-Control-Max-Age response header indicates how long the results of a preflight
    # request can be cached. -1 seconds disables the cache. Chromium max is 10m, Firefox is 24h.
    accessControlMaxAge = 5 seconds
    accessControlMaxAge = ${?COLLECTOR_CORS_ACCESS_CONTROL_MAX_AGE}
  }

  # Configuration of prometheus http metrics
  prometheusMetrics {
    # If metrics are enabled then all requests will be logged as prometheus metrics
    # and '/metrics' endpoint will return the report about the requests
    enabled = false
    # Custom buckets for http_request_duration_seconds_bucket duration metric
    #durationBucketsInSeconds = [0.1, 3, 10]
  }

  streams {
    # Events which have successfully been collected will be stored in the good stream/topic
    good = GOOD_STREAM
    good = ${?COLLECTOR_STREAMS_GOOD}

    # Events that are too big (w.r.t Kinesis 1MB limit) will be stored in the bad stream/topic
    bad = BAD_STREAM
    bad = ${?COLLECTOR_STREAMS_BAD}

    # Whether to use the incoming event's ip as the partition key for the good stream/topic
    # Note: Nsq does not make use of partition key.
    useIpAddressAsPartitionKey = false
    useIpAddressAsPartitionKey = ${?COLLECTOR_STREAMS_USE_IP_ADDRESS_AS_PARTITION_KEY}

    # Enable the chosen sink by uncommenting the appropriate configuration
    sink {
      # Choose between kinesis, google-pub-sub, kafka, nsq, or stdout.
      # To use stdout, comment or remove everything in the "collector.streams.sink" section except
      # "enabled" which should be set to "stdout".
      enabled = kafka
      enabled = ${?COLLECTOR_STREAMS_SINK_ENABLED}

      # Region where the streams are located
      #region = {{kinesisRegion}}
      #region = ${?COLLECTOR_STREAMS_SINK_REGION}

      ## Optional endpoint url configuration to override aws kinesis endpoints,
      ## this can be used to specify local endpoints when using localstack
      # customEndpoint = {{kinesisEndpoint}}
      # customEndpoint = ${?COLLECTOR_STREAMS_SINK_CUSTOM_ENDPOINT}

      # Thread pool size for Kinesis API requests
      #threadPoolSize = 10
      #threadPoolSize = ${?COLLECTOR_STREAMS_SINK_THREAD_POOL_SIZE}

      # Optional SQS buffer for good and bad events (respectively).
      # When messages can't be sent to Kinesiss, they will be sent to SQS.
      # If not configured, sending to Kinesis will be retried. 
      #sqsGoodBuffer = {{sqsGoodBuffer}}
      #sqsGoodBuffer = ${?COLLECTOR_STREAMS_SINK_SQS_GOOD_BUFFER}

      #sqsBadBuffer = {{sqsBadBuffer}}
      #sqsBadBuffer = ${?COLLECTOR_STREAMS_SINK_SQS_BAD_BUFFER}

      # The following are used to authenticate for the Amazon Kinesis sink.
      # If both are set to 'default', the default provider chain is used
      # (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
      # If both are set to 'iam', use AWS IAM Roles to provision credentials.
      # If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
      #aws {
      #  accessKey = iam
      #  accessKey = ${?COLLECTOR_STREAMS_SINK_AWS_ACCESS_KEY}
      #  secretKey = iam
      #  secretKey = ${?COLLECTOR_STREAMS_SINK_AWS_SECRET_KEY}
      #}

      # Minimum and maximum backoff periods, in milliseconds
      #backoffPolicy {
      #  minBackoff = {{minBackoffMillis}}
      #  minBackoff = ${?COLLECTOR_STREAMS_SINK_MIN_BACKOFF}
      #  maxBackoff = {{maxBackoffMillis}}
      #  maxBackoff = ${?COLLECTOR_STREAMS_SINK_MAX_BACKOFF}
      #}

      # Or Google Pubsub
      #googleProjectId = ID
      ## Minimum, maximum and total backoff periods, in milliseconds
      ## and multiplier between two backoff
      #backoffPolicy {
      #  minBackoff = {{minBackoffMillis}}
      #  maxBackoff = {{maxBackoffMillis}}
      #  totalBackoff = {{totalBackoffMillis}} # must be >= 10000
      #  multiplier = {{backoffMultiplier}}
      #}

      # Or Kafka
      brokers = "localhost:9092"
      ## Number of retries to perform before giving up on sending a record
      retries = 10
      # The kafka producer has a variety of possible configuration options defined at
      # https://kafka.apache.org/documentation/#producerconfigs
      # Some values are set to other values from this config by default:
      "bootstrap.servers" = brokers
      "buffer.memory"     = buffer.byteLimit
      "linger.ms"         = buffer.timeLimit
      producerConf {
        acks = all
        "key.serializer"     = "org.apache.kafka.common.serialization.StringSerializer"
        "value.serializer"   = "org.apache.kafka.common.serialization.StringSerializer"
      }

      # Or NSQ
      ## Host name for nsqd
      #host = "{{nsqHost}}"
      ## TCP port for nsqd, 4150 by default
      #port = {{nsqdPort}}
    }

    # Incoming events are stored in a buffer before being sent to Kinesis/Kafka.
    # Note: Buffering is not supported by NSQ.
    # The buffer is emptied whenever:
    # - the number of stored records reaches record-limit or
    # - the combined size of the stored records reaches byte-limit or
    # - the time in milliseconds since the buffer was last emptied reaches time-limit
    buffer {
      byteLimit = 2000
      byteLimit = ${?COLLECTOR_STREAMS_BUFFER_BYTE_LIMIT}
      recordLimit = 1000
      recordLimit = ${?COLLECTOR_STREAMS_BUFFER_RECORD_LIMIT}
      timeLimit = 200
      timeLimit = ${?COLLECTOR_STREAMS_BUFFER_TIME_LIMIT}
    }
  }

}

# Akka has a variety of possible configuration options defined at
# http://doc.akka.io/docs/akka/current/scala/general/configuration.html
akka {
  loglevel = DEBUG # 'OFF' for no logging, 'DEBUG' for all logging.
  loglevel = ${?AKKA_LOGLEVEL}
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  loggers = [${?AKKA_LOGGERS}]

  # akka-http is the server the Stream collector uses and has configurable options defined at
  # http://doc.akka.io/docs/akka-http/current/scala/http/configuration.html
  http.server {
    # To obtain the hostname in the collector, the 'remote-address' header
    # should be set. By default, this is disabled, and enabling it
    # adds the 'Remote-Address' header to every request automatically.
    remote-address-header = on
    remote-address-header = ${?AKKA_HTTP_SERVER_REMOTE_ADDRESS_HEADER}

    raw-request-uri-header = on
    raw-request-uri-header = ${?AKKA_HTTP_SERVER_RAW_REQUEST_URI_HEADER}

    # Define the maximum request length (the default is 2048)
    parsing {
      max-uri-length = 32768
      max-uri-length = ${?AKKA_HTTP_SERVER_PARSING_MAX_URI_LENGTH}
      uri-parsing-mode = relaxed
      uri-parsing-mode = ${?AKKA_HTTP_SERVER_PARSING_URI_PARSING_MODE}
    }
  }

  # By default setting `collector.ssl` relies on JSSE (Java Secure Socket
  # Extension) to enable secure communication.
  # To override the default settings set the following section as per
  # https://lightbend.github.io/ssl-config/ExampleSSLConfig.html
  # ssl-config {
  #   debug = {
  #     ssl = true
  #   }
  #   keyManager = {
  #     stores = [
  #       {type = "PKCS12", classpath = false, path = "/etc/ssl/mycert.p12", password = "mypassword" }
  #     ]
  #   }
  #   loose {
  #     disableHostnameVerification = false
  #   }
  # }
}

Do you have an error messages you can share with us?

We typically only support running on cloud platforms, AWS or GCP, as it is quite hard to support local environments. Have you tried follow the guide here: https://docs.snowplowanalytics.com/docs/setup-snowplow-on-aws/setup-the-snowplow-collector/overview-of-the-scala-stream-collector/

Hi,

I am responding from another account. Sorry for the inconvenience (my other computer has some issues).

Yes I went through the set up of scala collector twice but seems like the tracking code doesn’t forward events to kafka topics. The network shows a get request being made, which i think means it it sending some request to the end point. A post request is always blocked, hence i had to add post = false in the js tracking code.

I am not sure why it should be difficult to test out in a local environment. Can you throw some more light?

Do you get 200 OK responses back from your collector when the tracking fires? Can you check the /health endpoint on the collector looks ok?

Are you getting any logging information from the collector that might give a clue if it’s failing? Also, above it seems like your configuration is pasted twice, worth checking it isn’t duplicated in your configuration file too. There is nothing glaringly wrong in the configuration above.

I made a comment about running locally, as Snowplow has been designed as cloud first and running in a nice clean cloud environment (or docker container) removes any oddities that often exist on local development machines.

There are two places where i provide URLs on the tracker js snippet

;(function(p,l,o,w,i,n,g){if(!p[i])     {p.GlobalSnowplowNamespace=p.GlobalSnowplowNamespace||[]; p.GlobalSnowplowNamespace.push(i);p[i]=function(){(p[i].q=p[i].q||[]).push(arguments) };p[i].q=p[i].q||[];n=l.createElement(o);g=l.getElementsByTagName(o)[0];n.async=1; n.src=w;g.parentNode.insertBefore(n,g)}}(window,document,"script","http://<sp.js filepath>","snowplow")); **-> This loads up fine and shows the content of the entire sp.js file**
window.snowplow('newTracker', 'cf', 'http://<my global ip>:8092', {

appId: ‘{{MY-SITE-ID}}’,
cookieDomain: ‘{{MY-COOKIE-DOMAIN}}’,
post: true // Use POST rather than GET
});

window.snowplow(‘trackPageView’);

The serving of the static sp.js file works fine and the health of the endpoint which is my global ip:8092 gives 200 OK.

Then there is a follow up post request that is blocked:
http://http//global ip:8092/com.snowplowanalytics.snowplow/tp2

If I change the param in the tracker file to post=false, the follow up call is a get request

http://http//global ip:8092/i?stm=1603252030631&e=pv&url=http%3A%2F%2Fwww.w3sigma.com%2F&page=Color%20Strokes%20%E2%80%93%20Paint%20with%20me%20and%20get%20handcrafted%20paintings&tv=js-2.16.2&tna=cf&aid=%7B%7BMY-SITE-ID%7D%7D&p=web&tz=my tz&lang=en-US&cs=UTF-8&res=1920x1080&cd=24&cookie=1&eid=ea0b7c7e-3b44-45f6-b45e-4551ad355bfb&dtm=1603217779275&vp=1853x701&ds=1853x701&vid=1&sid=e3e5f5b0-450b-40d8-b8f7-a73fcec5fdb5&duid=571515bb-f963-4b53-a0a3-6b71e5c05ff9
stm 1603252030631
e pv
url my site url
page Page Title
tv js-2.16.2
tna cf
aid {{MY-SITE-ID}}
p web
tz my time zone
lang en-US
cs UTF-8
res 1920x1080
cd 24
cookie 1
eid ea0b7c7e-3b44-45f6-b45e-4551ad355bfb
dtm 1603217779275
vp 1853x701
ds 1853x701
vid 1
sid e3e5f5b0-450b-40d8-b8f7-a73fcec5fdb5
duid 571515bb-f963-4b53-a0a3-6b71e5c05ff9

My local kafka instance is running on localhost:9092 (I have tested this kafka instance multiple times and this works fine). I have created the GOOD_TOPIC and BAD_TOPIC as per documentation prior to running the collector

Do I need to modify anything on the endpoint? It seems like there is something specific i need to do for the end point.

I am facing exactly same issue. If you solve it please reach me at : okan@analyticahouse.com

Thanks for your attention

The collector endpoint in your tracker should omit the http protocol. If you look closely, the endpoint you’re sending events to isn’t legitimate:

Ok. Checkpoint reached. Can see byte formatted messages pushed to kafka GOOD_STREAM
Following changes worked

  1. Was using a global ip but should have used the docker exposed ip
  2. kafka serializer should be byte (since the default format is thrift and string serialization won’t work)
  3. Removed http:// as mentioned by Colm (Thanks!)
  4. POST messagae works

Not bad for a 3 day work to get to know snowplow and make the collector checkpoint… Onto enrichment … !!!

Quick question to the wonderful team… is there any other format supported other than thrift? Also how do i decode this schema : https://github.com/snowplow/iglu-central/blob/master/schemas/com.snowplowanalytics.snowplow/payload_data/jsonschema/1-0-4 (I mean how do i know what each key represents and the enums associated)
Thanks a lot anyways.

No there are no alternatives, Thrift is what the Collector writes. The Enrich application will read this and then produce a TSV into a good topic, this is what you want to be reading rather than what the collector outputs.

Then you can use the Analytics SDKs to parse the TSV into JSON.

Cool. I was able to enrich the stream. Although I am now struggling to understand the output. There are 3 or 4 schema definitions urls given within the message, but i couldn’t find any reference for the actual data.

This is the iglu resolver i am using:
{ "schema": "iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-4", "data": { "cacheSize": 500, "repositories": [ { "name": "Iglu Central", "priority": 0, "vendorPrefixes": [ "com.snowplowanalytics" ], "connection": { "http": { "uri": "http://iglucentral.com" } } }, { "name": "Iglu Central - GCP Mirror", "priority": 1, "vendorPrefixes": [ "com.snowplowanalytics" ], "connection": { "http": { "uri": "http://mirror01.iglucentral.com" } } } ] } }
And the following is one of the enriched messages.

[{"topic":"GOOD_STREAM_ENRICHED","partition":0,"offset":23,"timestamp":1603468799204,"timestampType":"CREATE_TIME","headers":[],"key":"3b16e419-7f5b-4c12-9deb-02f5c455155f","value":"{{MY-SITE-ID}}\tweb\t2020-10-23 15:59:59.197\t2020-10-23 15:59:58.994\t2020-10-23 15:59:58.979\tpage_view\t3b16e419-7f5b-4c12-9deb-02f5c455155f\t\tsp\tjs-2.16.2\tssc-1.0.0-kafka\tstream-enrich-1.4.0-common-1.4.0\t\t192.168.0.184\t\tfa5fe7d1-7276-4a5d-b1d5-65eecebe2721\t1\te01600c1-d336-474b-8968-048a0d5dacd5\t\t\t\t\t\t\t\t\t\t\t\thttp://my-site/\tmy-site-page-heading\thttp://my-site/\thttp\tmy-site\t80\t/\t\t\thttp\tmy-site\t80\t/\t\t\t\t\t\t\t\t\t\t\t{\"schema\":\"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0\",\"data\":[{\"schema\":\"iglu:com.snowplowanalytics.snowplow/web_page/jsonschema/1-0-0\",\"data\":{\"id\":\"58548b84-318f-4c39-abfc-731440c5acd3\"}}]}\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\tMozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:82.0) Gecko/20100101 Firefox/82.0\t\t\t\t\t\ten-US\t\t\t\t\t\t\t\t\t\t1\t24\t1853\t974\t\t\t\tmy-tz\t\t\t1920\t1080\tUTF-8\t1853\t974\t\t\t\t\t\t\t\t\t\t\t\t2020-10-23 15:59:58.981\t\t\t\t2f878cfe-ccc6-4e93-a3fc-2bac0fd12857\t2020-10-23 15:59:58.992\tcom.snowplowanalytics.snowplow\tpage_view\tjsonschema\t1-0-0\t\t"}]

Any inputs on how can i decode the message?

^^ That’ll be the easiest way to parse the data if you’re handling it yourself.

If you’re looking to parse it in order to get it into a storage target, the best way is to use one of our loaders.

@Colm, Is there any way i can understand the enriched stream myself before using the sdk?

Apologies, I’m not quite sure I understand your question, but let me try to explain what I’m guessing is the missing information:

If you want to know what the format of the event is, it’s a tab separated string, where some of the fields are self-describing JSON. This is the format that’s most compatible with the Snowplow loaders, for loading to warehouse.

The SDKs turn that format into a JSON. They exist because the TSV format is a nightmare to work with if you’re trying to parse the data yourself and do something with it. So if the aim is to understand the content of your events, (and you’re not loading to warehouse or blob storage), then I’d recommend using the SDK to avoid a frustrating experience.

Another option is to spin up a Snowplow Mini instance and view the data using the Elasticsearch Kibana UI.

Yes, my dev environment is only based on java hence i would like to implement a converter myself. I don’t think there is a java sdk available?

You can use the Scala SDK in your Java applications to do the conversion.

As a simple guide:

Assuming you’re using Maven, add this to your pom.xml (you can also use Gradle):

    <dependency>
      <groupId>com.snowplowanalytics</groupId>
      <artifactId>snowplow-scala-analytics-sdk_2.12</artifactId>
      <version>2.0.1</version>
    </dependency>

Then some imports in your .java file:

import cats.data.Validated;
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event;
import com.snowplowanalytics.snowplow.analytics.scalasdk.ParsingError;

And to parse the Event string (which I’ve called eventTSVString in this example):

Validated<ParsingError, Event> validatedEvent = Event.parse(eventTSVString);
if (validatedEvent.isValid()) {
    Event ev = validatedEvent.toOption().get();
    String jsonString = ev.toJson(false); // Or true if you want lossy output

    .....

}
1 Like

Thanks Paul. I get the following Error.

java.lang.NoSuchMethodError: scala.collection.immutable.List$.iterableFactory()Lscala/collection/Factory;
at com.snowplowanalytics.snowplow.analytics.scalasdk.Event$.<clinit>(Event.scala:248)
at com.snowplowanalytics.snowplow.analytics.scalasdk.Event.parse(Event.scala)

Code

        DataStream<String> out = stream.map(new MapFunction<String, String>() {
        @Override
        public String map(String s) throws Exception {
            Validated<ParsingError, Event> validatedEvent = Event.parse(s); // **Line of Error**
            Json jsonString = null;
            if(validatedEvent.isValid()) {
                Event ev = validatedEvent.toOption().get();
                jsonString = ev.toJson(false);
            }

            return jsonString.toString();
        }
    });

Are you packaging a fat jar? You’ll need to ensure all the dependencies are bundled.

Assuming you’re using Maven, a simple plugin for this that you could try is onejar-maven-plugin, you simply add this plugin to your pom.xml <plugins> section:

      <plugin>
        <groupId>com.jolira</groupId>
        <artifactId>onejar-maven-plugin</artifactId>
        <version>1.4.4</version>
        <executions>
          <execution>
          <goals>
            <goal>one-jar</goal>
          </goals>
          </execution>
        </executions>
      </plugin>

or perhaps better yet stick with official apache plugins and replace maven-jar-plugin with the maven-shade-plugin (change mainClass below):

<plugin>
  <groupId>org.apache.maven.plugins</groupId>
  <artifactId>maven-shade-plugin</artifactId>
  <version>2.3</version>
  <executions>
    <execution>
      <phase>package</phase>
      <goals>
        <goal>shade</goal>
      </goals>
      <configuration>
        <transformers>
          <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
            <mainClass>com.example.App</mainClass>
          </transformer>
        </transformers>
      </configuration>
    </execution>
  </executions>
</plugin>

Actually I am not packaging a fat jar (and I dont think i should).

I tried to incorporate the above change but doesnt seem to affect anything.

The above change will only be applicable if packaging a fat jar.

Basically, you’ll need to ensure the Scala libraries that the analytics sdk uses are available for your application to use and how to do that can vary depending on how you’re running it.

You might want to try adding the same dependencies that the Scala Analytics SDK uses to your pom.xml as well as the scala library itself.

Yes. Having gone through the scala sdk code, I think it will take me inordinate amount of effort of even trying to implement a small (limited event size) parser in java.

Would have been great if i could have used this sdk.

Just going through what could cause this error: seems like most likely this may be a version isseu (https://stackoverflow.com/questions/35186/how-do-i-fix-a-nosuchmethoderror)

While starting up enrich I am using
“schema”: “iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-1”,

in the iglu resolver. Should i be checking inside the sdk configs to see if there are no mismatches here?