Error in running scala stream collector


#1

Hi All,

I have done the basic setup of snowplow.
When i try to run the scala stream collector using below command,

java -jar snowplow-stream-collector-0.11.0.jar --config my.conf

Below is the error i am getting.

[scala-stream-collector-akka.actor.default-dispatcher-4] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
[main] INFO com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink - Creating thread pool of size 10
Exception in thread "main" com.amazonaws.SdkClientException: Unable to marshall request to JSON: 
Jackson jackson-core/jackson-dataformat-cbor incompatible library version detected.
You have two possible resolutions:
            1) Ensure the com.fasterxml.jackson.core:jackson-core & com.fasterxml.jackson.dataformat:jackson-dataformat-cbor libraries on your classpath have the same version number
            2) Disable CBOR wire-protocol by passing the -Dcom.amazonaws.sdk.disableCbor property or setting the AWS_CBOR_DISABLE environment variable (warning this may affect performance)
    at com.amazonaws.services.kinesis.model.transform.DescribeStreamRequestProtocolMarshaller.marshall(DescribeStreamRequestProtocolMarshaller.java:58)
    at com.amazonaws.services.kinesis.AmazonKinesisClient.executeDescribeStream(AmazonKinesisClient.java:711)
    at com.amazonaws.services.kinesis.AmazonKinesisClient.describeStream(AmazonKinesisClient.java:696)
    at com.amazonaws.services.kinesis.AmazonKinesisClient.describeStream(AmazonKinesisClient.java:732)
    at com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink.streamExists(KinesisSink.scala:123)
    at com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink.<init>(KinesisSink.scala:114)
    at com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink$.createAndInitialize(KinesisSink.scala:48)
    at com.snowplowanalytics.snowplow.collectors.scalastream.Collector$.run(Collector.scala:80)
    at com.snowplowanalytics.snowplow.collectors.scalastream.Collector$.main(Collector.scala:63)
    at com.snowplowanalytics.snowplow.collectors.scalastream.Collector.main(Collector.scala)
  Caused by: java.lang.RuntimeException: Jackson jackson-core/jackson-dataformat-cbor incompatible library version detected.
You have two possible resolutions:
            1) Ensure the com.fasterxml.jackson.core:jackson-core & com.fasterxml.jackson.dataformat:jackson-dataformat-cbor libraries on your classpath have the same version number
            2) Disable CBOR wire-protocol by passing the -Dcom.amazonaws.sdk.disableCbor property or setting the AWS_CBOR_DISABLE environment variable (warning this may affect performance)
    at com.amazonaws.protocol.json.SdkCborGenerator.getBytes(SdkCborGenerator.java:68)
    at com.amazonaws.protocol.json.internal.JsonProtocolMarshaller.finishMarshalling(JsonProtocolMarshaller.java:185)
    at com.amazonaws.protocol.json.internal.NullAsEmptyBodyProtocolRequestMarshaller.finishMarshalling(NullAsEmptyBodyProtocolRequestMarshaller.java:53)
    at com.amazonaws.services.kinesis.model.transform.DescribeStreamRequestProtocolMarshaller.marshall(DescribeStreamRequestProtocolMarshaller.java:56)
    ... 9 more
  Caused by: java.lang.NoSuchMethodError: com.fasterxml.jackson.dataformat.cbor.CBORGenerator.getOutputContext()Lcom/fasterxml/jackson/core/json/JsonWriteContext;
    at com.fasterxml.jackson.dataformat.cbor.CBORGenerator.close(CBORGenerator.java:903)
    at com.amazonaws.protocol.json.SdkJsonGenerator.close(SdkJsonGenerator.java:253)
    at com.amazonaws.protocol.json.SdkJsonGenerator.getBytes(SdkJsonGenerator.java:268)
    at com.amazonaws.protocol.json.SdkCborGenerator.getBytes(SdkCborGenerator.java:66)
    ... 12 more

