Java Heap Space error Snowplow Enrich 3.2.2

Hello,

we are currently using a stream enrich flavour of enrich (v2.0.5) and were in the process of upgrading to the new enrich kinesis flavour of enrich (v3.2.2), when we noticed a bunch of Java heap-space errors like the following that never occurred while using v2.0.5

software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: Java heap space
	at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:98)
	at software.amazon.awssdk.core.exception.SdkClientException.create(SdkClientException.java:43)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:204)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:200)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:179)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:159)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
	at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source)
	at software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:76)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
	at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$null$0(MakeAsyncHttpRequestStage.java:105)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
	at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$WrappedErrorForwardingResponseHandler.onError(MakeAsyncHttpRequestStage.java:159)
	at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter$1.lambda$notifyError$6(ResponseHandler.java:386)
	at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.runAndLogError(ResponseHandler.java:249)
	at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.access$600(ResponseHandler.java:74)
	at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter$1.notifyError(ResponseHandler.java:384)
	at software.amazon.awssdk.http.nio.netty.internal.utils.ExceptionHandlingUtils.tryCatch(ExceptionHandlingUtils.java:42)
	at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter$1.onNext(ResponseHandler.java:338)
	at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter$1.onNext(ResponseHandler.java:289)
	at software.amazon.awssdk.http.nio.netty.internal.nrs.HandlerPublisher.publishMessage(HandlerPublisher.java:407)
	at software.amazon.awssdk.http.nio.netty.internal.nrs.HandlerPublisher.channelRead(HandlerPublisher.java:383)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at software.amazon.awssdk.http.nio.netty.internal.nrs.HttpStreamsHandler.handleReadHttpContent(HttpStreamsHandler.java:228)
	at software.amazon.awssdk.http.nio.netty.internal.nrs.HttpStreamsHandler.channelRead(HttpStreamsHandler.java:199)
	at software.amazon.awssdk.http.nio.netty.internal.nrs.HttpStreamsClientHandler.channelRead(HttpStreamsClientHandler.java:173)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at software.amazon.awssdk.http.nio.netty.internal.LastHttpContentHandler.channelRead(LastHttpContentHandler.java:43)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at software.amazon.awssdk.http.nio.netty.internal.http2.Http2ToHttpInboundAdapter.onDataRead(Http2ToHttpInboundAdapter.java:85)
	at software.amazon.awssdk.http.nio.netty.internal.http2.Http2ToHttpInboundAdapter.channelRead0(Http2ToHttpInboundAdapter.java:49)
	at software.amazon.awssdk.http.nio.netty.internal.http2.Http2ToHttpInboundAdapter.channelRead0(Http2ToHttpInboundAdapter.java:42)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.handler.codec.http2.AbstractHttp2StreamChannel$Http2ChannelUnsafe.doRead0(AbstractHttp2StreamChannel.java:901)
	at io.netty.handler.codec.http2.AbstractHttp2StreamChannel.fireChildRead(AbstractHttp2StreamChannel.java:555)
	at io.netty.handler.codec.http2.Http2MultiplexHandler.channelRead(Http2MultiplexHandler.java:180)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.http2.Http2FrameCodec.onHttp2Frame(Http2FrameCodec.java:707)
	at io.netty.handler.codec.http2.Http2FrameCodec$FrameListener.onDataRead(Http2FrameCodec.java:646)
	at io.netty.handler.codec.http2.Http2FrameListenerDecorator.onDataRead(Http2FrameListenerDecorator.java:36)
	at io.netty.handler.codec.http2.Http2EmptyDataFrameListener.onDataRead(Http2EmptyDataFrameListener.java:49)
	at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onDataRead(DefaultHttp2ConnectionDecoder.java:307)
	at io.netty.handler.codec.http2.Http2InboundFrameLogger$1.onDataRead(Http2InboundFrameLogger.java:48)
	at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readDataFrame(DefaultHttp2FrameReader.java:415)
	at io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:250)
	at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:159)
	at io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41)
	at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:173)
	at io.netty.handler.codec.http2.DecoratingHttp2ConnectionDecoder.decodeFrame(DecoratingHttp2ConnectionDecoder.java:63)
	at io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:378)
	at io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438)
	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1372)
	at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1235)
	at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1284)
	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.OutOfMemoryError: Java heap space

