Unable to deploy Kinesis Scala collector with Elastic Beanstalk

Hi team,

I am completely new at setting up the open-source of Snowplow. I tried to deploy the scala collector on AWS with Elastic Beanstalk. I uploaded a zip file with the Procfile, collector jar and the config file in it. I got the following error:

Executing: /usr/bin/unzip -o -d /var/app/staging /opt/elasticbeanstalk/deploy/appsource/source_bundle
  Archive:  /opt/elasticbeanstalk/deploy/appsource/source_bundle
     creating: /var/app/staging/EB_collector/
    inflating: /var/app/staging/EB_collector/optimy.conf  
    inflating: /var/app/staging/__MACOSX/EB_collector/._optimy.conf  
    inflating: /var/app/staging/EB_collector/.DS_Store  
    inflating: /var/app/staging/__MACOSX/EB_collector/._.DS_Store  
    inflating: /var/app/staging/EB_collector/snowplow-stream-collector-kinesis-2.2.1.jar  
    inflating: /var/app/staging/EB_collector/Procfile  
  Unable to launch application as the source bundle does not contain either a file named application.jar or a Procfile.
  Unable to launch application as the source bundle does not contain either a file named application.jar or a Procfile. (ElasticBeanstalk::ExternalInvocationError)

I have the Procfile in the folder with web: java -jar snowplow-stream-collector-kinesis-2.2.1.jar --config optimy.conf in it.

I tried both Corretto 11 and Java 8 but they both give the same error.

I wonder if I did something wrong. Any help would be appreciated!!

This is my conf file.

