自定义rpc的完整实现---深入理解rpc内部原理(代码片段)

author author     2023-01-12     308

关键词:

倘若不使用RPC远端调用的情况下,代码如下:

local.py

# coding:utf-8

# 本地调用除法运算的形式
class InvalidOperation(Exception):
    def __init__(self, message = None):
        self.message = message or ‘involid operation‘

def divide(num1, num2 = 1):
    if num2 == 0:
        raise InvalidOperation
    res = num1 / num2
    return res

try:
    val = divide(200, 100)
except InvalidOperation as e:
    print(e.message)
else:
    print(val)

接下来将使用RPC二进制的形式,远程过程调用上述代码。

service.py 中自定义需要实现消息协议、传输控制,并且实现客户端存根clientStub和服务器端存根serverStub,服务器定义以及channel的定义。

import struct
from io import BytesIO
import socket

class InvalidOperation(BaseException):
    def __init__(self, message = None):
        self.message = message or ‘involid operation‘

class MethodProtocol(object):
    ‘‘‘‘
    解读方法名
    ‘‘‘
    def __init__(self, connection):
        self.conn = connection

    def _read_all(self, size):
        """
        帮助我们读取二进制数据
        :param  size: 想要读取的二进制数据大小
        :return:  二进制数据bytes
        """
        # self.conn
        if isinstance(self.conn, BytesIO):
            buff = self.conn.read(size)
            return buff
        else:
            # 有时候长度大于每次读取的长度
            have = 0
            buff = b‘‘
            while have < size:
                chunk = self.conn.recv(size - have)
                buff += chunk
                l = len(chunk)
                have += l
                if l == 0:
                    # 表示客户端已经关闭了
                    raise EOFError
            return buff

    def get_method_name(self):
        # 读取字符串长度
        buff = self._read_all(4)
        length = struct.unpack(‘!I‘,buff)[0]

        # 读取字符串
        buff = self._read_all(length)
        name = buff.decode()
        return name

