rabbitmq:rpc的实现(代码片段)

lonelyxmas lonelyxmas     2023-03-13     351

关键词:

原文:RabbitMQ(四):RPC的实现

一、RPC

  RPC(Remote Procedure Call)—远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。有很多方式可以实现,譬如UNIX RPC、REST API、WCF和SOAP。这些传统的RPC实现方法有共同之处:那就是客户端和服务器端紧密相连,客户端直接连接上服务器,发送一个请求,然后就停下来等待服务器的应答。

  这种点对点的性质模式有很多好处,它使得在小范围内的拓扑变得简单。但是当有众多服务器的时候,客户端如何发现在那台服务器上可以找到其他想要的服务就变的麻烦,SOAP和大多数的企业RPC已经采用复杂的补充协议和服务目录,但也带来了额外的复杂度和众多故障点。

  但是,用RabbitMQ来实现RPC可以无需关心由那台服务器来处理,也不必担心服务器奔溃,只需要简单的发送消息,然后等待响应即可。一般接触RabbitMQ的都是用发后即忘模型,用于发送邮件等通知或者处理其他并行处理事件,也就是AMQP的消息是单向的。如何才能让服务器将处理结果返回给原始的客户端呢?

二、消息应答和私有队列

  RabbitMQ有一个优雅的解决方案:使用消息来发回应答。在每个AMQP消息头里有个字段reply_to.消息的生产者可以通过该字段来确定队列的名称,并监听应答队列等待应答。然后接收消息的RPC服务器能偶检查reply_to字段,并创建包含应答内容的新的消息,并以队列名称为路由键,通过应答队列将处理结果发回给生产者。这里我们不需要创建应答队列的名字也不需要将应答队列绑定到交换器上,这是因为没有声明队列的名称RabbitMQ会自动申明,消息发布到RabbitMQ在没有指名交换器的时候,RabbitMQ就会让位目的地是应答队列,而路由键就是应答队列名称。

  所以RabbitMQ实现RPC需要比一般的消息通信多以下几个步骤:

  1. 生产者创建一个应答队列,并监听该队列。
  2. 生产者为消息头中的Reply_to和CorrelationId字段赋值。reply_to是应答队列的名称,CorrelationId是相关标识由消费者返回后对比确认是返回我们的结果。
  3. 消费者返回生产者发送的消息头,并且不需要绑定交换器,并将Reply_to参数作为路由键发送消息到应答队列。

三、自己实现简单的RPC

  其实简单的讲就是生产者在发送消息后接收消息,消费者在接受消息后发送消息,生产者多了一步接收处理消息,消费者多了一步发送消息。我这里简化了一些操作,争取用最少的代码实现,具体代码如下:

  生产者:

private static void MySelfRPCProducer()

    var conn_factory = new ConnectionFactory()HostName = "localhost",UserName = "guest",Password = "guest",Port = 5672;
    using (var conn = conn_factory.CreateConnection())
    
        using (var channel = conn.CreateModel())
        
            IBasicProperties pro = channel.CreateBasicProperties();
            pro.ReplyTo = channel.QueueDeclare().QueueName;//创建应答队列并返回队列名称,这个方法创建的队列exclusive和auto_delete都是true,这样可以确保没有人能窃取信息
            pro.ContentType = "text/plain";
            string corrId = Guid.NewGuid().ToString();
            pro.CorrelationId = corrId;

            channel.BasicPublish("", "rpc_queue", pro, Encoding.UTF8.GetBytes("小黄"));
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (ea, ch) =>
            
                //比较CorrelationId确认是返回的我们的消息
                if (ch.BasicProperties.CorrelationId == corrId)
                
                    //处理返回结果
                    string msg = Encoding.UTF8.GetString(ch.Body);
                    Console.WriteLine(msg);
                
            ;
            string consumer_tag = channel.BasicConsume(pro.ReplyTo, true, consumer);//监听应答队列
            channel.BasicCancel(consumer_tag);
        
    
    Console.ReadLine();

  消费者:

