Issue while sending data to kinesis data stream

  1. I have created a scala stream collector with port 8080
  2. Then I created a python flask application and initialized tracker like this:
    e = Emitter(“http://localhost:8080/”, port=8080)
  3. I have created good and bad stream in my aws account
  4. I started my flask application and scala stream collector on local which is running.
  5. But when I check my streams in aws its not receiving any data.
    Can you please help ?
    Thanks.

Hey @ank29,

Your tracker should take the endpoint address without the protocol and port:

e = Emitter(“localhost:8080”, port=8080)

Best,

Thank you colm. Its working fine now. But when I take data in s3 its unreadable. So how to validate it before putting into kinesis stream from collector?

The short answer is that normally you don’t - the raw stream is a very unfriendly format, it’s thrift-encoded, and when dumped to s3 is lzo compressed.

It is possible to write something to read this and spit out some more readable format, but the amount of work in doing so would be quite a lot more than what I would consider worth it.

You can think of the collector, enrich, and their associated kinesis streams as the one system. If the data is somehow of a format that enrich can’t deal with, it will end up in the bad stream with a failure message that suggests as such. But I would be very, very surprised if that were to happen, so I would say the best path forwards is to just plug it into enrich and work from there.

Totally agreed. Thank you so much Colm for your help.

  1. I am trying to implement enricher now.
  2. I have already configured my tracker, collector and streams are getting into kinesis streams.
  3. But its throwing me error for “Initializing LeaseCoordinator”. Is it compulsory to create dynamoDb table for enricher to work?
    Below is my config file for enricher

enrich {

streams {

in {
  # Stream/topic where the raw events to be enriched are located
  raw = good
  raw = ${?ENRICH_STREAMS_IN_RAW}
}

out {
  # Stream/topic where the events that were successfully enriched will end up
  enriched = enriched
  enriched = ${?ENRICH_STREAMS_OUT_ENRICHED}
  # Stream/topic where the event that failed enrichment will be stored
  bad = bad
  bad = ${?ENRICH_STREAMS_OUT_BAD}
  # Stream/topic where the pii tranformation events will end up
  pii = outpii
  pii = ${?ENRICH_STREAMS_OUT_PII}
  partitionKey = event_id
  partitionKey = ${?ENRICH_STREAMS_OUT_PARTITION_KEY}
}

sourceSink {
  enabled =  kinesis
  enabled =  ${?ENRICH_STREAMS_SOURCE_SINK_ENABLED}

  # Region where the streams are located (AWS region, pertinent to kinesis sink/source type)
   region = ap-southeast-1
   region = ${?ENRICH_STREAMS_SOURCE_SINK_REGION}
  
    aws {
    accessKey = "***********"
    accessKey = ${?COLLECTOR_STREAMS_SINK_AWS_ACCESS_KEY}
    secretKey = "**********"
    secretKey = ${?COLLECTOR_STREAMS_SINK_AWS_SECRET_KEY}
  }

  # Maximum number of records to get from Kinesis per call to GetRecords
   maxRecords = 10
   maxRecords = ${?ENRICH_MAX_RECORDS}

  # LATEST: most recent data.
  # TRIM_HORIZON: oldest available data.
  # "AT_TIMESTAMP": Start from the record at or after the specified timestamp
  # Note: This only effects the first run of this application on a stream.
  # (pertinent to kinesis source type)
   initialPosition = AT_TIMESTAMP
   initialPosition = ${?ENRICH_STREAMS_SOURCE_SINK_INITIAL_POSITION}

  # Need to be specified when initial-position is "AT_TIMESTAMP".
  # Timestamp format need to be in "yyyy-MM-ddTHH:mm:ssZ".
  # Ex: "2017-05-17T10:00:00Z"
  # Note: Time need to specified in UTC.
  initialTimestamp = "2020-06-23T10:00:00Z"
  initialTimestamp = ${?ENRICH_STREAMS_SOURCE_SINK_INITIAL_TIMESTAMP}

  # Minimum and maximum backoff periods, in milliseconds
  backoffPolicy {
    minBackoff = 3000
    minBackoff = ${?COLLECTOR_STREAMS_SINK_MIN_BACKOFF}
    maxBackoff = 600000
    maxBackoff = ${?COLLECTOR_STREAMS_SINK_MAX_BACKOFF}
  }


}


buffer {
  byteLimit = 5000
  byteLimit = ${?ENRICH_STREAMS_BUFFER_BYTE_LIMIT}
  recordLimit = 1000  # Not supported by Kafka; will be ignored
  recordLimit = ${?ENRICH_STREAMS_BUFFER_RECORD_LIMIT}
  timeLimit = 10000
  timeLimit = ${?ENRICH_STREAMS_BUFFER_TIME_LIMIT}
}

# Used for a DynamoDB table to maintain stream state.
# Used as the Kafka consumer group ID.
# Used as the Google PubSub subscription name.
appName = ""
appName = ${?ENRICH_STREAMS_APP_NAME}

}

}

List item

Enrich will create the DynamoDB table for you if it doesn’t exist, you don’t need to create the table manually. DynamoDB is used for the KCL (Kinesis consumer) checkpointing so it needs this table.

Thanks mike. Its working fine now.