What are configurable enrichments?

What is configurable enrichment. Is it necessary to do or we can do enrichment process without it?
If it is necessary then please help me to configure it

Hi @JeetuChoudhary,

The parameter --enrichments, which represents the configurable enrichments passed to EmrEtlRunner, is optional.

Please, read the following wiki pages to find out more.

In short, those are extra enrichments which you can apply to your events during the enrichment process. Some of them add extra values to your atomic.events table whilst the others require dedicated tables of their own to load the data into.

They are called configurable as the corresponding JSON files would required adjustments to be used with your events. This is because you would need to either

  • have an account with a 3rd party provider of the data relevant to the enrichment (ex. location info derived from the user’s IP)

or/and

  • have specific requirements for the outcome of the enrichment (ex. hide only the last octet of the captured IP)

If used, the value passed with the parameter --enrichments is the directory where you store your configurable JSON files. The examples of the files could be found here.

Hopefully this clarifies the term.

Regards,
Ihor

Thanks Ihor

So In Stream Enrich i don’t need configurable Enrichment.

I am Running Stream Enrich process and i got following exception:

Exception in thread "main" com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'app-name'
    at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:124)
    at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:145)
    at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:159)
    at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:164)
    at com.typesafe.config.impl.SimpleConfig.getString(SimpleConfig.java:206)
    at com.snowplowanalytics.snowplow.enrich.kinesis.KinesisEnrichConfig.<init>(KinesisEnrichApp.scala:350)
    at com.snowplowanalytics.snowplow.enrich.kinesis.KinesisEnrichApp$delayedInit$body.apply(KinesisEnrichApp.scala:125)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$$anonfun$main$1.apply(App.scala:71)
    at scala.App$$anonfun$main$1.apply(App.scala:71)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
    at scala.App$class.main(App.scala:71)
    at com.snowplowanalytics.snowplow.enrich.kinesis.KinesisEnrichApp$.main(KinesisEnrichApp.scala:76)
    at com.snowplowanalytics.snowplow.enrich.kinesis.KinesisEnrichApp.main(KinesisEnrichApp.scala)

And my configuration file is:

# Default Configuration for Scala Kinesis Enrich.

enrich {
  # Sources currently supported are:
  # 'kinesis' for reading Thrift-serialized records from a Kinesis stream
  # 'stdin' for reading Base64-encoded Thrift-serialized records from stdin
  source = "kinesis"

  # Sinks currently supported are:
  # 'kinesis' for writing enriched events to one Kinesis stream and invalid events to another.
  # 'stdouterr' for writing enriched events to stdout and invalid events to stderr.
  #    Using "sbt assembly" and "java -jar" is recommended to disable sbt
  #    logging.
  sink = "kinesis"

  # AWS credentials
  #
  # If both are set to 'cpf', a properties file on the classpath is used.
  # http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/ClasspathPropertiesFileCredentialsProvider.html
  #
  # If both are set to 'iam', use AWS IAM Roles to provision credentials.
  #
  # If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
  aws {
    access-key: "*********"
    secret-key: "*************"
  }

  streams {
    in: {
      raw: "{StreamName}"

      # After enrichment, are accumulated in a buffer before being sent to Kinesis.
      # The buffer is emptied whenever:
      # - the number of stored records reaches record-limit or
      # - the combined size of the stored records reaches byte-limit or
      # - the time in milliseconds since it was last emptied exceeds time-limit when
      #   a new event enters the buffer
      buffer: {
        byte-limit: 4500000
        record-limit: 500
        time-limit: 5000
      }
    }

    out: {
      enriched: "GoodStreamName"
      bad: "BadStreamName"

      # Minimum and maximum backoff periods
      # - Units: Milliseconds
      backoffPolicy: {
        minBackoff: 10
        maxBackoff: 100
      }
    }

    # "app-name" is used for a DynamoDB table to maintain stream state.
    # You can set it automatically using: "SnowplowKinesisEnrich-$\\{enrich.streams.in.raw\\}"
    #app-name: "{{enrichStreamsAppName}}"

    # LATEST: most recent data.
    # TRIM_HORIZON: oldest available data.
    # Note: This only effects the first run of this application
    # on a stream.
    #initial-position = "TRIM_HORIZON"

    region: "us-east-1"
  }
}

No @JeetuChoudhary, you don’t. Configurable enrichments are optional regardless whether it is a batch or real-time pipeline.

As for the error message you get when trying to start the Stream Enrich, it is due to misconfiguration.

In your config.hocon, you commented out the app-name parameter. It refers to the DynamoDB table which is a means to maintain steam processing status. It is essential as a Kinesis consumer periodically stores the current position of the stream in the backing DynamoDB table. This allows the system to recover from failures and continue processing where the application left off.

You could simply give it the value recommended in the comments above it, that is

app-name: "SnowplowKinesisEnrich-$\\{enrich.streams.in.raw\\}"

–Ihor

After Running it showing following Errors and repeating:

[main] INFO com.amazonaws.services.kinesis.leases.impl.LeaseCoordinator - With failover time 10000ms and epsilon 25ms, LeaseCoordinator will renew leases every 3308ms and take leases every 20050ms
[main] WARN com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Received configuration for both region name as us-east-1, and Amazon Kinesis endpoint as https://kinesis.us-east-1.amazonaws.com. Amazon Kinesis endpoint will overwrite region name.
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Initialization attempt 1
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Initializing LeaseCoordinator
[main] ERROR com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Caught exception when initializing LeaseCoordinator
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Initialization attempt 2
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Initializing LeaseCoordinator
[main] ERROR com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Caught exception when initializing LeaseCoordinator
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Initialization attempt 3
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Initializing LeaseCoordinator
[main] ERROR com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Caught exception when initializing LeaseCoordinator
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Initialization attempt 4
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Initializing LeaseCoordinator

Thanks

Hi Jeetu,

The KCL’s error logging is a bit lacking here. These repeated failed LeaseCoordinator initializations are often a sign that something is wrong with your credentials. Could you double check that your access key and secret key are correct?

Thanks,
Fred

Thanks Fred It is working

What is datums? How we knows that it is working properly.
Can we see data after enrichment process.

Hi @JeetuChoudhary - datums simply refer to metrics about the KCL app’s performance which are published to CloudWatch. A couple of options to review the contents of the enriched event stream:

  1. Set up Elasticsearch and the Kinesis Elasticsearch Sink
  2. Write a simple Kinesis worker (e.g. in Python) to “tail” the enriched event stream

Hope this helps.