rabbitmq学习:利用rabbitmq实现远程rpc调用(代码片段)

wutianqi wutianqi     2023-01-29     617

关键词:

一、rabbitmq实现rpc调用的原理

·rabbitmq实现rpc的原理是:客户端向一个队列中发送消息,并注册一个回调的队列用于接收服务端返回的消息,该消息需要声明一个叫做correaltionId的属性,该属性将是该次请求的唯一标识。服务端在接受到消息(在需要时可以验证correaltionId)后,处理消息,并将消息发送到客户端注册的回调队列中。原理图如下:  

  技术分享图片

二、代码实现

  下面我们将模拟实现一个rpc客户端和rpc服务端。客户端给服务端发送message,服务端收到后处理message,再将处理后的消息返给客户端

  rpc客户端

  

/**
 * rpc客户端
 */
public class RpcClient 
    //发送消息的队列名称
    private static final String RPC_QUEUE_NAME = "rpc_queue";

    public static void main(String[] args) 
        ConnectionFactory connectionFactory = new ConnectionFactory();
        Connection connection = null;
        Channel channel = null;
        try 
           connection = connectionFactory.newConnection();
           channel = connection.createChannel();
           //创建回调队列
           String callbackQueue = channel.queueDeclare().getQueue();
           //创建回调队列,消费者从回调队列中接收服务端传送的消息
            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume(callbackQueue,true,consumer);

            //创建消息带有correlationId的消息属性
            String correlationId = UUID.randomUUID().toString();
            AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().correlationId(correlationId).replyTo(callbackQueue).build();
            String message = "hello rabbitmq";
            channel.basicPublish("",RPC_QUEUE_NAME,basicProperties,message.getBytes());
            System.out.println("RpcClient send message " + message + ", correaltionId = " + correlationId);

            //接收回调消息
            while (true)
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String receivedCorrelationId = delivery.getProperties().getCorrelationId();
                if(correlationId.equals(receivedCorrelationId))
                    System.out.println("RpcClient receive format message " + new String(delivery.getBody(), "UTF-8") + ", correaltionId = " + correlationId);
                    break;
                
            
         catch (IOException e) 
            e.printStackTrace();
         catch (InterruptedException e) 
            e.printStackTrace();
         finally 
            try 
                channel.close();
                connection.close();
             catch (IOException e) 
                e.printStackTrace();
            
        

    

 

   rpc服务端

  

/**
 * rpc服务器
 */
public class RpcServer 
    private static final String RPC_QUEUE_NAME = "rpc_queue";

    private static String format(String message)
        return "......" + message + "......";
    

    public static void main(String[] args) 
        ConnectionFactory connectionFactory = new ConnectionFactory();
        Connection connection = null;
        try 
            connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(RPC_QUEUE_NAME,false,false,false,null);
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //声明消费者预取的消息数量
            channel.basicQos(1);
            channel.basicConsume(RPC_QUEUE_NAME,false,consumer);//采用手动回复消息
            System.out.println("RpcServer waitting for receive message");

            while (true)
                //接收并处理消息
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println("RpcServer receive message " + message);
                String response = format(message);
                //确认收到消息
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

                //取出消息的correlationId
                AMQP.BasicProperties properties = delivery.getProperties();
                String correlationId = properties.getCorrelationId();

                //创建具有与接收消息相同的correlationId的消息属性
                AMQP.BasicProperties replyProperties = new AMQP.BasicProperties().builder().correlationId(correlationId).build();
                channel.basicPublish("",properties.getReplyTo(),replyProperties,response.getBytes());
            

         catch (IOException e) 
            e.printStackTrace();
         catch (InterruptedException e) 
            e.printStackTrace();
        
    

   先运行服务端,再运行客户端,结果如下:

  RpcClient

  技术分享图片

  RpcServer

  技术分享图片

  

 


rpc使用rabbitmq实现

...在本地实现测试学习双向连接客户端和服务端配置:连接rabbitmq服务器定义消息队列配置发送请求的模板:交换机、消息队列。配置监听处理:监听的队列、消息转换处理配置处理类:处理类、发布的接口。RPC,全称为RemoteProcedu... 查看详情

rabbitmq:消息远程复制(shovel插件)

参考技术A官网参考:https://www.rabbitmq.com/shovel.htmlShovel可以实现在两个rabbitmq之间进行消息的远程复制。利用该组件,可以实现如下业务场景:cmd下运行在Docker下启动rabbitmq容器,并通过--name指定容器名字为rabbitmq1进入容器成功进... 查看详情

rabbitmq学习(远程过程调用(rpc))

...通常被称为远程过程调用或RPC。在本教程中,我们将使用RabbitMQ构建一个RPC系统:一个客户端和一个可扩展的RPC服务器。由于我们没有任何值得分发的耗时任务,我们将创建一个返回斐波那契数字的虚拟RPC服务。 客户端界面 查看详情

