万物互联之~rpc专栏(代码片段)

dunitian dunitian     2023-02-24     742

关键词:

其他专栏最新篇:协程加强之~兼容答疑篇 | 聊聊数据库~SQL环境篇

上篇回顾:万物互联之~深入篇

3.RPC引入

Code:https://github.com/lotapp/BaseCode/tree/master/python/6.net/6.rpc/

3.1.概念

RPC(Remote Procedure Call):分布式系统常见的一种通信方法(远程过程调用),通俗讲:可以一台计算机的程序调用另一台计算机的子程序(可以把它看成之前我们说的进程间通信,只不过这一次的进程不在同一台PC上了)

PS:RPC的设计思想是力图使远程调用中的通讯细节对于使用者透明,调用双方无需关心网络通讯的具体实现

引用一张网上的图:
技术分享图片

HTTP有点相似,你可以这样理解:

  1. 老版本的HTTP/1.0是短链接,而RPC是长连接进行通信
    • HTTP协议(header、body),RPC可以采取HTTP协议,也可以自定义二进制格式
  2. 后来HTTP/1.1支持了长连接(Connection:keep-alive),基本上和RPC差不多了
    • keep-alive一般都限制有最长时间,或者最多处理的请求数,而RPC是基于长连接的,基本上没有这个限制
  3. 后来谷歌直接基于HTTP/2.0建立了gRPC,它们之间的基本上也就差不多了
    • 如果硬是要区分就是:HTTP-普通话RPC-方言的区别了
    • RPC高效而小众,HTTP效率没RPC高,但更通用
  4. PS:RPCHTTP调用不用经过中间件,而是端到端的直接数据交互
    • 网络交互可以理解为基于Socket实现的(RPCHTTP都是Socket的读写操作)

