拉谷歌 PubSub 消息时 io.grpc 帧大小超过最大值

     2023-02-16     150

关键词:

【中文标题】拉谷歌 PubSub 消息时 io.grpc 帧大小超过最大值【英文标题】:io.grpc frame size exceeds maximum when pull google PubSub message 【发布时间】:2017-04-30 06:06:57 【问题描述】:

有人知道在使用 Google PubSub 服务时如何增加 io.grpc maxMessageSize 吗?

现在,当我拉出一条大消息时,有时会收到此异常:

Exception in thread "main" com.google.cloud.pubsub.PubSubException: io.grpc.StatusRuntimeException: INTERNAL: Exception deframing message
    at com.google.cloud.pubsub.spi.DefaultPubSubRpc$1.apply(DefaultPubSubRpc.java:193)
    at com.google.cloud.pubsub.spi.DefaultPubSubRpc$1.apply(DefaultPubSubRpc.java:187)
    at com.google.common.util.concurrent.Futures$CatchingFuture.doFallback(Futures.java:842)
    at com.google.common.util.concurrent.Futures$CatchingFuture.doFallback(Futures.java:834)
    at com.google.common.util.concurrent.Futures$AbstractCatchingFuture.run(Futures.java:789)
    at com.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:456)
    at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:817)
    at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:753)
    at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:634)
    at com.google.api.gax.grpc.RetryingCallable$RetryingResultFuture.setException(RetryingCallable.java:202)
    at com.google.api.gax.grpc.RetryingCallable$Retryer.onFailure(RetryingCallable.java:147)
    at com.google.common.util.concurrent.Futures$6.run(Futures.java:1764)
    at com.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:456)
    at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:817)
    at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:753)
    at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:634)
    at com.google.api.gax.grpc.ExceptionTransformingCallable$ExceptionTransformingFuture.onFailure(ExceptionTransformingCallable.java:113)
    at com.google.common.util.concurrent.Futures$6.run(Futures.java:1764)
    at com.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:456)
    at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:817)
    at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:753)
    at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:634)
    at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:466)
    at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:442)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:481)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$600(ClientCallImpl.java:398)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:513)
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:52)
    at io.grpc.internal.SerializingExecutor$TaskRunner.run(SerializingExecutor.java:154)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: com.google.api.gax.grpc.ApiException:        io.grpc.StatusRuntimeException: INTERNAL: Exception deframing message
    ... 20 more
Caused by: io.grpc.StatusRuntimeException: INTERNAL: Exception deframing message
    at io.grpc.Status.asRuntimeException(Status.java:545)
    ... 13 more
Caused by: io.grpc.StatusRuntimeException: INTERNAL: Frame size 5454271 exceeds maximum: 4194304. If this is normal, increase the maxMessageSize in the channel/server builder
    at io.grpc.Status.asRuntimeException(Status.java:536)
    at io.grpc.internal.MessageDeframer.processHeader(MessageDeframer.java:338)
    at io.grpc.internal.MessageDeframer.deliver(MessageDeframer.java:240)
    at io.grpc.internal.MessageDeframer.deframe(MessageDeframer.java:176)
    at io.grpc.internal.AbstractStream.deframe(AbstractStream.java:276)
    at io.grpc.internal.AbstractClientStream.inboundDataReceived(AbstractClientStream.java:150)
    at io.grpc.internal.Http2ClientStream.transportDataReceived(Http2ClientStream.java:137)
    at io.grpc.netty.NettyClientStream.transportDataReceived(NettyClientStream.java:180)
    at io.grpc.netty.NettyClientHandler.onDataRead(NettyClientHandler.java:251)
    at io.grpc.netty.NettyClientHandler.access$700(NettyClientHandler.java:92)
    at io.grpc.netty.NettyClientHandler$FrameListener.onDataRead(NettyClientHandler.java:588)
    at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onDataRead(DefaultHttp2ConnectionDecoder.java:256)
    at io.netty.handler.codec.http2.Http2InboundFrameLogger$1.onDataRead(Http2InboundFrameLogger.java:48)
    at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readDataFrame(DefaultHttp2FrameReader.java:410)
    at io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:244)
    at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:155)
    at io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41)
    at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:113)
    at io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:333)
    at io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:393)
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:411)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
    at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1066)
    at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:900)
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:411)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:571)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:512)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:426)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:398)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:877)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
    ... 1 more