class DivideProtocol(object):
    """
    divide过程消息协议转换工具
    """
    def args_encode(self, num1, num2=1):
        """
        将原始调用的请求参数转换打包成二进制消息数据
        :param num1: int
        :param num2: int
        :return: bytes 二进制消息数据
        """
        name = ‘divide‘

        # 处理函数名
        buff = struct.pack(‘!I‘, 6) # 无符号int
        buff += name.encode()

        # 处理参数1
        buff2 = struct.pack(‘!B‘, 1) # 无符号byte
        buff2 += struct.pack(‘!i‘, num1)

        # 处理参数2
        if num2 != 1:
            # 没有传参的时候
            buff2 += struct.pack(‘!B‘, 2)
            buff2 += struct.pack(‘!i‘, num2)

        # 处理参数边界和组合成完整数据
        buff += struct.pack(‘!I‘,len(buff2))
        buff += buff2

        return buff

    def _read_all(self, size):
        """
        帮助我们读取二进制数据
        :param  size: 想要读取的二进制数据大小
        :return:  二进制数据bytes
        """
        # self.conn
        if isinstance(self.conn, BytesIO):
            buff = self.conn.read(size)
            return buff
        else:
            # 有时候长度大于每次读取的长度
            have = 0
            buff = b‘‘
            while have < size:
                chunk = self.conn.recv(size - have)
                buff += chunk
                l = len(chunk)
                have +=  l
                if l == 0:
                    # 表示客户端已经关闭了
                    raise EOFError
            return buff

    def args_decode(self, connection):
        """
        接受调用请求数据病进行解析
        :param connection: 链接请求数据 socket  BytesIO
        :return: 因为有多个参数,定义为字典
        """
        param_len_map = 
            1:4,
            2:4,
        

        param_fmt_map = 
            1:‘!i‘,
            2:‘!i‘,
        

        param_name_map = 
            1: ‘num1‘,
            2: ‘num2‘,
        

        # 保存用来返回的参数字典
        args = 

        self.conn = connection
        # 处理方法的名字,已经提前被处理,稍后处理

        # 处理消息边界
        # 1) 读取二进制数据----read  , ------ByteIO.read
        # 2) 将二进制数据转换为python的数据类型
        buff = self._read_all(4)
        length = struct.unpack(‘!I‘,buff)[0]

        # 记录已经读取的长度值
        have = 0

        # 处理第一个参数
        # 解析参数序号
        buff = self._read_all(1)
        have += 1
        param_seq = struct.unpack(‘!B‘, buff)[0]

        # 解析参数值
        param_len = param_len_map[param_seq]
        buff = self._read_all(param_len)
        have += param_len
        param_fmt = param_fmt_map[param_seq]
        param = struct.unpack(param_fmt,buff)[0]

        # 设置解析后的字典
        param_name = param_name_map[param_seq]
        args[param_name] = param

        if have >= length:
            return args
        # 处理第二个参数
        # 解析参数序号
        buff = self._read_all(1)
        param_seq = struct.unpack(‘!B‘, buff)[0]

        # 解析参数值
        param_len = param_len_map[param_seq]
        buff = self._read_all(param_len)
        param_fmt = param_fmt_map[param_seq]
        param = struct.unpack(param_fmt, buff)[0]

        # 设置解析后的字典
        param_name = param_name_map[param_seq]
        args[param_name] = param
        return args

    def result_encode(self, result):
        """
        将原始结果数据转换为消息协议二进制数据
        :param result:
        :return:
        """
        if  isinstance(result,float):
            # 处理返回值类型
            buff = struct.pack(‘!B‘, 1)
            buff += struct.pack(‘!f‘, result)
            return buff
        else:
            buff = struct.pack(‘!B‘, 2)
            # 处理返回值
            length = len(result.message)
            # 处理字符串长度
            buff += struct.pack(‘!I‘, length)
            buff += result.message.encode()
            return buff

    def result_decode(self, connection):
        """
        将返回值消息数据转换为原始返回值
        :param connection: socket BytesIo
        :return: float InvalidOperation对象
        """
        self.conn = connection
        # 处理返回值类型
        buff = self._read_all(1)
        result_type = struct.unpack(‘!B‘, buff)[0]

        if result_type == 1:
            #正常情况
            buff = self._read_all(4)
            val = struct.unpack(‘!f‘, buff)[0]
            return val
        else:
            buff = self._read_all(4)
            length = struct.unpack(‘!I‘, buff)[0]
            # 读取字符串
            buff = self._read_all(length)
            message = buff.decode(buff)
            return InvalidOperation(message)

class Channel(object):
    """
    用于客户端建立网络链接
    """
    def __init__(self, host, port):
        self.host = host
        self.port = port

    def get_connection(self):
        """
        获取链接对象
        :return: 与服务器通讯的socket
        """
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.connect((self.host, self.port))
        return sock

class Server(object):
    """
    RPC服务器
    """
    def __init__(self, host, port, handlers):
        sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)

        # 地址复用
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

        self.host = host
        self.port = port
        # 绑定地址
        sock.bind((self.host, self.port))

        #  因为在启动的方法中才开启监听,所以不在此处开启
        # sock.listen(128)
        self.sock = sock
        self.handlers = handlers

    def serve(self):
        """
        开启服务器运行,提供RPC服务
        :return:
        """
        # 开启服务器的监听,等待客户端的链接请求
        self.sock.listen(128)
        print("服务器开启监听,ip地址为%s,port为%d..." % (self.host,self.port))
        while True:
            # 不断的接收客户端的链接请求
            client_sock, client_addr = self.sock.accept()
            print("与客户端%s建立连接" % str(client_addr))

            # 交个ServerStub,完成客户端的具体的RPC的调用请求
            stub = ServerStub(client_sock, self.handlers)
            try:
                while True:
                    # 不断的接收
                    stub.process()
            except EOFError:
                # 表示客户端关闭了连接
                print(‘客户端关闭了连接‘)
                client_sock.close()

class ClientStub(object):
    """
    用来帮助客户端完成远程过程调用 RPC调用

    stub = ClientStub()
    stub.divide(200, 100)
    """
    def __init__(self, channel):
        self.channel = channel
        self.conn = self.channel.get_connection()

    def divide(self, num1, num2 = 1):
        # 将调用的参数打包成消息协议的数据
        proto = DivideProtocol()
        args = proto.args_encode(num1, num2)
        # 将消息数据通过网络发送给服务器
        self.conn.sendall(args)

        # 接受服务器返回的消息数据,并进行解析
        result = proto.result_decode(self.conn)

        # 将结果之(正常float 或 异常InvalidOperation)返回给客户端
        if isinstance(result,float):
            return result
        else:
            raise result

