rabbitmq学习:rabbitmq之扇形交换机主题交换机(代码片段)

wutianqi wutianqi     2023-01-29     692

关键词:

 前言

上篇我们学习了rabbitmq的作用以及直连交换机的代码实现,这篇我们继续看如何用代码实现扇形交换机和主题交换机

一、扇形交换机

  1.生产者

  

/**
 * 生产者
 */
public class LogProducer 
    //交换机名称
    public final static String EXCHANGE_NAME = "logs";

    public static void main(String[] args) 
        ConnectionFactory connectionFactory = new ConnectionFactory();
        Connection connection = null;
        Channel channel = null;
        try 
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

            for (int i = 0; i < 5;i++)
                String message = "Hello Rabbit " + i;
                channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
                System.out.println("EmitLog send message " + message);
            
         catch (IOException e) 
            e.printStackTrace();
         finally 
            try 
                channel.close();
                connection.close();
             catch (IOException e) 
                e.printStackTrace();
            

        
    

  2.消费者

  Consumer1

/**
 * 消费者
 */
public class Consumer1 
    public final static String EXCHANGE_NAME = "logs";

    public static void main(String[] args) 
        ConnectionFactory connectionFactory = new ConnectionFactory();
        Connection connection = null;
        Channel channel = null;
        try 
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            String queueName = channel.queueDeclare().getQueue();
            //声明一个交换机,发布模式为fanout-扇形
            channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
            //将队列和交换机绑定起来,因为扇形交换机和路由键无关,所以这里路由键设为空字符串即可
            channel.queueBind(queueName,EXCHANGE_NAME,"");

            QueueingConsumer consumer = new QueueingConsumer(channel);
            //当连接断开时,队列会自动被删除
            channel.basicConsume(queueName,true,consumer);
            System.out.println("ReceiveLogTopic1 Waitting for message");
            while (true)
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println("ReceiveLogTopic1 receives message " + message);
            
         catch (IOException e) 
            e.printStackTrace();
         catch (InterruptedException e) 
            e.printStackTrace();
        
    

  Cosumer2

  

/**
 * 消费者2
 */
public class Consumer2 
    public final static String EXCHANGE_NAME = "logs";

    public static void main(String[] args) 
        ConnectionFactory connectionFactory = new ConnectionFactory();
        Connection connection = null;
        Channel channel = null;
        try 
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            String queueName = channel.queueDeclare().getQueue();
            //声明一个交换机,发布模式为fanout-扇形
            channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
            //将队列和交换机绑定起来,因为扇形交换机和路由键无关,所以这里路由键设为空字符串即可
            channel.queueBind(queueName,EXCHANGE_NAME,"");

            QueueingConsumer consumer = new QueueingConsumer(channel);
            //当连接断开时,队列会自动被删除
            channel.basicConsume(queueName,true,consumer);
            System.out.println("ReceiveLog2 Waitting for message");
            while (true)
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println("ReceiveLog2 receives message " + message);
            
         catch (IOException e) 
            e.printStackTrace();
         catch (InterruptedException e) 
            e.printStackTrace();
        
    

 

  先启动Consumer1,Consumer2,再启动LogProducer。结果如下:

  LogProducer:

  技术分享图片

 

  Consumer1:

  技术分享图片

 

  Consumer2:

  技术分享图片

  从输出结果中我们可以看出,扇形交换机所接受到的消息会被分发到所有绑定到该交换机上的队列中,和路由键无关。

 

二、主题交换机

  1.生产者

  

/**
 * 生产者
 */
public class Producer 
    private static final String EXCHANGE_NAME = "topic_logs";
    // 路由关键字
    private static final String[] routingKeys = new String[]
            "quick.orange.rabbit",
            "lazy.orange.elephant",
            "quick.orange.fox",
            "lazy.brown.fox",
            "quick.brown.fox",
            "quick.orange.male.rabbit",
            "lazy.orange.male.rabbit";

    public static void main(String[] args) 
        ConnectionFactory connectionFactory = new ConnectionFactory();
        Connection connection = null;
        Channel channel = null;
        try 
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            //声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");

            //循环发送具有不同routing key的message
            for (String routingKey : routingKeys) 
                String message = routingKey + "--->biu~";
                channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
                System.out.println("Producer -> routingkey: " + routingKey + ", send message " + message);
            
         catch (IOException e) 
            e.printStackTrace();
         finally 
            try 
                channel.close();
                connection.close();
             catch (IOException e) 
                e.printStackTrace();
            

        
    

