自定义rpc

zhouyanger zhouyanger     2022-12-09     230

关键词:

测试代码Github地址:https://github.com/zhouyanger/java_demo/tree/master/netty

五.自定义 RPC

5.1 概述

RPC(Remote Procedure Call),即远程过程调用,它是一种通过网络从远程计算机程序

上请求服务,而不需要了解底层网络实现的技术。常见的 RPC 框架有: 源自阿里的 Dubbo, Spring 旗下的 Spring Cloud,Google 出品的 grpc 等等。

技术图片

 

 

1. 服务消费方(client)以本地调用方式调用服务

2. client stub 接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体

3. client stub 将消息进行编码并发送到服务端

4. server stub 收到消息后进行解码

5. server stub 根据解码结果调用本地的服务

6. 本地服务执行并将结果返回给 server stub

7. server stub 将返回导入结果进行编码并发送至消费方

8. client stub 接收到消息并进行解码

9. 服务消费方(client)得到结果

RPC 的目标就是将 2-8 这些步骤都封装起来,用户无需关心这些细节,可以像调用本地

方法一样即可完成远程服务调用。接下来我们基于 Netty 自己动手搞定一个 RPC。

5.2 设计和实现

5.2.1 结构设计

 技术图片

 

 

l Client(服务的调用方): 两个接口 + 一个包含 main 方法的测试类

l Client Stub: 一个客户端代理类 + 一个客户端业务处理类

l Server(服务的提供方): 两个接口 + 两个实现类

l Server Stub: 一个网络处理服务器 + 一个服务器业务处理类

注意:服务调用方的接口必须跟服务提供方的接口保持一致(包路径可以不一致)

最终要实现的目标是:在 TestNettyRPC 中远程调用 HelloRPCImpl 或 HelloNettyImpl 中的方法

5.2.2 代码实现

l Server(服务的提供方)

public interface HelloNetty

String hello();

public class HelloNettyImpl implements HelloNetty

@Override

public String hello()

return "hello,netty";

public interface HelloRPC

String hello(String name);

public class HelloRPCImpl implements HelloRPC

@Override

public String hello(String name)

return "hello," + name;

上述代码作为服务的提供方,我们分别编写了两个接口和两个实现类,供消费方远程调用。

l Server Stub 部分

//封装类信息

public class ClassInfo implements Serializable

private static final long serialVersionUID = 1L;

private String className;

//类名

private String methodName; //方法名

private Class<?>[] types;

//参数类型

private Object[] objects; //参数列表

//此处省略 getter 和 setter 方法

上述代码作为实体类用来封装消费方发起远程调用时传给服务方的数据。

//服务器端业务处理类

public class InvokeHandler extends ChannelInboundHandlerAdapter

//得到某接口下某个实现类的名字

private String getImplClassName(ClassInfo classInfo) throws Exception

//服务方接口和实现类所在的包路径

String interfacePath="cn.itcast.rpc.server";

int lastDot = classInfo.getClassName().lastIndexOf(".");

String interfaceName=classInfo.getClassName().substring(lastDot);

Class superClass=Class.forName(interfacePath+interfaceName);

Reflections reflections = new Reflections(interfacePath);

//得到某接口下的所有实现类

Set<Class> ImplClassSet=reflections.getSubTypesOf(superClass);

if(ImplClassSet.size()==0)

System.out.println("未找到实现类");

return null;

else if(ImplClassSet.size()>1)

System.out.println("找到多个实现类,未明确使用哪一个");

return null;

else

//把集合转换为数组

Class[] classes=ImplClassSet.toArray(new Class[0]);

return classes[0].getName(); //得到实现类的名字

@Override //读取客户端发来的数据并通过反射调用实现类的方法

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception

ClassInfo classInfo = (ClassInfo) msg;

Object clazz = Class.forName(getImplClassName(classInfo)).newInstance();

Method method = clazz.getClass().getMethod(classInfo.getMethodName(), classInfo.getTypes());

//通过反射调用实现类的方法

Object result = method.invoke(clazz, classInfo.getObjects());

ctx.writeAndFlush(result);

上述代码作为业务处理类,读取消费方发来的数据,并根据得到的数据进行本地调用,然后

把结果返回给消费方。

//网络处理服务器

public class NettyRPCServer

private int port;

public NettyRPCServer(int port)

this.port = port;

public void start()

EventLoopGroup bossGroup = new NioEventLoopGroup();

EventLoopGroup workerGroup = new NioEventLoopGroup();

try

ServerBootstrap serverBootstrap = new ServerBootstrap();

serverBootstrap.group(bossGroup, workerGroup)

.channel(NioServerSocketChannel.class)

.option(ChannelOption.SO_BACKLOG, 128)

.childOption(ChannelOption.SO_KEEPALIVE, true)

