Scala Stream Collector + Strem Enrich + S3 Loader Setup

I am following this setup to get my alternative data store ready in S3.

But the config file looks similar to the stream enrich config file. Is it safe to assume that if we are using stream enrich to enrich our data, then all we need to do is assign the S3 region, bucket and format?
If not, and if we do have to create a new config file and run the loader separately, then what would be the input stream and output stream value for the S3 loader setup? Would it be the good enriched stream as input?
Also with my setup, I have the scala stream collector, then the stream enrich and the S3 loader, so I will need to have 2 kinesis stream for the collector(good and bad), 2 for stream enrich(enirched good, and and failed enrichment) and 1 S3(failed storage). Would this be right?
Thanks.

The Snowplow S3 loader has it’s own configuration file that you can specify to the jar.

The input stream could be your collector payloads stream, or your enriched stream (or another stream) - the output stream is only used to store events which failed to sink so you likely want to have a dedicated stream for this as you’ve mentioned.

Thanks.
When I run the stream-enrich-0.12.0. I am getting an error the following error:
configuration error: Cannot convert configuration to a com.snowplowanalytics.snowplow.enrich.stream.model$EnrichConfig. Failures are:
at ‘streams.nsq.host’:
- (file:/home/ec2-user/snowplow/3-enrich/stream-enrich/target/scala-2.11/enrich.config.sample:99) Key not found.
at ‘streams.nsq.lookupHost’:
- (file:/home/ec2-user/snowplow/3-enrich/stream-enrich/target/scala-2.11/enrich.config.sample:99) Key not found.
at ‘streams.nsq.lookupPort’:
- (file:/home/ec2-user/snowplow/3-enrich/stream-enrich/target/scala-2.11/enrich.config.sample:99) Key not found.
at ‘streams.nsq.port’:
- (file:/home/ec2-user/snowplow/3-enrich/stream-enrich/target/scala-2.11/enrich.config.sample:99) Key not found.
at ‘streams.nsq.rawChannel’:
- (file:/home/ec2-user/snowplow/3-enrich/stream-enrich/target/scala-2.11/enrich.config.sample:99) Key not found.

Even though I have specified my source and sink to be ‘kinesis’ at the start of the config file.
But, when I run the exact same config file on another instance where I have the stream-enrich-0.11.1, I am not seeing the above issue. Is there any known issue with 0.12.0 jar version with kinesis?
Thanks.

Could you post your stream enricher configuration? Do you happen to have the NSQ keys commented out (lines 103-115)?

enrich {
  source = kinesis

  sink = kinesis

  aws {
    accessKey = iam
    secretKey = iam
  }

  streams {
    in {
      raw = "snowplowpoc_good"
    }

    out {
      # Stream/topic where the events that were successfully enriched will end up
      enriched = snowplowpoc_enrich_good
      # Stream/topic where the event that failed enrichment will be stored
      bad = snowplowpoc_enrich_bad

      # How the output stream/topic will be partitioned.
      # Possible partition keys are: event_id, event_fingerprint, domain_userid, network_userid,
      # user_ipaddress, domain_sessionid, user_fingerprint.
      # Refer to https://github.com/snowplow/snowplow/wiki/canonical-event-model to know what the
      # possible parittion keys correspond to.
      # Otherwise, the partition key will be a random UUID.
      # Note: Nsq does not make use of partition key.
      partitionKey = event_id
    }

    kinesis {
      # Region where the streams are located
      region = us-east-2

      # Maximum number of records to get from Kinesis per call to GetRecords
      maxRecords = 10000

      # LATEST: most recent data.
      # TRIM_HORIZON: oldest available data.
      # "AT_TIMESTAMP": Start from the record at or after the specified timestamp
      # Note: This only effects the first run of this application on a stream.
      initialPosition = LATEST

      # Need to be specified when initial-position is "AT_TIMESTAMP".
      # Timestamp format need to be in "yyyy-MM-ddTHH:mm:ssZ".
      # Ex: "2017-05-17T10:00:00Z"
      # Note: Time need to specified in UTC.
      initialTimestamp = "initialTimestamp"

      # Minimum and maximum backoff periods, in milliseconds
      backoffPolicy {
        minBackoff = 5000
        maxBackoff = 20000
      }
    }

    # Kafka configuration
    kafka {
      brokers = "kafkaBrokers"

      # Number of retries to perform before giving up on sending a record
      retries = 0
    }

    # config for nsq
    nsq {
      # Channel name for nsq source
      # If more than one application is reading from the same NSQ topic at the same time,
      # all of them must have the same channel name
      //rawChannel = "{{nsqSourceChannelName}}"

      # Host name for nsqd
      //host = "{{nsqHost}}"

      # TCP port for nsqd, 4150 by default
      //port = {{nsqdPort}}

      # Host name for lookupd
      //lookupHost = "{{lookupHost}}"

      # HTTP port for nsqlookupd, 4161 by default
      //lookupPort = {{nsqlookupdPort}}
    }

    # After enrichment, events are accumulated in a buffer before being sent to Kinesis/Kafka.
    # Note: Buffering is not supported by NSQ.
    # The buffer is emptied whenever:
    # - the number of stored records reaches recordLimit or
    # - the combined size of the stored records reaches byteLimit or
    # - the time in milliseconds since it was last emptied exceeds timeLimit when
    #   a new event enters the buffer
    buffer {
      byteLimit = 5000
      recordLimit = 100 # Not supported by Kafka; will be ignored
      timeLimit = 5000
    }

    # Used for a DynamoDB table to maintain stream state.
    # Used as the Kafka consumer group ID.
    # You can set it automatically using: "SnowplowEnrich-$\\{enrich.streams.in.raw\\}"
    appName = "snowplowPOC"
  }
}

