rabbitmq学习笔记五:路由选择(routing)(代码片段)

RunningFan RunningFan     2023-03-18     246

关键词:

前面已经学习了rabbitmq的消息的发布(publish)和订阅(subscrible),参见http://blog.csdn.net/u010416588/article/details/54667952

一 路由选择(Routing)

前面章节我们创建了一个简单的日志系统,我们可以广播日志消息给所有的接收者。本篇会给日志系统增加新的特性,让日志系统订阅部分消息。eg,我们仅仅将致命的错误写入日志文件,与此同时仍在控制面板上打印所有的其他类型的日志消息。

二 绑定关系(Binding)

在前一章节中我们已经使用过绑定,像下面的代码

channel.queueBind(queueName, EXCHANGE_NAME, "");

绑定表示转发器与队列之间的关系,可以简单的理解为队列对该转发器的消息感兴趣。
绑定可以附带一个额外的参数routingKey。为了避免basicPublish方法(发布消息)参数混淆,我们把它叫做绑定键(binding key)。下面展示怎样用绑定键创建一个绑定。

channel.queueBind(queueName, EXCHANGE_NAME, "black");

绑定键的意义依赖于转发器的类型。对于fanout类型,忽略此参数。

三 直接转发

前面我们的日志系统把所有的消息广播给所有的消费者。我们希望对其扩展,允许根据日志的严重性来进行过滤日志。eg,我们希望程序能够把严重的错误日志下载磁盘上,而不是浪费磁盘空间记录所有的warning、info日志。
之前用的fanout类型的转发器,但是灵活性不够,它仅仅简单的转发。

我们将会使用direct类型转发器进行代替。direct类型转换器背后的路由算法很简单:消息会被推送至绑定键(binging key)和发布消息发布的附带选择键(routing key)匹配的队列。

上图,我们可以看到direct类型的转发器与两个队列绑定。第一个队列与绑定键orange绑定,第二个队列与转发器间有两个绑定,一个与绑定键black绑定,另一个与green绑定键绑定。
这样的话,当一个消息附带一个选择键(routing key) orange发布至转发器将会被导向到队列Q1。消息附带一个选择键(routing key)black或者green将会被导向到Q2.所有的其他的消息将会被丢弃。

四 多重绑定


使用一个绑定键(binding key)绑定多个队列是完全合法的。如上图,一个附带选择键(routing key)的消息将会被转发到Q1和Q2。

五 发送日志

我们准备将这种模式用于我们的日志系统。我们将消息发送到direct类型的转发器而不是fanout类型。我们将把日志的严重性作为选择键(routing key)。这样的话,接收程序可以根据严重性来选择接收。我们首先关注发送日志的代码:
像以前一样,我们需要先创建一个转发器:

channel.exchangeDeclare(EXCHANGE_NAME,"direct");

然后我们准备发送一条消息:

channel.basicPublish(EXCHANGE_NAME,severity, null, message.getBytes());

假定‘severity’是‘info’,‘warning’,‘error’中的一个。

六 订阅

接收消息跟之前类似,有一点不一样—-就是我们只对我们感兴趣的严重性日志创建一个绑定。

String queueName = channel.queueDeclare().getQueue();
for(String severity : argv)    
  channel.queueBind(queueName, EXCHANGE_NAME, severity);

七 完整的例子


如上图,定义一个direct类型的转发器,队列1与转发器绑定,绑定键为error,只记录类型为error的日志。队列二绑定转发器,绑定键为info,error,warning,会接收三种类型的消息。
发送消息类EmitLogDirect.java

package com.gta.goldnock.mq.routing;

import java.util.Random;  
import java.util.UUID;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
/**
 * 
* @ClassName: EmitLogDirect
* @Description: TODO(消息发送类,转发器类型direct)
* @author yuhuan.gao
* @date 2017年1月22日 下午5:45:50
*
 */
public class EmitLogDirect  

    //转发器名称
    private static final String EXCHANGE_NAME = "ex_logs_direct";  
    //几种安全级别日志
    private static final String[] SEVERITIES =  "info", "warning", "error" ;  

    public static void main(String[] argv) throws java.io.IOException, TimeoutException  
        // 创建连接和频道  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost("localhost");  
        Connection connection = factory.newConnection();  
        Channel channel = connection.createChannel();  
        // 声明转发器的类型  
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");  

        //发送6条消息  
        for (int i = 0; i < 6; i++)  
            String severity = getSeverity();  
            String message = severity + "_log :" + UUID.randomUUID().toString();  
            // 发布消息至转发器,指定routingkey  
            channel.basicPublish(EXCHANGE_NAME, severity, null, message  
                    .getBytes());  
            System.out.println(" [x] Sent '" + message + "'");  
          

        channel.close();  
        connection.close();  
      

    /** 
     * 随机产生一种日志类型 
     *  
     * @return 
     */  
    private static String getSeverity()  
      
        Random random = new Random();  
        int ranVal = random.nextInt(3);  
        return SEVERITIES[ranVal];  
      
  

接收消息类ReceiveLogsDirect.java

package com.gta.goldnock.mq.routing;

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
/**
 * 
* @ClassName: ReceiveLogsDirect
* @Description: TODO(消息接收类,接收转发器类型为direct)
* @author yuhuan.gao
* @date 2017年1月22日 下午5:55:20
*
 */
public class ReceiveLogsDirect 

    //转发器名称
    private static final String EXCHANGE_NAME = "ex_logs_direct";  
    //几种安全级别日志
    private static final String[] SEVERITIES =  "info", "warning", "error" ;  

    public static void main(String[] args) throws IOException, TimeoutException 
        // 创建连接和频道  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost("localhost");  
        Connection connection = factory.newConnection();  
        Channel channel = connection.createChannel();  
        // 声明direct类型转发器  
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");  

        String queueName = channel.queueDeclare().getQueue();  
        String severity = getSeverity();  
        // 指定binding_key  
        channel.queueBind(queueName, EXCHANGE_NAME, severity);  
        System.out.println(" [*] Waiting for "+severity+" logs. To exit press CTRL+C");  

        Consumer consumer = new DefaultConsumer(channel)
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                    AMQP.BasicProperties properties, byte[] body) throws IOException 
                    String message = new String(body, "UTF-8");
                    System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
                   
        ;
        //指定接收者,第二个参数为自动应答,(消息接收失败不会转发到其他接收者)
        channel.basicConsume(queueName, true, consumer);
    

    private static String getSeverity() 
         Random random = new Random();  
         int ranVal = random.nextInt(3);  
         return SEVERITIES[ranVal];  
    

运行接收消息类,分别启动info、warning、error日志监控,运行发送消息类。
EmitLogDirect控制台输出:

 [x] Sent 'error_log :3249e8ea-1d88-445e-bcf3-3197f4f33563'
 [x] Sent 'info_log :31782811-4e6e-4a30-9e2a-673427f9c33b'
 [x] Sent 'info_log :897961a5-e2ed-4263-a1a1-83d8ed417fed'
 [x] Sent 'warning_log :912a3ce6-c611-4ea9-a4ea-9e3a1436f83f'
 [x] Sent 'error_log :3ae1efc0-110a-492e-a69f-2b2b1f766d8a'
 [x] Sent 'info_log :df08baa2-1b2c-485f-adab-350a67652e42'
 [x] Sent 'info_log :48d254b1-bf8a-490c-8551-c85342ed9694'
 [x] Sent 'error_log :5b275de8-adbe-4d1c-8866-7025721f4031'
 [x] Sent 'info_log :9a9cdad0-2116-4448-b79f-a5339a8f9a40'
 [x] Sent 'info_log :e0ad7687-f70c-480f-961b-bdfc41b8d37f'

