订阅者的Spring pubsub过滤消息

     2023-02-16     59

关键词:

【中文标题】订阅者的Spring pubsub过滤消息【英文标题】:Spring pubsub Filter Messages for subscriber 【发布时间】:2021-12-19 14:28:07 【问题描述】:

我尝试使用 pubsub 实现请求异步响应模式。我正在通过使用弹簧集成来做到这一点。我在两端定义了一个主题和两个订阅(用于异步响应的事件发送者和用于请求的事件消费者)。但是如果消费者发送响应,它会发送给发送者和消费者。但是消费者事件是空的。到目前为止一切都很好。我的问题是如何为 springs pubsub 集成中的消息定义过滤器。它是 Google pubsub 中的一项功能。

【问题讨论】:

有机会看到你的一些代码以更好地了解发生了什么吗? 抱歉,下个工作日可以创建快速入门。到目前为止,我没有任何可用的过滤代码。目前只实现了事件发送。 对,但是到目前为止,您已经确定了要添加过滤的位置。可能简单的 Spring Integration Filter 对你来说就足够了:docs.spring.io/spring-integration/docs/current/reference/html/… 看起来很有希望。我会试一试,然后回来。 这个答案对你有帮助吗? 【参考方案1】:

我建议您使用 Spring 过滤器来让您采取行动,无论是应该丢弃消息还是将消息传递到消息通道。你可以看到更多information。

您可以查看这些示例。

@Filter(inputChannel = "inputChannel", outputChannel = "secretChannel")
boolean filter(Message<?> message) 
    String msg = message.getPayload().toString();
    return msg.contains("secret");

您可以查看这些示例,了解如何在 spring 中实现过滤。你可以看到更多examples。

package com.zj.node.contentcenter.controller.content;
import lombok.RequiredArgsConstructor;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
 
/**
 * Producer
 *
 * @author 01
 * @date 2019-08-03
 **/
@RestController
@RequiredArgsConstructor
public class TestProducerController 
    private final Source source;
    @GetMapping("/stream-send-msg")
    public String streamSendMsg(String flagHeader) 
        source.output().send(
                MessageBuilder.withPayload("Message Body")
                        // Setting the header for filtering messages
                        .setHeader("flag-header", flagHeader)
                        .build()
        );
        return "send message success!";
    

【讨论】:

pubsub 订阅者出错:超出最大消息大小

】pubsub订阅者出错:超出最大消息大小【英文标题】:Errorinpubsubsubscriber:Maxmessagesizeexceeded【发布时间】:2017-06-2318:45:27【问题描述】:我正在为我的应用程序使用GoogleCloudPubsub。pubsub主题的订阅者是用Javascript编写的,并在Nodejs... 查看详情

javascript发布订阅pubsub模式