When I tried the uncommented nsq setting, that portion looks like this
nsq {
# Channel name for nsq source
# If more than one application is reading from the same NSQ topic at the same time,
# all of them must have the same channel name
rawChannel = “{{nsqSourceChannelName}}”

  # Host name for nsqd
  host = "{{nsqHost}}"

  # TCP port for nsqd, 4150 by default
  port = 4150

  # Host name for lookupd
  lookupHost = "{{lookupHost}}"

  # HTTP port for nsqlookupd, 4161 by default
  lookupPort = 4161
}

And here I get the error as:
[main] INFO com.snowplowanalytics.snowplow.enrich.stream.sources.KinesisSource - Using workerId: ip-172-31-27-251.us-east-2.compute.internal:418f0eab-5aa8-42e6-9b6a-3905e34756cc
[main] INFO com.snowplowanalytics.snowplow.enrich.stream.sources.KinesisSource - Running: snowplowPOC.
[main] INFO com.snowplowanalytics.snowplow.enrich.stream.sources.KinesisSource - Processing raw input stream: snowplowpoc_good
Exception in thread “main” com.amazonaws.SdkClientException: Unable to marshall request to JSON: Jackson jackson-core/jackson-dataformat-cbor incompatible library version detected.
You have two possible resolutions:
1) Ensure the com.fasterxml.jackson.core:jackson-core & com.fasterxml.jackson.dataformat:jackson-dataformat-cbor libraries on your classpath have the same version number
2) Disable CBOR wire-protocol by passing the -Dcom.amazonaws.sdk.disableCbor property or setting the AWS_CBOR_DISABLE environment variable (warning this may affect performance)
at com.amazonaws.services.kinesis.model.transform.DescribeStreamRequestProtocolMarshaller.marshall(DescribeStreamRequestProtocolMarshaller.java:58)
at com.amazonaws.services.kinesis.AmazonKinesisClient.executeDescribeStream(AmazonKinesisClient.java:729)
at com.amazonaws.services.kinesis.AmazonKinesisClient.describeStream(AmazonKinesisClient.java:714)
at com.amazonaws.services.kinesis.AmazonKinesisClient.describeStream(AmazonKinesisClient.java:750)
at com.snowplowanalytics.snowplow.enrich.stream.sinks.KinesisSink.streamExists(KinesisSink.scala:82)
at com.snowplowanalytics.snowplow.enrich.stream.sinks.KinesisSink.(KinesisSink.scala:70)
at com.snowplowanalytics.snowplow.enrich.stream.sources.AbstractSource$$anon$1.initialValue(AbstractSource.scala:148)
at com.snowplowanalytics.snowplow.enrich.stream.sources.AbstractSource$$anon$1.initialValue(AbstractSource.scala:139)
at java.lang.ThreadLocal.setInitialValue(ThreadLocal.java:180)
at java.lang.ThreadLocal.get(ThreadLocal.java:170)
at com.snowplowanalytics.snowplow.enrich.stream.sources.KinesisSource.run(KinesisSource.scala:88)
at com.snowplowanalytics.snowplow.enrich.stream.EnrichApp$.run(EnrichApp.scala:166)
at com.snowplowanalytics.snowplow.enrich.stream.EnrichApp$.main(EnrichApp.scala:124)
at com.snowplowanalytics.snowplow.enrich.stream.EnrichApp.main(EnrichApp.scala)
Caused by: java.lang.RuntimeException: Jackson jackson-core/jackson-dataformat-cbor incompatible library version detected.
You have two possible resolutions:
1) Ensure the com.fasterxml.jackson.core:jackson-core & com.fasterxml.jackson.dataformat:jackson-dataformat-cbor libraries on your classpath have the same version number
2) Disable CBOR wire-protocol by passing the -Dcom.amazonaws.sdk.disableCbor property or setting the AWS_CBOR_DISABLE environment variable (warning this may affect performance)
at com.amazonaws.protocol.json.SdkCborGenerator.getBytes(SdkCborGenerator.java:68)
at com.amazonaws.protocol.json.internal.JsonProtocolMarshaller.finishMarshalling(JsonProtocolMarshaller.java:188)
at com.amazonaws.protocol.json.internal.NullAsEmptyBodyProtocolRequestMarshaller.finishMarshalling(NullAsEmptyBodyProtocolRequestMarshaller.java:53)
at com.amazonaws.services.kinesis.model.transform.DescribeStreamRequestProtocolMarshaller.marshall(DescribeStreamRequestProtocolMarshaller.java:56)
… 13 more
Caused by: java.lang.NoSuchMethodError: com.fasterxml.jackson.dataformat.cbor.CBORGenerator.getOutputContext()Lcom/fasterxml/jackson/core/json/JsonWriteContext;
at com.fasterxml.jackson.dataformat.cbor.CBORGenerator.close(CBORGenerator.java:903)
at com.amazonaws.protocol.json.SdkJsonGenerator.close(SdkJsonGenerator.java:253)
at com.amazonaws.protocol.json.SdkJsonGenerator.getBytes(SdkJsonGenerator.java:268)
at com.amazonaws.protocol.json.SdkCborGenerator.getBytes(SdkCborGenerator.java:66)

This same config file on another instance with an older version of enrich works fine.

See above.

1 Like

Thanks Mike, that helped :slight_smile: