java实现利用rabbitmq发送和消费消息

wang_dong_yang wang_dong_yang     2022-12-04     674

关键词:

1.简介

     RabbitMQ 是一个消息代理。从本质上说,它从生产者接收消息,然后把这些消息传递给消费者。在这期间,它能根据你制定的规则发送,缓存,或者持久化存储这些消息。

RabbitMQ 使用到的专业术语。

1).Producing 的意思不仅仅是发送消息。发送消息的程序叫做producer。我们像下图一样描绘它。

2).Queue 是一个消息盒子的名称。它存活在 RabbitMQ 里。虽然消息流经 RabbitMQ 和你的应用程序,但是他们只能在 Queue 里才能被保存。Queue 没有任何边界的限制,你想存多少消息都可以,它本质上是一个无限的缓存。许多生产者都可以向一个 Queue 里发送消息,许多消费者都可以从一个 Queue 里接收消息。我们下图一样描绘它。


3).Consuming 的意思和接收类似。一个消费者主要是指等待接收消息的程序。我们下图一样描绘它。

   

注意:生产者,消费者和代理不一定非要在同一台机器上,在大多数应用中的确也是这样的。


2."Hello World"

在这部分的使用指南中,我们要用 java 写两个程序;一个是生产者,他发送一个消息,另一个是消费者,它接收消息,并且把消息打印出来。我们将会忽略一些Java API 的细节,而是将注意力主要放在我们将要做的这件事上,这件事就是发送一个 "Hello World" 消息。

在下面的图中,"P" 代表生产者,而 "C" 代表消费者。中间的就是一个 Queue,一个消息缓存区。 

  

java 客户端类库

AMQP 是一个开源的,通用的消息协议。有几个用不同语言编写的 AMQP 的客户端。我们将要使用 RabbitMQ 提供的 java 版本的客户端。