首先使用数组缓存订阅者订阅的消息,当订阅者订阅消息的时候,把订阅的消息push到指定消息的队列中,当发布者发布消息的时候,我们遍历执行push到指定消息队列中的回调事件。varPubsub=(function(){vareventObj={};return{subscribe:functio... 查看详情

GCP PubSub Spring Boot 重复提取消息

】GCPPubSubSpringBoot重复提取消息【英文标题】:GCPPubSubSpringBootrepeatextractmessage【发布时间】:2019-10-2508:47:35【问题描述】:我需要有关gcppub/sus问题的帮助。我有一个进程向pubsub发送100条带有过滤器的消息,另一个应用程序(在spri... 查看详情

[已解决]Pubsub 推送订阅不确认消息

】[已解决]Pubsub推送订阅不确认消息【英文标题】:Pubsubpushsubscriptionnotacknowledgingmessages【发布时间】:2021-03-0502:55:27【问题描述】:这是我的设置。订阅A是一种推送订阅,可将消息发布到云运行部署。该部署公开了一个HTTP端点... 查看详情

Google Cloud PubSub 不确认消息

...9-07-0223:38:09【问题描述】:我们有基于GCPPubSub的发布者和订阅者系统。订阅者处理单个消息的时间很长,大约1分钟。我们已经将订阅者确认截止时间设置为600秒(10分钟)(最长1秒),以确保pubsub不会过早开始重新交付,因为... 查看详情

为啥 PubSub 订阅在保留期到期后向死信主题发布消息

...要在PubSub中跟踪未传递的消息。但是,当PubSubPull订阅的订阅者在保留期后不可 查看详情

如何使用当前的 pubsub 订阅者从 google Pub/Sub 系统获取消息

】如何使用当前的pubsub订阅者从googlePub/Sub系统获取消息【英文标题】:HowtogetmessagesfromgooglesPub/Subsytsembyusingthecurrentpubsubsubsciber【发布时间】:2019-06-0909:09:58【问题描述】:我需要使用基于python的订阅者从googlesPub/Sub系统接收已发... 查看详情

在 PubSub 订阅确认截止日期和重新传递的上下文中,未发送给订阅者的消息是啥意思?

...PubSub订阅确认截止日期和重新传递的上下文中,未发送给订阅者的消息是啥意思?【英文标题】:WhatisthemeaningofmessagesoutstandingtoasubscriberinthecontextofPubSubsubscriptionacknowledgedeadlinesandre-delivery?在PubSub订阅确认截止日期和重新传递的上... 查看详情

如何将 GCP Pubsub 订阅的消息转发到另一个主题?

】如何将GCPPubsub订阅的消息转发到另一个主题?【英文标题】:HowcanIforwardaGCPPubsubsubscription\'smessagestoanothertopic?【发布时间】:2017-04-0111:35:34【问题描述】:我希望能够将GoogleCloudPlatform(GCP)Pubsub订阅的消息转发到另一个GCP主题,... 查看详情

如何在使用 triggerTopic 创建云功能时设置发布订阅消息过滤器

...模板来创建它。但是我找不到任何方法来为该云功能设置订阅者消息过滤器。我尝试在创 查看详情

谷歌 PubSub 存在拉式订阅者设计缺陷?

】谷歌PubSub存在拉式订阅者设计缺陷?【英文标题】:GooglePubSubwithpullsubscriberdesignflaw?【发布时间】:2021-06-3000:37:47【问题描述】:我们使用googlessteamingpull订阅者设计如下我们正在做将文件从FE(前端)发送到BE(后端)将该文... 查看详情

基于特定电子邮件 ID 的 Google Pubsub 订阅

】基于特定电子邮件ID的GooglePubsub订阅【英文标题】:GooglePubsubSubscriptionbasedonparticularemailid【发布时间】:2021-01-1702:33:43【问题描述】:我对GCP平台比较陌生。我需要创建一个系统,让我的团队在收到来自客户端的电子邮件时收... 查看详情

没有消息时的 Google Cloud PubSub 费用

...019-07-3112:47:58【问题描述】:我正在查看Pub/Sub定价,如果订阅者点击端点但没有收到任何消息,我无法获得价格。因为我的想法是24小时运行工人。【问题讨论】:【参考方案1】:使用今天的定价模式,如果您的订阅者发送Pull... 查看详情

vue2.0—消息订阅与发布pubsub(二十)

【Vue2.0】—消息订阅与发布pubsub(二十) 查看详情

Google pubsub golang 订阅者在空闲几个小时后停止接收新发布的消息

】Googlepubsubgolang订阅者在空闲几个小时后停止接收新发布的消息【英文标题】:Googlepubsubgolangsubscriberstopsreceivingnewpublishedmessage(s)afterbeingidleforafewhours【发布时间】:2019-08-2518:16:02【问题描述】:我在googlepubsub中创建了一个TOPIC,... 查看详情

如何使用 GCP 在 pubsub 模型中一次向所有订阅者发送消息

】如何使用GCP在pubsub模型中一次向所有订阅者发送消息【英文标题】:HowtosendmessagetoallsubscribersatonceinpubsubmodelusingGCP【发布时间】:2021-03-1602:53:30【问题描述】:使用google云平台实现pubsub模型,使用函数创建topic、subscriber、publish... 查看详情

Google Pubsub - 接收推送订阅的传递尝试

】GooglePubsub-接收推送订阅的传递尝试【英文标题】:GooglePubsub-Receivedeliveryattemptforpushsubscription【发布时间】:2021-06-0120:57:17【问题描述】:我有一个由Pubsub推送订阅触发的Google云功能。我想知道给定消息的当前传递尝试。在请... 查看详情

Pubsub.pull 请求无法正常工作 - 去吧

...。但是即使订阅中存在消息,消息也不会拉取请求请求。订阅者正在等待处理所有消息。我正在尝试使用基本代码,一次提取一条消息。我使用了两个实例,并在两个实例的后台运行脚本( 查看详情