Enricher ERROR com.google.cloud.pubsub.v1.StreamingSubscriberConnection

Hello Everyone,

I have been seeing a lot of these errors in enricher pod logs.

Apr 12, 2022 12:30:32 PM com.google.cloud.pubsub.v1.StreamingSubscriberConnection$2 onFailure
WARNING: failed to send operations
com.google.api.gax.rpc.UnavailableException: io.grpc.StatusRuntimeException: UNAVAILABLE: 502:Bad Gateway
at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:69)
at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72)
at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60)
at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68)
at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1050)
at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1176)
at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:969)
at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:760)
at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:545)
at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:515)
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:426)
at io.grpc.internal.ClientCallImpl.access$500(ClientCallImpl.java:66)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:689)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$900(ClientCallImpl.java:577)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:751)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:740)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: io.grpc.StatusRuntimeException: UNAVAILABLE: 502:Bad Gateway
at io.grpc.Status.asRuntimeException(Status.java:533)
... 15 more

I can see several spikes in collector’s output in raw google pub/sub subscription with un-ack message count going over 50000 and above probably because enricher is throwing the above error messages.

I noticed that there are no enricher pod restarts, so not sure what these error messages mean in context.

This is our snowplow setup in google kubernetes engine.
JS tracker → collector(gke) → google pub/sub → enricher → google pub/sub → stream loader → Google Big query

These are the versions used.

Collector : snowplow/scala-stream-collector-pubsub:2.4.5
Enricher  : snowplow/snowplow-enrich-pubsub:2.0.5
Stream_loader : snowplow/snowplow-bigquery-streamloader:1.2.0
Repeater : snowplow/snowplow-bigquery-streamloader:1.2.0
Mutator : snowplow/snowplow-bigquery-mutator:1.2.0

Any help is really appreciated.

Any 502 responses should get automatically retried so that you don’t lose these messages so I’d check whether you are getting the expected number of messages being emitted from enrich into the PubSub stream.

If you are getting consistent spikes of 5xx errors I’d suggest the best course of action would be to contact GCP support as the pipeline isn’t able to do much to address this other than continue to gracefully retry.

Thanks @mike , i am trying to get GCP support to help out.
Meanwhile i can see many of the below error message in enricher pod.

Apr 13, 2022 1:36:43 AM com.google.cloud.pubsub.v1.StreamingSubscriberConnection$2 onFailure
WARNING: failed to send operations
com.google.api.gax.rpc.InternalException: io.grpc.StatusRuntimeException: INTERNAL: http2 exception
at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:67)
at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72)
at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60)
at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68)
at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1050)
at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1176)
at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:969)
at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:760)
at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:545)
at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:515)
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:426)
at io.grpc.internal.ClientCallImpl.access$500(ClientCallImpl.java:66)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:689)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$900(ClientCallImpl.java:577)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:751)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:740)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: io.grpc.StatusRuntimeException: INTERNAL: http2 exception
at io.grpc.Status.asRuntimeException(Status.java:533)
… 15 more
Caused by: io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2Exception$StreamException: Maximum active streams violated for this endpoint.
at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2Exception.streamError(Http2Exception.java:147)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2Connection$DefaultEndpoint.checkNewStreamAllowed(DefaultHttp2Connection.java:896)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2Connection$DefaultEndpoint.createStream(DefaultHttp2Connection.java:748)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2Connection$DefaultEndpoint.createStream(DefaultHttp2Connection.java:668)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder.writeHeaders0(DefaultHttp2ConnectionEncoder.java:201)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder.writeHeaders(DefaultHttp2ConnectionEncoder.java:167)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DecoratingHttp2FrameWriter.writeHeaders(DecoratingHttp2FrameWriter.java:53)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.StreamBufferingEncoder.writeHeaders(StreamBufferingEncoder.java:153)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.StreamBufferingEncoder.writeHeaders(StreamBufferingEncoder.java:141)
at io.grpc.netty.shaded.io.grpc.netty.NettyClientHandler.createStreamTraced(NettyClientHandler.java:579)
at io.grpc.netty.shaded.io.grpc.netty.NettyClientHandler.createStream(NettyClientHandler.java:562)
at io.grpc.netty.shaded.io.grpc.netty.NettyClientHandler.write(NettyClientHandler.java:323)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.write(DefaultChannelPipeline.java:1015)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannel.write(AbstractChannel.java:289)
at io.grpc.netty.shaded.io.grpc.netty.WriteQueue$AbstractQueuedCommand.run(WriteQueue.java:213)
at io.grpc.netty.shaded.io.grpc.netty.WriteQueue.flush(WriteQueue.java:128)
at io.grpc.netty.shaded.io.grpc.netty.WriteQueue.drainNow(WriteQueue.java:114)
at io.grpc.netty.shaded.io.grpc.netty.NettyClientHandler.goingAway(NettyClientHandler.java:778)
at io.grpc.netty.shaded.io.grpc.netty.NettyClientHandler.access$300(NettyClientHandler.java:91)
at io.grpc.netty.shaded.io.grpc.netty.NettyClientHandler$3.onGoAwayReceived(NettyClientHandler.java:275)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2Connection.goAwayReceived(DefaultHttp2Connection.java:236)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.onGoAwayRead0(DefaultHttp2ConnectionDecoder.java:218)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onGoAwayRead(DefaultHttp2ConnectionDecoder.java:551)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger$1.onGoAwayRead(Http2InboundFrameLogger.java:119)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readGoAwayFrame(DefaultHttp2FrameReader.java:591)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:272)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:160)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:174)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:378)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438)
at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:498)
at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:437)
at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1486)
at io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1235)
at io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1282)
at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:498)
at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:437)
at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
at io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
at io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
… 1 more