Scala Kinesis Enricher: Cannot convert configuration to Enrich Config (Key not found: streams.sourceSink.enabled)


#1

Hi. I’m trying to run the stream enricher on Kinesis. Here’s the Command line I’m using:

$ java -Dorg.slf4j.simpleLogger.defaultLogLevel=debug -jar kinesis/target/scala-2.11/snowplow-stream-enrich-kinesis-0.17.0.jar --config etc/config.hocon --resolver file:etc/iglu_resolver.json --enrichments file:etc/enrichments/

And here’s its output:

$ java -Dorg.slf4j.simpleLogger.defaultLogLevel=debug -jar kinesis/target/scala-2.11/snowplow-stream-enrich-kinesis-0.17.0.jar --config etc/config.hocon --resolver file:etc/iglu_resolver.json --enrichments file:etc/enrichments/
Cannot convert configuration to a com.snowplowanalytics.snowplow.enrich.stream.model$EnrichConfig. Failures are:
  at 'streams.sourceSink.enabled':
    - (file:/home/aldrin/work/snowplow-r106-acropolis/3-enrich/stream-enrich/etc/config.hocon:48) Key not found.

Here’s my config:

# Copyright (c) 2013-2018 Snowplow Analytics Ltd. All rights reserved.
#
# This program is licensed to you under the Apache License Version 2.0, and
# you may not use this file except in compliance with the Apache License
# Version 2.0.  You may obtain a copy of the Apache License Version 2.0 at
# http://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the Apache License Version 2.0 is distributed on an "AS
# IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.  See the Apache License Version 2.0 for the specific language
# governing permissions and limitations there under.

# This file (application.conf.example) contains a template with
# configuration options for Stream Enrich.

enrich {

  streams {

    in {
      # Stream/topic where the raw events to be enriched are located
      raw = 'sp-prod-c-good'
    }

    out {
      # Stream/topic where the events that were successfully enriched will end up
      enriched = 'sp-prod-e-enriched'
      # Stream/topic where the event that failed enrichment will be stored
      bad = 'sp-prod-e-bad'
      # Stream/topic where the pii tranformation events will end up
      pii = 'sp-prod-e-pii'

      # How the output stream/topic will be partitioned.
      # Possible partition keys are: event_id, event_fingerprint, domain_userid, network_userid,
      # user_ipaddress, domain_sessionid, user_fingerprint.
      # Refer to https://github.com/snowplow/snowplow/wiki/canonical-event-model to know what the
      # possible parittion keys correspond to.
      # Otherwise, the partition key will be a random UUID.
      # Note: Nsq does not make use of partition key.
      partitionKey = 'event_id'
    }

    # Configuration shown is for Kafka, to use another uncomment the appropriate configuration
    # and comment out the other
    # To use stdin, comment or remove everything in the "enrich.streams.sourceSink" section except
    # "enabled" which should be set to "stdin".
    sourceSink {
      # Sources / sinks currently supported are:
      # 'kinesis' for reading Thrift-serialized records and writing enriched and bad events to a
      # Kinesis stream
      # 'googlepubsub' for reading / writing to a Google PubSub topic
      # 'kafka' for reading / writing to a Kafka topic
      # 'nsq' for reading / writing to a Nsq topic
      # 'stdin' for reading from stdin and writing to stdout and stderr
      type =  kinesis

      # Region where the streams are located (AWS region, pertinent to kinesis sink/source type)
      region = 'us-east-1'

      # AWS credentials (pertinent to kinesis sink/source type)
      # If both are set to 'default', use the default AWS credentials provider chain.
      # If both are set to 'iam', use AWS IAM Roles to provision credentials.
      # If both are set to 'env', use env variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
      aws {
        accessKey = default
        secretKey = default
      }

      # Maximum number of records to get from Kinesis per call to GetRecords
      maxRecords = 10000

      # LATEST: most recent data.
      # TRIM_HORIZON: oldest available data.
      # "AT_TIMESTAMP": Start from the record at or after the specified timestamp
      # Note: This only effects the first run of this application on a stream.
      # (pertinent to kinesis source type)
      initialPosition = TRIM_HORIZON

      # Need to be specified when initial-position is "AT_TIMESTAMP".
      # Timestamp format need to be in "yyyy-MM-ddTHH:mm:ssZ".
      # Ex: "2017-05-17T10:00:00Z"
      # Note: Time need to specified in UTC.
      #initialTimestamp = "{{initialTimestamp}}"

      # Minimum and maximum backoff periods, in milliseconds
      backoffPolicy {
        minBackoff = 256
        maxBackoff = 65535
      }

      # Or Google PubSub
      #googleProjectId = my-project-id
      ## Size of the subscriber thread pool
      #threadPoolSize = 4
      ## Minimum, maximum and total backoff periods, in milliseconds
      ## and multiplier between two backoffs
      #backoffPolicy {
      #  minBackoff = {{enrichStreamsOutMinBackoff}}
      #  maxBackoff = {{enrichStreamsOutMaxBackoff}}
      #  totalBackoff = {{enrichStreamsOutTotalBackoff}} # must be >= 10000
      #  multiplier = {{enrichStreamsOutTotalBackoff}}
      #}

      # Or Kafka (Comment out for other types)
      #brokers = "{{kafkaBrokers}}"
      # Number of retries to perform before giving up on sending a record
      retries = 128

      # Or NSQ
      ## Channel name for nsq source
      ## If more than one application is reading from the same NSQ topic at the same time,
      ## all of them must have the same channel name
      #rawChannel = "{{nsqSourceChannelName}}"
      ## Host name for nsqd
      #host = "{{nsqHost}}"
      ## TCP port for nsqd, 4150 by default
      #port = {{nsqdPort}}
      ## Host name for lookupd
      #lookupHost = "{{lookupHost}}"
      ## HTTP port for nsqlookupd, 4161 by default
      #lookupPort = {{nsqlookupdPort}}
    }

    # After enrichment, events are accumulated in a buffer before being sent to Kinesis/Kafka.
    # Note: Buffering is not supported by NSQ.
    # The buffer is emptied whenever:
    # - the number of stored records reaches recordLimit or
    # - the combined size of the stored records reaches byteLimit or
    # - the time in milliseconds since it was last emptied exceeds timeLimit when
    #   a new event enters the buffer
    buffer {
      byteLimit = 33554432
      recordLimit = 2048 # Not supported by Kafka; will be ignored
      timeLimit = 60000
    }

    # Used for a DynamoDB table to maintain stream state.
    # Used as the Kafka consumer group ID.
    # Used as the Google PubSub subscription name.
    appName = 'sp-prod-e'
  }

  # Optional section for tracking endpoints
  #monitoring {
  #  snowplow {
  #    collectorUri = "{{collectorUri}}"
  #    collectorPort = 80
  #    appId = {{enrichAppName}}
  #    method = GET
  #  }
  #}
}


#2

Recently the enabled part of sourceSink was replaced with type, see if reverting this to enabled = kinesis works.