Retry for RDB Stream Transformer

Hello,

we are currently evaluating the RDB Stream Transformer and it works great already for multiple hours and would enable us to save a lot of money instead of using EMR, so thanks a lot for that! :slight_smile:

One thing is currently only that after a few hours, we get the following S3 503 exception that makes the worker restart. As we have a window period of one hour, one exception leads to the reprocession of one hour. Therefore my question: is there a way to set a retry to avoid such errors? AWS suggests there something like that: Troubleshoot HTTP 5xx errors from Amazon S3

Thank you :slight_smile:

Best, Christoph

[ioapp-compute-1] ERROR com.snowplowanalytics.snowplow.rdbloader.transformer.kinesis.Shutdown - Error on sinking and checkpointing events
software.amazon.awssdk.services.s3.model.S3Exception: null (Service: S3, Status Code: 503, Request ID: null, Extended Request ID: H+askDRiXqFW9axRPgHR0dXmsibxRVFB4VHDb4y5hto8GOnQiebxdxrVE584sXQrHG5oAIvHhlDTB3LSHGuW9Gs6mr4F7Aom)
	at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handleErrorResponse(AwsXmlPredicatedResponseHandler.java:156)
	at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handleResponse(AwsXmlPredicatedResponseHandler.java:106)
	at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handle(AwsXmlPredicatedResponseHandler.java:84)
	at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handle(AwsXmlPredicatedResponseHandler.java:42)
	at software.amazon.awssdk.core.internal.handler.BaseClientHandler.lambda$successTransformationResponseHandler$6(BaseClientHandler.java:232)
	at software.amazon.awssdk.core.internal.http.async.AsyncResponseHandler.lambda$prepare$0(AsyncResponseHandler.java:89)
	at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
	at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)
	at software.amazon.awssdk.core.internal.http.async.AsyncResponseHandler$BaosSubscriber.onComplete(AsyncResponseHandler.java:132)
	at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$FullResponseContentPublisher$1.request(ResponseHandler.java:369)
	at software.amazon.awssdk.core.internal.http.async.AsyncResponseHandler$BaosSubscriber.onSubscribe(AsyncResponseHandler.java:110)
	at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$FullResponseContentPublisher.subscribe(ResponseHandler.java:360)
	at software.amazon.awssdk.core.internal.http.async.AsyncResponseHandler.onStream(AsyncResponseHandler.java:71)
	at software.amazon.awssdk.core.internal.http.async.AsyncAfterTransmissionInterceptorCallingResponseHandler.onStream(AsyncAfterTransmissionInterceptorCallingResponseHandler.java:86)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$WrappedErrorForwardingResponseHandler.onStream(MakeAsyncHttpRequestStage.java:153)
	at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.channelRead0(ResponseHandler.java:110)
	at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.channelRead0(ResponseHandler.java:69)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at software.amazon.awssdk.http.nio.netty.internal.nrs.HttpStreamsHandler.channelRead(HttpStreamsHandler.java:163)
	at software.amazon.awssdk.http.nio.netty.internal.nrs.HttpStreamsClientHandler.channelRead(HttpStreamsClientHandler.java:173)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at software.amazon.awssdk.http.nio.netty.internal.LastHttpContentHandler.channelRead(LastHttpContentHandler.java:43)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:280)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
	at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1372)
	at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1235)
	at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1284)
	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at java.base/java.lang.Thread.run(Unknown Source)
	at cancelable @ blobstore.util$.$anonfun$liftJavaFuture$1(util.scala:10)
	at flatMap @ blobstore.util$.liftJavaFuture(util.scala:9)
	at flatMap @ blobstore.s3.S3Store.$anonfun$put$1(S3Store.scala:97)
	at main$ @ com.snowplowanalytics.snowplow.rdbloader.transformer.kinesis.Main$.main(Main.scala:22)
[cats-effect-blocker-1] INFO software.amazon.kinesis.coordinator.Scheduler - Worker shutdown requested.

Plus additional question: did you also already encounter errors like:

[prefetch-cache-shardId-000000000159-0000] ERROR software.amazon.kinesis.retrieval.polling.PrefetchRecordsPublisher - production-enriched-good:shardId-000000000159 :  Exception thrown while fetching records from Kinesis
software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: Java heap space

After some time the RDB Loader always hits the Java heap space limit (even after manually increasing the heap space) via Java Options :-/

Hi @capchriscap,

For the first question, thanks for reporting it and suggesting a solution! I’ve opened Github issue for that problem. We will look into it :slight_smile:

For the second question, I reckon it might be related with your window period. Since all the items in the window are stored in the memory, 1 hour window period might be too long. Could you try lower window period and check if you are having that problem still, please ?
This explanation isn’t correct. Please check below for correction.

Hi @enes_aldemir ,
thanks for the fast feedback! :slight_smile:

Thanks for adding the first issue on your investigation list :slight_smile:

Regarding the second question: I thought the transformer will cache and process the number of batches defined in the batchSize parameter. When the data is processed, it is directly written to S3 and cleared from cache. The window parameter only shows when the SQS message is being written to tell the RDB Loader which folder it should process. Did I misunderstood there something?

I guess you meant bufferSize. bufferSize is related with fetching items from Kinesis stream actually. It is used by the Kinesis library we’ve been using.

windowing is the parameter used by application. All the items during windowing period are collected and they are sinked to S3 in the end of the window period. Therefore using smaller window will lead to less items in the memory and smaller size to be written in the end of the window period
This explanation isn’t correct. Please check below for correction.

Oh yeah, thank you for the correction @enes_aldemir :slight_smile:

Well, it might also be pretty likely that I simply did not read the documentation 100% properly :slight_smile:

One thing that made me just curious is that S3 files are written during the window period:

This looks for me that files are written on S3 on the fly in batches. Therefore, I am a little bit unsure why the data is still kept in memory even if the files seem to be written on S3.

Also from a conceptional point of view this way would make sense for me as the cache is directly cleared and even if a worker breaks down, a new “worker folder” is being generated and the worker simply re-processes all the events as the checkpoint is not written in the Kinesis manifest yet. So no data is being lost and the transformers are pretty memory efficient. Just some background how I interpreted the service :slight_smile:

Thanks a lot for your support in here, I really appreciate it! :slight_smile:

Hey again @capchriscap,

After your last message, I’ve rechecked the application and I found out I misremembered the architecture and I can confirm that your understanding is right. Sorry to misguide you initially.

As you said, items shouldn’t be stored in the memory after they are written to S3. We will check this memory problem too.

Thanks again for reporting these issues!

1 Like

maybe helpful: we face the heap space problem also currently in the Enricher (v3.2.2). Can it be that the Memory issue is part of the fs2Aws module as this one introduced in the new enricher version and is also part of the RDB Transformer?

That is a helpful observation, thanks for letting us know! We will take it to consideration while looking into this problem.

I was facing the same issue which is resolved now. I resolved it by increased the pod resources to have 6G of memory for the pod. you can do it as follows

in deployment.yaml

 spec:
      containers:
        - name: {{ .Chart.Name }}
          image: {{ .Values.image.repository }}
          resources:
            requests:
              cpu: 2000m
              memory: 6Gi
              limits:
                cpu: 2000m
                memory: 6Gi

and by default pods are using only 0.25 of the allocated memory you have to set an ENV variable to make pod use full resources assigned. you can do it in 2 ways.

  1. By setting up in dockerfile ENV JAVA_TOOL_OPTIONS="-Xms6G -Xmx6G"
  2. In deployment.yaml as
spec:
      containers:
        - name: {{ .Chart.Name }}
          image: {{ .Values.image.repository }}
          env:
            - name: JAVA_TOOL_OPTIONS
              value: {{ .Values.config.streamTransformer.envValue }}
          imagePullPolicy: Always

hope it helps for the enricher too. because i did not face this issue as i was using already 6G memory for Enricher.