2.消费者

  Consumer1

  

/**
 * 消费者1
 */
public class Consumer1 
    private static final String EXCHANGE_NAME = "topic_logs";
    // 路由关键字
    private static final String[] routingKeys = new String[]"*.orange.*";

    public static void main(String[] args) 
        ConnectionFactory connectionFactory = new ConnectionFactory();
        Connection connection = null;
        Channel channel = null;
        try 
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            //声明队列
            String queueName = channel.queueDeclare().getQueue();
            //声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");

            //将队列与交换器用routingkey绑定起来
            for (String routingKey : routingKeys) 
                channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
                System.out.println("Consumer1 -> queue: " + queueName + ", exchange_name: " + EXCHANGE_NAME + ", routingKey: " + routingKey);
            

            //接收消息
            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume(queueName, true, consumer);
            System.out.println("Consumer1 waitting for message");

            while (true)
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody(), "UTF-8");
                Envelope envelope = delivery.getEnvelope();
                String routingKey = envelope.getRoutingKey();
                System.out.println("Consumer1 receive message " + message + ", routingKey: " + routingKey);
            

         catch (IOException e) 
            e.printStackTrace();
         catch (InterruptedException e) 
            e.printStackTrace();
        
    

  Consumer2

  

/**
 * 消费者2
 */
public class Consumer2 
    private static final String EXCHANGE_NAME = "topic_logs";
    // 路由关键字
    private static final String[] routingKeys = new String[]"*.*.rabbit", "lazy.#";

    public static void main(String[] args) 
        ConnectionFactory connectionFactory = new ConnectionFactory();
        Connection connection = null;
        Channel channel = null;
        try 
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            //声明队列
            String queueName = channel.queueDeclare().getQueue();
            //声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");

            //将队列与交换器用routingkey绑定起来
            for (String routingKey : routingKeys) 
                channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
                System.out.println("Consumer2 -> queue: " + queueName + ", exchange_name: " + EXCHANGE_NAME + ", routingKey: " + routingKey);
            

            //接收消息
            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume(queueName, true, consumer);
            System.out.println("Consumer2 waitting for message");

            while (true)
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody(), "UTF-8");
                Envelope envelope = delivery.getEnvelope();
                String routingKey = envelope.getRoutingKey();
                System.out.println("Consumer2 receive message " + message + ", routingKey: " + routingKey);
            

         catch (IOException e) 
            e.printStackTrace();
         catch (InterruptedException e) 
            e.printStackTrace();
        
    

  同样先运行消费者,再运行生产者,结果如下:

  Producer:

  技术分享图片

 

   Consumer1:

  技术分享图片

  Consumer2:

  技术分享图片

  由运行结果我们可以看到:消息被交换机按照模式路由键的规则路由到相应的队列中。

 

  

rabbitmq学习:rabbitmq(消息队列)的作用以及rabbitmq之直连交换机(代码片段)

...行消息发布和接收。因为消息的的接收以及路由都是通过交换机实现的,所以接下来我们要学习如何利用不同的交换机进行消息的发布。最后会再学习如何利用rabbitmq进行rpc的调用。一、rabbitmq(消息队列)的作用1.异 查看详情

rabbitmq学习总结(代码片段)

...码:http://www.cnblogs.com/bluesummer/p/8992225.html因为之前不了解交换机及AMQP协议,上来就研究RabbitMQ,很多概念都有点蒙圈,所以建议大家在学习RabbitMQ之前先对一些概念有基本的了解安装与配置:下载Erlang:http://www.erlang.org/downloads下... 查看详情

rabbitmq学习(下)——发布确认高级幂等性优先级惰性和rabbitmq集群(代码片段)

...、发布确认高级1.1confirm确认模式1.2return退回模式1.3备份交换机二、幂等性、优先级、惰性2.1幂等性2.2优先级2.3惰性三、RabbitMQ集群3.1RabbitMQ集群的搭建3.2RabbitMQ之镜像队列一、发布确认高级在使用RabbitMQ的时候ÿ 查看详情

rabbitmq学习——交换机

