关键词:
一、 RPC基础知识
1.1、RPC是什么
RPC 【Remote Procedure Call】是指远程过程调用,是一种进程间通信方式,是一种技术思想,而不是规范。它允许程序调用另一个地址空间(网络的另一台机器上)的过程或函数,而不用开发人员显式编码这个调用的细节。调用本地方法和调用远程方法一样。
1.2、 RPC基本原理
- 服务调用方 client(客户端)以本地调用方式调用服务;
- client stub(客户端存根:存放服务端地址信息,将客户端的请求参数编组成网络消息,再通过网络发送给服务方)接收到调用后负责将方法、参数等编组成能够进行网络传输的消息体;在Java中就是序列化的过程
- client stub找到服务地址,并将消息通过网络发送到服务端(server);
- server stub(服务端存根:接受客户端发送过来的消息并解组,再调用本地服务,再将本地服务执行结果发送给客户端)收到消息后进行解组;
- server stub根据解组结果调用本地的服务;
- 本地服务执行处理逻辑;
- 本地服务将结果返回给server stub;
- server stub将返回结果编组成网络消息;
- server stub将编组后的消息通过网络并发送客户端
- client stub接收到消息,并进行解组;
- 服务调用方client得到最终结果。
1.3、 RPC协议是什么
RPC调用过程中需要将参数编组为消息进行发送,接受方需要解组消息为参数,过程处理结果同样需要经编组、解组。消息由哪些部分构成及消息的表示形式就构成了消息协议。
RPC调用过程中采用的消息协议称为RPC协议
RPC协议规定请求、响应消息的格式
在TCP(网络传输控制协议)上可选用或自定义消息协议来完成RPC消息交互
我们可以选用通用的标准协议(如:http、https),也也可根据自身的需要定义自己的消息协议。
1.4、 RPC框架是什么
封装好参数编组、消息解组、底层网络通信的RPC程序,可直接在其基础上只需专注与过程业务代码编码,无需再关注其调用细节
目前常见的RPC框架
Dubbo、gRPC、gRPC、Apache Thrift、RMI…等
二、手写RPC框架
下面将一步步来写一个精简版的RPC框架,使项目引入该框架后,通过简单的配置让项目拥有提供远程服务与调用的能力
2.1、 服务端编写
2.1.1、 服务端都需要完成哪些?
首先服务端要停工远程服务,就必须具备服务注册及暴露的能力;在这之后还需要开启网络服务,供客户端连接。有些项目可能即使服务提供者同时又是服务消费者,那么什么时候注册暴露服务,什么时候注入消费服务呢?在这我就引入了一个RPC监听处理器的概念,就有这个处理器来完成服务的注册暴露,以及服务消费注入
2.1.2、具体实现
2.1.2.1、 服务暴露注解
哪些服务需要注册暴露这里使用自定义注解的方式来标注:@Service
/**
* @Author Lijl
* @AnnotationTypeName Service
* @Description 被该注解标记的服务可提供远程访问的能力
* @Date 2022/2/14 14:32
* @Version 1.0
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Component
@Inherited
public @interface Service
String value() default "";
String version() default "";
long timeout() default 0L;
2.1.2.2、服务注册(暴露)
/**
* @Author Lijl
* @InterfaceName ServiceRegister
* @Description 定义服务注册
* @Date 2022/2/15 15:14
* @Version 1.0
*/
public interface ServiceRegister
void register(List<ServiceObject> so) throws Exception;
ServiceObject getServiceObject(String name) throws Exception;
/**
* @Author Lijl
* @ClassName DefaultServiceRegister
* @Description 默认服务注册
* @Date 2022/2/15 15:19
* @Version 1.0
*/
public abstract class DefaultServiceRegister implements ServiceRegister
private Map<String, ServiceObject> serviceMap = new HashMap<>();
protected String protocol;
protected int port;
/**
* @Author lijl
* @MethodName register
* @Description 缓存服务持有对象
* @Date 16:10 2022/3/11
* @Version 1.0
* @param soList
* @return: void
**/
@Override
public void register(List<ServiceObject> soList) throws Exception
if (soList==null&&soList.size()>0)
throw new IllegalAccessException("Service object information cannot be empty");
soList.forEach(so -> this.serviceMap.put(so.getName(), so));
/**
* @Author lijl
* @MethodName getServiceObject
* @Description 获取服务持有对象
* @Date 16:11 2022/3/11
* @Version 1.0
* @param name
* @return: com.huawei.rpc.server.register.ServiceObject
**/
@Override
public ServiceObject getServiceObject(String name)
return this.serviceMap.get(name);
/**
* @Author Lijl
* @ClassName ZookeeperExportServiceRegister
* @Description Zookeeper服务注册,提供服务注册、服务暴露
* @Date 2022/2/15 15:26
* @Version 1.0
*/
public class ZookeeperExportServiceRegister extends DefaultServiceRegister
/**
* zk客户端
*/
private ZkClient client;
public ZookeeperExportServiceRegister(String zkAddress, int port, String protocol)
this.client = new ZkClient(zkAddress);
this.client.setZkSerializer(new ZookeeperSerializer());
this.port = port;
this.protocol = protocol;
/**
* @Author lijl
* @MethodName register
* @Description 缓存服务持有对象,并注册服务
* @Date 15:38 2022/2/15
* @Version 1.0
* @param soList 服务持有者集合
* @return: void
**/
@Override
public void register(List<ServiceObject> soList) throws Exception
super.register(soList);
for (ServiceObject so : soList)
ServiceInfo serviceInfo = new ServiceInfo();
String host = InetAddress.getLocalHost().getHostAddress();
String address = host + ":" + port;
serviceInfo.setAddress(address);
serviceInfo.setName(so.getName());
serviceInfo.setProtocol(protocol);
this.exportService(serviceInfo);
/**
* @Author lijl
* @MethodName exportService
* @Description 暴露服务
* @Date 15:38 2022/2/15
* @Version 1.0
* @param serviceInfo 需要暴露的服务信息
* @return: void
**/
private void exportService(ServiceInfo serviceInfo)
String serviceName = serviceInfo.getName();
String uri = JSON.toJSONString(serviceInfo);
try
uri = URLEncoder.encode(uri, CommonConstant.UTF_8);
catch (UnsupportedEncodingException e)
e.printStackTrace();
String servicePath = CommonConstant.ZK_SERVICE_PATH + CommonConstant.PATH_DELIMITER + serviceName + CommonConstant.PATH_DELIMITER + "service";
if (!client.exists(servicePath))
client.createPersistent(servicePath,true);
String uriPath = servicePath + CommonConstant.PATH_DELIMITER + uri;
if (client.exists(uriPath))
client.delete(uriPath);
client.createEphemeral(uriPath);
这个过程其实没有详说的必要,就是将指定ServiceObject对象序列化后保存到ZK上,供客户端发现。同时会将服务对象缓存起来,在客户端调用服务时,通过缓存的ServiceObject对象反射指定服务,调用方法;其实看方法上打的注释也能看明白每步都做了什么
2.1.2.3、开启网络服务、处理接收到的客户端请求
/**
* @Author Lijl
* @ClassName RpcService
* @Description RPC抽象服务端
* @Date 2022/2/15 16:09
* @Version 1.0
*/
public abstract class RpcServer
/**
* 服务端口
*/
protected int port;
/**
* 服务协议
*/
protected String protocol;
/**
* 请求处理者
*/
protected RequestHandler handler;
public RpcServer(int port, String protocol, RequestHandler handler)
super();
this.port = port;
this.protocol = protocol;
this.handler = handler;
/**
* 开启服务
*/
public abstract void start();
/**
* 停止服务
*/
public abstract void stop();
/**
* @Author Lijl
* @ClassName NettyRpcService
* @Description Netty RPC服务端,提供Netty网络服务开启、关闭,接收客户端请求及消息后的处理
* @Date 2022/2/15 16:28
* @Version 1.0
*/
@Slf4j
public class NettyRpcServer extends RpcServer
private Channel channel;
public NettyRpcServer(int port, String protocol, RequestHandler handler)
super(port,protocol,handler);
@Override
public void start()
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try
ServerBootstrap sb = new ServerBootstrap();
sb.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>()
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new ChannelRequestHandler());
);
//启动服务
ChannelFuture future = sb.bind(port).sync();
log.info("Server starteed successfully.");
channel = future.channel();
//等待服务通道关闭
future.channel().closeFuture().sync();
catch (InterruptedException e)
e.printStackTrace();
finally
//释放资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
@Override
public void stop()
if (this.channel!=null)
this.channel.close();
private class ChannelRequestHandler extends ChannelInboundHandlerAdapter
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception
log.info("Channel active: ",ctx);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
log.info("The server receives a message: ",msg);
ByteBuf msgBuf = (ByteBuf) msg;
byte[] req = new byte[msgBuf.readableBytes()];
msgBuf.readBytes(req);
byte[] res = handler.handleRequest(req);
log.info("Send response: ",msg);
ByteBuf respBuf = Unpooled.buffer(res.length);
respBuf.writeBytes(res);
ctx.write(respBuf);
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception
ctx.flush();
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
cause.printStackTrace();
log.error("Exception occurred: ",cause.getMessage());
ctx.close();
/**
* @Author Lijl
* @ClassName RequestHandler
* @Description 请求处理器,提供解组请求,编组响应操作
* @Date 2022/2/15 16:10
* @Version 1.0
*/
public class RequestHandler
private MessageProtocol protocol;
private ServiceRegister serviceRegister;
public RequestHandler(MessageProtocol protocol,ServiceRegister serviceRegister)
super();
this.protocol = protocol;
this.serviceRegister = serviceRegister;
/**
* @Author lijl
* @MethodName handleRequest
* @Description 处理客户端请求参数,调用本地服务
* @Date 16:26 2022/2/15
* @Version 1.0
* @param data
* @return: byte[]
**/
public byte[] handleRequest(byte[] data) throws Exception
//1.解组消息
Request request = this.protocol.unmarshallingRequest(data);
//2. 查找服务对象
ServiceObject so = this.serviceRegister.getServiceObject(request.getServiceName());
Response response = null;
if (so==null)
response = Response.builder().status(Status.NOT_FOUND).build();
else
try
//3.反射调用对应的过程方法
Method method = so.getClazz().getMethod(request.getMethod(), request.getParameterTypes());
Object returnVal = method.invoke(so.getObj(), request.getParameters());
response = Response.builder()
.status(Status.SUCCESS)
.returnValue(returnVal)
.build();
catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException |
InvocationTargetException e)
response = Response.builder()
.status(Status.ERROR)
.exception(e)
.build();
return this.protocol.marshallingResponse(response);
这段算是服务端的核心部分,控制服务段Netty网络服务的开启关闭;接收客户端发起的请求,将客户端发送的请求参数解组并查询客户端远程调用的过程业务过程接口,并通过反射调用返回调用结果
2.1.2.3、RPC监听处理器
开始有提到RPC监听处理器的概念,用于服务的注册暴露与服务的消费注入,这里先说下服务开启服务注册,后面说的客户端时在补充服务注入
/**
* @Author Lijl
* @ClassName DefaultRpcProcessor
* @Description rcp监听处理器 负责暴露服务、自动注入
* @Date 2022/2/26 20:43
* @Version 1.0
*/
@Slf4j
public class DefaultRpcProcessor implements ApplicationListener<ContextRefreshedEvent>, DisposableBean
@Autowired
private ClientProxyFactory clientProxyFactory;
@Autowired
private ServiceRegister serviceRegister;
@Autowired
private RpcServer rpcService;
@SneakyThrows
@Override
public void onApplicationEvent(ContextRefreshedEvent event)
ApplicationContext applicationContext = event.getApplicationContext();
if (Objects.isNull(applicationContext.getParent()))
//开启服务
startServer(applicationContext);
//注入Service
injectService(applicationContext);
/**
* @Author lijl
* @MethodName startServer
* @Description 扫描服务注册注解,调用服务注册将服务注册到zookeeper中
* @Date 18:44 2022/2/26
* @Version 1.0
* @param applicationContext
* @return: void
**/
private void startServer(ApplicationContext applicationContext) throws Exception
//过滤出带有服务注册注解的实例
Map<String, Object> beans = applicationContext.getBeansWithAnnotation(Service.class);
if (beans.size()!=0)
//遍历服务组装服务注册信息
for (Object o : beans.values())
List<ServiceObject> soList = new ArrayList<>();
Class<?> clazz = o.getClass();
Service service = clazz.getAnnotation(Service.class);
String version = service.version();
Class<?>[] interfaces = clazz.getInterfaces();
if (interfaces.length>1)
//相同接口存在不同版本号,则分别注册
for (Class<?> aClass : interfaces)
String aClassName = aClass.getName();
if (StringUtils.hasLength(version))
aClassName +=":"+version;
soList.add(new ServiceObject(aClassName,aClass,o));
else
Class<?> superClass = interfaces[0];
String aClassName = superClass.getName();
if (StringUtils.hasLength(version))
aClassName +=":"+version;
soList.add(new ServiceObject(aClassName, superClass, o));
//调用服务注册
this.serviceRegister.register(soList);
rpcService.start();
/**
* @Author lijl
* @MethodName injectService
* @Description 注入远程调用服务
* @Date 19:20 2022/2/26
* @Version 1.0
* @param applicationContext
* @return: void
**/
private void 实现一个简易的点对点rpc框架
什么是RPC? RemoteProcedureCall,即远程过程调用,RPC框架可以帮助我们屏蔽网络通讯细节。就使用方而言,让远程调用和本地调用一样简单。本地调用?远程调用? 那么本地调用和远程调用有什么区... 查看详情
netty_06_手写rpc基础版(实践类)(代码片段)
...反射调用等。源码下载:https://www.syjshare.com/res/QL0434P3手写RPC框架是netty的应用,不断往dubbo上靠拢二、整体运行先启动rpc服务端然后启动rpc客户端,看日志然后看rpc 查看详情
一文就读懂rpc远程调用核心原理
...个不同的进程。因此,我们就从跨进程进行访问的角度去理解就行了。Procedure,意思是一串可执行的代码,我们写Java的方法,就是一段课程行的代码。Call,即调用,调用的就是跨了进程的方法。综上,rpc就 查看详情
手写springmvc框架实现简易版mvc框架
前言前面几篇文章中,我们讲解了SpringMVC执⾏的⼤致原理及关键组件的源码解析,今天,我们来模仿它⼿写⾃⼰的mvc框架。先梳理一下需要实现的功能点:tomcat加载配置文件web.xml;调用web.xml中指定的前端控制器DispatcherServlet加... 查看详情
手写一个简单的rpc框架(代码片段)
学习RPC框架,由繁化简,了解其本质原理文章目录项目简介什么是RPC?项目模块项目代码common模块client模块server模块framework模块测试项目简介什么是RPC?RPC(RemoteProcedureCall)即远程过程调用,不同于本... 查看详情
手写rpc框架-第四天超时处理(代码片段)
手写RPC框架-第四天超时处理1.为什么需要超时处理机制超时处理是RPC框架一个比较基本的能力,如果缺少超时处理机制,无论是服务端还是客户端都容易因为网络或其他错误导致挂死,资源耗尽,这些问题的出现... 查看详情
手写rpc框架-第四天超时处理(代码片段)
手写RPC框架-第四天超时处理1.为什么需要超时处理机制超时处理是RPC框架一个比较基本的能力,如果缺少超时处理机制,无论是服务端还是客户端都容易因为网络或其他错误导致挂死,资源耗尽,这些问题的出现... 查看详情
netty_06_手写rpc基础版(实践类)(代码片段)
文章目录一、前言二、整体运行三、客户端和服务端3.1客户端3.2服务端3.3RpcServerInitializer和RpcClientInitializer四、小结一、前言常用的rpc框架:dubbothriftgRPCrpc定义:remoteproceducercallrpc目的/解决的问题:像调用本地服务一... 查看详情
netty_06_手写rpc基础版(实践类)(代码片段)
文章目录一、前言二、整体运行三、客户端和服务端3.1客户端3.2服务端3.3RpcServerInitializer和RpcClientInitializer四、小结一、前言常用的rpc框架:dubbothriftgRPCrpc定义:remoteproceducercallrpc目的/解决的问题:像调用本地服务一... 查看详情
day480.netty手写dubborpc框架-netty(代码片段)
Netty手写dubboRPC框架一、RPC基本介绍rpc是远程调用的一种行为,在数据传输过程中涉及到传输协议,http就是一种传输协议RPC(RemoteProcedureCall)—远程过程调用,是一个计算机通信协议。该协议允许运行于一台... 查看详情
day480.netty手写dubborpc框架-netty(代码片段)
Netty手写dubboRPC框架一、RPC基本介绍rpc是远程调用的一种行为,在数据传输过程中涉及到传输协议,http就是一种传输协议RPC(RemoteProcedureCall)—远程过程调用,是一个计算机通信协议。该协议允许运行于一台... 查看详情
grpc本地服务搭建(代码片段)
RPCRPC原理主流RPC框架gRPC概述特点服务端创建定义服务生成gRPC代码服务端实现客户端实现踩坑记录源码RPCRPC原理RPC框架的目标就是让远程服务调用更加简单、透明,RPC框架负责屏蔽底层的传输方式(TCP或者UDP)、序列化方式(XML/... 查看详情
rpc原理及rpc实例分析
...远程服务,而让调用方对网络通信这些细节透明常见的RPC框架:阿里巴巴的hsf、dubboFacebook的thriftGoogle的grpcTwitter的fi 查看详情
[架构之路-61]:目标系统-平台软件-基础中间件-远程过程(函数)调用rpc原理与其网络架构(代码片段)
目录第1章远程过程/函数调用RPC概述1.1什么是编程语言原生的函数调用1.2IPC:(InterProcessCommunication)跨进程通信1.4什么是本地过程(函数)调用LPC1.5 什么是远程过程/函数调用RPC第2章远程过程调动的网络架构2... 查看详情
手写模拟dubbo实现一个自己的rpc框架
手写模拟Dubbo代码地址:https://github.com/chenruoyu0319/small-dubbo.git什么是RPC?维基百科是这么定义RPC的:在分布式计算,远程过程调用(英语:RemoteProcedureCall,缩写为RPC)是一个计算机通信协议。该协议允许... 查看详情
thrift原理浅析
...述了对象,对象成员,接口方法等一系列信息。2.通过RPC框架提供的编译器,将接口说明文件编译成对应的语言文件。2.在客户端和服务端分别引用RPC编译器生成的文件,即可像调用本地方法一样远程调用。RPC通信过程如下:1.客... 查看详情
rpc(remoteprocedurecall)及其应用(java版)(代码片段)
...简介1.1什么是rpc1.2rpc的实现方式1.3RPC的使用场景:案例Dubbo框架介绍使用Dubbo框架实现RPC调用添加Dubbo依赖:配置服务:配置服务注册中心配置客户端:服务调用一、简介1.1什么是rpcRPC(RemoteProcedureCall,远程过程调用)... 查看详情
一篇文章了解rpc框架原理
1.RPC框架的概念RPC(RemoteProcedureCall)–远程过程调用,通过网络通信调用不同的服务,共同支撑一个软件系统,微服务实现的基石技术。使用RPC可以解耦系统,方便维护,同时增加系统处理请求的能力。上面是一个简单的软... 查看详情