collector {
  # The collector runs as a web service specified on the following interface and port.
  interface = "0.0.0.0"
  interface = ${?COLLECTOR_INTERFACE}
  port = 8080
  port = ${?COLLECTOR_PORT}

  # optional SSL/TLS configuration
  ssl {
    enable = false
    enable = ${?COLLECTOR_SSL}
    # whether to redirect HTTP to HTTPS
    redirect = false
    redirect = ${?COLLECTOR_SSL_REDIRECT}
    port = 9543
    port = ${?COLLECTOR_SSL_PORT}
  }

  # The collector responds with a cookie to requests with a path that matches the 'vendor/version' protocol.
  # The expected values are:
  # - com.snowplowanalytics.snowplow/tp2 for Tracker Protocol 2
  # - r/tp2 for redirects
  # - com.snowplowanalytics.iglu/v1 for the Iglu Webhook
  # Any path that matches the 'vendor/version' protocol will result in a cookie response, for use by custom webhooks
  # downstream of the collector.
  # But you can also map any valid (i.e. two-segment) path to one of the three defaults.
  # Your custom path must be the key and the value must be one of the corresponding default paths. Both must be full
  # valid paths starting with a leading slash.
  # Pass in an empty map to avoid mapping.
  paths {
    # "/com.acme/track" = "/com.snowplowanalytics.snowplow/tp2"
    # "/com.acme/redirect" = "/r/tp2"
    # "/com.acme/iglu" = "/com.snowplowanalytics.iglu/v1"
  }

  # Configure the P3P policy header.
  p3p {
    policyRef = "/w3c/p3p.xml"
    CP = "NOI DSP COR NID PSA OUR IND COM NAV STA"
  }

  # Cross domain policy configuration.
  # If "enabled" is set to "false", the collector will respond with a 404 to the /crossdomain.xml
  # route.
  crossDomain {
    enabled = false
    # Domains that are granted access, *.acme.com will match http://acme.com and http://sub.acme.com
    enabled = ${?COLLECTOR_CROSS_DOMAIN_ENABLED}
    domains = [ "*" ]
    domains = [ ${?COLLECTOR_CROSS_DOMAIN_DOMAIN} ]
    # Whether to only grant access to HTTPS or both HTTPS and HTTP sources
    secure = true
    secure = ${?COLLECTOR_CROSS_DOMAIN_SECURE}
  }

  # The collector returns a cookie to clients for user identification
  # with the following domain and expiration.
  cookie {
    enabled = true
    enabled = ${?COLLECTOR_COOKIE_ENABLED}
    expiration = "365 days" # e.g. "365 days"
    expiration = ${?COLLECTOR_COOKIE_EXPIRATION}
    # Network cookie name
    name = optimy_snowplow
    name = ${?COLLECTOR_COOKIE_NAME}
    # The domain is optional and will make the cookie accessible to other
    # applications on the domain. Comment out these lines to tie cookies to
    # the collector's full domain.
    # The domain is determined by matching the domains from the Origin header of the request
    # to the list below. The first match is used. If no matches are found, the fallback domain will be used,
    # if configured.
    # If you specify a main domain, all subdomains on it will be matched.
    # If you specify a subdomain, only that subdomain will be matched.
    # Examples:
    # domain.com will match domain.com, www.domain.com and secure.client.domain.com
    # client.domain.com will match secure.client.domain.com but not domain.com or www.domain.com
    domains = [
        "{{cookieDomain1}}" # e.g. "domain.com" -> any origin domain ending with this will be matched and domain.com will be returned
        "{{cookieDomain2}}" # e.g. "secure.anotherdomain.com" -> any origin domain ending with this will be matched and secure.anotherdomain.com will be returned
        # ... more domains
    ]
    domains += ${?COLLECTOR_COOKIE_DOMAIN_1}
    domains += ${?COLLECTOR_COOKIE_DOMAIN_2}
    # ... more domains
    # If specified, the fallback domain will be used if none of the Origin header hosts matches the list of
    # cookie domains configured above. (For example, if there is no Origin header.)
    fallbackDomain = "{{fallbackDomain}}"
    fallbackDomain = ${?FALLBACK_DOMAIN}
    secure = false
    secure = ${?COLLECTOR_COOKIE_SECURE}
    httpOnly = false
    httpOnly = ${?COLLECTOR_COOKIE_HTTP_ONLY}
    # The sameSite is optional. You can choose to not specify the attribute, or you can use `Strict`,
    # `Lax` or `None` to limit the cookie sent context.
    #   Strict: the cookie will only be sent along with "same-site" requests.
    #   Lax: the cookie will be sent with same-site requests, and with cross-site top-level navigation.
    #   None: the cookie will be sent with same-site and cross-site requests.
    sameSite = "{{cookieSameSite}}"
    sameSite = ${?COLLECTOR_COOKIE_SAME_SITE}
  }

  # If you have a do not track cookie in place, the Scala Stream Collector can respect it by
  # completely bypassing the processing of an incoming request carrying this cookie, the collector
  # will simply reply by a 200 saying "do not track".
  # The cookie name and value must match the configuration below, where the names of the cookies must
  # match entirely and the value could be a regular expression.
  doNotTrackCookie {
    enabled = false
    enabled = ${?COLLECTOR_DO_NOT_TRACK_COOKIE_ENABLED}
    name = "collector-do-not-track-cookie" #{{doNotTrackCookieName}}
    name = ${?COLLECTOR_DO_NOT_TRACK_COOKIE_NAME}
    value = "collector-do-not-track-cookie" #{{doNotTrackCookieValue}}
    value = ${?COLLECTOR_DO_NOT_TRACK_COOKIE_VALUE}
  }

  # When enabled and the cookie specified above is missing, performs a redirect to itself to check
  # if third-party cookies are blocked using the specified name. If they are indeed blocked,
  # fallbackNetworkId is used instead of generating a new random one.
  cookieBounce {
    enabled = false
    enabled = ${?COLLECTOR_COOKIE_BOUNCE_ENABLED}
    # The name of the request parameter which will be used on redirects checking that third-party
    # cookies work.
    name = "n3pc"
    name = ${?COLLECTOR_COOKIE_BOUNCE_NAME}
    # Network user id to fallback to when third-party cookies are blocked.
    fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000"
    fallbackNetworkUserId = ${?COLLECTOR_COOKIE_BOUNCE_FALLBACK_NETWORK_USER_ID}
    # Optionally, specify the name of the header containing the originating protocol for use in the
    # bounce redirect location. Use this if behind a load balancer that performs SSL termination.
    # The value of this header must be http or https. Example, if behind an AWS Classic ELB.
    forwardedProtocolHeader = "X-Forwarded-Proto"
    forwardedProtocolHeader = ${?COLLECTOR_COOKIE_BOUNCE_FORWARDED_PROTOCOL_HEADER}
  }

  # When enabled, redirect prefix `r/` will be enabled and its query parameters resolved.
  # Otherwise the request prefixed with `r/` will be dropped with `404 Not Found`
  # Custom redirects configured in `paths` can still be used.
  enableDefaultRedirect = false
  enableDefaultRedirect = ${?COLLECTOR_ALLOW_REDIRECTS}

  # When enabled, the redirect url passed via the `u` query parameter is scanned for a placeholder
  # token. All instances of that token are replaced withe the network ID. If the placeholder isn't
  # specified, the default value is `${SP_NUID}`.
  redirectMacro {
    enabled = false
    enabled = ${?COLLECTOR_REDIRECT_MACRO_ENABLED}
    # Optional custom placeholder token (defaults to the literal `${SP_NUID}`)
    placeholder = "[TOKEN]"
    placeholder = ${?COLLECTOR_REDIRECT_REDIRECT_MACRO_PLACEHOLDER}
  }

  # Customize response handling for requests for the root path ("/").
  # Useful if you need to redirect to web content or privacy policies regarding the use of this collector.
  rootResponse {
    enabled = false
    enabled = ${?COLLECTOR_ROOT_RESPONSE_ENABLED}
    statusCode = 302
    statusCode = ${?COLLECTOR_ROOT_RESPONSE_STATUS_CODE}
    # Optional, defaults to empty map
    headers = {
      Location = "https://127.0.0.1/",
      Location = ${?COLLECTOR_ROOT_RESPONSE_HEADERS_LOCATION},
      X-Custom = "something"
    }
    # Optional, defaults to empty string
    body = "302, redirecting"
    body = ${?COLLECTOR_ROOT_RESPONSE_BODY}
  }

  # Configuration related to CORS preflight requests
  cors {
    # The Access-Control-Max-Age response header indicates how long the results of a preflight
    # request can be cached. -1 seconds disables the cache. Chromium max is 10m, Firefox is 24h.
    accessControlMaxAge = 5 seconds
    accessControlMaxAge = ${?COLLECTOR_CORS_ACCESS_CONTROL_MAX_AGE}
  }

  # Configuration of prometheus http metrics
  prometheusMetrics {
    # If metrics are enabled then all requests will be logged as prometheus metrics
    # and '/metrics' endpoint will return the report about the requests
    enabled = false
    # Custom buckets for http_request_duration_seconds_bucket duration metric
    #durationBucketsInSeconds = [0.1, 3, 10]
  }

  streams {
    # Events which have successfully been collected will be stored in the good stream/topic
    good = snowplow-collected-good-events-stream #{{good}}
    good = ${?COLLECTOR_STREAMS_GOOD}

    # Bad rows (https://docs.snowplowanalytics.com/docs/try-snowplow/recipes/recipe-understanding-bad-data/) will be stored in the bad stream/topic.
    # The collector can currently produce two flavours of bad row:
    #  - a size_violation if an event is larger that the Kinesis (1MB) or SQS (256KB) limits;
    #  - a generic_error if a request's querystring cannot be parsed because of illegal characters
    bad = snowplow-collected-bad-events-stream #{{bad}}
    bad = ${?COLLECTOR_STREAMS_BAD}

    # Whether to use the incoming event's ip as the partition key for the good stream/topic
    # Note: Nsq does not make use of partition key.
    useIpAddressAsPartitionKey = false
    useIpAddressAsPartitionKey = ${?COLLECTOR_STREAMS_USE_IP_ADDRESS_AS_PARTITION_KEY}

    # Enable the chosen sink by uncommenting the appropriate configuration
    sink {
      # Choose between kinesis, sqs, google-pub-sub, kafka, nsq, or stdout.
      # To use stdout, comment or remove everything in the "collector.streams.sink" section except
      # "enabled" which should be set to "stdout".
      enabled = kinesis # or sqs
      enabled = ${?COLLECTOR_STREAMS_SINK_ENABLED}

      # Region where the streams are located
      region = ca-central-1 # {{kinesisRegion}} # or {{sqsRegion}}
      region = ${?COLLECTOR_STREAMS_SINK_REGION}

      ## Optional endpoint url configuration to override aws kinesis endpoints,
      ## this can be used to specify local endpoints when using localstack
      # customEndpoint = {{kinesisEndpoint}}
      # customEndpoint = ${?COLLECTOR_STREAMS_SINK_CUSTOM_ENDPOINT}

      # Thread pool size for Kinesis and SQS API requests
      threadPoolSize = 10
      threadPoolSize = ${?COLLECTOR_STREAMS_SINK_THREAD_POOL_SIZE}

      # Optional SQS buffer for good and bad events (respectively).
      # When messages can't be sent to Kinesis, they will be sent to SQS.
      # If not configured, sending to Kinesis will be retried.
      # This should only be set up for the Kinesis sink, where it acts as a failsafe.
      # For the SQS sink, the good and bad queue should be specified under streams.good and streams.bad, respectively and these settings should be ignored.
      #sqsGoodBuffer = {{sqsGoodBuffer}}
      #sqsGoodBuffer = ${?COLLECTOR_STREAMS_SINK_SQS_GOOD_BUFFER}

      #sqsBadBuffer = {{sqsBadBuffer}}
      #sqsBadBuffer = ${?COLLECTOR_STREAMS_SINK_SQS_BAD_BUFFER}

      # The following are used to authenticate for the Amazon Kinesis and SQS sinks.
      # If both are set to 'default', the default provider chain is used
      # (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
      # If both are set to 'iam', use AWS IAM Roles to provision credentials.
      # If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
      aws {
        accessKey = iam
        accessKey = ${?COLLECTOR_STREAMS_SINK_AWS_ACCESS_KEY}
        secretKey = iam
        secretKey = ${?COLLECTOR_STREAMS_SINK_AWS_SECRET_KEY}
      }

      # Minimum and maximum backoff periods, in milliseconds
      backoffPolicy {
        minBackoff = 43200 #{{minBackoffMillis}}
        minBackoff = ${?COLLECTOR_STREAMS_SINK_MIN_BACKOFF}
        maxBackoff = 43200 #{{maxBackoffMillis}}
        maxBackoff = ${?COLLECTOR_STREAMS_SINK_MAX_BACKOFF}
      }

      # Or Google Pubsub
      #googleProjectId = ID
      ## Minimum, maximum and total backoff periods, in milliseconds
      ## and multiplier between two backoff
      #backoffPolicy {
      #  minBackoff = {{minBackoffMillis}}
      #  maxBackoff = {{maxBackoffMillis}}
      #  totalBackoff = {{totalBackoffMillis}} # must be >= 10000
      #  multiplier = {{backoffMultiplier}}
      #}

      # Or Kafka
      #brokers = "{{kafkaBrokers}}"
      ## Number of retries to perform before giving up on sending a record
      #retries = 0
      # The kafka producer has a variety of possible configuration options defined at
      # https://kafka.apache.org/documentation/#producerconfigs
      # Some values are set to other values from this config by default:
      # "bootstrap.servers" = brokers
      # "buffer.memory"     = buffer.byteLimit
      # "linger.ms"         = buffer.timeLimit
      #producerConf {
      #  acks = all
      #  "key.serializer"     = "org.apache.kafka.common.serialization.StringSerializer"
      #  "value.serializer"   = "org.apache.kafka.common.serialization.StringSerializer"
      #}

      # Or NSQ
      ## Host name for nsqd
      #host = "{{nsqHost}}"
      ## TCP port for nsqd, 4150 by default
      #port = {{nsqdPort}}
    }

    # Incoming events are stored in a buffer before being sent to Kinesis/Kafka.
    # Note: Buffering is not supported by NSQ.
    # The buffer is emptied whenever:
    # - the number of stored records reaches record-limit or
    # - the combined size of the stored records reaches byte-limit or
    # - the time in milliseconds since the buffer was last emptied reaches time-limit
    buffer {
      byteLimit = 10000 #{{bufferByteThreshold}}
      byteLimit = ${?COLLECTOR_STREAMS_BUFFER_BYTE_LIMIT}
      recordLimit = 1000000 #{{bufferRecordThreshold}} # Not supported by Kafka; will be ignored
      recordLimit = ${?COLLECTOR_STREAMS_BUFFER_RECORD_LIMIT}
      timeLimit = 50000 #{{bufferTimeThreshold}}
      timeLimit = ${?COLLECTOR_STREAMS_BUFFER_TIME_LIMIT}
    }
  }

}

