springcloud集成stream(代码片段)

大忽悠爱忽悠 大忽悠爱忽悠     2023-01-16     675

关键词:


Stream为什么被引入

常见MQ(消息中间件):

  • ActiveMQ
  • RabbitMQ
  • RocketMQ
  • Kafka

有没有一种新的技术诞生,让我们不再关注具体MQ的细节,我们只需要用一种适配绑定的方式,自动的给我们在各种MQ内切换。(类似于Hibernate)

Cloud Stream是什么?屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。


Stream是什么及Binder介绍

什么是Spring Cloud Stream?

官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架。

应用程序通过inputs或者 outputs 来与Spring Cloud Streambinder对象交互。

通过我们配置来binding(绑定),而Spring Cloud Streambinder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与Spring Cloud Stream交互就可以方便使用消息驱动的方式。

通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。

Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。

目前仅支持RabbitMQ、 Kafka


Stream的设计思想

标准MQ

  • 生产者/消费者之间靠消息媒介传递信息内容
  • 消息必须走特定的通道 - 消息通道 Message Channel
  • 消息通道里的消息如何被消费呢,谁负责收发处理 - 消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息处理器所订阅。

为什么用Cloud Stream?

比方说我们用到了RabbitMQKafka,由于这两个消息中间件的架构上的不同,像RabbitMQexchangekafkaTopicPartitions分区。

这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候Spring Cloud Stream给我们提供了—种解耦合的方式。


Stream凭什么可以统一底层差异?

在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现

通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。

Binder:

  • INPUT对应于消费者
  • OUTPUT对应于生产者

Stream中的消息通信方式遵循了发布-订阅模式

Topic主题进行广播

  • RabbitMQ就是Exchange
  • Kakfa中就是Topic

Stream编码常用注解简介

Spring Cloud Stream标准流程套路


  • Binder - 很方便的连接中间件,屏蔽差异。
  • Channel - 通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置
  • SourceSink - 简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。

编码API和常用注解

组成说明
Middleware中间件,目前只支持RabbitMQ和Kafka
BinderBinder是应用与消息中间件之间的封装,目前实现了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现
@Input注解标识输入通道,通过该输入通道接收到的消息进入应用程序
@Output注解标识输出通道,发布的消息将通过该通道离开应用程序
@StreamListener监听队列,用于消费者的队列的消息接收
@EnableBinding指信道channel和exchange绑定在一起

案例

准备RabbitMQ环境

工程中新建三个子模块

  • cloud-stream-rabbitmq-provider8801,作为生产者进行发消息模块
  • cloud-stream-rabbitmq-consumer8802,作为消息接收模块
  • cloud-stream-rabbitmq-consumer8803,作为消息接收模块

Stream消息驱动之生产者

新建Module:cloud-stream-rabbitmq-provider8801

POM

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>cloud_Parent</artifactId>
        <groupId>dhy.xpy</groupId>
        <version>520.521.finally</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>cloud-stream-rabbitmq-provider8801</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <!--基础配置-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

</project>

YML

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: 192.168.112.128
                port: 5672
                username: admin
                password: 123
      bindings: # 服务的整合处理
        #生产者
        output: # 这个名字是一个通道的名称
          #在MQ中相当于声明一个交换机
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置

eureka:
  client: # 客户端进行Eureka注册的配置
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
    instance-id: send-8801.com  # 在信息列表时显示主机名称
    prefer-ip-address: true     # 访问的路径变为IP地址

主启动

@SpringBootApplication
public class StreamMQMain8801 
    public static void main(String[] args) 
        SpringApplication.run(StreamMQMain8801.class,args);
    

业务类

1.发送消息接口

public interface IMessageProvider 
    public String send();

2.发送消息接口实现类

@EnableBinding(Source.class) //定义消息的推送管道
public class MessageProviderImpl implements IMessageProvider

    @Resource
    private MessageChannel output; // 消息发送管道

    @Override
    public String send()
    
        String serial = UUID.randomUUID().toString();
        output.send(MessageBuilder.withPayload(serial).build());
        System.out.println("*****serial: "+serial);
        return null;
    

3.Controller

@RestController
public class SendMessageController

    @Resource
    private IMessageProvider messageProvider;

    @GetMapping(value = "/sendMessage")
    public String sendMessage() 
    
        return messageProvider.send();
    

测试

启动 7001eureka

启动 RabpitMq

rabbitmq-plugins enable rabbitmq_management:开启图形化web管理界面

http://192.168.112.128:15672/

启动 8801

访问 - http://localhost:8801/sendMessage

