rabbitmq初探——用队列实现rpc

王大西 王大西     2022-09-24     492

关键词:

rabbitmq构造rpc

前言


 

rpc——remote procedure call 远程调用。在我接触的使用过http协议、thrift框架来实现远程调用。其实消息队列rabbitmq也可以实现。

 

原理


 

我们称调用远程服务者为Client,远程服务提供者为Server。

Client充当生产者,将请求发送到rabbitmq队列中,Server作为消费者,处理Client请求产生结果数据result,此刻Server作为生产者,将result

通过rabbitmq队列传递到Client,Client作为结果数据的消费者,得到result。

 

代码


rpc_client.php

 

<?php
/**
 * Created by PhpStorm.
 * User: 王大西
 * Date: 2017/10/23
 * Time: 16:36
 */
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class RpcClient
{
    private $connection = null;
    private $channel = null;
    private $callbackQueue = null;
    private $response = null;
    private $corrId = null;

    public function __construct()
    {
        $this->connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
        $this->channel = $this->connection->channel();

        list($this->callbackQueue, ,) = $this->channel->queue_declare("", false, false, true, false);
        $this->channel->basic_consume($this->callbackQueue, '', false, false, false, false, array($this, 'onResponse'));
    }

    public function onResponse($rep)
    {
        if ($rep->get('correlation_id') == $this->corrId) {
            $this->response = $rep->body;
        }
    }

    public function call($n)
    {
        $this->response = null;
        $this->corrId = uniqid();

        $msg = new AMQPMessage((string) $n, array(
            'correlation_id' => $this->corrId,
            'reply_to' => $this->callbackQueue
        ));

        $this->channel->basic_publish($msg, '', 'rpc_queue1');
        while (!$this->response) {
            $this->channel->wait();
        }
        return intval($this->response);
    }

}

$number = isset($argv[1]) ? $argv[1] : 30;
$objRpcClient = new RpcClient();
$response = $objRpcClient->call($number);

echo " RPC result $response\n";

 rpc_server.php

<?php
/**
 * rpc server
 * Created by PhpStorm.
 * User: 王大西
 * Date: 2017/10/23
 * Time: 16:36
 */
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('rpc_queue1', false, false, false, false);

function fib($n){
    if ($n == 0) {
        return 0;
    }
    if ($n == 1) {
        return 1;
    }
    return fib($n-1) + fib($n-2);
}