My configuration file is as below

	# Copyright (c) 2013-2017 Snowplow Analytics Ltd. All rights reserved.
	#
	# This program is licensed to you under the Apache License Version 2.0, and
	# you may not use this file except in compliance with the Apache License
	# Version 2.0.  You may obtain a copy of the Apache License Version 2.0 at
	# http://www.apache.org/licenses/LICENSE-2.0.
	#
	# Unless required by applicable law or agreed to in writing, software
	# distributed under the Apache License Version 2.0 is distributed on an "AS
	# IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
	# implied.  See the Apache License Version 2.0 for the specific language
	# governing permissions and limitations there under.

	# This file (application.conf.example) contains a template with
	# configuration options for the Scala Stream Collector.
	#
	# To use, copy this to 'application.conf' and modify the configuration options.

	# 'collector' contains configuration options for the main Scala collector.
	collector {
	  # The collector runs as a web service specified on the following
	  # interface and port.
	  interface = "0.0.0.0"
	  port = 8080

	  # Configure the P3P policy header.
	  p3p {
		policyRef = "/w3c/p3p.xml"
		CP = "NOI DSP COR NID PSA OUR IND COM NAV STA"
	  }

	  # The collector returns a cookie to clients for user identification
	  # with the following domain and expiration.
	  cookie {
		enabled = true
		expiration = "365 days" # e.g. "365 days"
		# Network cookie name
		name = UnilogAnalytics
		# The domain is optional and will make the cookie accessible to other
		# applications on the domain. Comment out this line to tie cookies to
		# the collector's full domain
		domain = "collector.cookie.domain"
	  }

	  # When enabled and the cookie specified above is missing, performs a redirect to itself to check
	  # if third-party cookies are blocked using the specified name. If they are indeed blocked,
	  # fallbackNetworkId is used instead of generating a new random one.
	  cookieBounce {
		enabled = false
		# The name of the request parameter which will be used on redirects checking that third-party
		# cookies work.
		name = "n3pc"
		# Network user id to fallback to when third-party cookies are blocked.
		fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000"
	  }

	  streams {
		# Events which have successfully been collected will be stored in the good stream/topic
		good = GoodStream

		# Events that are too big (w.r.t Kinesis 1MB limit) will be stored in the bad stream/topic
		bad = BadStream

		# Whether to use the incoming event's ip as the partition key for the good stream/topic
		# Note: Nsq does not make use of partition key.
		useIpAddressAsPartitionKey = false

		# Enable the chosen sink by uncommenting the appropriate configuration
		sink {
		  # Choose between kinesis, kafka, nsq, or stdout
		  # To use stdout comment everything
		  enabled = kinesis

		  # Region where the streams are located
		  region = us-east-1

		  # Thread pool size for Kinesis API requests
		  threadPoolSize = 10

		  # The following are used to authenticate for the Amazon Kinesis sink.
		  # If both are set to 'default', the default provider chain is used
		  # (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
		  # If both are set to 'iam', use AWS IAM Roles to provision credentials.
		  # If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
		  aws {
			accessKey = xxx
			secretKey = xxxxxxx
		  }

		  # Minimum and maximum backoff periods
		  backoffPolicy {
			minBackoff = 3000
			maxBackoff = 600000
		  }

		  # Or Kafka
		  #brokers = "{{kafkaBrokers}}"
		  ## Number of retries to perform before giving up on sending a record
		  #retries = 0

		  # Or NSQ
		  ## Host name for nsqd
		  #host = "{{nsqHost}}"
		  ## TCP port for nsqd, 4150 by default
		  #port = {{nsqdPort}}
		}

		# Incoming events are stored in a buffer before being sent to Kinesis/Kafka.
		# Note: Buffering is not supported by NSQ.
		# The buffer is emptied whenever:
		# - the number of stored records reaches record-limit or
		# - the combined size of the stored records reaches byte-limit or
		# - the time in milliseconds since the buffer was last emptied reaches time-limit
		buffer {
		  byteLimit = 4500000
		  recordLimit = 500 # Not supported by Kafka; will be ignored
		  timeLimit = 60000
		}
	  }
	}

	# Akka has a variety of possible configuration options defined at
	# http://doc.akka.io/docs/akka/current/scala/general/configuration.html
	akka {
	  loglevel = DEBUG # 'OFF' for no logging, 'DEBUG' for all logging.
	  loggers = ["akka.event.slf4j.Slf4jLogger"]

	  # akka-http is the server the Stream collector uses and has configurable options defined at
	  # http://doc.akka.io/docs/akka-http/current/scala/http/configuration.html
	  http.server {
		# To obtain the hostname in the collector, the 'remote-address' header
		# should be set. By default, this is disabled, and enabling it
		# adds the 'Remote-Address' header to every request automatically.
		remote-address-header = on

		raw-request-uri-header = on

		# Define the maximum request length (the default is 2048)
		parsing {
		  max-uri-length = 32768
		  uri-parsing-mode = relaxed
		}
	  }
	}

Please help me to resolve this error.


#2

please help me to resolve this error.


#3

@sandesh you need to run:

java -Dcom.amazonaws.sdk.disableCbor -jar snowplow-stream-collector-0.11.0.jar --config my.conf

This is written in the error message.


Scala Stream Collector + Strem Enrich + S3 Loader Setup