后台将打印serial: UUID字符串


Stream消息驱动之消费者

新建Module:cloud-stream-rabbitmq-consumer8802

POM

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>cloud_Parent</artifactId>
        <groupId>dhy.xpy</groupId>
        <version>520.521.finally</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>cloud-stream-rabbitmq-consumer8802</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <!--基础配置-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>

YML

server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: 192.168.112.128
                port: 5672
                username: admin
                password: 123
      bindings: # 服务的整合处理
        #消费者
        input: # 这个名字是一个通道的名称
          #通过指定交换机完成消息的消费
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置

eureka:
  client: # 客户端进行Eureka注册的配置
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
    instance-id: receive-8802.com  # 在信息列表时显示主机名称
    prefer-ip-address: true     # 访问的路径变为IP地址

主启动类StreamMQMain8802

@SpringBootApplication
public class StreamMQMain8802 
    public static void main(String[] args) 
        SpringApplication.run(StreamMQMain8802.class,args);
    

业务类

@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController

    @Value("$server.port")
    private String serverPort;
    
    @StreamListener(Sink.INPUT)
    public void input(Message<String> message)
    
        System.out.println("消费者1号,----->接受到的消息: "+message.getPayload()+"\\t  当前微服务的port: "+serverPort);
    

测试

启动EurekaMain7001

启动StreamMQMain8801

启动StreamMQMain8802

8801发送8802接收消息




Stream之消息重复消费

依照8802,克隆出来一份运行8803 - cloud-stream-rabbitmq-consumer8803。

启动

RabbitMQ

服务注册 - 7001

消息生产 - 8801

消息消费 - 8802

消息消费 - 8802


此时studyexchange交换机会把消息路由到两个与其绑定的队列上

运行后有两个问题

  1. 有重复消费问题
  2. 消息持久化问题

消费

  • http://localhost:8801/sendMessage
  • 目前是8802/8803同时都收到了,存在重复消费问题
  • 如何解决:分组和持久化属性group(重要)




生产实际案例

比如在如下场景中,订单系统我们做集群部署,都会从RabbitMQ中获取订单信息,那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况。这时我们就可以使用Stream中的消息分组来解决


注意在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。不同组是可以全面消费的(重复消费)。


Stream之group解决消息重复消费

原理

微服务应用放置于同一个group中,就能够保证消息只会被其中一个应用消费一次。

不同的组是可以重复消费的,同一个组内会发生竞争关系,只有其中一个可以消费。

8802/8803都变成不同组,group两个不同

group: A_Group、B_Group

8802修改YML

server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: 192.168.112.128
                port: 5672
                username: admin
                password: 123
      bindings: # 服务的整合处理
        #消费者
        input: # 这个名字是一个通道的名称
          #通过指定交换机完成消息的消费
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息

springcloud第二季之stream,sleuth学习笔记(代码片段)

目录 83.Stream为什么被引入84.Stream是什么及Binder介绍85.Stream的设计思想86.Stream编码常用注解简介87.Stream消息驱动之生产者88.Stream消息驱动之消费者89.Stream之消息重复消费90.Stream之group解决消息重复消费91.Stream之消息持久化92.Sleuth... 查看详情

springcloud(12)——基于kafka的stream实现(代码片段)

基于ApacheKafka的Stream实现如果你的应用使用了ApacheKafka,你需要把它和SpringCloud进行整合。需要在应用中添加如下依赖。<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-ka 查看详情

springcloud笔记七stream(代码片段)

stream介绍为什么使用springcloudstream屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。BinderImplementations绑定器通过绑定器Binder作为中间件,实现了应用程序与消息中间件细节的解耦。Input对应消息生产... 查看详情

springcloud(11)——基于rocketmq的stream实现(代码片段)

基于RocketMQ的Stream实现SpringCloudStream是一个消息收发的框架,它提供了一套标准,应用程序只需要按照它的标准进行消息的收发,而不用关注具体的实现机制。具体的实现可以基于不同的消息中间件进行不同的实现,比如Kafka的实... 查看详情

springcloud-springcloud之stream构建消息驱动微服务框架(十九)(代码片段)

阅读本文前可先参考​​​​​​SpringCloud-SpringCloud根/父项目,开发准备(二)_MinggeQingchun的博客-CSDN博客在微服务的开发过程中,经常用到消息中间件,通过消息中间件在服务与服务之间传递消息,不管... 查看详情

springcloud——stream(学习与使用)(代码片段)