该应用是用 java 编写的,使用 http://googlecloudplatform.github.io/google-cloud-java/0.6.0/apidocs/index.html?com/google/cloud/pubsub/PubSub.html 并在 Google Compute Engine 实例中运行。

【问题讨论】:

【参考方案1】:

您可以通过NettyChannelBuilder.maxMessageSize() 之类的方式配置频道。

在 grpc v1.1 中,您将能够使用 ManagedChannelBuilder.maxInboundMessageSize() 并避免直接使用 NettyChannelBuilder。您还可以在 CallOptionsAbstractStub 上使用 withMaxInboundMessageSize() 对每个呼叫进行配置。

【讨论】:

数据流中的错误:io.grpc.StatusRuntimeException:不可用

...8:22【问题描述】:我正在尝试构建一个从BigQuery读取并向Pubsub主题发送消息的Dataflow作业。我一直在与相互依赖作斗争,这是最新一期。Dataflow作业开始正常 查看详情

谷歌 PubSub 存在拉式订阅者设计缺陷?

】谷歌PubSub存在拉式订阅者设计缺陷?【英文标题】:GooglePubSubwithpullsubscriberdesignflaw?【发布时间】:2021-06-3000:37:47【问题描述】:我们使用googlessteamingpull订阅者设计如下我们正在做将文件从FE(前端)发送到BE(后端)将该文... 查看详情

Google PubSub - 获取最后一条消息

...【发布时间】:2016-10-1221:31:41【问题描述】:我开始使用谷歌应用引擎及其提供的服务。我特别感兴趣的一项服务是云发布订阅。我的计划是有一个nodejssocket.io服务器订阅某个pubsub主题,并且每当主题收到发布时,它都会将该发... 查看详情

JSON 数据作为 PubSub 的消息

...PubSub【发布时间】:2021-01-2309:36:30【问题描述】:我根据谷歌“将消息发布到主题”指南编写了以下代码:import("context""fmt""io""cloud.google.com/go/pubsub")funcpublishMessage(wio.Writer,projectID,topicID)errormsg:= 查看详情

是否可以在同步拉取 pubsub 时取消确认消息

】是否可以在同步拉取pubsub时取消确认消息【英文标题】:Isitpossibletounacknowledgemessagesinsynchronouspullforpubsub【发布时间】:2021-09-2813:37:55【问题描述】:我正在通过订阅提取pubsub消息,并且需要在处理之前确认这些消息,因为我... 查看详情

使用 Google Cloud PubSub 不断收到“向 Cloud PubSub 发送测试消息时出错...”

】使用GoogleCloudPubSub不断收到“向CloudPubSub发送测试消息时出错...”【英文标题】:Keepgetting\'ErrorsendingtestmessagetoCloudPubSub...\'withGoogleCloudPubSub【发布时间】:2015-11-1005:14:03【问题描述】:我正在尝试将Google的推送PubSub设置到我的... 查看详情

没有消息时的 Google Cloud PubSub 费用

】没有消息时的GoogleCloudPubSub费用【英文标题】:CostofGoogleCloudPubSubwhentherearenomessages【发布时间】:2019-07-3112:47:58【问题描述】:我正在查看Pub/Sub定价,如果订阅者点击端点但没有收到任何消息,我无法获得价格。因为我的想法... 查看详情

Google PubSub 如何在流式拉取消息时处理搜索到的消息

】GooglePubSub如何在流式拉取消息时处理搜索到的消息【英文标题】:GooglePubSubhowtoprocessseekedmessageswhilststreaming-pullmessages【发布时间】:2021-06-1910:13:29【问题描述】:我正在尝试在Python中处理seek(timestamp)返回的消息。我使用流式拉... 查看详情