# Akka has a variety of possible configuration options defined at
# http://doc.akka.io/docs/akka/current/scala/general/configuration.html
akka {
  loglevel = DEBUG # 'OFF' for no logging, 'DEBUG' for all logging.
  loglevel = ${?AKKA_LOGLEVEL}
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  loggers = [${?AKKA_LOGGERS}]

  # akka-http is the server the Stream collector uses and has configurable options defined at
  # http://doc.akka.io/docs/akka-http/current/scala/http/configuration.html
  http.server {
    # To obtain the hostname in the collector, the 'remote-address' header
    # should be set. By default, this is disabled, and enabling it
    # adds the 'Remote-Address' header to every request automatically.
    remote-address-header = on
    remote-address-header = ${?AKKA_HTTP_SERVER_REMOTE_ADDRESS_HEADER}

    raw-request-uri-header = on
    raw-request-uri-header = ${?AKKA_HTTP_SERVER_RAW_REQUEST_URI_HEADER}

    # Define the maximum request length (the default is 2048)
    parsing {
      max-uri-length = 32768
      max-uri-length = ${?AKKA_HTTP_SERVER_PARSING_MAX_URI_LENGTH}
      uri-parsing-mode = relaxed
      uri-parsing-mode = ${?AKKA_HTTP_SERVER_PARSING_URI_PARSING_MODE}
    }
  }

  # By default setting `collector.ssl` relies on JSSE (Java Secure Socket
  # Extension) to enable secure communication.
  # To override the default settings set the following section as per
  # https://lightbend.github.io/ssl-config/ExampleSSLConfig.html
  # ssl-config {
  #   debug = {
  #     ssl = true
  #   }
  #   keyManager = {
  #     stores = [
  #       {type = "PKCS12", classpath = false, path = "/etc/ssl/mycert.p12", password = "mypassword" }
  #     ]
  #   }
  #   loose {
  #     disableHostnameVerification = false
  #   }
  # }
}