下载这个客户端 (http://www.rabbitmq.com/java-client.html) ,把它解压到你的工作目录下,并且找到jar文件。

$ unzip rabbitmq-java-client-bin-*.zip
$ cp rabbitmq-java-client-bin-*/*.jar ./

这个客户端在 Maven 的中心仓库中也有。

groupId:com.rabbitmq

artifactId:amqp-client

现在我们有了客户端和依赖,我们开始写代码。

Sending

 

我们把消息发送者叫 Send,消息接收者叫 Recv。消息发送者连接 RabbitMQ ,发送一个消息,然后退出。

创建 Send.java 文件,并导入我们需要的类。

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

创建给 Queue 命名

public class Send 
  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv)
      throws java.io.IOException 
      ...
  

然后我们创建一个连接。

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

到这,我们就和本机的代理建立了连接。如果我们想要连接到不同机器上的代理,我们只需要制定那台机器的域名或 IP 地址即可。

接下来,我们创建一个管道。为了发送消息,我们还需要声明一个 Queue ,然后我们就能发布消息到 Queue 上了。

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

Queue 声明的过程是幂等的,只有在它不存在的情况下才会被创建出来。消息的内容是字节数组,这意味着你能够使用任何你想使用的编码。

最后,我们关闭这个管道和连接。

下面是我们完整的 Send.java 文件

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Send
private final static String QUEUE_NAME = "hello";
  public static void main(String[] argv) throws Exception
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "Hello World!";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + message + "'");
        channel.close();
        connection.close();
       

Receiving

不像消息的发送者只是发送一个消息,我们的接收者需要不断的监听消息,并把它们打印出来。



在 Recv.java 这个文件中需要导入的类和 Send.java 中类似。

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
DefaultConsumer 是一个实现了 Consumer 接口的类,我们将会用它来缓存从服务器推送过来的消息。

创建 Recv.java 文件,打开一个连接和一个管道,并且声明一个我们将要消费消息的 Queue。注意:这个 Queue 的名字要和 Send.java 文件中的一致。

public class Recv 
  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv)
      throws java.io.IOException,
             java.lang.InterruptedException 

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    ...
    

注意到,我们在这又声明了一个 Queue,这是因为我们可能在运行 Send.java 文件之前运行 Recv.java 文件,所以我们要确保在从 Queue 消费消息之前,

它是已经存在的。

我们告诉服务器从 Queue 中向我们传递消息,由于它是以异步的方式向我们传递消息,所以我们采用从消息的缓存对象回调的方式,直到我们已经消费了这些消

息。这就是 DefaultConsumer 子类所做的事。

 Consumer consumer = new DefaultConsumer(channel) 
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
          throws IOException 
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
      
    ;
    channel.basicConsume(QUEUE_NAME, true, consumer);

下面是完整的 Recv.java 文件。

import com.rabbitmq.client.*;
import java.io.IOException;

public class Recv
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    Consumer consumer = new DefaultConsumer(channel)
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
          throws IOException
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
     
    ;
    channel.basicConsume(QUEUE_NAME, true, consumer);


Putting it all together

编译文件:

$ javac -cp rabbitmq-client.jar Send.java Recv.java

运行时,我们需要 rabbitmq-client.jar,打开终端,运行 Send 文件:

$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Send

然后运行 Recv 文件:

$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Recv

消息的接收者将会打印出发送者发送的消息,接收者将保持运行的状态,并且等待消息,使用 Ctrl+C 可以结束运行。因此,尽可能从另一个终端运行发送者程序。

原文地址:https://www.rabbitmq.com/tutorials/tutorial-one-java.html







rabbitmq笔记(代码片段)

RabbitMQ1MQ引言1.1什么是MQMQ(MessageQuene):翻译为消息队列,通过典型的生产者和消费者模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接... 查看详情

rabbitmq笔记(代码片段)

RabbitMQ1MQ引言1.1什么是MQMQ(MessageQuene):翻译为消息队列,通过典型的生产者和消费者模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接... 查看详情

rabbitemq特性-发送方确认和消费放确认

...​​​思考​​​​1.为什么要进行消息确认?​​​​2.rabbitmq消息确认机制是什么样的?​​​​3.发送的消息什么样才算成功或失败?如何确认?​​​​4.消费方如何告知rabbitmq消息消费成功或失败?​​​​发送方确认​​​​... 查看详情

基于rabbitmq实现分布式延时任务调度(代码片段)

...特殊处理(加分布式锁)避免任务被重复执行。然而使用RabbitMQ实现延时任务可以天然解决分布式环境下重复执行的问题(利用mq中消息只会被一个消费者消费这一特性可以让延时任务只会被一个消费者执行)。基于RabbitMQ做延时... 查看详情

rabbitmq简介

RabbitMQ简介MQ全称为MessageQueue,即消息队列,RabbitMQ使用Erlang语言开发。它基于AMQP协议(AdvancedMessageQueue高级消息队列协议)实现的消息队列。它是一种应用程序之间通信的方式,在分布式系统中应用十分广泛。应用场景1、任务... 查看详情

rabbitmq使用

...收者1接收者2生产者booleanautoAck=true;//自动确认模式,一旦rabbitmq将消息分发给消费者,该消息就会从内存中删除。这种情况下,如果杀死正在执行的消费者,就会丢失正在处理的消息booleanautoAck=false;//手动模式,如果一个消费者... 查看详情

2019年12道rabbitmq高频面试题你都会了吗?(含答案解析)

RabbitMQ面试题1、什么是rabbitmq2、为什么要使用rabbitmq3、使用rabbitmq的场景4、如何确保消息正确地发送至RabbitMQ?如何确保消息接收方消费了消息?5.如何避免消息重复投递或重复消费?6、消息基于什么传输?7、消息如何分发?8... 查看详情

聊一聊rabbitmq和kafka的应用场景

参考技术A我们知道常用的2款消息中间件是rabbitmq和kafka,他们2者都有什么各自的特点和应用场景呢?我们下面就聊一聊。rabbitmq消息的发送,首先经过exchange,然后由exchange根据路由把消息投递到绑定的队列中,exchange有3中类型... 查看详情

rabbitmq--顺序消费模式和迅速消息发送模式

MQ使用过程中,有些业务场景需要我们保证顺序消费,而如果一个Producer,一个Queue,多个Consumer的情况下是无法保证顺序的举例:  1、业务上产生三条消息,分别是对数据的增加、修改、删除,而如果没有保证顺序消费,结果... 查看详情

rabbitmq利用消息超时和死信交换机实现定时任务

  在RabbitMQ的基础功能中,并没有定时任务或者延时任务这种功能,然而很多业务都有此类需求,但是我们可以依赖RabbitMQ的消息自动超时和死信交换机等基本的属性来实现这类需求,大致思路如下:  1.通过计算任务执行时... 查看详情

利用rabbitmq的死信队列实现延时消息(代码片段)

mq基本的消息模型mq死信队列的消息模型简单的说就是先弄一个正常队列,然后不要设置消费者,接着给这个正常队列绑定一个死信队列,这个死信队列设置方式和正常队列没啥区别。然后监听这个死信队列的消费.一... 查看详情

rabbitmq消息发布和消费的确认机制

前言新公司项目使用的消息队列是RabbitMQ,之前其实没有在实际项目上用过RabbitMQ,所以对它的了解都谈不上入门。趁着周末休息的时间也猛补习了一波,写了两个窗体应用,一个消息发布端和消息消费端。园子里解释RabbitMQ基础... 查看详情

javaspringboot集成rabbitmq实战和总结(代码片段)

...和RetryTemplateRPC模式的消息(不常用)关于消费模型关于RabbitMq客户端的线程模型在公司里一直在用RabbitMQ,由于api已经封装的很简单,关于RabbitMQ本身还有封装的实现没有了解,最近在看RabbitMQ实战这本书,结合网上 查看详情

rabbitmq消息可靠性的机制

RabbitMQ消息可靠性一、发布确认机制。生成者发送消息,Exchange路由消息到队列,RabbitMQ就会给生产者发送确认Ack。(注意:发布确认机制不能和事务机制一起使用)注意:多消息发布确认机制情况下,倘若要发送100条消息,发送... 查看详情

rabbitmq基础概念详解——环境配置及模拟生产者和消费者简单消息发送(代码片段)

一、简介:RabbitMq 是实现了高级消息队列协议(AMQP)的开源消息代理中间件。消息队列是一种应用程序对应用程序的通行方式,应用程序通过写消息,将消息传递于队列,由另一应用程序读取完成通信。而作为中间件的Rabbit... 查看详情

rabbitmq和kafka对于消费失败处理总结

...重试发送消息。kafka:https://github.com/huangxiongbiao12/kafka.gitrabbitmq:https://github.com/huangxiongbiao12/rabbitmq-demo.git 查看详情

消息中间件系列三:使用rabbitmq原生java客户端进行消息通信(消费者(接收方)自动确认模式消费者(接收方)自行确认模式生产者(发送方)确认模式)

准备工作:1)安装RabbitMQ,参考文章:消息中间件系列二:RabbitMQ入门(基本概念、RabbitMQ的安装和运行)2.)分别新建名为OriginalRabbitMQProducer和OriginalRabbitMQConsumer的maven工程在pom.xml文件里面引入如下依赖:<dependency><groupId&... 查看详情

day393&394&395.rabbitmq实战-rabbitmq(代码片段)

RabbitMQ实战1.MQ引言1.1什么是MQMQ(MessageQuene):翻译为消息队列,通过典型的生产者和消费者模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发... 查看详情