Unknown Scala Collector Error

Hi ya, I’m running the Scala collector snowplow-stream-collector-kinesis-0.15.0.jar and I got some errors which crashed the collector.

Questions:

  1. Is there a best practices way to keep the Scala collector running/restart it? I’m running on EC2 amzn-linux.
  2. Can anyone understand this error? I can’t seem to understand what’s causing it:
[scala-stream-collector-akka.actor.default-dispatcher-3316] INFO com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink - Writing 47 Thrift records to Kinesis stream snowplow-collector-good
[ERROR] [11/25/2020 16:35:51.616] [scala-stream-collector-akka.actor.default-dispatcher-3316] [akka.actor.ActorSystemImpl(scala-stream-collector)] Error during processing of request: 'Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@36175e36 rejected from java.util.concurrent.ScheduledThreadPoolExecutor@7276936f[Shutting down, pool size = 10, active threads = 10, queued tasks = 1838, completed tasks = 183760767]'. Completing with 500 Internal Server Error response. To change default exception handling behavior, provide a custom ExceptionHandler.
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@36175e36 rejected from java.util.concurrent.ScheduledThreadPoolExecutor@7276936f[Shutting down, pool size = 10, active threads = 10, queued tasks = 1838, completed tasks = 183760767]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
	at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
	at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
	at java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622)
	at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:136)
	at scala.concurrent.impl.Future$.apply(Future.scala:31)
	at scala.concurrent.Future$.apply(Future.scala:494)
	at com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink.multiPut(KinesisSink.scala:260)
	at com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink.sendBatch(KinesisSink.scala:235)
	at com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink$EventStorage$.flush(KinesisSink.scala:210)
	at com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink$EventStorage$.store(KinesisSink.scala:196)
	at com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink$$anonfun$storeRawEvents$1.apply(KinesisSink.scala:217)
	at com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink$$anonfun$storeRawEvents$1.apply(KinesisSink.scala:217)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink.storeRawEvents(KinesisSink.scala:217)
	at com.snowplowanalytics.snowplow.collectors.scalastream.CollectorService.sinkEvent(CollectorService.scala:218)
	at com.snowplowanalytics.snowplow.collectors.scalastream.CollectorService.cookie(CollectorService.scala:113)
	at com.snowplowanalytics.snowplow.collectors.scalastream.CollectorRoute$$anonfun$collectorRoute$1$$anonfun$apply$1$$anonfun$apply$2$$anonfun$apply$3$$anonfun$apply$4$$anonfun$apply$5$$anonfun$apply$6$$anonfun$apply$7.apply(CollectorRoute.scala:50)
	at com.snowplowanalytics.snowplow.collectors.scalastream.CollectorRoute$$anonfun$collectorRoute$1$$anonfun$apply$1$$anonfun$apply$2$$anonfun$apply$3$$anonfun$apply$4$$anonfun$apply$5$$anonfun$apply$6$$anonfun$apply$7.apply(CollectorRoute.scala:49)
	at akka.http.scaladsl.server.util.ApplyConverterInstances$$anon$1$$anonfun$apply$1.apply(ApplyConverterInstances.scala:14)
	at akka.http.scaladsl.server.util.ApplyConverterInstances$$anon$1$$anonfun$apply$1.apply(ApplyConverterInstances.scala:13)
	at akka.http.scaladsl.server.ConjunctionMagnet$$anon$2$$anonfun$apply$14$$anonfun$apply$15$$anonfun$apply$16.apply(Directive.scala:162)
	at akka.http.scaladsl.server.ConjunctionMagnet$$anon$2$$anonfun$apply$14$$anonfun$apply$15$$anonfun$apply$16.apply(Directive.scala:162)
	at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$mapRouteResult$1$$anonfun$apply$3.apply(BasicDirectives.scala:61)
	at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$mapRouteResult$1$$anonfun$apply$3.apply(BasicDirectives.scala:61)
	at akka.http.scaladsl.server.directives.FutureDirectives$$anonfun$onComplete$1$$anonfun$apply$1$$anonfun$apply$2.apply(FutureDirectives.scala:37)
	at akka.http.scaladsl.server.directives.FutureDirectives$$anonfun$onComplete$1$$anonfun$apply$1$$anonfun$apply$2.apply(FutureDirectives.scala:37)
	at akka.http.scaladsl.util.FastFuture$$anonfun$transformWith$extension0$1.apply(FastFuture.scala:37)
	at akka.http.scaladsl.util.FastFuture$$anonfun$transformWith$extension0$1.apply(FastFuture.scala:37)
	at akka.http.scaladsl.util.FastFuture$.akka$http$scaladsl$util$FastFuture$$strictTransform$1(FastFuture.scala:41)
	at akka.http.scaladsl.util.FastFuture$.transformWith$extension1(FastFuture.scala:45)
	at akka.http.scaladsl.util.FastFuture$.transformWith$extension0(FastFuture.scala:37)
	at akka.http.scaladsl.server.directives.FutureDirectives$$anonfun$onComplete$1$$anonfun$apply$1.apply(FutureDirectives.scala:37)
	at akka.http.scaladsl.server.directives.FutureDirectives$$anonfun$onComplete$1$$anonfun$apply$1.apply(FutureDirectives.scala:35)
	at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
	at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
	at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
	at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
	at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$mapRouteResult$1$$anonfun$apply$3.apply(BasicDirectives.scala:61)
	at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$mapRouteResult$1$$anonfun$apply$3.apply(BasicDirectives.scala:61)
	at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
	at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
	at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:44)
	at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:42)
	at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$mapRequestContext$1$$anonfun$apply$1.apply(BasicDirectives.scala:43)
	at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$mapRequestContext$1$$anonfun$apply$1.apply(BasicDirectives.scala:43)
	at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
	at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
	at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:44)
	at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:42)
	at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
	at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
	at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8$$anonfun$apply$9$$anonfun$apply$10.apply(Directive.scala:93)
	at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8$$anonfun$apply$9$$anonfun$apply$10.apply(Directive.scala:93)
	at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8$$anonfun$apply$9$$anonfun$apply$10.apply(Directive.scala:93)
	at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8$$anonfun$apply$9$$anonfun$apply$10.apply(Directive.scala:93)
	at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8$$anonfun$apply$9$$anonfun$apply$10.apply(Directive.scala:93)
	at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8$$anonfun$apply$9$$anonfun$apply$10.apply(Directive.scala:93)
	at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
	at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
	at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8.apply(Directive.scala:93)
	at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8.apply(Directive.scala:90)
	at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8.apply(Directive.scala:93)
	at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8.apply(Directive.scala:90)
	at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8.apply(Directive.scala:93)
	at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8.apply(Directive.scala:90)
	at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
	at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
	at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
	at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
	at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
	at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
	at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
	at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
	at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8$$anonfun$apply$9$$anonfun$apply$10.apply(Directive.scala:93)
	at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8$$anonfun$apply$9$$anonfun$apply$10.apply(Directive.scala:93)
	at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
	at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
	at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8.apply(Directive.scala:93)
	at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8.apply(Directive.scala:90)
	at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8$$anonfun$apply$11.apply(Directive.scala:94)
	at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8$$anonfun$apply$11.apply(Directive.scala:93)
	at akka.http.scaladsl.util.FastFuture$.akka$http$scaladsl$util$FastFuture$$strictTransform$1(FastFuture.scala:41)
	at akka.http.scaladsl.util.FastFuture$.transformWith$extension1(FastFuture.scala:45)
	at akka.http.scaladsl.util.FastFuture$.flatMap$extension(FastFuture.scala:26)
	at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8.apply(Directive.scala:93)
	at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8.apply(Directive.scala:90)
	at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:44)
	at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:42)
	at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:44)
	at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:42)
	at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:44)
	at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:42)
	at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:44)
	at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:42)
	at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:44)
	at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:42)
	at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$mapRouteResultWith$1$$anonfun$apply$4.apply(BasicDirectives.scala:67)
	at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$mapRouteResultWith$1$$anonfun$apply$4.apply(BasicDirectives.scala:67)
	at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
	at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
	at akka.http.scaladsl.server.directives.ExecutionDirectives$$anonfun$handleExceptions$1$$anonfun$apply$1.apply(ExecutionDirectives.scala:32)
	at akka.http.scaladsl.server.directives.ExecutionDirectives$$anonfun$handleExceptions$1$$anonfun$apply$1.apply(ExecutionDirectives.scala:28)
	at akka.http.scaladsl.server.Route$$anonfun$asyncHandler$1.apply(Route.scala:79)
	at akka.http.scaladsl.server.Route$$anonfun$asyncHandler$1.apply(Route.scala:78)
	at akka.stream.impl.fusing.MapAsync$$anon$24.onPush(Ops.scala:1169)
	at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:747)
	at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:710)
	at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:616)
	at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:471)
	at akka.stream.impl.fusing.GraphInterpreterShell.receive(ActorGraphInterpreter.scala:423)
	at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:603)
	at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:618)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
	at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:529)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
	at akka.actor.ActorCell.invoke(ActorCell.scala:495)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