class ServerStub(object):
    """
    服务端存根
    帮助服务端完成远端过程调用
    """
    def __init__(self, connection, handlers):
        """
        :param connection: 与客户端的链接
        :param handlers: 真正的本地函数路由
        此处不以map的形式处理,实现类的形式
        class Handler:
            @staticmethod
            def divide():
                pass
            @staticmethod
            def add():
                pass
        """
        self.conn = connection
        self.method_proto = MethodProtocol(self.conn)
        self.process_map = 
            ‘divide‘: self._process_divide,
            ‘add‘: self._process_add
        
        self.handlers = handlers

    def process(self):
        """
        当服务端接受了客户的链接,建立好链接后,完成远端调用的处理
        :return:
        """
        # 接收消息数据,并解析方法的名字
        name = self.method_proto.get_method_name()
        # 根据解析获得的方法名,调用相应的过程协议,接收并解析消息数据
        self.process_map[name]()

    def _process_divide(self):
        """
        处理除法过程调用
        :return:
        """
        proto = DivideProtocol()
        args = proto.args_decode(self.conn)
        # args = ‘num1‘:xxx, ‘num2‘:xxx
        # 除法过程的本地调用------------------->>>>>>>>>
        # 将本地调用过程的返回值(包括可能的异常)打包成消息协议的数据,通过网络返回给客户端
        try:
            val = self.handlers.divide(**args)
        except InvalidOperation as e:
            ret_message = proto.result_encode(e)
        else:
            ret_message = proto.result_encode(val)
        self.conn.sendall(ret_message)

    def _process_add(self):
        """
        处理加法过程调用
        此方法暂时不识闲
        :return:
        """
        pass

if __name__ == ‘__main__‘:
    # 目的:消息协议测试,模拟网络传输
    # 构造消息数据
    proto = DivideProtocol()

    # 测试一
    # divide(200,100)
    # message = proto.args_encode(200,100)

    # 测试二
    message = proto.args_encode(200)

    conn = BytesIO()
    conn.write(message)
    conn.seek(0)

    # 解析消息数据
    method_proto = MethodProtocal(conn)
    name = method_proto.get_method_name()
    print(name)

    args = proto.args_decode(conn)
    print(args)

接下来,只需要创建服务器实例和使用客户端发起请求

server.py

from services import InvalidOperation
from services import Server

class Handlers:
    @staticmethod
    def divide(num1, num2 = 1):
        if num2 == 0:
            raise InvalidOperation(‘ck_god_err‘)
        val = num1/num2
        return val

if __name__ == ‘__main__‘:
    # 开启服务器
    _server = Server(‘127.0.0.1‘, 8000, Handlers)
    _server.serve()

client.py

ffrom services import ClientStub
from services import Channel
from services import InvalidOperation

# 创建与服务器的连接
channel = Channel(‘127.0.0.1‘, 8000)

# 创建用于rpc调用的工具
stub = ClientStub(channel)

# 进行调用
for i in range(5):
    try:
        # val = stub.divide(i * 100,100)
        # val = stub.divide(i * 100)
        val = stub.divide( 100, 0)
    except InvalidOperation as e:
        print(e.message)
    else:
        print(val)

分布式理论,架构设计自定义rpc(代码片段)

分布式理论,架构设计(四)自定义RPC自定义RPCRMI基于netty实现RPC框架代码实现服务端代码客户端代码自定义RPC在分布式服务框架中,一个最基础的问题就是远程服务是怎么通讯的,在Java领域中有很多可实现... 查看详情

深入浅出rpc——深入篇(转载)

...载自这里是原文《深入篇》我们主要围绕RPC的功能目标和实现考量去展开,一个基本的RPC框架应该提供什么功能,满足什么要求以及如何去实现它?RPC功能目标RPC的主要功能呢个目标是让构建分布式计算更加容易,在提供强大的... 查看详情

apachethrift-使用,内部实现及构建一个可扩展的rpc框架

...ook开发的远程服务调用框架,它采用接口描述语言(IDL)定义并创建服务,支 查看详情