private static void MySelfRPCCousmer()

    var conn_factory = new ConnectionFactory()HostName = "localhost",UserName = "guest",Password = "guest",Port = 5672;
    using (var conn = conn_factory.CreateConnection())
    
        using (var channel = conn.CreateModel())
        
            channel.QueueDeclare("rpc_queue", false, false, false, null);
            var consumer = new EventingBasicConsumer(channel);
            channel.BasicQos(0, 1, false);
            consumer.Received += (ea, ch) =>
            
                string msg = Encoding.UTF8.GetString(ch.Body);
                Console.WriteLine("接收到消息:" + msg);
                //发送处理结果
                channel.BasicPublish("", ch.BasicProperties.ReplyTo, ch.BasicProperties, Encoding.UTF8.GetBytes(msg + "给我回电话了"));
                channel.BasicAck(ch.DeliveryTag, false);
            ;
            string consumer_tag = channel.BasicConsume("rpc_queue", false, consumer);
            Console.ReadLine();//这里先停止运行下面的代码,因为需要持续监听,信道断开就监听不了了
            channel.BasicCancel(consumer_tag);
        
    

 四、RabbitMQ封装好的RPC

  其实RabbitMQ已经封装好了RPC相应的对象,分别是SimpleRpcClient和SimpleRpcServer。客户端在初始化SimpleRpcClient后主要可以通过Call方法发送消息并返回服务端处理结果。服务端的SimpleRpcServer内部定义了很多虚方法,具体的消息处理是我们自己决定的,所以需要继承SimpleRpcServer后实现相应方法,通过实现重写HandleSimpleCall方法可以返回给客户端数据。具体代码如下所示:

  客户端:

private static void RabbitMQRPCProducer()

    var conn_factory = new ConnectionFactory()  HostName = "localhost", UserName = "guest", Password = "guest", Port = 5672 ;
    using (var conn = conn_factory.CreateConnection())
    
        using (var channel = conn.CreateModel())
        
            //创建client的rpc
            SimpleRpcClient client = new SimpleRpcClient(channel, new PublicationAddress(exchangeType: ExchangeType.Direct, exchangeName: string.Empty, routingKey: "rpc_queue"));
            bool flag = true;
            var sendmsg = "";
            while (flag)
            
                Console.WriteLine("请输入要发送的消息");
                sendmsg = Console.ReadLine();
                if (string.IsNullOrWhiteSpace(sendmsg))
                
                    Console.Write("请输入消息");
                    continue;
                
                var msg = client.Call(Encoding.UTF8.GetBytes(sendmsg));
                Console.WriteLine(Encoding.UTF8.GetString(msg));
            
            Console.ReadKey();
        
    

  服务端:

private static void RabbitMQRPCCousmer()


    var conn_factory = new ConnectionFactory()  HostName = "localhost", UserName = "guest", Password = "guest", Port = 5672 ;
    using (var conn = conn_factory.CreateConnection())
    
        //创建返回一个新的频道
        using (var channel = conn.CreateModel())
        
            channel.QueueDeclare("rpc_queue", false, false, false, null);//创建一个rpc queue
            SimpleRpcServer rpc = new MySimpleRpcServer(new Subscription(channel, "rpc_queue"));
            Console.WriteLine("服务端启动成功");
            rpc.MainLoop(); Console.ReadKey();
        
    

  继承实现方法:

class MySimpleRpcServer : SimpleRpcServer

    public MySimpleRpcServer(Subscription subscription) : base(subscription)
    
    
    /// <summary>
    /// 执行完成后进行回调
    /// </summary>
    public override byte[] HandleSimpleCall(bool isRedelivered, IBasicProperties requestProperties, byte[] body, out IBasicProperties replyProperties)
    
        replyProperties = null;
        return Encoding.UTF8.GetBytes($"给Encoding.UTF8.GetString(body)发送短信成功");
    

 五、小结

  以上就是RabbitMQ对于RPC的最简单的实现,与大家共勉。

RabbitMQ RPC 以异步方式?

】RabbitMQRPC以异步方式?【英文标题】:RabbitMQRPCinanasyncway?【发布时间】:2020-09-0407:08:38【问题描述】:我正在为一个使用RabbitMQ和RPC的客户开发一个项目,我不太了解RabbitMQ,我正在努力在互联网上找到一些不错的示例。我需要... 查看详情

RabbitMQ RPC 关闭最终消息的响应队列

】RabbitMQRPC关闭最终消息的响应队列【英文标题】:RabbitMQRPCcloseresponsequeueonfinalmessage【发布时间】:2016-03-1704:53:58【问题描述】:我计划用RabbitMQ实现一个RPC模式。客户端将发送一条消息以请求来自服务器的数据快照。服务器将... 查看详情

RabbitMQ RPC 教程查询