Might there be an issue with the config for the new Enrich flavour from our side

Our config file for v2.0.5

enrich {
  streams {

    in {
      # Stream/topic where the raw events to be enriched are located
      raw = "{{ .Values.environment }}-{{ .Values.tenant }}-collected-good"
    }

    out {
      # Stream/topic where the events that were successfully enriched will end up
      enriched = "{{ .Values.environment }}-enriched-good"
      # Stream/topic where the event that failed enrichment will be stored
      bad = "{{ .Values.environment }}-enriched-bad"
      
      # Stream/topic where the pii tranformation events will end up
      #pii = "__TOPIC_PII__"

      # How the output stream/topic will be partitioned.
      # Possible partition keys are: event_id, event_fingerprint, domain_userid, network_userid,
      # user_ipaddress, domain_sessionid, user_fingerprint.
      # Refer to https://github.com/snowplow/snowplow/wiki/canonical-event-model to know what the
      # possible parittion keys correspond to.
      # Otherwise, the partition key will be a random UUID.
      # Note: Nsq does not make use of partition key.
      partitionKey = "event_id"
    }

    # Configuration shown is for Kafka, to use another uncomment the appropriate configuration
    # and comment out the other
    # To use stdin, comment or remove everything in the "enrich.streams.sourceSink" section except
    # "enabled" which should be set to "stdin".
    sourceSink {
      # Sources / sinks currently supported are:
      # 'kinesis' for reading Thrift-serialized records and writing enriched and bad events to a
      # Kinesis stream
      # 'kafka' for reading / writing to a Kafka topic
      # 'nsq' for reading / writing to a Nsq topic
      # 'stdin' for reading from stdin and writing to stdout and stderr
      enabled =  "kinesis"

      # Region where the streams are located (AWS region, pertinent to kinesis sink/source type)
      region = eu-central-1
      # region = ${?ENRICH_STREAMS_SOURCE_SINK_REGION}

      ## Optional endpoint url configuration to override aws kinesis endpoints,
      ## this can be used to specify local endpoints when using localstack
      # customEndpoint = {kinesisEndpoint}
      # customEndpoint = ${?ENRICH_STREAMS_SOURCE_SINK_CUSTOM_ENDPOINT}

      ## Optional endpoint url configuration to override aws dyanomdb endpoints for Kinesis checkpoints lease table,
      ## this can be used to specify local endpoints when using Localstack
      # dynamodbCustomEndpoint = "http://localhost:4569"
      # dynamodbCustomEndpoint = ${?ENRICH_DYNAMODB_CUSTOM_ENDPOINT}

      # Optional override to disable cloudwatch
      disableCloudWatch = true
      # disableCloudWatch = ${?ENRICH_DISABLE_CLOUDWATCH}

      # AWS credentials
      # If both are set to 'default', use the default AWS credentials provider chain.
      # If both are set to 'iam', use AWS IAM Roles to provision credentials.
      # If both are set to 'env', use env variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
      aws {
        accessKey = default
        secretKey = default
      }

      # Maximum number of records to get from Kinesis per call to GetRecords
      maxRecords = 10000
      # maxRecords = ${?ENRICH_MAX_RECORDS}

      # LATEST: most recent data.
      # TRIM_HORIZON: oldest available data.
      # "AT_TIMESTAMP": Start from the record at or after the specified timestamp
      # Note: This only effects the first run of this application on a stream.
      # (pertinent to kinesis source type)
      initialPosition = TRIM_HORIZON
      # initialPosition = ${?ENRICH_STREAMS_SOURCE_SINK_INITIAL_POSITION}

      # Need to be specified when initial-position is "AT_TIMESTAMP".
      # Timestamp format need to be in "yyyy-MM-ddTHH:mm:ssZ".
      # Ex: "2017-05-17T10:00:00Z"
      # Note: Time need to specified in UTC.
      initialTimestamp = "2017-05-17T10:00:00Z"

      # Minimum and maximum backoff periods, in milliseconds
      backoffPolicy {
        minBackoff = 1000
        maxBackoff = 10000
      }

      # Or Kafka (Comment out for other types)
      # brokers = "__KAFKA_BROKERS__"
      # Number of retries to perform before giving up on sending a record
      # retries = 0
      # The kafka producer has a variety of possible configuration options defined at
      # https://kafka.apache.org/documentation/#producerconfigs
      # Some values are set to other values from this config by default:
      # "bootstrap.servers" -> brokers
      # retries             -> retries
      # "buffer.memory"     -> buffer.byteLimit
      # "linger.ms"         -> buffer.timeLimit
      #producerConf {
      #  acks = all
      #  "key.serializer"     = "org.apache.kafka.common.serialization.StringSerializer"
      #  "value.serializer"   = "org.apache.kafka.common.serialization.StringSerializer"
      #}
      # The kafka consumer has a variety of possible configuration options defined at
      # https://kafka.apache.org/documentation/#consumerconfigs
      # Some values are set to other values from this config by default:
      # "bootstrap.servers" -> brokers
      # "group.id"          -> appName
      #consumerConf {
      #  "enable.auto.commit" = true
      #  "auto.commit.interval.ms" = 1000
      #  "auto.offset.reset"  = earliest
      #  "session.timeout.ms" = 30000
      #  "key.deserializer"   = "org.apache.kafka.common.serialization.StringDeserializer"
      #  "value.deserializer" = "org.apache.kafka.common.serialization.ByteArrayDeserializer"
      #}

      # Or NSQ
      ## Channel name for nsq source
      ## If more than one application is reading from the same NSQ topic at the same time,
      ## all of them must have the same channel name
      #rawChannel = "{nsqSourceChannelName}"
      ## Host name for nsqd
      #host = "{nsqHost}"
      ## TCP port for nsqd, 4150 by default
      #port = {nsqdPort}
      ## Host name for lookupd
      #lookupHost = "{lookupHost}"
      ## HTTP port for nsqlookupd, 4161 by default
      #lookupPort = {nsqlookupdPort}
    }

    # After enrichment, events are accumulated in a buffer before being sent to Kinesis/Kafka.
    # Note: Buffering is not supported by NSQ.
    # The buffer is emptied whenever:
    # - the number of stored records reaches recordLimit or
    # - the combined size of the stored records reaches byteLimit or
    # - the time in milliseconds since it was last emptied exceeds timeLimit when
    #   a new event enters the buffer
    # Snowplow enriched event has about 2.25kb / event
    buffer {
      byteLimit = 4500000
      recordLimit = 500 # Not supported by Kafka; will be ignored
      timeLimit = 250
    }

    # Used for a DynamoDB table to maintain stream state.
    # Used as the Kafka consumer group ID.
    # Used as the Google PubSub subscription name.
    appName = "{{ .Values.environment }}-{{ .Values.tenant }}-stream-enrich-manifest"
  }

  # The setting below requires an adapter being ready, i.e.: https://github.com/snowplow-incubator/remote-adapter-example
  # remoteAdapters = [
  #    {
  #        vendor: "com.globeandmail"
  #        version: "v1"
  #        url: "http://remote-adapter-example:8995/sampleRemoteAdapter"
  #        connectionTimeout: 1000
  #        readTimeout: 5000
  #    }
  # ]

  # # Optional section for tracking endpoints
  # monitoring {
  #   snowplow {
  #     collectorUri = "{collectorUri}"
  #     collectorUri = ${?ENRICH_MONITORING_COLLECTOR_URI}
  #     collectorPort = 80
  #     collectorPort = ${?ENRICH_MONITORING_COLLECTOR_PORT}
  #     appId = {enrichAppName}
  #     appId = ${?ENRICH_MONITORING_APP_ID}
  #     method = GET
  #     method = ${?ENRICH_MONITORING_METHOD}
  #   }
  # }
}