云功能不向 PubSub 发送消息

】云功能不向PubSub发送消息【英文标题】:Cloudfunctiondoesn\'tsendmessagestoPubSub【发布时间】:2021-09-1900:46:05【问题描述】:总结:您好,我正在使用云功能作为由PubSub触发的异步后台工作程序。我有2个云函数,第一个将向CloudSQL发... 查看详情

GCP Pubsub 中的消息丢失和重复

】GCPPubsub中的消息丢失和重复【英文标题】:MessagelostandduplicatesinGCPPubsub【发布时间】:2017-10-2316:11:58【问题描述】:我在从Dataflow读取GCPPubSub时遇到问题,当在短时间内发布大量消息时,Dataflow将收到大部分发送的消息,除了一... 查看详情

在 MessageReciever 之外确认 pubSub 消息

】在MessageReciever之外确认pubSub消息【英文标题】:AckpubSubmessageoutsideoftheMessageReciever【发布时间】:2021-10-1722:41:38【问题描述】:我正在使用asyncPull从pupSub主题中提取消息,进行一些处理并将消息发送到ActiveMQ主题。使用pupSub的当... 查看详情

错误 403:向 Cloud PubSub 发送测试消息时出错:用户无权执行此操作

】错误403:向CloudPubSub发送测试消息时出错:用户无权执行此操作【英文标题】:Error403:ErrorsendingtestmessagetoCloudPubSub:Usernotauthorizedtoperformthisaction【发布时间】:2018-02-1809:12:41【问题描述】:我想设置推送通知手表,但收到错误响... 查看详情

GCP PubSub Spring Boot 重复提取消息

】GCPPubSubSpringBoot重复提取消息【英文标题】:GCPPubSubSpringBootrepeatextractmessage【发布时间】:2019-10-2508:47:35【问题描述】:我需要有关gcppub/sus问题的帮助。我有一个进程向pubsub发送100条带有过滤器的消息,另一个应用程序(在spri... 查看详情

Google PubSub 排序密钥问题,未启用消息排序

】GooglePubSub排序密钥问题,未启用消息排序【英文标题】:GooglePubSuborderingkeyissue,messageorderingnotenabled【发布时间】:2021-12-0515:12:11【问题描述】:我在将googlecloudpubsub消息发布到启用排序的订阅时遇到问题。我有一个pubsub主题:t... 查看详情

为啥使用 PUBSUB 订阅时无法 PING?

】为啥使用PUBSUB订阅时无法PING?【英文标题】:WhycanInotPINGwhenSubscribedusingPUBSUB?为什么使用PUBSUB订阅时无法PING?【发布时间】:2014-07-1613:48:22【问题描述】:我在Azure上使用PUBSUB时遇到问题。Azure防火墙将关闭空闲时间不限的连... 查看详情

GCP Pubsub 未传递消息的数量不会改变

】GCPPubsub未传递消息的数量不会改变【英文标题】:GCPPubsubnumofundeliveredmessageswontchange【发布时间】:2021-08-2423:59:27【问题描述】:我正在使用pubsub来触发我定义为最多具有10个实例的云功能。当大约300条或更多消息到达主题并开... 查看详情

具有 Cloud pubsub 的 Cloud Functions 触发消息排队

】具有Cloudpubsub的CloudFunctions触发消息排队【英文标题】:CloudFunctionswithCloudpubsubtriggerqueuingthemessages【发布时间】:2021-09-0115:53:39【问题描述】:我们有一个带有Pubsub触发器的CloudFunctions,它将根据消息调用HTTP端点上的应用程序... 查看详情

将 pubsub 消息转换为 websocket 事件

】将pubsub消息转换为websocket事件【英文标题】:Convertingpubsubmessagestowebsocketevents【发布时间】:2021-11-2505:49:58【问题描述】:当前架构我有一个为特定资源生成事件的微服务(我们称之为发布者)。有一个Web应用程序显示资源的... 查看详情