轻量级rpc设计与实现第二版(代码片段)

maratong maratong     2023-04-17     650

关键词:

在上一个版本中利用netty实现了简单的一对一的RPC,需要手动设置服务地址,限制性较大。
在本文中,利用zookeeper作为服务注册中心,在服务端启动时将本地的服务信息注册到zookeeper中,当客户端发起远程服务调用时,先从zookeeper中获取该服务的地址,然后根据获得的这个地址来利用netty进行网络传送。
在服务端和注册中心之间需要建立监听,当服务信息发生变化或网络连接等问题时需要对注册中心的服务信息进行修改。在本文中创建了服务注册监控中心,利用心跳机制来判断与服务端是否有较稳定的连接,当出现网络不稳定时,则从注册中心中删除属于该服务端的服务信息。在本项目中设定在5分钟内3次以上没有发送心跳包为不稳定状态。

关于心跳机制,之前有一篇文章介绍过:Dubbo心跳机制

zookeeper注册中心

zookeeper是hadoop中的一个重要组件,其主要是作为分布式协调服务
zookeeper采用节点树的数据模型,类似linux文件系统。
每个节点称做一个ZNode,每个ZNode都可以通过路径唯一标识,同时每个节点还可以存储少量数据。
本项目借鉴dubbo的注册中心模型来设计本文的注册中心。
总体上设计了四级节点,在一个节点是一个持久节点/register,表示是记录注册服务的区域。二级节点是服务接口名,三级节点是远程服务ip地址,该节点是临时节点,节点存储的数据是具体的实现类名。
技术图片
在客户端会根据服务接口名在注册中心进行查找,得到远程服务ip地址,并根据节点中存储的具体实现类名进行反射。

首先进行zookeeper初始化,利用了CuratorFramework有关类

private static void init() 
        RetryPolicy retryPolicy = new RetryNTimes(ZKConsts.RETRYTIME, ZKConsts.SLEEP_MS_BEWTEENR_RETRY);
        client = CuratorFrameworkFactory.builder().connectString(ZKConsts.ZK_SERVER_PATH)
                .sessionTimeoutMs(ZKConsts.SESSION_TIMEOUT_MS).retryPolicy(retryPolicy)
                .namespace(ZKConsts.WORK_SPACE).build();
        client.start();
    

服务的注册代码

public static void register(URL url) 
        try 
            String interfaceName = url.getInterfaceName();
            String implClassName = url.getImplClassName();
            Stat stat = client.checkExists().forPath(getPath(interfaceName, url.toString()));
            if (stat != null) 
                System.out.println("该节点已存在!");
                client.delete().forPath(getPath(interfaceName, url.toString()));
            
            client.create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.EPHEMERAL)
                    //权限控制,任何连接的客户端都可以操作该属性znode
                    .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                    .forPath(getPath(interfaceName, url.toString()), implClassName.getBytes());
            System.out.println(getPath(interfaceName, url.toString()));
         catch (Exception e) 
            e.printStackTrace();
        
    

根据服务接口名来获取远程服务连接地址

public static URL random(String interfaceName) 
        try 

            System.out.println("开始查找服务节点:" + getPath(interfaceName));
            List<String> urlList = client.getChildren().forPath("/" + interfaceName);
            System.out.println("结果:" + urlList);
            String serviceUrl = urlList.get(0);
            String[] urls = serviceUrl.split(":");
            String implClassName = get(interfaceName, serviceUrl);
            return new URL(urls[0], Integer.valueOf(urls[1]), interfaceName, implClassName);
         catch (Exception e) 
            System.out.println(e);
            e.printStackTrace();
        
        return null;
    