什么是SpringCloudStreamSpringCloudStream是用于构建消息驱动微服务应用程序的框架,该框架提供了一个灵活的编程模型,提供了来自多家供应商的中间件的合理配置,包括publish-subscrbe,消息分组和消息分区的支持。可以做到... 查看详情

springcloud——stream(学习与使用)(代码片段)

什么是SpringCloudStreamSpringCloudStream是用于构建消息驱动微服务应用程序的框架,该框架提供了一个灵活的编程模型,提供了来自多家供应商的中间件的合理配置,包括publish-subscrbe,消息分组和消息分区的支持。可以做到... 查看详情

springcloud-stream3.x版本使用教程及如何整合rabbitmq(代码片段)

...gBoot如何整合RabbitMQ中我们留了一个坑,就是如何使用SpringCloud-Stream来使用RabbitMQ。看名称就知道这个技术是属于SpringCloud家族的一员,SpringCloud从发家起干的就是提供抽象的活,被Netflix晃了一下 查看详情

springcloud集成ribbon(代码片段)

SpringCloud集成RibbonRibbon入门介绍LB负载均衡(LoadBalance)是什么Ribbon本地负载均衡客户端VSNginx服务端负载均衡区别集中式LB进程内LBRibbon的负载均衡和Rest调用架构说明POMRestTemplate探究Ribbon默认自带的负载规则Ribbon负载规则替换原理简... 查看详情

springcloud集成hystrix(代码片段)

SpringCloud集成Hystrix分布式系统面临的问题服务雪崩Hystrix是什么Hystrix停更进维Hystrix的服务降级熔断限流概念服务降级哪些情况会出发降级服务熔断服务限流Hystrix支付微服务构建JMeter高并发压测后卡顿正式测试订单微服务调用支... 查看详情

jenkins+docker+springcloud微服务持续集成(下)(代码片段)

Jenkins+Docker+SpringCloud微服务持续集成(下)Jenkins+Docker+SpringCloud部署方案优化Jenkins+Docker+SpringCloud集群部署流程说明修改所有微服务配置修改注册中心的配置其他微服务配置设计Jenkins集群项目的构建参数J... 查看详情

springcloud集成gateway(代码片段)

SpringCloud集成GatewayGateway是什么微服务架构中网关的位置GateWay非阻塞异步模型Gateway工作流程三大核心概念Gateway工作流程Gateway9527搭建gateway网关的pom文件配置,不需要web和actuor的依赖,否则会报错Gateway配置路由的两种方式... 查看详情

jenkins+docker+springcloud微服务持续集成(上)(代码片段)

Jenkins+Docker+SpringCloud微服务持续集成(上)Jenkins+Docker+SpringCloud持续集成说明概述服务器列表微服务项目说明环境准备Harbor的安装安装DockerCompose安装Harbor访问Harbor在Harbor中创建用户和项目创建用户给私有项目... 查看详情

springcloud的stream消息组件的使用@streamlistener(代码片段)

常见问题(使用rabbitmq)消息分组防止多实例重复消费在一个服务多实例场景下使用默认使用@StreamListener监听消息消费,yml中没有特殊配置的话是会导致消息重复消费的,原因是此时每个实例都是匿名在rabbitmq... 查看详情

springcloud学习之路:springcloud集成zuul网关(代码片段)

网关就是做一下过滤或拦截操作让我们的服务更加安全用户访问我们服务的时候就要先通过网关然后再由网关转发到我们的微服务1.新建一个网关服务Module 2.依然选择springboot工程 3.老规矩起个名字 4.勾选注册中心客户... 查看详情

springcloud使用stream配置rabbitmq实现延时消息(代码片段)

...m-rabbit</artifactId></dependency>消息通道packagecom.fchan.springcloudstream.service;importorg.springframework.cloud.stream.annotation.Input;importorg.springframework.cloud.stream.annotation.Output;importorg.springframework.messaging 查看详情

springcloud-springcloudalibaba之gateway集成sentinel(代码片段)

阅读本文前可先参考SpringCloud-SpringCloud之Gateway网关(十三)_MinggeQingchun的博客-CSDN博客_spring.cloud.gateway.routes[0]SpringCloud-SpringCloudAlibaba之Sentinel规则持久化(十)_MinggeQingchun的博客-CSDN 查看详情

springcloud集成sleuth(代码片段)

Springcloud集成SleuthSleuth是什么Sleuth之zipkin搭建安装Sleuth链路监控展现Sleuth是什么为什么会出现这个技术?要解决哪些问题?在微服务框架中,一个由客户端发起的请求在后端系统中会经过多个不同的的服务节点调用来... 查看详情