关键词:
rabbitmq构造rpc
前言
rpc——remote procedure call 远程调用。在我接触的使用过http协议、thrift框架来实现远程调用。其实消息队列rabbitmq也可以实现。
原理
我们称调用远程服务者为Client,远程服务提供者为Server。
Client充当生产者,将请求发送到rabbitmq队列中,Server作为消费者,处理Client请求产生结果数据result,此刻Server作为生产者,将result
通过rabbitmq队列传递到Client,Client作为结果数据的消费者,得到result。
代码
rpc_client.php
<?php /** * Created by PhpStorm. * User: 王大西 * Date: 2017/10/23 * Time: 16:36 */ require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; class RpcClient { private $connection = null; private $channel = null; private $callbackQueue = null; private $response = null; private $corrId = null; public function __construct() { $this->connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest'); $this->channel = $this->connection->channel(); list($this->callbackQueue, ,) = $this->channel->queue_declare("", false, false, true, false); $this->channel->basic_consume($this->callbackQueue, '', false, false, false, false, array($this, 'onResponse')); } public function onResponse($rep) { if ($rep->get('correlation_id') == $this->corrId) { $this->response = $rep->body; } } public function call($n) { $this->response = null; $this->corrId = uniqid(); $msg = new AMQPMessage((string) $n, array( 'correlation_id' => $this->corrId, 'reply_to' => $this->callbackQueue )); $this->channel->basic_publish($msg, '', 'rpc_queue1'); while (!$this->response) { $this->channel->wait(); } return intval($this->response); } } $number = isset($argv[1]) ? $argv[1] : 30; $objRpcClient = new RpcClient(); $response = $objRpcClient->call($number); echo " RPC result $response\n";
rpc_server.php
<?php /** * rpc server * Created by PhpStorm. * User: 王大西 * Date: 2017/10/23 * Time: 16:36 */ require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('rpc_queue1', false, false, false, false); function fib($n){ if ($n == 0) { return 0; } if ($n == 1) { return 1; } return fib($n-1) + fib($n-2); } echo " [x] Awaiting RPC requests\n"; $callback = function($req){ $n = intval($req->body); //todo $n empty return echo " [.] fib(", $n, ")\n"; $msg = new AMQPMessage((string) fib($n), array('correlation_id' => $req->get("correlation_id")) ); $req->delivery_info['channel']->basic_publish($msg, '', $req->get('reply_to')); $req->delivery_info['channel']->basic_ack($req->delivery_info['delivery_tag']); }; $channel->basic_qos(null, 1, null); $channel->basic_consume('rpc_queue1', '', false, false, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close();
测试
server
client
rpc使用rabbitmq实现
...在本地实现测试学习双向连接客户端和服务端配置:连接rabbitmq服务器定义消息队列配置发送请求的模板:交换机、消息队列。配置监听处理:监听的队列、消息转换处理配置处理类:处理类、发布的接口。RPC,全称为RemoteProcedu... 查看详情
rabbitmq学习:利用rabbitmq实现远程rpc调用(代码片段)
一、rabbitmq实现rpc调用的原理·rabbitmq实现rpc的原理是:客户端向一个队列中发送消息,并注册一个回调的队列用于接收服务端返回的消息,该消息需要声明一个叫做correaltionId的属性,该属性将是该次请求的唯一标识。服务端在... 查看详情
rabbitmq:rpc的实现(代码片段)
原文:RabbitMQ(四):RPC的实现一、RPC RPC(RemoteProcedureCall)—远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。有很多方式可以实现,譬如UNIXRPC、RESTAPI、WCF和SOAP。这些传统... 查看详情
rabbitmq中rpc的实现及其通信机制(代码片段)
RabbitMQ中RPC的实现:客户端发送请求消息,服务端回复响应消息,为了接受响应response,客户端需要发送一个回调队列的地址来接受响应,每条消息在发送的时候会带上一个唯一的correlation_id,相应的服务端处理计算后会将结果返... 查看详情
rabbitmq消息队列rpc应用2
基于RabbitMQRPC实现的主机异步管理地址原文:http://blog.51cto.com/baiying/2065436,作者大大,我把原文贴出来了啊。不要告我[email protected]:~/workspace#treeManageHost/ManageHost/├──environment│ ├──base_dir.py│ ├ 查看详情
rpc和消息队列(代码片段)
目录RPC和消息队列1消息队列2Rabbitmq2.1安装2.2基本使用2.3消息确认机制2.4持久化2.5闲置消费2.6发布订阅2.7关键字2.8模糊匹配2.9rabbitmq实现rpc3python中的rpc框架3.1SimpleXMLRPCServer3.2ZeroRPC实现rpcRPC和消息队列1消息队列1两个服务调用:restf... 查看详情
rabbitmq——rabbitmq的高级特性(ttl死信队列延迟队列优先级队列rpc)
摘要本博文将介绍数据可靠性的一些细节,并展示RabbitMQ的几种已具备或衍生的高级特性,包括TTL、死信队列、延迟队列、优先级队列、RPC等,这些功能在实际使用中可以让某些应用的实现变得事半功倍。同时对源码进行详细的... 查看详情
rabbitmq初探——消息分发
...ker进程(消费者)处理耗时长,所以就有了任务的积压。rabbitmq提供了任务分发的机制。流程弱化如下图:可以接入多个消费者,rabbitmq会将消息均匀的分发给每一个消费者。耗时任务 我们可以在consumer端用sleep() 查看详情
rabbitmq学习rabbitmq初探(代码片段)
Rabbitmq学习(一)Rabbitmq初探理论定义消息队列:在消息的传输过程中保存消息的的容器。这是一个较为经典的消费-生产者模型,说起来比较抽象,打个比方:A线程需要给B线程发送消息(A、B线程不一定是在同一台机器上的),A线... 查看详情
rabbitmq普通使用
rabbitmq是一种消息队列服务,可以实现rpc,一般情况下openstack的rpc用的就是用它做的,它有很多的用途。除了Qpid以外它是唯一实现了AMQP标准的代理服务器1.安装安装rabbitmq是一件十分容易的事情,在yum源正常的情况下直接即可yumi... 查看详情
RabbitMQ RPC:排他队列锁定 @ PHP
】RabbitMQRPC:排他队列锁定@PHP【英文标题】:RabbitMQRPC:Exclusivequeueslocking@PHP【发布时间】:2012-06-1915:12:19【问题描述】:我正在尝试使用类似于此示例的RabbitMQ在PHP上构建RPC服务:http://www.rabbitmq.com/tutorials/tutorial-six-java.html我正在... 查看详情
springboot+rabbitmq实现rpc调用(代码片段)
...都是RESTfulAPI、Dubbo、WebService、JavaRMI、CORBA等。其实,RabbitMQ也给我们提供了RPC功能,并且使用起来很简单。今天松哥通过一个简单的案例来和大家分享一下SpringBoot+RabbitMQ如何实现一个简单的RPC调用。注意关于RabbitMQ实... 查看详情
springboot+rabbitmq实现rpc调用(代码片段)
...都是RESTfulAPI、Dubbo、WebService、JavaRMI、CORBA等。其实,RabbitMQ也给我们提供了RPC功能,并且使用起来很简单。今天松哥通过一个简单的案例来和大家分享一下SpringBoot+RabbitMQ如何实现一个简单的RPC调用。注意关于RabbitMQ实... 查看详情
基于rabbitmq的rpc框架(代码片段)
...理消息,但是,它会接受数据、存储消息数据、转发消息。RabbitMQ术语生产者:消息发送者,在MQ中被称为生产者(producer),一个发送消息的应用也被叫做生产者,用P表示消费者:生产者“生产”出消息后,最终由谁消费呢... 查看详情
是否可以为 RabbitMQ RPC C# 配置多个/不同的回调队列?
】是否可以为RabbitMQRPCC#配置多个/不同的回调队列?【英文标题】:Isitpossibletoconfiguremultiple/differentCallBackqueueforRabbitMQRPCC#?【发布时间】:2015-08-2106:12:24【问题描述】:我想要这样的东西......客户提出的请求。ThroughtExchange转到rpc_... 查看详情
rabbitmq初探——发布与订阅
...息推送给所有订阅到该频道的用户。这里我们就需要用到rabbitmq的发布与订阅(publishandsubscribe) 原理前面我们弱化rabbitmq,只抽象出了生产者、队列、消费者三个概念。现在需要介绍rabbitmq的整体数据流转过程。数据 查看详情
rabbitmq学习:rabbitmq(消息队列)的作用以及rabbitmq之直连交换机(代码片段)
前言 上篇介绍了AMQP的基本概念,组成及其与rabbitmq的关系。了解了这些东西后,下面我们开始学习rabbitmq(消息队列)的作用以及用java代码和rabbitmq通讯进行消息发布和接收。因为消息的的接收以及路由都是通过交换机实现... 查看详情
rabbitmq初探——helloworld
HelloWorld前言 这里我们弱化broker内部构造。将整体分为三部分。P:producer。生产者。C:Consumer。消费者。queue:队列。后面的代码都依赖于 the php-amqplib API composer安装php-amqplib 1.新建composer.json{"require":{"php- 查看详情