I am following this setup to get my alternative data store ready in S3.
But the config file looks similar to the stream enrich config file. Is it safe to assume that if we are using stream enrich to enrich our data, then all we need to do is assign the S3 region, bucket and format?
If not, and if we do have to create a new config file and run the loader separately, then what would be the input stream and output stream value for the S3 loader setup? Would it be the good enriched stream as input?
Also with my setup, I have the scala stream collector, then the stream enrich and the S3 loader, so I will need to have 2 kinesis stream for the collector(good and bad), 2 for stream enrich(enirched good, and and failed enrichment) and 1 S3(failed storage). Would this be right?
Thanks.
The Snowplow S3 loader has it’s own configuration file that you can specify to the jar.
The input stream could be your collector payloads stream, or your enriched stream (or another stream) - the output stream is only used to store events which failed to sink so you likely want to have a dedicated stream for this as you’ve mentioned.
Thanks.
When I run the stream-enrich-0.12.0. I am getting an error the following error:
configuration error: Cannot convert configuration to a com.snowplowanalytics.snowplow.enrich.stream.model$EnrichConfig. Failures are:
at ‘streams.nsq.host’:
- (file:/home/ec2-user/snowplow/3-enrich/stream-enrich/target/scala-2.11/enrich.config.sample:99) Key not found.
at ‘streams.nsq.lookupHost’:
- (file:/home/ec2-user/snowplow/3-enrich/stream-enrich/target/scala-2.11/enrich.config.sample:99) Key not found.
at ‘streams.nsq.lookupPort’:
- (file:/home/ec2-user/snowplow/3-enrich/stream-enrich/target/scala-2.11/enrich.config.sample:99) Key not found.
at ‘streams.nsq.port’:
- (file:/home/ec2-user/snowplow/3-enrich/stream-enrich/target/scala-2.11/enrich.config.sample:99) Key not found.
at ‘streams.nsq.rawChannel’:
- (file:/home/ec2-user/snowplow/3-enrich/stream-enrich/target/scala-2.11/enrich.config.sample:99) Key not found.
Even though I have specified my source and sink to be ‘kinesis’ at the start of the config file.
But, when I run the exact same config file on another instance where I have the stream-enrich-0.11.1, I am not seeing the above issue. Is there any known issue with 0.12.0 jar version with kinesis?
Thanks.
enrich {
source = kinesis
sink = kinesis
aws {
accessKey = iam
secretKey = iam
}
streams {
in {
raw = "snowplowpoc_good"
}
out {
# Stream/topic where the events that were successfully enriched will end up
enriched = snowplowpoc_enrich_good
# Stream/topic where the event that failed enrichment will be stored
bad = snowplowpoc_enrich_bad
# How the output stream/topic will be partitioned.
# Possible partition keys are: event_id, event_fingerprint, domain_userid, network_userid,
# user_ipaddress, domain_sessionid, user_fingerprint.
# Refer to https://github.com/snowplow/snowplow/wiki/canonical-event-model to know what the
# possible parittion keys correspond to.
# Otherwise, the partition key will be a random UUID.
# Note: Nsq does not make use of partition key.
partitionKey = event_id
}
kinesis {
# Region where the streams are located
region = us-east-2
# Maximum number of records to get from Kinesis per call to GetRecords
maxRecords = 10000
# LATEST: most recent data.
# TRIM_HORIZON: oldest available data.
# "AT_TIMESTAMP": Start from the record at or after the specified timestamp
# Note: This only effects the first run of this application on a stream.
initialPosition = LATEST
# Need to be specified when initial-position is "AT_TIMESTAMP".
# Timestamp format need to be in "yyyy-MM-ddTHH:mm:ssZ".
# Ex: "2017-05-17T10:00:00Z"
# Note: Time need to specified in UTC.
initialTimestamp = "initialTimestamp"
# Minimum and maximum backoff periods, in milliseconds
backoffPolicy {
minBackoff = 5000
maxBackoff = 20000
}
}
# Kafka configuration
kafka {
brokers = "kafkaBrokers"
# Number of retries to perform before giving up on sending a record
retries = 0
}
# config for nsq
nsq {
# Channel name for nsq source
# If more than one application is reading from the same NSQ topic at the same time,
# all of them must have the same channel name
//rawChannel = "{{nsqSourceChannelName}}"
# Host name for nsqd
//host = "{{nsqHost}}"
# TCP port for nsqd, 4150 by default
//port = {{nsqdPort}}
# Host name for lookupd
//lookupHost = "{{lookupHost}}"
# HTTP port for nsqlookupd, 4161 by default
//lookupPort = {{nsqlookupdPort}}
}
# After enrichment, events are accumulated in a buffer before being sent to Kinesis/Kafka.
# Note: Buffering is not supported by NSQ.
# The buffer is emptied whenever:
# - the number of stored records reaches recordLimit or
# - the combined size of the stored records reaches byteLimit or
# - the time in milliseconds since it was last emptied exceeds timeLimit when
# a new event enters the buffer
buffer {
byteLimit = 5000
recordLimit = 100 # Not supported by Kafka; will be ignored
timeLimit = 5000
}
# Used for a DynamoDB table to maintain stream state.
# Used as the Kafka consumer group ID.
# You can set it automatically using: "SnowplowEnrich-$\\{enrich.streams.in.raw\\}"
appName = "snowplowPOC"
}
}
When I tried the uncommented nsq setting, that portion looks like this
nsq {
# Channel name for nsq source
# If more than one application is reading from the same NSQ topic at the same time,
# all of them must have the same channel name
rawChannel = “{{nsqSourceChannelName}}”
# Host name for nsqd
host = "{{nsqHost}}"
# TCP port for nsqd, 4150 by default
port = 4150
# Host name for lookupd
lookupHost = "{{lookupHost}}"
# HTTP port for nsqlookupd, 4161 by default
lookupPort = 4161
}
And here I get the error as:
[main] INFO com.snowplowanalytics.snowplow.enrich.stream.sources.KinesisSource - Using workerId: ip-172-31-27-251.us-east-2.compute.internal:418f0eab-5aa8-42e6-9b6a-3905e34756cc
[main] INFO com.snowplowanalytics.snowplow.enrich.stream.sources.KinesisSource - Running: snowplowPOC.
[main] INFO com.snowplowanalytics.snowplow.enrich.stream.sources.KinesisSource - Processing raw input stream: snowplowpoc_good
Exception in thread “main” com.amazonaws.SdkClientException: Unable to marshall request to JSON: Jackson jackson-core/jackson-dataformat-cbor incompatible library version detected.
You have two possible resolutions:
1) Ensure the com.fasterxml.jackson.core:jackson-core & com.fasterxml.jackson.dataformat:jackson-dataformat-cbor libraries on your classpath have the same version number
2) Disable CBOR wire-protocol by passing the -Dcom.amazonaws.sdk.disableCbor property or setting the AWS_CBOR_DISABLE environment variable (warning this may affect performance)
at com.amazonaws.services.kinesis.model.transform.DescribeStreamRequestProtocolMarshaller.marshall(DescribeStreamRequestProtocolMarshaller.java:58)
at com.amazonaws.services.kinesis.AmazonKinesisClient.executeDescribeStream(AmazonKinesisClient.java:729)
at com.amazonaws.services.kinesis.AmazonKinesisClient.describeStream(AmazonKinesisClient.java:714)
at com.amazonaws.services.kinesis.AmazonKinesisClient.describeStream(AmazonKinesisClient.java:750)
at com.snowplowanalytics.snowplow.enrich.stream.sinks.KinesisSink.streamExists(KinesisSink.scala:82)
at com.snowplowanalytics.snowplow.enrich.stream.sinks.KinesisSink.(KinesisSink.scala:70)
at com.snowplowanalytics.snowplow.enrich.stream.sources.AbstractSource$$anon$1.initialValue(AbstractSource.scala:148)
at com.snowplowanalytics.snowplow.enrich.stream.sources.AbstractSource$$anon$1.initialValue(AbstractSource.scala:139)
at java.lang.ThreadLocal.setInitialValue(ThreadLocal.java:180)
at java.lang.ThreadLocal.get(ThreadLocal.java:170)
at com.snowplowanalytics.snowplow.enrich.stream.sources.KinesisSource.run(KinesisSource.scala:88)
at com.snowplowanalytics.snowplow.enrich.stream.EnrichApp$.run(EnrichApp.scala:166)
at com.snowplowanalytics.snowplow.enrich.stream.EnrichApp$.main(EnrichApp.scala:124)
at com.snowplowanalytics.snowplow.enrich.stream.EnrichApp.main(EnrichApp.scala)
Caused by: java.lang.RuntimeException: Jackson jackson-core/jackson-dataformat-cbor incompatible library version detected.
You have two possible resolutions:
1) Ensure the com.fasterxml.jackson.core:jackson-core & com.fasterxml.jackson.dataformat:jackson-dataformat-cbor libraries on your classpath have the same version number
2) Disable CBOR wire-protocol by passing the -Dcom.amazonaws.sdk.disableCbor property or setting the AWS_CBOR_DISABLE environment variable (warning this may affect performance)
at com.amazonaws.protocol.json.SdkCborGenerator.getBytes(SdkCborGenerator.java:68)
at com.amazonaws.protocol.json.internal.JsonProtocolMarshaller.finishMarshalling(JsonProtocolMarshaller.java:188)
at com.amazonaws.protocol.json.internal.NullAsEmptyBodyProtocolRequestMarshaller.finishMarshalling(NullAsEmptyBodyProtocolRequestMarshaller.java:53)
at com.amazonaws.services.kinesis.model.transform.DescribeStreamRequestProtocolMarshaller.marshall(DescribeStreamRequestProtocolMarshaller.java:56)
… 13 more
Caused by: java.lang.NoSuchMethodError: com.fasterxml.jackson.dataformat.cbor.CBORGenerator.getOutputContext()Lcom/fasterxml/jackson/core/json/JsonWriteContext;
at com.fasterxml.jackson.dataformat.cbor.CBORGenerator.close(CBORGenerator.java:903)
at com.amazonaws.protocol.json.SdkJsonGenerator.close(SdkJsonGenerator.java:253)
at com.amazonaws.protocol.json.SdkJsonGenerator.getBytes(SdkJsonGenerator.java:268)
at com.amazonaws.protocol.json.SdkCborGenerator.getBytes(SdkCborGenerator.java:66)
This same config file on another instance with an older version of enrich works fine.