.localAddress(port).childHandler(

new ChannelInitializer<SocketChannel>()

@Override

protected void initChannel(SocketChannel ch) throws Exception

ChannelPipeline pipeline = ch.pipeline();

//编码器

pipeline.addLast("encoder", new ObjectEncoder());

//解码器

pipeline.addLast("decoder",new ObjectDecoder(Integer.MAX_VALUE,

ClassResolvers.cacheDisabled(null)));

//服务器端业务处理类

pipeline.addLast(new InvokeHandler());

);

ChannelFuture future = serverBootstrap.bind(port).sync();

System.out.println("......Server is ready......");

future.channel().closeFuture().sync();

catch (Exception e)

bossGroup.shutdownGracefully();

workerGroup.shutdownGracefully();

public static void main(String[] args) throws Exception

new NettyRPCServer(9999).start();

上述代码是用 Netty 实现的网络服务器,采用 Netty 自带的 ObjectEncoder 和 ObjectDecoder 作为编解码器(为了降低复杂度,这里并没有使用第三方的编解码器),当然实际开发时也可以采用 JSON 或 XML。

l Client Stub 部分

//客户端业务处理类

public class ResultHandler extends ChannelInboundHandlerAdapter

private Object response;

public Object getResponse()

return response;

@Override //读取服务器端返回的数据(远程调用的结果)

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception

response = msg;

ctx.close();

上述代码作为客户端的业务处理类读取远程调用返回的数据

//客户端代理类

public class NettyRPCProxy

//根据接口创建代理对象

public static Object create(Class target)

return Proxy.newProxyInstance(target.getClassLoader(), new Class[]target, new InvocationHandler()

@Override

public Object invoke(Object proxy, Method method, Object[] args)

throws Throwable

//封装 ClassInfo

ClassInfo classInfo = new ClassInfo();

classInfo.setClassName(target.getName());

classInfo.setMethodName(method.getName());

classInfo.setObjects(args);

classInfo.setTypes(method.getParameterTypes());

//开始用 Netty 发送数据

EventLoopGroup group = new NioEventLoopGroup();

ResultHandler resultHandler = new ResultHandler();

try

Bootstrap b = new Bootstrap();

b.group(group)

.channel(NioSocketChannel.class)

.handler(new ChannelInitializer<SocketChannel>()

@Override

public void initChannel(SocketChannel ch) throws Exception

ChannelPipeline pipeline = ch.pipeline();

//编码器

pipeline.addLast("encoder", new ObjectEncoder());

//解码器

pipeline.addLast("decoder",new ObjectDecoder(Integer.MAX_VALUE,

ClassResolvers.cacheDisabled(null)));

//客户端业务处理类

pipeline.addLast("handler", resultHandler);

);

ChannelFuture future = b.connect("127.0.0.1", 9999).sync();

future.channel().writeAndFlush(classInfo).sync();

future.channel().closeFuture().sync();

finally

group.shutdownGracefully();

return resultHandler.getResponse();

);

上述代码是用 Netty 实现的客户端代理类,采用 Netty 自带的 ObjectEncoder 和 ObjectDecoder 作为编解码器(为了降低复杂度,这里并没有使用第三方的编解码器),当然实际开发时也可以采用 JSON 或 XML。

l Client(服务的调用方-消费方)

//服务调用方

public class TestNettyRPC

public static void main(String [] args)

//第 1 次远程调用

HelloNetty helloNetty=(HelloNetty) NettyRPCProxy.create(HelloNetty.class);

System.out.println(helloNetty.hello());

//第 2 次远程调用

HelloRPC helloRPC = (HelloRPC) NettyRPCProxy.create(HelloRPC.class);

System.out.println(helloRPC.hello("zhouyang"));

消费方不需要知道底层的网络实现细节,就像调用本地方法一样成功发起了两次远程调用。

运行效果如下图所示:

 技术图片

 

 

这样是简单的实现了rpc,当然是不完善的。

如何在 Go Rpc 中使用自定义错误

