RDBLoader failing to process SQS messages

Hi,

We started to shred and load about 15GB of data to Redshift, all in one run. Our RDBLoader is at R35 and it seems the shredding part has went perfectly fine. After a day the shredding job is finished, we noticed many errors on the RDBLoader log as follow:

INFO 2021-05-31 08:25:45.343: Received new message. Total 2301 messages received, 874 loaded, 0 attempts has been made to load current folder
INFO 2021-05-31 08:25:45.343: New data discovery at run=2021-05-23-11-02-27 with following shredded types:
  * iglu:com.snowplowanalytics.snowplow/atomic/jsonschema/1-*-* TSV
.
.
.
INFO 2021-05-31 08:25:45.480: Loading s3://sp-shredded-????/good/run=2021-05-23-11-02-27/
INFO 2021-05-31 08:25:45.481: COPY atomic.events
.
.
.
INFO 2021-05-31 08:27:01.976: Loading finished for s3://sp-shredded-????/good/run=2021-05-23-11-02-27/
May 31, 2021 8:27:02 AM com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper logAndGetAmazonServiceException
SEVERE: AmazonServiceException: deleteMessage. RequestId: d35cb72b-b220-59c6-95a5-ee05123579e9
HTTPStatusCode: 400 AmazonErrorCode: InvalidParameterValue
com.amazonaws.services.sqs.model.AmazonSQSException: Value AQEBctYjgo0mzH4lcIARY9nXkSF4mMhnle26rjiGbJ9X5slj/eApw3EPQtUaMK1eRqAtlLiyBC3DhiaEIVGRiJsxfnJkJ8GjPxoF21mMrr/V6np++LUSDJBkZ2lrxFmQriLbj498DVMeXhTVdSmFHUffFtJxqlVbsaABBF3X+7J4j4Q0LEjixFmY7oyKeK6F1CPWTu0gv9aFGOrR/MJl2aDTvYNtroe7c/CyuVHXcxdyAZPGS7kQWgvKmkl2yNnWGXV1oif/4Ew2+LEPeZSDKqEfwMNfqdG/eLPWs8DzkieaqDc= for parameter ReceiptHandle is invalid. Reason: The receipt handle has expired. (Service: AmazonSQS; Status Code: 400; Error Code: InvalidParameterValue; Request ID: d35cb72b-b220-59c6-95a5-ee05123579e9; Proxy: null)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
	at com.amazonaws.services.sqs.AmazonSQSClient.doInvoke(AmazonSQSClient.java:2207)
	at com.amazonaws.services.sqs.AmazonSQSClient.invoke(AmazonSQSClient.java:2174)
	at com.amazonaws.services.sqs.AmazonSQSClient.invoke(AmazonSQSClient.java:2163)
	at com.amazonaws.services.sqs.AmazonSQSClient.executeDeleteMessage(AmazonSQSClient.java:893)
	at com.amazonaws.services.sqs.AmazonSQSClient.deleteMessage(AmazonSQSClient.java:865)
	at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.deleteMessage(AmazonSQSMessagingClientWrapper.java:159)
	at com.amazon.sqs.javamessaging.acknowledge.AutoAcknowledger.acknowledge(AutoAcknowledger.java:47)
	at com.amazon.sqs.javamessaging.message.SQSMessage.acknowledge(SQSMessage.java:941)
	at com.snowplowanalytics.snowplow.rdbloader.common.Message$.$anonfun$messageDecoder$1(Message.scala:25)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:104)
	at cats.effect.internals.IORunLoop$.restartCancelable(IORunLoop.scala:51)
	at cats.effect.internals.IOBracket$BracketStart.run(IOBracket.scala:100)
	at cats.effect.internals.Trampoline.cats$effect$internals$Trampoline$$immediateLoop(Trampoline.scala:67)
	at cats.effect.internals.Trampoline.startLoop(Trampoline.scala:35)
	at cats.effect.internals.TrampolineEC$JVMTrampoline.super$startLoop(TrampolineEC.scala:90)
	at cats.effect.internals.TrampolineEC$JVMTrampoline.$anonfun$startLoop$1(TrampolineEC.scala:90)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
	at cats.effect.internals.TrampolineEC$JVMTrampoline.startLoop(TrampolineEC.scala:90)
	at cats.effect.internals.Trampoline.execute(Trampoline.scala:43)
	at cats.effect.internals.TrampolineEC.execute(TrampolineEC.scala:42)
	at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:482)
	at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:422)
	at cats.effect.internals.IOStart$.$anonfun$apply$1(IOStart.scala:42)
	at cats.effect.internals.IOStart$.$anonfun$apply$1$adapted(IOStart.scala:29)
	at cats.effect.internals.IORunLoop$RestartCallback.start(IORunLoop.scala:447)
	at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:156)
	at cats.effect.internals.IORunLoop$.restart(IORunLoop.scala:41)
	at cats.effect.internals.IOBracket$.$anonfun$apply$1(IOBracket.scala:48)
	at cats.effect.internals.IOBracket$.$anonfun$apply$1$adapted(IOBracket.scala:34)
	at cats.effect.internals.IOAsync$.$anonfun$apply$1(IOAsync.scala:37)
	at cats.effect.internals.IOAsync$.$anonfun$apply$1$adapted(IOAsync.scala:37)
	at cats.effect.internals.IORunLoop$RestartCallback.start(IORunLoop.scala:447)
	at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:156)
	at cats.effect.internals.IORunLoop$RestartCallback.signal(IORunLoop.scala:463)
	at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:484)
	at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:422)
	at cats.effect.internals.IOShift$Tick.run(IOShift.scala:36)
	at cats.effect.internals.PoolUtils$$anon$2$$anon$3.run(PoolUtils.scala:52)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