Hi @Marco_Mai,

Its not going to like the OSX .DS store, compress on a windows machine or use the OSX terminal:

zip -r dir.zip . -x ".*" -x "__MACOSX"

I find it easier just to use a win10 machine for this, while most of my work is done on a Mac.

Also ensure that the root of the zip are the 3 components, and they are not in a folder to start with.

A bit of further explanation, select the 3 components and zip, don’t zip a folder.

Hope this helps
Kyle

1 Like

When I tried to access the Beanstalk URL, I got the 502 Bad Gateway and the log shows the beanstalk role is not authorized to access the Kinesis. I also attached my policy. Any help would be appreciated!!!

web: Exception in thread "main" com.amazonaws.services.kinesis.model.AmazonKinesisException: User: arn:aws:sts::<account_id>:assumed-role/aws-elasticbeanstalk-ec2-role/i-****** is not authorized to perform: kinesis:DescribeStream on resource: arn:aws:kinesis:ca-central-1:<account_id>:stream/snowplow-collected-good-events-stream (Service: AmazonKinesis; Status Code: 400; Error Code: AccessDeniedException; Request ID: ****; Proxy: null)

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "acm:*",
                "autoscaling:*",
                "aws-marketplace:Subscribe",
                "aws-marketplace:Unsubscribe",
                "aws-marketplace:ViewSubscriptions",
                "cloudformation:*",
                "cloudfront:*",
                "cloudwatch:*",
                "dynamodb:*",
                "ec2:*",
                "es:*",
                "elasticbeanstalk:*",
                "elasticloadbalancing:*",
                "elasticmapreduce:*",
                "iam:*",
                "kinesis:*",
                "logs:*",
                "rds:*",
                "redshift:*",
                "s3:*",
                "sns:*"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}

Hi @Marco_Mai,

Take a look at the work Andrew Hawker did on IAM permissions here

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:DescribeStream",
        "kinesis:PutRecord"
      ],
      "Resource": [
        "${collector_stream_out_good}",
        "${collector_stream_out_bad}"
      ]
    }
  ]
}

Kyle

1 Like