】RabbitMQRPC教程查询【英文标题】:RabbitMQRPCtutorialquery【发布时间】:2015-05-2106:27:24【问题描述】:我正在阅读RabbitMQhere分享的教程我假设下面的客户端代码while(true)varea=(BasicDeliverEventArgs)consumer.Queue.Dequeue();if(ea.BasicProperties.Correla... 查看详情

如何为 RabbitMQ RPC 请求设置超时?

】如何为RabbitMQRPC请求设置超时?【英文标题】:HowtosetatimeouttoaRabbitMQRPCrequest?【发布时间】:2020-04-1321:45:25【问题描述】:在AMQP(RabbitMQ)RPC模型中向主题发布消息是否存在超时?我不想等待很长时间(超时后)消费者对生产者消... 查看详情

Spring Boot 和异步 RabbitMQ RPC

】SpringBoot和异步RabbitMQRPC【英文标题】:SpringbootandasynchronousRabbitMQRPC【发布时间】:2021-08-2522:23:26【问题描述】:我正在尝试实现RabbitMQRPC模式(请求/响应)。这对我来说是全新的技术。所以我很难过。这是一个网络应用,内置... 查看详情

rabbitmq——rpc

RabbitMQ(六)——RPC (转载请附上本文链接——linhxx)  一、概述RabbitMQ的RPC模式,支持生产者和消费者不在同一个系统中,即允许远程调用的情况。通常,消费者作为服务端,放置在远程的系统中,提供接口,生产... 查看详情

跨多个 rabbitMQ 实例的 RabbitMQ RPC

】跨多个rabbitMQ实例的RabbitMQRPC【英文标题】:RabbitMQRPCacrossmultiplerabbitMQinstances【发布时间】:2015-04-1517:53:26【问题描述】:我有三个客户端,每个客户端都有自己的RabbitMQ实例,我有一个应用程序(我们称之为appA),它有自己... 查看详情

使用 Node.js 的 RabbitMQ RPC

】使用Node.js的RabbitMQRPC【英文标题】:RabbitMQRPCusingNode.js【发布时间】:2017-09-0801:49:39【问题描述】:我使用RabbitMQ在Node.js中实现RPC。我按照教程,我为每个客户端声明相同的队列名称为\'rpc_client\'为了高权限,这里是client.js在... 查看详情

RabbitMQ RPC:排他队列锁定 @ PHP

】RabbitMQRPC:排他队列锁定@PHP【英文标题】:RabbitMQRPC:Exclusivequeueslocking@PHP【发布时间】:2012-06-1915:12:19【问题描述】:我正在尝试使用类似于此示例的RabbitMQ在PHP上构建RPC服务:http://www.rabbitmq.com/tutorials/tutorial-six-java.html我正在... 查看详情

是否可以为 RabbitMQ RPC C# 配置多个/不同的回调队列?

】是否可以为RabbitMQRPCC#配置多个/不同的回调队列?【英文标题】:Isitpossibletoconfiguremultiple/differentCallBackqueueforRabbitMQRPCC#?【发布时间】:2015-08-2106:12:24【问题描述】:我想要这样的东西......客户提出的请求。ThroughtExchange转到rpc_... 查看详情

mysort(选做)的实现(代码片段)

MySort(选做)的实现题目内容注意:研究sort的其他功能,要能改的动代码,需要答辩模拟实现Linux下Sort-t:-k2的功能。要有伪代码,产品代码,测试代码(注意测试用例的设计)参考Sort的实现。提交博客链接。代码框架(题目给出... 查看详情

htmlmvvm框架的实现(代码片段)

查看详情

20165215mysort的实现(代码片段)

MySort的实现模拟实现Linux下Sort-t:-k2的功能。要有伪代码,产品代码,测试代码(注意测试用例的设计)代码packageweek12;importjava.util.*;importjava.lang.Integer;publicclassMySortpublicstaticvoidmain(String[]args)String[]toSort="aaa:10:1:1"," 查看详情

golanggotwitzcliapp上的解析功能实现的核心代码。(代码片段)

查看详情

线程的实现方式二(代码片段)

线程实现方式二:    1.自定义一个类实现runnable接口        classThreadimplementsRunnable         查看源代码,我们可以知道Thread实现了runnable接口        runnable中只有一个run抽象类    2.实... 查看详情

rubyactivejob的规范模式实现(代码片段)

查看详情

javascript懒惰的事件实现(代码片段)

查看详情

phplaravel的apimodel实现(代码片段)

查看详情