ERROR 2021-05-31 08:27:02.345: Fatal failure during message processing (base s3://sp-shredded-????/good/run=2021-05-23-11-02-27/), message hasn't been ack'ed. AmazonServiceException: deleteMessage. RequestId: d35cb72b-b220-59c6-95a5-ee05123579e9
HTTPStatusCode: 400 AmazonErrorCode: InvalidParameterValue
INFO 2021-05-31 08:27:02.345: Received new message. Total 2302 messages received, 874 loaded, 0 attempts has been made to load current folder
INFO 2021-05-31 08:27:02.345: New data discovery at run=2021-05-12-04-24-15 with following shredded types:
  * iglu:com.snowplowanalytics.snowplow/atomic/jsonschema/1-*-* TSV

And here is a snapshot of our sqs status:

Our SQS “visibility timeout” is at 30 seconds and “delivery delay” is zero.

Is there anything wrong with our SQS setup?
We are running the RDBLoader on a single Fargate instance. Does it help if it runs on multiple instances in parallel?

Hi @dadasami,

How long was it taking to load a folder? It seems that visibility timeout is way too low, by the time Loader tries to acknowledge (delete) the message - it is already “lost” to SQS. In 1.0.0 we set it to 5 minutes and acknowledge after the folder has been loaded. As a result if Loader sees the message again - it just considered a duplicate and ignored, but that’s because we have a manifest in 1.0.0, R35 doesn’t have this feature.

I’m still trying to wrap my head around Shredding EMR spark config (IOException: All datanodes ... are bad) - #5 by dadasami to help you to migrate to 1.0.1.

Is there anything wrong with our SQS setup?

I think you can set visibility timeout only in code? If I’m wrong - I definitely recommend you to set to something higher.

We are running the RDBLoader on a single Fargate instance. Does it help if it runs on multiple instances in parallel?

No, certainly not. Loader is designed to be running as a singleton to avoid race conditions and overwhelming Redshift.

1 Like

Hi @anton,

Thanks a lot for your help. I’ll set the visibility timeout to a higher amount and let you know how it changes the pipeline behavior.

One gerenal question is whether the RDBLoader and the Shredder version 1.0.0 are co-dependent or can they be updated independently? Then, we may be able to benefit from the RBDloader 1.0.0 manifest while waiting for the Shredder 1.0.1 if necessary.

Please feel free to reach out in case you have any question about our failed update attempt. Yet I cannot say with certainty whether the Shredder was ignoring the shredded files fully or partially, but we could see multiple versions for part of the runs and the shredder jobs were taking much longer than what we were used to before the update.

No, sorry, in 1.0.0 we made one last major change and combined good and bad outputs under single run folder. In 1.x branch though we promise full compatibility - any Shredder will be able to work with any Loader.

I think I do know what happened with your Shredder in original post - will respond there shortly.

2 Likes