publicclassSend{ publicstaticvoidmain(String[]args)throwsException{ ConnectionFactoryfactory=newConnectionFactory(); factory.setHost("localhost"); Connectionconnection=factory.newConnection(); Cha 查看详情

rabbitmq学习订阅/发布

...不直接将信息传输到队列中,在生产者和队列中间有一个交换机(Exchange),我们之前没有使用到交换机是应为我们没有配置交换机,使用了默认的交换机。有几个可供选择的交换机类型:直连交换机(di 查看详情

rabbitmq之fanout交换器模式开发(代码片段)

Fanout模式,即广播模式,一个发送到交换机的消息会被转发到与该交换机绑定的所有队列上。一、Provider配置文件1spring.application.name=provider2spring.rabbitmq.host=192.168.50.303spring.rabbitmq.port=56724spring.rabbitmq.username=rabbit5spring.rabbitm 查看详情

rabbitmq之topic交换器模式下开发(代码片段)

Topic交换器,即主题模式,进行规则匹配。一、Provider配置文件1spring.application.name=provider2spring.rabbitmq.host=192.168.50.303spring.rabbitmq.port=56724spring.rabbitmq.username=rabbit5spring.rabbitmq.password=rabbit6#设置交换 查看详情

微服务专题之.net6下集成消息队列-rabbitmq交换机模式代码演示(全)

【微服务专题之】.Net6下集成消息队列-RabbitMQ交换机模式代码演示(全)【微服务专题之】.Net6下集成消息队列-RabbitMQ交换机模式代码演示(全) 查看详情

rabbitmq学习笔记五:rabbitmq之优先级消息队列

RabbitMQ优先级队列注意点:1、只有当消费者不足,不能及时进行消费的情况下,优先级队列才会生效2、RabbitMQ3.5以后才支持优先级队列代码在博客:RabbitMQ学习笔记三:Java实现RabbitMQ之与Spring集成 最后面有下载地址,只是做... 查看详情

【rabbitmq】消息队列之rabbitmq

参考技术A消息队列之RabbitMQ(推荐)https://www.jianshu.com/p/79ca08116d57/RabbitMQ集群-镜像模式搭建https://segmentfault.com/a/1190000038432417消息中间件—RabbitMQ(集群原理与搭建篇)https://www.jianshu.com/p/6376936845ff其实对于rabbitmq这种去中心化(通... 查看详情

微服务专题之.net6下集成消息队列-rabbitmq交换机模式代码演示(全)

...果觉得本公众号对您有帮助,欢迎关注].Net中RabbitMQ中交换机模式的使用前文回顾【微服务专题之】.Net6下集成消息队列上-RabbitMQ【微服务专题之】.Net6下集成消息队列2-RabbitMQ【微服务专题之】.Net6中集成消息队列-Rabb 查看详情

springboot学习——springboot快速整合rabbitmq

...单入门的案例,其中一个生产者,一个消费者。无需声明交换机(其实是有个默认的交换机),声明完一个队列以后,生产者往这里 查看详情

rabbitmq高级特性之死信队列通俗易懂超详细内含案例(代码片段)

RabbitMq高级特性之死信队列又称死信交换机DLX当消息成为Deadmessage后,会重新发送到另一个交换机,这个交换机就是DLX消息成为死信的情况公有三种:队列消息长度达到限制消费者拒接消费消息basicNack/basicReject,并且不把消息重新放回... 查看详情

springboot整合rabbitmq

参考技术A直连型交换机,根据消息携带的路由键,将消息转发给对应的队列扇形交换机,接收到消息后会将消息转发到所有队列主题交换机,根据消息携带的路由键和交换机与队列绑定键的规则,将消息转发给对应的队列规则... 查看详情

分布式消息通信之rabbitmq_01(代码片段)

...本介绍3.1AMQP协议3.2RabbitMQ的特性3.3工作模型3.4三种主要的交换机4.JavaAPI编程声明交换机声明队列发布消息5.进阶知识5.1TTL(TimeToLive)5.2死信队列,死信交换机5.3优先级队列5.4延迟队列5.5RPC5.6服务端流控5.6消费端限流6.UI管理界面 查看详情

rabbitmq学习

...生产者,多个消费者发布订阅模:又叫无路由键交换机模式,队列绑定到交换机,当发送消息到交换机时,绑定到该交换机的队列都会监听到Direct模式࿱ 查看详情

rabbitmq学习:主题交换机

 尽管直连交换机能够改善我们的系统,但是它也有它的限制——没办法基于多个标准执行路由操作。为了实现这个目的,接下来我们学习如何使用另一种更复杂的交换机——主题交换机。发送到主题交换机(topicexchange)的... 查看详情

rabbitmq学习笔记(自用)(代码片段)

...MQ-发布确认4.1发布确认逻辑4.2发布确认的策略五、RabbitMQ-交换机5.1Exch 查看详情