接收消息端info:

 [*] Waiting for info logs. To exit press CTRL+C
 [x] Received 'info':'info_log :31782811-4e6e-4a30-9e2a-673427f9c33b'
 [x] Received 'info':'info_log :897961a5-e2ed-4263-a1a1-83d8ed417fed'
 [x] Received 'info':'info_log :df08baa2-1b2c-485f-adab-350a67652e42'
 [x] Received 'info':'info_log :48d254b1-bf8a-490c-8551-c85342ed9694'
 [x] Received 'info':'info_log :9a9cdad0-2116-4448-b79f-a5339a8f9a40'
 [x] Received 'info':'info_log :e0ad7687-f70c-480f-961b-bdfc41b8d37f'

接收消息端warning:

 [*] Waiting for warning logs. To exit press CTRL+C
 [x] Received 'warning':'warning_log :912a3ce6-c611-4ea9-a4ea-9e3a1436f83f'

接收消息端error:

 [*] Waiting for error logs. To exit press CTRL+C
 [x] Received 'error':'error_log :3249e8ea-1d88-445e-bcf3-3197f4f33563'
 [x] Received 'error':'error_log :3ae1efc0-110a-492e-a69f-2b2b1f766d8a'
 [x] Received 'error':'error_log :5b275de8-adbe-4d1c-8866-7025721f4031'

从上面的例子可以看出:
1 接收者可以自定义自己感兴趣类型的日志。
2 发送消息时可以设置routing_key,接收队列与转发器间可以设置binding_key,接收者接收与binding_key与routing_key相同的消息。

rabbitmq学习笔记六:话题(topics)(代码片段)

前一篇我们已经改善了我们的日志系统,不用fanout类型的转发器愚蠢的广播消息,而是使用direct类型的转发器,能够选择性的接收我们想要接收的消息。参见Rabbit学习笔记五:路由选择(Routing)http://blog.cs... 查看详情

rabbitmq学习笔记五:rabbitmq之优先级消息队列

RabbitMQ优先级队列注意点:1、只有当消费者不足,不能及时进行消费的情况下,优先级队列才会生效2、RabbitMQ3.5以后才支持优先级队列代码在博客:RabbitMQ学习笔记三:Java实现RabbitMQ之与Spring集成 最后面有下载地址,只是做... 查看详情

rabbitmq学习笔记(自用)(代码片段)

...概念1.1什么是MQ1.2为什么要用MQ1.3MQ的分类1.4MQ的选择二、RabbitMQ2.1RabbitMQ的概念2.2四大核心概念三、简单案例3.1WorkQueues3.2轮训分发消息3.3消息应答3.4RabbitMQ持久化3.5不公平分发3.6预取值分发四、RabbitMQ-发布确认4.1发布确认逻辑4.2发... 查看详情

openstack学习笔记五多节点部署之rabbitmq信息中枢与元数据

元数据 rabbitmq信息中枢rabbitmq信息中枢所有组件通信的时候实用AMQP高级消息队列qpidrabbitmq  端口5672   sll加密 5671192.168.1.201       h1.hequan.com  &nb 查看详情

rabbitmq学习笔记(代码片段)