服务之间的调用之rpc深入理解

...用本地服务(方法)一样调用服务器的服务(方法)。通常的实现有XML-RPC,JSON-RPC,通信方式基本相同,所不同的只是传输数据的格式.RPC是分布式架构的核心,按响应方式分如下两种:同步调用:客户端调用服务方方法,等待直到服务方... 查看详情

自定义rpc

...地址:https://github.com/zhouyanger/java_demo/tree/master/netty五.自定义RPC5.1概述RPC(RemoteProcedureCall),即远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络实现的技术。常见的RPC框架有:源自阿里的Dub... 查看详情

分布式理论,架构设计自定义rpc(代码片段)

分布式理论,架构设计(四)自定义RPC自定义RPCRMI基于netty实现RPC框架代码实现服务端代码客户端代码自定义RPC在分布式服务框架中,一个最基础的问题就是远程服务是怎么通讯的,在Java领域中有很多可实现... 查看详情

分布式理论,架构设计自定义rpc(代码片段)

分布式理论,架构设计(四)自定义RPC自定义RPCRMI基于netty实现RPC框架代码实现服务端代码客户端代码自定义RPC在分布式服务框架中,一个最基础的问题就是远程服务是怎么通讯的,在Java领域中有很多可实现... 查看详情

深入理解java注解原理

...Java内置了一些注解(如@Override、@Deprecated等),还支持自定义注解,一些知名的框架Struts、Hibernate、Spring都有自己实现的自定 查看详情

深入浅出rpc-深入篇

《深入篇》我们主要围绕RPC的功能目标和实现考量去展开,一个基本的RPC框架应该提供什么功能,满足什么要求以及如何去实现它? RPC功能目标RPC的主要功能目标是让构建分布式计算(应用)更容易,在提供强大的远程调用... 查看详情

深入理解aqs(代码片段)

文章目录深入理解AQSAQS概念特点AOS自定义实现锁ReentrantLock原理非公平锁实现原理加锁解锁原理竞争失败原理RenntrantLock可重入的原理可打断原理可打断模式公平锁实现原理非公平锁实现公平锁实现读写锁ReentrantReadWriteLock注意事项... 查看详情

rpc

参考:为什么需要RPC,而不是简单的HTTP接口深入浅出RPC-浅出篇深入浅出RPC-深入篇概念RPC的全称是RemoteProcedureCall是一种进程间通信方式。它允许程序调用另一个地址空间(通常是共享网络的另一台机器上)的过程或函数,而不... 查看详情

深入理解rpc框架的序列化方案

👇推荐大家关注一个公众号👇点击上方"JavaEdge"关注, 星标或置顶一起成长后台回复“面试”有惊喜礼包!               这是一个纷杂而无规则的世界,越想忘掉的事情,越难忘记。 查看详情

一文搞懂rpc的基本原理和层次架构(代码片段)

...协议。但在很多性能要求较高的场景各大企业内部也会自定义的RPC协议。举个例子,就是相当于各个省不但用官方普通话,还都有自己的方言,RPC就相当于是一个方言。RPC 的全称是Rem 查看详情

rpc核心原理

...调用方,被调用的叫做服务提供方,为了实现远程调用,一个完整的RPC会涉及哪些步骤呢?RPC需要通过网络传输数据,并且通常用于系统之间的交互,所以需要保证可靠性,在网络协议上一般会采用TCP,常用的HTTP就是TCP协议之上的.对于调... 查看详情

微服务治理平台的rpc方案实现

...好处是什么。同时也会介绍用友RPC框架的基本结构以及在实现时所用到的一些关键技术。希望通过本文读者能够一窥用友rpc框架的原理,并藉此开发出更优秀的微服务应用。一、rpc在微服务中的重要性  随着越来越多的公司向... 查看详情

微服务治理平台的rpc方案实现

...好处是什么。同时也会介绍用友RPC框架的基本结构以及在实现时所用到的一些关键技术。希望通过本文读者能够一窥用友rpc框架的原理,并藉此开发出更优秀的微服务应用。一、rpc在微服务中的重要性  随着越来越多的公司向... 查看详情

rpc实现原理

RPC实现原理 查看详情

rpc实现原理

RPC实现原理 查看详情