简单概括一下RPC的优缺点就是:

  1. 优点:
    1. 效率更高(可以自定义二进制格式)
    2. 发起RPC调用的一方,在编写代码时可忽略RPC的具体实现(跟编写本地函数调用一般
  2. 缺点:
    • 通用性不如HTTP(方言普及程度肯定不如普通话),如果传输协议不是HTTP协议格式,调用双方就需要专门实现通信库

PS:HTTP更多是ClientServer的通讯;RPC更多是内部服务器间的通讯

3.2.引入

上面说这么多,可能还没有来个案例实在,我们看个案例:

本地调用sum()

def sum(a, b):
    """return a+b"""
    return a + b

def main():
    result = sum(1, 2)
    print(f"1+2=result")

if __name__ == "__main__":
    main()

输出:(这个大家都知道)

1+2=3

1.xmlrpc案例

官方文档:

https://docs.python.org/3/library/xmlrpc.client.html
https://docs.python.org/3/library/xmlrpc.server.html

都说RPC用起来就像本地调用一样,那么用起来啥样呢?看个案例:

服务端:(CentOS7:192.168.36.123:50051)

from xmlrpc.server import SimpleXMLRPCServer

def sum(a, b):
    """return a+b"""
    return a + b

# PS:50051是gRPC默认端口
server = SimpleXMLRPCServer(('', 50051))
# 把函数注册到RPC服务器中
server.register_function(sum)
print("Server启动ing,Port:50051")
server.serve_forever()

客户端:(Win10:192.168.36.144

from xmlrpc.client import ServerProxy

stub = ServerProxy("http://192.168.36.123:50051")
result = stub.sum(1, 2)
print(f"1+2=result")

输出:(Client用起来是不是和本地差不多?就是通过代理访问了下RPCServer而已)

1+2=3

技术分享图片

PS:CentOS服务器不是你绑定个端口就一定能访问的,如果不能记让防火墙开放对应的端口

这个之前在说MariaDB环境的时候有详细说:https://www.cnblogs.com/dotnetcrazy/p/9887708.html#_map4

# 添加 --permanent永久生效(没有此参数重启后失效)
firewall-cmd --zone=public --add-port=80/tcp --permanent

2.ZeroRPC案例:

zeroRPC用起来和这个差不多,也简单举个例子吧:

把服务的某个方法注册到RPCServer中,供外部服务调用

import zerorpc

class Test(object):
    def say_hi(self, name):
        return f"Hi,My Name isname"


# 注册一个Test的实例
server = zerorpc.Server(Test())
server.bind("tcp://0.0.0.0:50051")
server.run()

调用服务端代码

import zerorpc

client = zerorpc.Client("tcp://192.168.36.123:50051")
result = client.say_hi("RPC")
print(result)

3.3.简单版自定义RPC

看了上面的引入案例,是不是感觉RPC不过如此?NoNoNo,要是真这么简单也就谈不上RPC架构了,上面两个是最简单的RPC服务了,可以这么说:生产环境基本上用不到,只能当案例练习罢了,对Python来说,最常用的RPC就两个gRPC and Thrift

PS:国产最出名的是Dubbo and Tars,Net最常用的是gRPCThriftSurging

1.RPC服务的流程

要自己实现一个RPC Server那么就得了解整个流程了:

  1. Client(调用者)以本地调用的方式发起调用
  2. 通过RPC服务进行远程过程调用(RPC的目标就是要把这些步骤都封装起来,让使用者感觉不到这个过程)
    1. 客户端的RPC Proxy组件收到调用后,负责将被调用的方法名、参数等打包编码成自定义的协议
    2. 客户端的RPC Proxy组件在打包完成后通过网络把数据包发送给RPC Server
    3. 服务端的RPC Proxy组件把通过网络接收到的数据包按照相应格式进行拆包解码,获取方法名和参数
    4. 服务端的RPC Proxy组件根据方法名和参数进行本地调用
    5. RPC Server(被调用者)本地执行后将结果返回给服务端的RPC Proxy
    6. 服务端的RPC Proxy组件将返回值打包编码成自定义的协议数据包,并通过网络发送给客户端的RPC Proxy组件
    7. 客户端的RPC Proxy组件收到数据包后,进行拆包解码,把数据返回给Client
  3. Client(调用者)得到本次RPC调用的返回结果

用一张时序图来描述下整个过程:
技术分享图片

PS:RPC Proxy有时候也叫Stub(存根):(Client Stub,Server Stub)

为屏蔽客户调用远程主机上的对象,必须提供某种方式来模拟本地对象,这种本地对象称为存根(stub),存根负责接收本地方法调用,并将它们委派给各自的具体实现对象

PRC服务实现的过程中其实就两核心点:

  1. 消息协议:客户端调用的参数和服务端的返回值这些在网络上传输的数据以何种方式打包编码和拆包解码
    • 经典代表:Protocol Buffers
  2. 传输控制:在网络中数据的收发传输控制具体如何实现(TCP/UDP/HTTP

2.手写RPC

下面我们就根据上面的流程来手写一个简单的RPC:

1.Client调用:

# client.py
from client_stub import ClientStub

def main():
    stub = ClientStub(("192.168.36.144", 50051))

    result = stub.get("sum", (1, 2))
    print(f"1+2=result")

    result = stub.get("sum", (1.1, 2))
    print(f"1.1+2=result")

    time_str = stub.get("get_time")
    print(time_str)

if __name__ == "__main__":
    main()

输出:

1+2=3
1.1+2.2=3.1
Wed Jan 16 22

2.Client Stub,客户端存根:(主要有打包解包、和RPC服务器通信的方法)

# client_stub.py
import socket

class ClientStub(object):
    def __init__(self, address):
        """address ==> (ip,port)"""
        self.socket = socket.socket()
        self.socket.connect(address)

    def convert(self, obj):
        """根据类型转换成对应的类型编号"""
        if isinstance(obj, int):
            return 1
        if isinstance(obj, float):
            return 2
        if isinstance(obj, str):
            return 3

    def pack(self, func, args):
        """打包:把方法和参数拼接成自定义的协议
        格式:func:函数名@params:类型-参数,类型2-参数2...
        """
        result = f"func:func"
        if args:
            params = ""
            # params:类型-参数,类型2-参数2...
            for item in args:
                params += f"self.convert(item)-item,"
            # 去除最后一个,
            result += f"@params:params[:-1]"
        # print(result)  # log 输出
        return result.encode("utf-8")

    def unpack(self, data):
        """解包:获取返回结果"""
        msg = data.decode("utf-8")
        # 格式应该是"data:xxxx"
        params = msg.split(":")
        if len(params) > 1:
            return params[1]
        return None

    def get(self, func, args=None):
        """1.客户端的RPC Proxy组件收到调用后,负责将被调用的方法名、参数等打包编码成自定义的协议"""
        data = self.pack(func, args)
        # 2.客户端的RPC Proxy组件在打包完成后通过网络把数据包发送给RPC Server
        self.socket.send(data)
        # 等待服务端返回结果
        data = self.socket.recv(2048)
        if data:
            return self.unpack(data)
        return None

简要说明下:(我根据流程在Code里面标注了,看起来应该很轻松)

之前有说到核心其实就是消息协议and传输控制,我客户端存根的消息协议是自定义的格式(后面会说简化方案):func:函数名@params:类型-参数,类型2-参数2...,传输我是基于TCP进行了简单的封装


3.Server端:(实现很简单)

# server.py
import socket
from server_stub import ServerStub

class RPCServer(object):
    def __init__(self, address, mycode):
        self.mycode = mycode
        # 服务端存根(RPC Proxy)
        self.server_stub = ServerStub(mycode)
        # TCP Socket
        self.socket = socket.socket()
        # 端口复用
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        # 绑定端口
        self.socket.bind(address)

    def run(self):
        self.socket.listen()
        while True:
            # 等待客户端连接
            client_socket, client_addr = self.socket.accept()
            print(f"来自client_addr的请求:
")
            # 交给服务端存根(Server Proxy)处理
            self.server_stub.handle(client_socket, client_addr)

if __name__ == "__main__":
    from server_code import MyCode
    server = RPCServer(('', 50051), MyCode())
    print("Server启动ing,Port:50051")
    server.run()

为了简洁,服务端代码我单独放在了server_code.py中:

# 5.RPC Server(被调用者)本地执行后将结果返回给服务端的RPC Proxy
class MyCode(object):
    def sum(self, a, b):
        return a + b

    def get_time(self):
        import time
        return time.ctime()

4.然后再看看重头戏Server Stub:

# server_stub.py
import socket

class ServerStub(object):
    def __init__(self, mycode):
        self.mycode = mycode

    def convert(self, num, obj):
        """根据类型编号转换类型"""
        if num == "1":
            obj = int(obj)
        if num == "2":
            obj = float(obj)
        if num == "3":
            obj = str(obj)
        return obj

    def unpack(self, data):
        """3.服务端的RPC Proxy组件把通过网络接收到的数据包按照相应格式进行拆包解码,获取方法名和参数"""
        msg = data.decode("utf-8")
        # 格式应该是"格式:func:函数名@params:类型编号-参数,类型编号2-参数2..."
        array = msg.split("@")
        func = array[0].split(":")[1]
        if len(array) > 1:
            args = list()
            for item in array[1].split(":")[1].split(","):
                temps = item.split("-")
                # 类型转换
                args.append(self.convert(temps[0], temps[1]))
            return (func, tuple(args))  # (func,args)
        return (func, )

    def pack(self, result):
        """打包:把方法和参数拼接成自定义的协议"""
        # 格式:"data:返回值"
        return f"data:result".encode("utf-8")

    def exec(self, func, args=None):
        """4.服务端的RPC Proxy组件根据方法名和参数进行本地调用"""
        # 如果没有这个方法则返回None
        func = getattr(self.mycode, func, None)
        if args:
            return func(*args)  # 解包
        else:
            return func()  # 无参函数

    def handle(self, client_socket, client_addr):
        while True:
            # 获取客户端发送的数据包
            data = client_socket.recv(2048)
            if data:
                try:
                    data = self.unpack(data)  # 解包
                    if len(data) == 1:
                        data = self.exec(data[0])  # 执行无参函数
                    elif len(data) > 1:
                        data = self.exec(data[0], data[1])  # 执行带参函数
                    else:
                        data = "RPC Server Error Code:500"
                except Exception as ex:
                    data = "RPC Server Function Error"
                    print(ex)
                # 6.服务端的RPC Proxy组件将返回值打包编码成自定义的协议数据包,并通过网络发送给客户端的RPC Proxy组件
                data = self.pack(data)  # 把函数执行结果按指定协议打包
                # 把处理过的数据发送给客户端
                client_socket.send(data)
            else:
                print(f"客户端:client_addr已断开
")
                break

再简要说明一下:里面方法其实主要就是解包执行函数返回值打包

输出图示:
技术分享图片

再贴一下上面的时序图:
技术分享图片

课外拓展:

HTTP1.0、HTTP1.1 和 HTTP2.0 的区别
https://www.cnblogs.com/heluan/p/8620312.html

简述分布式RPC框架
https://blog.csdn.net/jamebing/article/details/79610994

分布式基础—RPC
http://www.dataguru.cn/article-14244-1.html

下节预估:RPC服务进一步简化与演变手写一个简单的REST接口

手把手写c++服务器(25):万物皆可文件之socketfd(代码片段)

...【更新中】 前言:大家一定听说过在Linux当中,万物皆是文件,任何客观的存在都是以文件形式呈现。前面讲socket编程的时候(手把手写C++服务器(21):Linuxsocket网络编程入门基础、手把手写C+& 查看详情

elk专栏之es快速入门-01(代码片段)

...?数据库做搜索的弊端站内搜索(垂直搜索)互联网搜索全文检索、倒排索引和Lucene全文检索Lucene什么是ElasticSearch 查看详情

万物之源泉14内置函数二(代码片段)

一.lamda匿名函数为了解决一些简单的需求而设计的一句话函数#计算n的n次方#先用之前的函数办法deffunc(n):returnn**2print(func(9))#用lambda方法:a=lambdan:n*nprint(a(9))lambda表示的是匿名函数不需要用def声明语法:  函数名=lambda参数:返回值... 查看详情

elk专栏之javaapi操作-02(代码片段)

ELK专栏之JavaAPI操作-02JavaAPI实现文档管理ES的技术特点Java客户端简单获取数据JavaAPI文档准备工作使用Java客户端操作ES结合Spring-boot-test测试文档准备工作查询新增修改删除批量增删改bulkJavaAPI实现文档管理ES的技术特点●ES技术比... 查看详情

elk专栏之es索引-04(代码片段)

ELK专栏之ES索引-04索引Index入门为什么我们要手动创建索引?索引管理创建索引查询索引修改索引删除索引定制分词器默认分词器修改分词器的位置定制自己的分词器type底层结构及弃用原因type是什么?ES中不同的type存储... 查看详情

万物互联的背后,有一座“数据围城”

...段的物联网也被称作数据“泛在聚合”意义上的物联网。万物互联的时代造就了庞大的数据海洋,Kevin认为应该通过对其中每个数据进行属性的精确标识,全面实现数据的资源化。如果不能合理、合规、合情的利用这些数据,我... 查看详情

elk专栏之es内部机制-03(代码片段)

ELK专栏之ES内部机制-03ES内部机制ES分布式基础ES对复杂分布式机制的透明隐藏特性ES的垂直扩容和水平扩容增加和减少节点,数据重新分配master节点节点对等的分布式架构分片shard、副本replica机制单Node(节点)环境下... 查看详情

java设计模式实战,适配器模式之万物拟人化(代码片段)

非常感谢你阅读本文,欢迎【👍点赞】【⭐收藏】【📝评论】~放弃不难,但坚持一定很酷!希望我们大家都能每天进步一点点!🎉本文由二当家的白帽子https://le-yi.blog.csdn.net/博客原创,转载请注... 查看详情

java设计模式实战,适配器模式之万物拟人化(代码片段)

非常感谢你阅读本文,欢迎【👍点赞】【⭐收藏】【📝评论】~放弃不难,但坚持一定很酷!希望我们大家都能每天进步一点点!🎉本文由二当家的白帽子https://le-yi.blog.csdn.net/博客原创,转载请注... 查看详情

esp8266+blinker的万物互联(智能家居篇)(代码片段)

...ESP8266将温湿度等上传到阿里云平台,于是我想起来了万物互联,就想先做个有关智能家居的,通过查阅资料发现了Blinker(轻松物联网)。下面是演示视频Esp8266物联网文章目录前言一、准备二、操作步骤1.Blinke... 查看详情

消息总线扩展之集成thrift-rpc(代码片段)

本文主要探讨了消息总线支持ThriftRPC的实现过程。鉴于RabbitMQ官方的JavaClient提供了基于RabbitMQ的JSON-RPC,消息总线也顺道提供了JSON-RPC的API。然后也尝试了为消息总线增加对Thrift-RPC的扩展支持,希望此举能让消息总线同时... 查看详情

万物皆对象,你信吗?(代码片段)

先来回答:万物皆对象?大学上课的时候,老湿一上来就给在座的各位单身狗说:万物皆对象。当时就惊喜万分说:那如何拥有对象?new出来。年少不懂事,如今今非昔比,那么万物皆对象真的是... 查看详情

spark源码研读-散篇记录:spark内置rpc框架之transportconf(代码片段)

1Spark版本Spark2.1.0。2说明去年在网易之初,已经开发了一个完整的RPC框架,其中使用的核心技术也是Netty,所以当看到Spark的RPC框架时,并不觉得太陌生,关于个人开发的这个RPC框架,真正完全可用是在今年,明年会完善一下,开... 查看详情

服务化实战之dubbodubboxmotanthriftgrpc等rpc框架比较及选型(代码片段)

概述前段时间项目要做服务化,所以我比较了现在流行的几大RPC框架的优缺点以及使用场景,最终结合本身项目的实际情况选择了使用dubbox作为rpc基础服务框架。下面就简单介绍一下RPC框架技术选型的过程。RPC简述该系... 查看详情

引点科技私房菜专栏之第一个python小爬虫(代码片段)

文章目录第一个Python小爬虫分析网页编写爬虫结果第一个Python小爬虫分析网页打开网站猫眼电影TOP100,打开开发者工具,查看网页源代码,找到列表代码,也可以直接看下边的代码。<dd><iclass 查看详情

微服务之协议概述(代码片段)

微服务协议互联网协议很多,TCPIP是基础协议,在它之上有众多应用层协议,这里关注的微服务以什么协议向外提供服务,即以什么方式,或者说以什么手段,通过什么媒介来提供面向用户或者其他服务提供他们所需要的服务。传统... 查看详情

netty框架之协议应用二(rpc开发实战之dubbo)(代码片段)

前言netty框架马上就进入尾声了,小编没有特别深入的讲解,第一是网络编程确实挺难的,第二用好netty其实是挺不容易的一件事情,尤其都是异步的情况下,今天小编继续为大家带来开发实战,上次分享... 查看详情

netty框架之协议应用二(rpc开发实战之dubbo)(代码片段)

前言netty框架马上就进入尾声了,小编没有特别深入的讲解,第一是网络编程确实挺难的,第二用好netty其实是挺不容易的一件事情,尤其都是异步的情况下,今天小编继续为大家带来开发实战,上次分享... 查看详情