Enricher ERROR com.google.cloud.pubsub.v1.StreamingSubscriberConnection

Hello Everyone,

I have been seeing a lot of these errors in enricher pod logs.

Apr 12, 2022 12:30:32 PM com.google.cloud.pubsub.v1.StreamingSubscriberConnection$2 onFailure
WARNING: failed to send operations
com.google.api.gax.rpc.UnavailableException: io.grpc.StatusRuntimeException: UNAVAILABLE: 502:Bad Gateway
at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:69)
at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72)
at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60)
at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68)
at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1050)
at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1176)
at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:969)
at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:760)
at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:545)
at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:515)
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:426)
at io.grpc.internal.ClientCallImpl.access$500(ClientCallImpl.java:66)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:689)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$900(ClientCallImpl.java:577)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:751)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:740)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: io.grpc.StatusRuntimeException: UNAVAILABLE: 502:Bad Gateway
at io.grpc.Status.asRuntimeException(Status.java:533)
... 15 more

I can see several spikes in collector’s output in raw google pub/sub subscription with un-ack message count going over 50000 and above probably because enricher is throwing the above error messages.

I noticed that there are no enricher pod restarts, so not sure what these error messages mean in context.

This is our snowplow setup in google kubernetes engine.
JS tracker → collector(gke) → google pub/sub → enricher → google pub/sub → stream loader → Google Big query

These are the versions used.

Collector : snowplow/scala-stream-collector-pubsub:2.4.5
Enricher  : snowplow/snowplow-enrich-pubsub:2.0.5
Stream_loader : snowplow/snowplow-bigquery-streamloader:1.2.0
Repeater : snowplow/snowplow-bigquery-streamloader:1.2.0
Mutator : snowplow/snowplow-bigquery-mutator:1.2.0

Any help is really appreciated.

Any 502 responses should get automatically retried so that you don’t lose these messages so I’d check whether you are getting the expected number of messages being emitted from enrich into the PubSub stream.

If you are getting consistent spikes of 5xx errors I’d suggest the best course of action would be to contact GCP support as the pipeline isn’t able to do much to address this other than continue to gracefully retry.

Thanks @mike , i am trying to get GCP support to help out.
Meanwhile i can see many of the below error message in enricher pod.

Apr 13, 2022 1:36:43 AM com.google.cloud.pubsub.v1.StreamingSubscriberConnection$2 onFailure
WARNING: failed to send operations
com.google.api.gax.rpc.InternalException: io.grpc.StatusRuntimeException: INTERNAL: http2 exception
at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:67)
at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72)
at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60)
at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68)
at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1050)
at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1176)
at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:969)
at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:760)
at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:545)
at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:515)
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:426)
at io.grpc.internal.ClientCallImpl.access$500(ClientCallImpl.java:66)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:689)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$900(ClientCallImpl.java:577)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:751)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:740)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: io.grpc.StatusRuntimeException: INTERNAL: http2 exception
at io.grpc.Status.asRuntimeException(Status.java:533)
… 15 more
Caused by: io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2Exception$StreamException: Maximum active streams violated for this endpoint.
at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2Exception.streamError(Http2Exception.java:147)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2Connection$DefaultEndpoint.checkNewStreamAllowed(DefaultHttp2Connection.java:896)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2Connection$DefaultEndpoint.createStream(DefaultHttp2Connection.java:748)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2Connection$DefaultEndpoint.createStream(DefaultHttp2Connection.java:668)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder.writeHeaders0(DefaultHttp2ConnectionEncoder.java:201)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder.writeHeaders(DefaultHttp2ConnectionEncoder.java:167)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DecoratingHttp2FrameWriter.writeHeaders(DecoratingHttp2FrameWriter.java:53)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.StreamBufferingEncoder.writeHeaders(StreamBufferingEncoder.java:153)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.StreamBufferingEncoder.writeHeaders(StreamBufferingEncoder.java:141)
at io.grpc.netty.shaded.io.grpc.netty.NettyClientHandler.createStreamTraced(NettyClientHandler.java:579)
at io.grpc.netty.shaded.io.grpc.netty.NettyClientHandler.createStream(NettyClientHandler.java:562)
at io.grpc.netty.shaded.io.grpc.netty.NettyClientHandler.write(NettyClientHandler.java:323)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.write(DefaultChannelPipeline.java:1015)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannel.write(AbstractChannel.java:289)
at io.grpc.netty.shaded.io.grpc.netty.WriteQueue$AbstractQueuedCommand.run(WriteQueue.java:213)
at io.grpc.netty.shaded.io.grpc.netty.WriteQueue.flush(WriteQueue.java:128)
at io.grpc.netty.shaded.io.grpc.netty.WriteQueue.drainNow(WriteQueue.java:114)
at io.grpc.netty.shaded.io.grpc.netty.NettyClientHandler.goingAway(NettyClientHandler.java:778)
at io.grpc.netty.shaded.io.grpc.netty.NettyClientHandler.access$300(NettyClientHandler.java:91)
at io.grpc.netty.shaded.io.grpc.netty.NettyClientHandler$3.onGoAwayReceived(NettyClientHandler.java:275)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2Connection.goAwayReceived(DefaultHttp2Connection.java:236)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.onGoAwayRead0(DefaultHttp2ConnectionDecoder.java:218)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onGoAwayRead(DefaultHttp2ConnectionDecoder.java:551)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger$1.onGoAwayRead(Http2InboundFrameLogger.java:119)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readGoAwayFrame(DefaultHttp2FrameReader.java:591)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:272)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:160)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:174)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:378)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438)
at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:498)
at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:437)
at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1486)
at io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1235)
at io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1282)
at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:498)
at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:437)
at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
at io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
at io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
… 1 more

@ashish_george - did you ever get to the bottom of this issue? I just had a very similar incident with my enricher and I’m trying to understand root cause and figure out how to prevent it happening again.

@Jake_Beresford Sorry no luck with that.