[WARN] [11/25/2020 16:35:51.769] [scala-stream-collector-akka.actor.default-dispatcher-3316] [akka.actor.ActorSystemImpl(scala-stream-collector)] Illegal header: Illegal 'user-agent' header: Invalid input '[', expected 'EOI', product-or-comment, WSP, comment or CRLF (line 1, column 162): Mozilla/5.0 (Linux; Android 10; SM-G965U Build/QP1A.190711.020; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/86.0.4240.198 Mobile Safari/537.36 [FB_IAB/FB4A;FBAV/297.0.0.36.116;]
                                                                                                                                                                 ^
[DEBUG] [11/25/2020 16:35:51.769] [scala-stream-collector-akka.actor.default-dispatcher-3326] [akka://scala-stream-collector/system/IO-TCP/selectors/$a/0] New connection accepted
[DEBUG] [11/25/2020 16:35:51.916] [scala-stream-collector-akka.actor.default-dispatcher-3320] [akka://scala-stream-collector/system/IO-TCP/selectors/$a/0] New connection accepted

The above error appeared to have happened several times in succession then the logs just stop.

Any help would be most appreciated!
Patrick

Hi @pcb,

That seems to be a Kinesis outage. I’m bit surprised the collector crashed not with OOM though - this is something we’ve observed in the past, but allso 0.15.0 is a very old version (almost 2 years old). In recent version we’ve added an SQS buffer designed specifically to handle outages like that:

I recommend you to upgrade to the latest 2.1.0 version.