】如何在GoRpc中使用自定义错误【英文标题】:HowtoUseCustomErrorsinGoRpc【发布时间】:2020-11-1203:06:39【问题描述】:我正在使用go标准库中的rpc包,我希望能够通过RPC调用返回我的自定义错误。我有下面的自定义错误类:const(UNAVAILA... 查看详情

分布式理论,架构设计自定义rpc(代码片段)

分布式理论,架构设计(四)自定义RPC自定义RPCRMI基于netty实现RPC框架代码实现服务端代码客户端代码自定义RPC在分布式服务框架中,一个最基础的问题就是远程服务是怎么通讯的,在Java领域中有很多可实现... 查看详情

分布式理论,架构设计自定义rpc(代码片段)

分布式理论,架构设计(四)自定义RPC自定义RPCRMI基于netty实现RPC框架代码实现服务端代码客户端代码自定义RPC在分布式服务框架中,一个最基础的问题就是远程服务是怎么通讯的,在Java领域中有很多可实现... 查看详情

自定义rpc的完整实现---深入理解rpc内部原理(代码片段)

倘若不使用RPC远端调用的情况下,代码如下:local.py#coding:utf-8#本地调用除法运算的形式classInvalidOperation(Exception):def__init__(self,message=None):self.message=messageor‘involidoperation‘defdivide(num1,num2=1):ifnum2==0:raiseInv 查看详情

具有自定义端点行为的 WCF ChannelFactory (Json-Rpc)

】具有自定义端点行为的WCFChannelFactory(Json-Rpc)【英文标题】:WCFChannelFactorywithCustomEndpointBehavior(Json-Rpc)【发布时间】:2013-08-2317:10:09【问题描述】:我一直在为WCFJson-RpcServiceModel努力工作。我对WCF和WCF可扩展性还很陌生,但最后... 查看详情

为啥我的自定义 XML RPC 获取配置不起作用?

】为啥我的自定义XMLRPC获取配置不起作用?【英文标题】:WhydoesmyCustomXMLRPCget-confignotwork?为什么我的自定义XMLRPC获取配置不起作用?【发布时间】:2021-03-1114:39:36【问题描述】:我正在尝试编写自己的RPC来获取配置。我知道get-co... 查看详情

log4j2自定义appender(输出到文件/rpc服务中)

...等。但是有时候确难免完全适合自己,此时我们就需要自定义Appender,使日志输出到指定的位置上。本文,将通过两个例子说明自定义APPender,一个是将日志写入文件中,另一个是将日志发送到远程Thrift服务中。本文代码详见:ht... 查看详情

轻量级rpc

①自定义一个协议接口继承VersionedProtocol②自定义协议类实现上面的接口,完善功能需求③服务端④客户端二:模拟一个namenode  查看详情

分布式理论,架构设计自定义rpc(代码片段)

分布式理论,架构设计(四)自定义RPC自定义RPCRMI基于netty实现RPC框架代码实现服务端代码客户端代码自定义RPC在分布式服务框架中,一个最基础的问题就是远程服务是怎么通讯的,在Java领域中有很多可实现... 查看详情

金字塔 jsonrpc 中的自定义错误消息

】金字塔jsonrpc中的自定义错误消息【英文标题】:Customerrormessageinpyramidjsonrpc【发布时间】:2015-08-2610:56:46【问题描述】:我有一个用pyramid_rpc编写的应用程序。我们通过json_rpc在网络上调用我们的函数。有时用户输入中有错误(... 查看详情

手写基于http的rpc框架

...、​httpclient_rpc_pojo​​​使用顶级模块是因为rpc协议自定义了两种方式,现在介绍的是​​httpclient_rpc​​这种方式二、pom依赖1.new_rpc<groupId&g 查看详情

自研发rpc调用框架

自主研发设计RPC远程调用框架,实现服务自动注册,服务发现,远程RPC调用,后续实现服务负载均衡主要包括:客户端服务,服务端,服务发现,服务注册    github地址:https://github.com/btshoutn/rpc_project  查看详情

rpc手动实现(基于socket)

1.实现的主要思路按着下面图形实现https://github.com/xinyanggit/rpc_demo具体可以查看相应的代码 分为三个模块apiproviderclient2.主要用到的技术socket通信jdk动态代理自定义注解 查看详情

dubbo自定义过滤器,打印接口调用信息

...样,我们可以在dubbo提供的服务提供方和消费方都可以自定义过滤器,从而可以获得方法调用的时间或参数、返回结果及异常信息。我们可以利用log打印出来。而且,这个过滤器机制,也是分布式跟踪系统的一部分。  &nbs... 查看详情

rpc----基于netty实现的rpc(代码片段)

...erverHandler2、客户端client2.1NettyClient2.2NettyClientHandler二、自定义协议和编解码器1、协议2、编码器3、解码器4、拆包器5、补充知识5.1TCP粘包拆包问题三、序列化接口1、JsonSerializer2、Hessian 查看详情

13.rpc的socket实现(阻塞式)与netty实现(非阻塞式)(代码片段)

【README】1.本文总结了RPC的概念,包括定义,RPC实现,及其优缺点;2.本文po出了RPC的简单代码实现,总结自B站《netty-尚硅谷》;3.本文部分内容总结自:WhatIsRemoteProcedureCall(RPC)?DefinitionfromSearchAppArchitectu... 查看详情

13.rpc的socket实现(阻塞式)与netty实现(非阻塞式)(代码片段)

【README】1.本文总结了RPC的概念,包括定义,RPC实现,及其优缺点;2.本文po出了RPC的简单代码实现,总结自B站《netty-尚硅谷》;3.本文部分内容总结自:WhatIsRemoteProcedureCall(RPC)?DefinitionfromSearchAppArchitectu 查看详情

远程调用(rpc)

RPC:RemoteProduceCall远程过程调用类似的还有RMI。自定义数据格式,基于原生TCP通信,速度快,效率高。早期的webservice,现在热门的dubbo,都是RPC的典型.该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无... 查看详情