注册中心与服务端进行连接时需要判断是否维持了稳定的连接,如果服务端出现宕机等情况时需要从注册中心中删除这些服务。
以前的一些处理机制,有session机制和wacher机制。
session机制
每个zookeeper注册中心与服务端进行连接时会创建一个session,在设置的sessionTimeout内,服务端会与注册中心进行心跳包的定时发送,从而感知每个客户端是否宕机,如果创建某个临时Znode节点对应的session销毁时,相应的临时节点也会被注册中心删除。
watcher机制
针对每个节点的操作,都有要给监督者进行watcher,当监控的某个节点发生了变化,则会触发watcher事件。注册中心的watcher是一次性的,触发后会被销毁。父节点,子节点增删改都能够触发watcher。触发销毁后,下次需要监听时还需要再注册一次。
本文心跳机制
服务端定时向注册中心发送本机地址,看作心跳数据包,而注册中心监控则维持一个channelId和具体地址的map,并且通过IdleHandler监听空闲事件,到达一定的空闲次数则认为不活跃,当不活跃时,zookeeper删除对应的url节点。该版本实现了上面的内容,后续的步骤在以后的版本实现。
如果10s内没有触发读,就会执行userEventTriggered方法。如果5分钟中出现两次不活跃次数,就认定该连接不稳定,注册中心会移除属于该服务端的服务。你也可以根据实际情况设定不稳定标准。

 service.scheduleAtFixedRate(() -> 
                if (future.channel().isActive()) 
                    int time = new Random().nextInt(5);
                    log.info("本次定时任务获取的随机数:", time);
                    if (time > 3) 
                        log.info("发送本地地址到注册中心:", url);
                        future.channel().writeAndFlush(url);
                    
                
            , 60, 60, TimeUnit.SECONDS);
 @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception 
        if (evt instanceof IdleStateEvent) 
            IdleStateEvent state = (IdleStateEvent)evt;
            if (state.state().equals(IdleState.READER_IDLE)) 
                log.info("读空闲");
             else if (state.state().equals(IdleState.WRITER_IDLE)) 
                log.info("写空闲");
            
            //在一定时间内读写空闲才会关闭链接
            else if (state.state().equals(IdleState.ALL_IDLE)) 
                if (++inActiveCount == 1) 
                    start = System.currentTimeMillis();
                
                int minute = (int)((System.currentTimeMillis() - start) / (60 * 1000)) + 1;
                log.info("第次读写都空闲,计时分钟数", inActiveCount, minute);
                if (inActiveCount > 2 && minute < 5) 
                    log.info("移除不活跃ip");
                    removeAndClose(ctx);
                 else 
                    if (minute >= 5) 
                        log.info("新周期开始");
                        start = 0;
                        inActiveCount = 0;
                    
                
            
        
    

具体实现代码:RPC第二版

轻量级rpc设计与实现第五版(最终版)(代码片段)

...时间里,通过搜集有关资料加上自己的理解,设计了一款轻量级RPC,起了一个名字lightWeightRPC。它拥有一个RPC常见的基本功能。主要功能和特点如下:利用Spring实现依赖注入与参数配置利用Netty来实现客户端与服务端的远程通信利... 查看详情

轻量级rpc设计与实现第三版(代码片段)

在前两个版本中,每次发起请求一次就新建一个netty的channel连接,如果在高并发情况下就会造成资源的浪费,这时实现异步请求就十分重要,当有多个请求线程时,需要设计一个线程池来进行管理。除此之外,当前方法过于依赖... 查看详情

轻量级rpc设计与实现第一版(代码片段)

什么是RPCRPC(RemoteProcedureCallProtocol),远程过程调用,通俗的解释就是:客户端在不知道调用细节的情况下,调用存在于远程计算机上的某个对象,就像调用本地应用程序中的对象一样,不需要了解底层网络技术的协议。简单的整体... 查看详情

《rocketmq技术原理:rocketmq架构设计与实现原理》第二版书籍勘误

...友们,谢谢大家对《RocketMQ技术内幕:RocketMQ架构设计与实现原理》第二版本的支持与厚爱,如果发现书中有什么错误的内容,烦请大家在该文章追加您的评论,我会及时与广大读者朋友们进行互动,共同... 查看详情

《rocketmq技术原理:rocketmq架构设计与实现原理》第二版书籍勘误

...友们,谢谢大家对《RocketMQ技术内幕:RocketMQ架构设计与实现原理》第二版本的支持与厚爱,如果发现书中有什么错误的内容,烦请大家在该文章追加您的评论,我会及时与广大读者朋友们进行互动,共同... 查看详情

使用命名管道承载grpc(代码片段)

...不见得,先看看他的优点:gRPC的主要优点:现代高性能轻量级RPC框架。协定优先API开发,默认使用协议缓冲区,允许与语言无关的实现。可用于多种语言的工具,以生成强类型服务器和客户端。支 查看详情

基于netty和springboot实现一个轻量级rpc框架-client篇(代码片段)