Our config file for v3.2.2

{
  # Where to read collector payloads from
  "input": {
    "type": "Kinesis"

    # Optional. Name of the application which the KCL daemon should assume
    "appName": "{{ .Values.environment }}-{{ .Values.tenant }}-stream-enrich-manifest"

    # Name of the Kinesis stream to read from
    "streamName": "{{ .Values.environment }}-{{ .Values.tenant }}-collected-good"

    # Optional. Region where the Kinesis stream is located
    # This field is optional if it can be resolved with AWS region provider chain.
    # It checks places like env variables, system properties, AWS profile file.
    # https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/regions/providers/DefaultAwsRegionProviderChain.html
    "region": "eu-central-1"

    # Optional, set the initial position to consume the Kinesis stream
    # Must be TRIM_HORIZON, LATEST or AT_TIMESTAMP
    # LATEST: most recent data.
    # TRIM_HORIZON: oldest available data.
    # AT_TIMESTAMP: start from the record at or after the specified timestamp
    "initialPosition": {
      "type": "TRIM_HORIZON"
    }
    # "initialPosition": {
    #   "type": "AT_TIMESTAMP"
    #   "timestamp": "2020-07-17T10:00:00Z" # Required for AT_TIMESTAMP
    # }

    # Optional, set the mode for retrieving records.
    "retrievalMode": {
      "type": "Polling"

      # Maximum size of a batch returned by a call to getRecords.
      # Records are checkpointed after a batch has been fully processed,
      # thus the smaller maxRecords, the more often records can be checkpointed
      # into DynamoDb, but possibly reducing the throughput.
      "maxRecords": 10000
    }
    # "retrievalMode": {
    #   "type": "FanOut"
    # }

    # Optional. Size of the internal buffer used when reading messages from Kinesis,
    # each buffer holding up to maxRecords from above
    #"bufferSize": 3

    # Optional, endpoint url configuration to override aws kinesis endpoints
    # Can be used to specify local endpoint when using localstack
    # "customEndpoint": "http://localhost:4566"

    # Optional, endpoint url configuration to override aws dyanomdb endpoint for Kinesis checkpoints lease table
    # Can be used to specify local endpoint when using localstack
    # "dynamodbCustomEndpoint": "http://localhost:4569"

    # Optional, endpoint url configuration to override aws cloudwatch endpoint for metrics
    # Can be used to specify local endpoint when using localstack
    # "cloudwatchCustomEndpoint": "http://localhost:4582"
  }

  "output": {
    # Enriched events output
    "good": {
      "type": "Kinesis"

      # Name of the Kinesis stream to write to
      "streamName": "{{ .Values.environment }}-enriched-good"

      # Optional. Region where the Kinesis stream is located
      # This field is optional if it can be resolved with AWS region provider chain.
      # It checks places like env variables, system properties, AWS profile file.
      # https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/regions/providers/DefaultAwsRegionProviderChain.html
      "region": "eu-central-1"

      # Optional. How the output stream/topic will be partitioned in Kinesis
      # Possible partition keys are: event_id, event_fingerprint, domain_userid, network_userid,
      # user_ipaddress, domain_sessionid, user_fingerprint
      # Refer to https://github.com/snowplow/snowplow/wiki/canonical-event-model to know what the
      # possible partition keys correspond to.
      # Otherwise, the partition key will be a random UUID.
      "partitionKey": "event_id"

      # Optional. Policy for cats-retry to retry after failures writing to kinesis
      "backoffPolicy": {
        "minBackoff": 1000 milliseconds
        "maxBackoff": 10000 milliseconds
        "maxRetries": 10
      }

      # Optional. Maximum amount of time an enriched event may spend being buffered before it gets sent
      "maxBufferedTime": 100 millis

      # Optional. The KPL will consider a record failed if it cannot be sent within this deadline.
      # The KPL then yields back to the JVM, which will log the error, and might retry sending.
      "recordTtl": 20 seconds

      # Optional. See https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html
      "collection": {
        # Maximum number of Kinesis records to pack into a PutRecords request
        "maxCount": 500

        # Maximum amount of data to send with a PutRecords request
        "maxSize": 5242880
      }

      # Optional. See https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html
      # If not specified, aggregation is deactivated
      #"aggregation": {
      #  # Maximum number of enriched events to pack into an aggregated Kinesis record
      #  "maxCount": 4294967295
      #
      #  # Maximum number of bytes to pack into an aggregated Kinesis record
      #  "maxSize": 51200
      #}

      # Optional. Maximum number of connections to open in the backend.
      # HTTP requests are sent in parallel over multiple connections.
      # Setting this too high may impact latency and consume additional resources
      # without increasing throughput
      "maxConnections": 24

      # Optional. Minimum level of logs for the native KPL daemon.
      # Logs show up on stderr
      # Possible values: trace | debug | info | warning | error
      "logLevel": "warning"

      # Optional. Use a custom Kinesis endpoint.
      # Note this does not accept protocols or paths, only host names or ip addresses.
      # There is no way to disable TLS.
      # Needs to be specified along with customPort
      # "customEndpoint": "localhost"

      # Optional. Server port to connect to for Kinesis.
      # Needs to be specified along with customEndpoint
      # "customPort": 4566

      # Optional. Use a custom Cloudwatch endpoint.
      # Note this does not accept protocols or paths, only host names or ip addresses.
      # There is no way to disable TLS
      # Needs to be specified along with cloudwatchPort
      # "cloudwatchEndpoint": "localhost"

      # Optional. Server port to connect to for CloudWatch.
      # Needs to be specified along with cloudwatchPort
      # "cloudwatchPort": 4582
    }

    # Bad rows output
    "bad": {
      "type": "Kinesis"

      # Name of the Kinesis stream to write to
      "streamName": "{{ .Values.environment }}-enriched-bad"

      # Optional. Region where the Kinesis stream is located
      # This field is optional if it can be resolved with AWS region provider chain.
      # It checks places like env variables, system properties, AWS profile file.
      # https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/regions/providers/DefaultAwsRegionProviderChain.html
      "region": "eu-central-1"

      # Optional. Policy for cats-retry to retry after failures writing to kinesis
      "backoffPolicy": {
        "minBackoff": 1000 milliseconds
        "maxBackoff": 10000 milliseconds
        "maxRetries": 10
      }

      # Optional. Maximum amount of time an enriched event may spend being buffered before it gets sent
      "maxBufferedTime": 100 millis

      # Optional. The KPL will consider a record failed if it cannot be sent within this deadline.
      # The KPL then yields back to the JVM, which will log the error, and might retry sending.
      "recordTtl": 20 seconds

      # Optional. See https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html
      "collection": {
        # Maximum number of Kinesis records to pack into a PutRecords request
        "maxCount": 500

        # Maximum amount of data to send with a PutRecords request
        "maxSize": 5242880
      }

      # Optional. See https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html
      # If not specified, aggregation is deactivated
      #"aggregation": {
      #  # Maximum number of enriched events to pack into an aggregated Kinesis record
      #  "maxCount": 4294967295
      #
      #  # Maximum number of bytes to pack into an aggregated Kinesis record
      #  "maxSize": 51200
      #}

      # Optional. Maximum number of connections to open in the backend.
      # HTTP requests are sent in parallel over multiple connections.
      # Setting this too high may impact latency and consume additional resources
      # without increasing throughput
      "maxConnections": 24

      # Optional. Minimum level of logs for the native KPL daemon.
      # Logs show up on stderr
      # Possible values: trace | debug | info | warning | error
      "logLevel": "warning"

      # Optional. Use a custom Kinesis endpoint.
      # Note this does not accept protocols or paths, only host names or ip addresses.
      # There is no way to disable TLS.
      # Needs to be specified along with customPort
      # "customEndpoint": "localhost"

      # Optional. Server port to connect to for Kinesis.
      # Needs to be specified along with customEndpoint
      # "customPort": 4566

      # Optional. Use a custom Cloudwatch endpoint.
      # Note this does not accept protocols or paths, only host names or ip addresses.
      # There is no way to disable TLS
      # Needs to be specified along with cloudwatchPort
      # "cloudwatchEndpoint": "localhost"

      # Optional. Server port to connect to for CloudWatch.
      # Needs to be specified along with cloudwatchPort
      # "cloudwatchPort": 4582
    }
  }

  # Optional. Concurrency of the app
  "concurrency" : {
    # Number of events that can get enriched at the same time within a chunk
    "enrich": 256
    # Number of chunks that can get sunk at the same time
    # WARNING: if greater than 1, records can get checkpointed before they are sunk
    "sink": 1
  }
}

Or might there be some other way to solve the Java heap-space issue we are currently facing, maybe someone could points us into the right direction.

Thanks in advance :slight_smile:
Dieter