rabbitmqAMQP协议安装如何使用AMQP协议生产者消费者队列一个队列存多条消息消息连接发消息和接收消息都是必须先建立连接通道(channel)一个连接有多个通道,消息会在通道里面流转,之后到达消费者交换机(... 查看详情

消息中间件学习笔记——rabbitmq

...性降低系统复杂度提高一致性问题三、常见的MQ产品四、RabbitMQ中的概念五、如何实现RabbitMQ的延迟对列一、消息中间件概述 通过下图我们就可以很好理解消息中间件的概念。二、消息中间件(MQ)的优劣(1).MQ的优势应用... 查看详情

rabbitmq学习笔记(代码片段)

文章目录1.RabbitMq(消息中间件)1.概念:是基于队列模式实现的异步/同步的传输数据。2.传统的Http请求存在哪些缺点?3.mq的使用场景?4.mq服务器如何保证消息不丢失?5.VirtualHost?Exchange分派我们的消息存放在哪个队... 查看详情

rabbitmq学习笔记(代码片段)

文章目录RabbitMq(消息中间件)1.概念:是基于队列模式实现的异步/同步的传输数据。2.传统的Http请求存在哪些缺点?3.mq的使用场景?4.mq服务器如何保证消息不丢失?5.VirtualHost?Exchange分派我们的消息存放在哪个队列... 查看详情

经典排序算法学习笔记五——直接选择排序

一、直接选择排序数据结构数组最差时间复杂度O(n^2)最优时间复杂度O(n^2)平均时间复杂度O(n^2)最差空间复杂度О(n) total, O(1) auxiliary        1、算法思想:首先在未排序序列中找到最小(大... 查看详情

rabbitmq学习笔记3-使用topic交换器

本例使用topic接收警告、错误的日志,并根据匹配的路由规则发送给不同的Queue队列来处理的例子:日志生产者SenderWithTopicExchange1packagecom.yzl.test2;23importjava.util.concurrent.CountDownLatch;4importjava.util.concurrent.ExecutorService;5importjava. 查看详情

hsrp(热备份路由选择协议)---学习笔记

HSRP(hotstandbyroutigprotectol)热备份路由选择协议:是Cisco的一种私有技术。与VRRP工作原理基本相同。一?HSRP的概念(1)概述:通过建立虚拟路由器组和虚拟mac地址实现IP网络冗余备份(三层网络路由),HSRP使组内的Cisco路由器能够互... 查看详情

rabbitmq学习笔记(代码片段)

RabbitMq学习笔记1.消息队列1.1MQ相关概念1.1.1什么是MQ?1.1.2为什么要用MQ?1.2.3MQ的分类1.2.4MQ的选择1.2.5为什么选择RabbitMQ?1.2.5MQ对比分析图在这里插入图片描述1.2RabbitMQ1.2.1.RabbitMQ的概念1.2.2.四大核心概念1.2.3RabbitMQ6大模... 查看详情

pthread学习笔记

POSIX线程,也被称为Pthreads,是一个线程的POSIX标准;pthread.hintpthread_create(pthread_t*thread, pthread_attr_tconst*attr, void*(*start_routine)(void*), void*arg);thread:新线程句柄attr:新线程属性start_routine:线程启动程序arg:线程启动程序参数 查看详情

rabbitmq学习笔记(代码片段)

目录一、引⾔二、RabbitMQ介绍三、RabbitMQ安装四、RabbitMQ架构1.官⽅的简单架构图2.RabbitMQ的完整架构图3.查看图形化界⾯并创建⼀个VirtualHost五、RabbitMQ的队列模式1.RabbitMQ的通讯⽅式2.HelloWorld模式-简单队列模式3.work队列模式:... 查看详情

rabbitmq教程总结(代码片段)

【译】RabbitMQ教程一主要通过HelloWord对RabbitMQ有初步认识【译】RabbitMQ教程二工作队列,即一个生产者对多个消费者循环分发、消息确认、消息持久、公平分发【译】RabbitMQ教程三如何同一个消息同时发给多个消费者开始引入RabbitM... 查看详情

rabbitmq学习笔记

一环境搭建下载地址:ERLANG  http://www.erlang.org/downloadsMQhttps://github.com/rabbitmq/rabbitmq-server/releases/tag/rabbitmq_v3_6_9首先,由于RabbitMQ使用Erlang编写的,需要运行在Erlang运行时环境上,所以在安装RabbitMQServer之前需要安 查看详情

我说这是新手入门最好的rabbitmq学习笔记,谁赞成谁反对?

前言RabbitMQ于2007年发布,是一个在AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。RabbitMQ的特点RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实... 查看详情

rabbitmq学习笔记

rabbitMQ学习笔记2017年12月31日星期日Lee 环境:centos7版本:rabbitmq-server-3.7.2-1 准备了3台主机做实验。先配置hosts如下。cat/etc/hosts192.168.5.71 node1192.168.5.72 node72192.168.5.73 node730、基础知识建议看下美团分享的ra 查看详情