前提前置文章:《基于Netty和SpringBoot实现一个轻量级RPC框架-协议篇》《基于Netty和SpringBoot实现一个轻量级RPC框架-Server篇》前一篇文章相对简略地介绍了RPC服务端的编写,而这篇博文最要介绍服务端(Client)的实现。RPC调用一般... 查看详情

《c++并发编程实战第二版》:条件变量唤醒丢失与虚假唤醒(代码片段)

《C++并发编程实战第二版》:条件变量唤醒丢失与虚假唤醒推荐阅读《C++并发编程实战第二版》学习笔记目录本文主要是对《C++并发编程实战第二版》第4章中条件变量部分做进一步探究,主要内容为使... 查看详情

微服务与rpc/grpc(代码片段)

...开发的方法,每个服务程序都在自己的进程中运行,并与轻量级机制(通常是HTTP资源API)进行通信。这些服务是围绕业务功能构建的。可以通过全自动部署机器独立部署。这些服务器可以用不同的编程语言编写,使用不同的数... 查看详情

c语言网络编程—轻量级http服务器设计与实现(代码片段)

目录文章目录目录Tinyhttpd安装TinyhttpdTinyhttpd是一个轻量级的Web服务器,由J.DavidBlackstone在1999年发布。使用C语言编写,源代码只有500多行,非常适用于学习并理解TCP协议、HTTP协议以及HTTP服务器的设计与实现。Tinyhttpd实... 查看详情

c++实现轻量级rpc分布式网络通信框架(代码片段)

前言:本次项目实现了一个轻量级的RPC分布式网络通信框架,因此写下本篇文章梳理一下该框架的实现逻辑和相关知识点。文章中文字内容不少,主要是考虑到读文章的人看代码会比较辛苦,所以写了比较多的字... 查看详情

《rocketmq技术原理:rocketmq架构设计与实现原理》第二版书籍勘误

...友们,谢谢大家对《RocketMQ技术内幕:RocketMQ架构设计与实现原理》第二版本的支持与厚爱,如果发现书中有什么错误的内容,烦请大家在该文章追加您的评论,我会及时与广大读者朋友们进行互动,共同... 查看详情

剑指offer第二版-9.用两个栈实现队列(代码片段)

描述:使用两个栈实现一个队列。队列中实现尾部插入和头部删除函数。思路:stack1负责插入,stack2负责弹出,如果stack2为空了,将stack1的元素依次弹出并存放到stack2中,之后对stack2进行弹出操作。考点:对栈和队列的理解;对... 查看详情

c语言网络编程—轻量级http服务器设计与实现(代码片段)

...目录Tinyhttpd启动HTTP服务器软件设计TinyhttpdTinyhttpd是一个轻量级的Web服务器,由J.DavidBlackstone在1999年发布。使用C语言编写,源代码只有500多行,非常适用于学习并理解TCP协议、HTTP协议以及HTTP服务器的设计与实现。完... 查看详情

第二十篇商城系统-秒杀功能设计与实现(代码片段)

秒杀服务一、商品上架秒杀活动的结构图通过定时任务触发:/***定时上架秒杀商品信息*/@Slf4j@ComponentpublicclassSeckillSkuSchedule@AutowiredSeckillServiceseckillService;@AutowiredRedissonClientredissonClient;/****/@ 查看详情

第二十篇商城系统-秒杀功能设计与实现(代码片段)

秒杀服务一、商品上架秒杀活动的结构图通过定时任务触发:/***定时上架秒杀商品信息*/@Slf4j@ComponentpublicclassSeckillSkuSchedule@AutowiredSeckillServiceseckillService;@AutowiredRedissonClientredissonClient;/****/@ 查看详情

django创建并连接数据库(实现增删改查)--第二版(代码片段)

注意点一:url里面的地址,不只是html页面,准确说是views视图里面对应的函数方法<!DOCTYPEhtml><htmllang="en"><head><metacharset="UTF-8"><linkrel="stylesheet"href="/static/bootstrap-3.3.7-dist/css/bootstrap.css 查看详情

低代码平台前端的设计与实现设计态画布designcanvas的设计与实现(代码片段)

...力,在ComponentBuildAspectHandler中通过一些逻辑,来完成一个轻量级的设计器画布。这个画布能够实现如下的一个简单的效果。对于所有渲染出来的元素,都会有一个灰色的边框,当我们选中某个元素的时候,就会高亮显示。ElementNo... 查看详情