rabbitmq初探——用队列实现rpc

rabbitmq构造rpc前言 rpc——remoteprocedurecall远程调用。在我接触的使用过http协议、thrift框架来实现远程调用。其实消息队列rabbitmq也可以实现。 原理 我们称调用远程服务者为Client,远程服务提供者为Server。Client充当生产... 查看详情

rabbitmq学习:远程结果调用

 场景:我们需要在传输消息时得到结果客服端在发送请求时会发送回调队列,服务端处理事情完成后会将结果返回到回调队列中,在增加关联标志关联每个请求和服务返回客户端代码:publicclassRPCClient{   privatefinal... 查看详情

springboot|rabbitmq实现rpc方式远程同步调用

上篇相关文章 Springboot项目整合Rabbitmq详细教程服务端代码示例@ResourceprivateRabbitTemplaterabbitTemplate;/***同步 对外发送消息的方法*@parammsg 具体的消息内容*@throwsException*/publicStringsyncSend(Stringmsg)MessagePropertiesme 查看详情

springboot|rabbitmq实现rpc方式远程同步调用

上篇相关文章 Springboot项目整合Rabbitmq详细教程服务端代码示例@ResourceprivateRabbitTemplaterabbitTemplate;/***同步 对外发送消息的方法*@parammsg 具体的消息内容*@throwsException*/publicStringsyncSend(Stringmsg)MessagePropertiesme 查看详情

rabbitmq学习笔记-p2(springamqp)(代码片段)

...超级推荐!!SpringAMQP1.初识SpringAMQPSpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。SpringAmqp的官方地址:https://spring.io/projects/spring-am 查看详情

java实现利用rabbitmq发送和消费消息

1.简介   RabbitMQ是一个消息代理。从本质上说,它从生产者接收消息,然后把这些消息传递给消费者。在这期间,它能根据你制定的规则发送,缓存,或者持久化存储这些消息。RabbitMQ使用到的专业术语。1).Pr... 查看详情

rabbitmq利用消息超时和死信交换机实现定时任务

  在RabbitMQ的基础功能中,并没有定时任务或者延时任务这种功能,然而很多业务都有此类需求,但是我们可以依赖RabbitMQ的消息自动超时和死信交换机等基本的属性来实现这类需求,大致思路如下:  1.通过计算任务执行时... 查看详情

内网穿透实现在外远程连接rabbitmq服务(代码片段)

文章目录前言1.安装erlang语言2.安装rabbitMQ3.内网穿透3.1安装cpolar内网穿透(支持一键自动安装脚本)3.2创建HTTP隧道4.公网远程连接5.固定公网TCP地址5.1保留一个固定的公网TCP端口地址5.2配置固定公网TCP端口地址转载自远控源码文章&#x... 查看详情

内网穿透实现在外远程连接rabbitmq服务(代码片段)

文章目录前言1.安装erlang语言2.安装rabbitMQ3.内网穿透3.1安装cpolar内网穿透(支持一键自动安装脚本)3.2创建HTTP隧道4.公网远程连接5.固定公网TCP地址5.1保留一个固定的公网TCP端口地址5.2配置固定公网TCP端口地址转载自远控源码文章&#x... 查看详情

rabbitmq学习:rabbitmq之扇形交换机主题交换机(代码片段)

 前言上篇我们学习了rabbitmq的作用以及直连交换机的代码实现,这篇我们继续看如何用代码实现扇形交换机和主题交换机一、扇形交换机  1.生产者  /***生产者*/publicclassLogProducer//交换机名称publicfinalstaticStringEXCHANGE_NAME="... 查看详情

rabbitmq学习rabbitmq六大核心部分学习(代码片段)

六大核心部分Java代码实现HelloWorld新建Maven工程,引入依赖<!--指定jdk编译版本--><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plu 查看详情

rabbitmq学习笔记

1基本概念rabbitmqserver(brokerserver):rabbitmq服务client:包括producers和consumermessage:包括payload和labelexchange:producer发布message的地方queue:messages存放和consumer收取message的地方bindings:将message从exchange到不同queue的实现con 查看详情

rabbitmq学习系列:介绍

1. 介绍    RabbitMQ是一个由erlang开发的基于AMQP(AdvancedMessageQueue)协议的开源实现。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面都非常的优秀。是当前最主流的消息中间件之一。  ... 查看详情

rabbitmq学习系列:介绍

1. 介绍    RabbitMQ是一个由erlang开发的基于AMQP(AdvancedMessageQueue)协议的开源实现。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面都非常的优秀。是当前最主流的消息中间件之一。  ... 查看详情

rabbitmq

h3{background-color:palegreen}一、简介解释RabbitMQ,就不得不提到AMQP(AdvancedMessageQueuingProtocol)协议。AMQP协议是一种基于网络的消息传输协议,它能够在应用或组织之间提供可靠的消息传输。RabbitMQ是该AMQP协议的一种实现,利用它,... 查看详情