echo " [x] Awaiting RPC requests\n";
$callback = function($req){
    $n = intval($req->body);
    //todo $n empty return
    echo " [.] fib(", $n, ")\n";

    $msg = new AMQPMessage((string) fib($n), array('correlation_id' => $req->get("correlation_id")) );
    $req->delivery_info['channel']->basic_publish($msg, '', $req->get('reply_to'));

    $req->delivery_info['channel']->basic_ack($req->delivery_info['delivery_tag']);
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume('rpc_queue1', '', false, false, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

 

测试


server

 

client

 

 

rpc使用rabbitmq实现

...在本地实现测试学习双向连接客户端和服务端配置:连接rabbitmq服务器定义消息队列配置发送请求的模板:交换机、消息队列。配置监听处理:监听的队列、消息转换处理配置处理类:处理类、发布的接口。RPC,全称为RemoteProcedu... 查看详情

rabbitmq学习:利用rabbitmq实现远程rpc调用(代码片段)

一、rabbitmq实现rpc调用的原理·rabbitmq实现rpc的原理是:客户端向一个队列中发送消息,并注册一个回调的队列用于接收服务端返回的消息,该消息需要声明一个叫做correaltionId的属性,该属性将是该次请求的唯一标识。服务端在... 查看详情

rabbitmq:rpc的实现(代码片段)

原文:RabbitMQ(四):RPC的实现一、RPC  RPC(RemoteProcedureCall)—远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。有很多方式可以实现,譬如UNIXRPC、RESTAPI、WCF和SOAP。这些传统... 查看详情

rabbitmq中rpc的实现及其通信机制(代码片段)

RabbitMQ中RPC的实现:客户端发送请求消息,服务端回复响应消息,为了接受响应response,客户端需要发送一个回调队列的地址来接受响应,每条消息在发送的时候会带上一个唯一的correlation_id,相应的服务端处理计算后会将结果返... 查看详情

rabbitmq消息队列rpc应用2

基于RabbitMQRPC实现的主机异步管理地址原文:http://blog.51cto.com/baiying/2065436,作者大大,我把原文贴出来了啊。不要告我[email protected]:~/workspace#treeManageHost/ManageHost/├──environment│  ├──base_dir.py│  ├ 查看详情

rpc和消息队列(代码片段)

目录RPC和消息队列1消息队列2Rabbitmq2.1安装2.2基本使用2.3消息确认机制2.4持久化2.5闲置消费2.6发布订阅2.7关键字2.8模糊匹配2.9rabbitmq实现rpc3python中的rpc框架3.1SimpleXMLRPCServer3.2ZeroRPC实现rpcRPC和消息队列1消息队列1两个服务调用:restf... 查看详情

rabbitmq——rabbitmq的高级特性(ttl死信队列延迟队列优先级队列rpc)

摘要本博文将介绍数据可靠性的一些细节,并展示RabbitMQ的几种已具备或衍生的高级特性,包括TTL、死信队列、延迟队列、优先级队列、RPC等,这些功能在实际使用中可以让某些应用的实现变得事半功倍。同时对源码进行详细的... 查看详情

rabbitmq初探——消息分发

...ker进程(消费者)处理耗时长,所以就有了任务的积压。rabbitmq提供了任务分发的机制。流程弱化如下图:可以接入多个消费者,rabbitmq会将消息均匀的分发给每一个消费者。耗时任务 我们可以在consumer端用sleep() 查看详情

rabbitmq学习rabbitmq初探(代码片段)

Rabbitmq学习(一)Rabbitmq初探理论定义消息队列:在消息的传输过程中保存消息的的容器。这是一个较为经典的消费-生产者模型,说起来比较抽象,打个比方:A线程需要给B线程发送消息(A、B线程不一定是在同一台机器上的),A线... 查看详情

rabbitmq普通使用

rabbitmq是一种消息队列服务,可以实现rpc,一般情况下openstack的rpc用的就是用它做的,它有很多的用途。除了Qpid以外它是唯一实现了AMQP标准的代理服务器1.安装安装rabbitmq是一件十分容易的事情,在yum源正常的情况下直接即可yumi... 查看详情

RabbitMQ RPC:排他队列锁定 @ PHP

】RabbitMQRPC:排他队列锁定@PHP【英文标题】:RabbitMQRPC:Exclusivequeueslocking@PHP【发布时间】:2012-06-1915:12:19【问题描述】:我正在尝试使用类似于此示例的RabbitMQ在PHP上构建RPC服务:http://www.rabbitmq.com/tutorials/tutorial-six-java.html我正在... 查看详情

springboot+rabbitmq实现rpc调用(代码片段)

...都是RESTfulAPI、Dubbo、WebService、JavaRMI、CORBA等。其实,RabbitMQ也给我们提供了RPC功能,并且使用起来很简单。今天松哥通过一个简单的案例来和大家分享一下SpringBoot+RabbitMQ如何实现一个简单的RPC调用。注意关于RabbitMQ实... 查看详情

springboot+rabbitmq实现rpc调用(代码片段)

...都是RESTfulAPI、Dubbo、WebService、JavaRMI、CORBA等。其实,RabbitMQ也给我们提供了RPC功能,并且使用起来很简单。今天松哥通过一个简单的案例来和大家分享一下SpringBoot+RabbitMQ如何实现一个简单的RPC调用。注意关于RabbitMQ实... 查看详情

基于rabbitmq的rpc框架(代码片段)

...理消息,但是,它会接受数据、存储消息数据、转发消息。RabbitMQ术语生产者:消息发送者,在MQ中被称为生产者(producer),一个发送消息的应用也被叫做生产者,用P表示消费者:生产者“生产”出消息后,最终由谁消费呢... 查看详情

是否可以为 RabbitMQ RPC C# 配置多个/不同的回调队列?

】是否可以为RabbitMQRPCC#配置多个/不同的回调队列?【英文标题】:Isitpossibletoconfiguremultiple/differentCallBackqueueforRabbitMQRPCC#?【发布时间】:2015-08-2106:12:24【问题描述】:我想要这样的东西......客户提出的请求。ThroughtExchange转到rpc_... 查看详情

rabbitmq初探——发布与订阅

...息推送给所有订阅到该频道的用户。这里我们就需要用到rabbitmq的发布与订阅(publishandsubscribe) 原理前面我们弱化rabbitmq,只抽象出了生产者、队列、消费者三个概念。现在需要介绍rabbitmq的整体数据流转过程。数据 查看详情

rabbitmq学习:rabbitmq(消息队列)的作用以及rabbitmq之直连交换机(代码片段)

前言  上篇介绍了AMQP的基本概念,组成及其与rabbitmq的关系。了解了这些东西后,下面我们开始学习rabbitmq(消息队列)的作用以及用java代码和rabbitmq通讯进行消息发布和接收。因为消息的的接收以及路由都是通过交换机实现... 查看详情

rabbitmq初探——helloworld

HelloWorld前言 这里我们弱化broker内部构造。将整体分为三部分。P:producer。生产者。C:Consumer。消费者。queue:队列。后面的代码都依赖于 the php-amqplib API composer安装php-amqplib  1.新建composer.json{"require":{"php- 查看详情