rpc----基于zookeeper为注册中心实现的rpc(代码片段)

TheWhc TheWhc     2022-12-04     384

关键词:

基于ZooKeeper为注册中心实现的RPC

一、原理

一个能够动态注册和获取服务信息的地方,来统一管理服务名称和其对应的服务器列表信息,称之为服务配置中心。如图所示

  • 服务提供在启动时,将其提供的服务名称、服务器地址注册到服务配置中心
  • 服务消费者通过服务配置中心来获得需要调用的服务的机器列表,通过相应的负载均衡算法,选取其中一台服务器进行调用
  • 当服务器宕机或者下线时,相应的机器需要能够动态地从服务配置中心里面移除,并通知相应地服务消费者

二、统一配置管理

主要把服务名以及服务相关的服务器IP地址注册到注册中心,在使用服务的时候,只需要根据服务名,就可以得到所有服务地址IP,然后根据一定的负载均衡策略来选择IP地址

1、服务的注册

关于服务的注册,其实就是把服务和IP注册到ZooKeeper节点中。

  • 服务名用的是永久节点
  • 服务IP地址用的是临时节点(为后面对节点进行注册监听做铺垫)

(用端口号的不同区别不同的机器)

CuratorUtils类提供createPersistentNode()createEphemeralNode()方法

// 创建服务名永久节点PERSISTENT
public static void createPersistentNode(CuratorFramework zkClient, String path) 
   try 
      // 永久节点已存在
      if (PERSISTENT_REGISTERED_PATH_SET.contains(path) || zkClient.checkExists().forPath(path) != null) 
         logger.info("永久节点已经存在,永久节点是:[]", path);
       else 
         // 永久节点不存在,则创建永久节点
         //eg: /MyRPC/com.whc.rpc.api.UserService
         zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path);
         logger.info("永久节点成功被创建,永久节点是:[]", path);
      
      PERSISTENT_REGISTERED_PATH_SET.add(path);
    catch (Exception e) 
      logger.error("创建永久节点失败[]", path);
   


// 创建服务地址为临时节点EPHEMERAL
// 临时节点,当客户端与 Zookeeper 之间的连接或者 session 断掉时会被zk自动删除。开源 Dubbo 框架,使用的就是临时节点
// 优点: 当服务节点下线或者服务节点不可用,Zookeeper 会自动将节点地址信息从注册中心删除
public static void createEphemeralNode(CuratorFramework zkClient, String path) 
   try 
      // 临时节点已存在
      if (EPHEMERAL_REGISTERED_PATH_SET.contains(path) || zkClient.checkExists().forPath(path) != null) 
         logger.info("临时节点已经存在,临时节点是:[]", path);
       else 
         // 临时节点不存在,则创建临时节点
         //eg: /MyRPC/com.whc.rpc.api.UserService/127.0.0.1:9000
         zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
         logger.info("临时节点成功被创建,临时节点是:[]", path);
      
      EPHEMERAL_REGISTERED_PATH_SET.add(path);
    catch (Exception e) 
      logger.error("创建临时节点失败[]", path);
   

2、服务的发现

服务的发现就是根据服务名来获取ZooKeeper节点中的IP地址

CuratorUtils类提供了getChildrenNodes()方法

// 获取一个节点下的孩子节点
public static List<String> getChildrenNodes(CuratorFramework zkClient, String rpcServiceName) 
   if (SERVICE_ADDRESS_MAP.containsKey(rpcServiceName)) 
      return SERVICE_ADDRESS_MAP.get(rpcServiceName);
   
   List<String> result = null;
   String servicePath = ZK_REGISTER_ROOT_PATH + "/" + rpcServiceName;
   try 
      result = zkClient.getChildren().forPath(servicePath);
      SERVICE_ADDRESS_MAP.put(rpcServiceName, result);
      // 动态发现服务节点的变化(监听),如果提供服务的服务端上下线,则重新更新服务器列表
      registerWatcher(rpcServiceName, zkClient);
    catch (Exception e) 
      logger.error("获取节点下的孩子节点 [] 失败", servicePath);
   
   return result;

3、测试代码

服务的注册测试

NettyTestServer:测试用Netty服务提供者

public static void main(String[] args)
   UserService userService = new UserServiceImpl();
   BlogService blogService = new BlogServiceImpl();
   // 服务端需要把自己的ip,端口给注册中心
   NettyServer server = new NettyServer("127.0.0.1", 9000, CommonSerializer.PROTOBUF_SERIALIZER);
   server.publishService(userService, UserService.class);
   server.publishService(blogService, BlogService.class);

   server.start();

NettyServer:Netty服务提供者

@Override
public <T> void publishService(T service, Class<T> serviceClass) 
   if(serializer == null) 
      logger.error("未设置序列化器");
      throw new RpcException(RpcError.SERIALIZER_NOT_FOUND);
   
   // com.whc.test.UserServiceImpl,UserService.Class
   serviceProvider.addServiceProvider(service, serviceClass);
   // com.whc.test.UserService,127.0.0.1:9000
   serviceRegistry.register(serviceClass.getCanonicalName(), new InetSocketAddress(host, port));

ServiceRegistry:服务注册接口

/**
 * 服务注册接口
 * 注册:保存服务和地址
 * @ClassName: ServiceRegistry
 * @Author: whc
 * @Date: 2021/06/09/22:29
 */
public interface ServiceRegistry 

   /**
    * 将一个服务注册进注册表
    * @param serviceName 服务名称
    * @param inetSocketAddress 提供服务的地址
    */
   void register(String serviceName, InetSocketAddress inetSocketAddress);

ZKServiceRegistryImpl:服务注册实现类

public class ZKServiceRegistryImpl implements ServiceRegistry 

   @Override
   public void register(String serviceName, InetSocketAddress inetSocketAddress) 
       // /MyRPC/com.whc.test.UserService
      String servicePersistentPath = CuratorUtils.ZK_REGISTER_ROOT_PATH + "/" + serviceName;
       // /MyRPC/com.whc.test.UserService/127.0.0.1:9000
      String serviceEphemeralPath = servicePersistentPath + inetSocketAddress;
      CuratorFramework zkClient = CuratorUtils.getZkClient();
      // 创建服务名永久节点, 服务地址为临时节点
      CuratorUtils.createPersistentNode(zkClient, servicePersistentPath);
      CuratorUtils.createEphemeralNode(zkClient, serviceEphemeralPath);
   


服务的发现测试

NettyClient:发送RpcRequest

// 获取服务地址
InetSocketAddress inetSocketAddress = serviceDiscovery.serviceDiscovery(rpcRequest.getInterfaceName());

ServiceDiscovery:服务发现接口

/**
 * 服务发现接口
 * 查询: 根据服务名查找地址
 * @ClassName: ServiceDiscovery
 * @Author: whc
 * @Date: 2021/06/13/23:52
 */
public interface ServiceDiscovery 

   /**
    * 根据服务名称查找服务实体
    * @param serviceName 服务名称
    * @return 服务实体
    */
   InetSocketAddress serviceDiscovery(String serviceName);

ZKServiceDiscoveryImpl:服务发现实现类

/**
 * 服务发现实现类
 * @ClassName: ZKServiceDiscoveryImpl
 * @Author: whc
 * @Date: 2021/06/14/0:57
 */
public class ZKServiceDiscoveryImpl implements ServiceDiscovery 

   private static final Logger logger = LoggerFactory.getLogger(ZKServiceDiscoveryImpl.class);

   private final LoadBalancer loadBalancer;

   public ZKServiceDiscoveryImpl() 
      this(null);
   

   public  ZKServiceDiscoveryImpl(LoadBalancer loadBalancer) 
      if(loadBalancer == null) 
         this.loadBalancer = new RandomLoadBalance();
       else 
         this.loadBalancer = loadBalancer;
      
   

   @Override
   public InetSocketAddress serviceDiscovery(String serviceName) 
      CuratorFramework zkClient = CuratorUtils.getZkClient();
       // 获取服务地址列表
      List<String> serviceUrlList = CuratorUtils.getChildrenNodes(zkClient, serviceName);
      if (serviceUrlList == null || serviceUrlList.size() == 0) 
         throw new RpcException(RpcError.SERVICE_NOT_FOUND, serviceName);
      

      // 负载均衡
      String targetServiceUrl = loadBalancer.balance(serviceUrlList);
      logger.info("通过负载均衡策略,获取到服务地址:[]", targetServiceUrl);
      String[] socketAddressArray = targetServiceUrl.split(":");
      String host = socketAddressArray[0];
      int port = Integer.parseInt(socketAddressArray[1]);
      return new InetSocketAddress(host, port);
   

测试截图

开启服务9000端口,向ZooKeeper注册服务

开启服务9001端口,向ZooKeeper注册服务

开启服务9002端口,向ZooKeeper注册服务

客户端向ZooKeeper获取服务地址

三、负载均衡

常见的负载均衡策略:随机,轮询,最小连接数,一致性Hash

这里只实现了随机轮询方式的负载均衡

1、接口

负载均衡用一个接口抽象出来:

/**
 * 负载均衡接口
 * 给服务器地址列表,根据不同的负载均衡策略选择一个
 * @ClassName: LoadBalancer
 * @Author: whc
 * @Date: 2021/06/12/22:08
 */
public interface LoadBalancer 
   String balance(List<String> serviceAddresses);

负载均衡抽象类

public abstract class AbstractLoadBalance implements LoadBalancer 

   @Override
   public String balance(List<String> serviceAddresses) 
      if (serviceAddresses == null || serviceAddresses.size() == 0) 
         return null;
      
      if (serviceAddresses.size() == 1) 
         return serviceAddresses.get(0);
      
      return doSelect(serviceAddresses);
   

   protected abstract String doSelect(List<String> serviceAddresses);

2、随机、轮询代码

  • 随机

    /**
     * 随机负载均衡
     * @ClassName: RandomLoadBalance
     * @Author: whc
     * @Date: 2021/06/12/22:11
     */
    public class RandomLoadBalance extends AbstractLoadBalance 
    
       @Override
       protected String doSelect(List<String> serviceAddresses) 
          return serviceAddresses.get(new Random().nextInt(serviceAddresses.size()));
       
    
    
    
  • 轮询

    public class RoundLoadBalance extends AbstractLoadBalance 
    
       private int index = 0;
    
       @Override
       protected String doSelect(List<String> serviceAddresses) 
          if(index >= serviceAddresses.size()) 
             index %= serviceAddresses.size();
          
          return serviceAddresses.get(index++);
       
    
    

3、客户端服务发现代码

@Override
public InetSocketAddress serviceDiscovery(String serviceName) 
   CuratorFramework zkClient = CuratorUtils.getZkClient();
   // 获取服务地址列表
   List<String> serviceUrlList = CuratorUtils.getChildrenNodes(zkClient, serviceName);
   if (serviceUrlList == null || serviceUrlList.size() == 0) 
      throw new RpcException(RpcError.SERVICE_NOT_FOUND, serviceName);
   

   // 负载均衡
   String targetServiceUrl = loadBalancer.balance(serviceUrlList);
   logger.info("通过负载均衡策略,获取到服务地址:[]", targetServiceUrl);
   String[] socketAddressArray = targetServiceUrl.split(":");
   String host = socketAddressArray[0];
   int port = Integer.parseInt(socketAddressArray[1]);
   return new InetSocketAddress(host, port);

三、动态感知服务器状态

1、文字描述

在实际的生成环境中一般都是集群环境部署,同一个程序会部署在相同的几台服务器上,这时就可以通过负载均衡服务器去调度,但是我们并不能很快速的获知哪台服务器挂掉了,这时我们就可以使用ZooKeeper来解决这个问题。

  • 感知上线

    当服务器启动的时候通过程序知道后会同时在zookeeper的service节点下创建一个新的短暂节点来存储当前服务器的信息。客户端通过对service节点的watch可以立马知道有新的服务器上线了

  • 感知下线

    当我们有个服务器下线后,对应的service下的短暂节点会被删除,此时watch service节点的客户端也能立马知道哪个服务器下线了,能够及时将访问列表中对应的服务器信息移除,从而实现及时感知服务器的变化。

2、代码部分实现

CuratorUtils提供:

  • createEphemeralNode()创建临时节点
  • registerWatcher()监听节点
// 创建服务地址为临时节点EPHEMERAL
// 临时节点,当客户端与 Zookeeper 之间的连接或者 session 断掉时会被zk自动删除。开源 Dubbo 框架,使用的就是临时节点
// 优点: 当服务节点下线或者服务节点不可用,Zookeeper 会自动将节点地址信息从注册中心删除
public static void createEphemeralNode(CuratorFramework zkClient, String path) 
   try 
      // 临时节点已存在
      if (EPHEMERAL_REGISTERED_PATH_SET.contains(path) || zkClient.checkExists().forPath(path) != null) 
         logger.info("临时节点已经存在,临时节点是:[]", path);
       else 
         // 临时节点不存在,则创建临时节点
         //eg: /MyRPC/com.whc.rpc.api.UserService/127.0.0.1:9000
         zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
         logger.info("临时节点成功被创建,临时节点是:[]", path);
      
      EPHEMERAL_REGISTERED_PATH_SET.add(path);
    catch (Exception e) 
      logger.error("创建临时节点失败[]", path);
   

// 对节点进行注册监听, 用的是PathChildrenCache
private static void registerWatcher(String rpcServiceName, CuratorFramework zkClient) throws Exception 
   String servicePath = ZK_REGISTER_ROOT_PATH + "/" + rpcServiceName;
   // 1. 创建监听对象
   PathChildrenCache pathChildrenCache = new PathChildrenCache(zkClient, servicePath, true);

   // 2. 绑定监听器
   pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() 
      @Override
      public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception 
         // 重新获取节点的孩子节点, 即重新获取服务列表信息
         List<String> serviceAddresses = curatorFramework.getChildren().forPath(servicePath);
         // 更新客户端本地服务缓存
         SERVICE_ADDRESS_MAP.put(rpcServiceName, serviceAddresses);
         logger.info("服务地址列表:", SERVICE_ADDRESS_MAP.get(rpcServiceName));
      
   );

   // 3. 开启
   pathChildrenCache.start();

3、测试截图

假设提供服务的9000端口对应的机器关闭服务,由于ZooKeeper创建的是临时节点,所以断开连接后,超过一定时间后,会关闭会话,临时节点会被删除,此时监听节点的监听器会收到删除事件的信息,于是让客户端重新获取服务地址信息,同时更新客户端本地缓存服务信息。

服务器下线

如图是关闭端口为9001的服务器

服务器上线

重启端口为9001的服务器

四、总结

ZooKeeper上所形成的节点树如图所示:

  • 服务注册与发现 & 负载均衡

    • 服务提供者在启动时,将其提供的服务名称、
      服务器地址,以节点的形式注册到服务配置中心
    • 服务消费者通过服务配置中心来获得需要调用的服务名称节点下的机器列表节点。通过负载均衡算法,选取其中一台服务器进行调用。
  • 动态感知服务器状态

    • 一旦服务器与ZooKeeper断开连接,节点也就不存在了,通过注册相应的watcher,服消费者能够在第一时间获知服务提供者机器信息的变更。利用其znode的特点和watcher机制,将其作为动态注册和获取服务信息的配置中心,统一管理服务名称和其对应的服务器列表信息,能够近乎实时地感知到后端的服务器的状态(上线、下线、宕机)

      在这个过程中,服务消费者只有在第一次调用服务时需要查询服务配置中心,然后将查询到的服务信息缓存到本地,后面的调用直接使用本地缓存的服务地址列表信息,而不需要重新发起请求到服务配置中心去获取相应的服务地址列表,直到服务的地址列表有变更(机器上线或者下线),变更行为会触发服务消费者注册的相应的watcher进行服务地址的重新查询。这种无中心化的结构,使得服务消费者在服务信息没有变更时,几乎不依赖配置中心,解决以往由负载均衡设备所导致的单点故障的问题,并且大大降低了服务配置中心的压力。
      (ZooKeeper集群间通过Zab协议,服务配置信息能够保持一致,而ZooKeeper本身容错性和leader选举机制,能保证我们方便地进行扩容)
      

五、版本三特

zookeeper--基于watcher原理实现带注册中心的rpc框架(代码片段)

...ry;importorg.apache.curator.retry.ExponentialBackoffRetry;importorg.apache.zookeeper.CreateMode;publicclassRegisterCenterimplementsIRegisterCenter privateCuratorFrameworkcuratorFramework; publicRegisterCenter() curatorFramework=CuratorFrameworkFactory.builder().connectString(ZooConfig.CONNECTION... 查看详情

基于zookeeper实现rpc注册中心

...增减节点时,客户端无需改动重启。krpc目前已经增加了zookeeper作为注册中心(如果不配置zk信息,则不使用)。针对服务节点意外宕掉的解决方案,zk的临时节点天然的符合这个场景。zk中当创建临时节点的链接断掉,该临时节... 查看详情

轻量级rpc设计与实现第二版(代码片段)

...,需要手动设置服务地址,限制性较大。在本文中,利用zookeeper作为服务注册中心,在服务端启动时将本地的服务信息注册到zookeeper中,当客户端发起远程服务调用时,先从zookeeper中获取该服务的地址,然后根据获得的这个地址... 查看详情

dubbo的rpc远程过程调用+dubbo的负载均衡+zookeeper注册中心(代码片段)

文章目录Dubbo的RPC远程过程调用+Dubbo的负载均衡+Zookeeper注册中心分布式基础理论应用架构演变单一应用架构垂直应用架构分布式服务架构RPC远程过程调用dubbo核心概念环境搭建_zookeeper注册中心环境搭建_管理控制台编写提供... 查看详情

基于zookeeper的dubbo注册中心

Zookeeper注册中心安装建议使用dubbo-2.3.3以上版本的zookeeper注册中心客户端。Zookeeper是ApacheHadoop的子项目,强度相对较好,建议生产环境使用该注册中心。Dubbo未对Zookeeper服务器端做任何侵入修改,只需安装原生的Zookeeper服务器即... 查看详情

基于zookeeper的dubbo注册中心

Zookeeper注册中心安装建议使用dubbo-2.3.3以上版本的zookeeper注册中心客户端。Zookeeper是ApacheHadoop的子项目,强度相对较好,建议生产环境使用该注册中心。Dubbo未对Zookeeper服务器端做任何侵入修改,只需安装原生的Zookeeper服务器即... 查看详情

基于zookeeper与netty实现的分布式rpc服务(代码片段)

文章目录前言架构前置知识服务接口API用户服务接口用户对象RPC请求对象RPC响应对象服务提供者RPC服务端RPC业务处理暴露接口服务注册服务消费者RPC客户端RPC客户端处理类RPC客户端代理类服务发现web接口前言大部分的互联网公司... 查看详情

轻量级rpc设计与实现第五版(最终版)(代码片段)

...客户端与服务端的远程通信利用Hessian来实现序列化设置Zookeeper作为注册中心新设监控器,通过心跳机制来判断服务端与监控器的网络连接状况,当出现不稳定时,认为服务端出现了问题,在注册中心删除相关的服务信息。利用Ne... 查看详情

rpc和注册中心的简介

...高系统的可伸缩性4、集中管理服务;常见的注册中心:zookeeper,Redis;服务的发现:引入注册中心 查看详情

利用zookeeper实现分布式锁及服务注册中心(代码片段)

原文:利用Zookeeper实现分布式锁及服务注册中心对于Zookeeper的定义以及原理,网上已经有很多的优秀文章对其进行了详细的介绍,所以本文不再进行这方面的阐述。本文主要介绍一些基本的准备工作以及zookeeper.net的使用。本文源... 查看详情

分布式专题

...异常.处理,阻止ID生成,这可能导致服务不可用。为什么Zookeeper可以用来作为注册中心可以利用Zookeeper的临时节点和watch机制来实现注册中心的自动注册和发现,另外Zookeeper中的数据都是存在内存中的,并且Zookeeper底层采用了nio... 查看详情

基于zookeeper实现配置中心

在Zookeeper的主要应用场景中,其中之一是作为分布式系统的配置中心。实现原理在Zookeeper建立一个根节点,比如/CONFIG,代表某个配置文件。将配置文件中的信息作为根节点的子节点存储,比如配置项timeout=3000,在Zookeeper中展现... 查看详情

dubbo注册中心

...能。Dubbo支持的注册中心主要包括:其中dubbo官方推荐用Zookeeper作为注册中心,下面介绍ZookeeperRegistry。Dubbo在Registry层实现服务的注册于发现,主要包括如下几个类:流程说明:RegistryProtocol是对需要暴露服务到注册中心的一层封... 查看详情

dubbo的底层实现原理和机制

...!作为SOA:具有服务治理功能,提供服务的注册和发现!用zookeeper实现注册中心!启动时候服务端会把所有接口注册到注册中心,并且订阅configurators,服务消费端订阅provide,configurators,routers,订阅变更时,zk会推送providers,configuators,... 查看详情

dubbo——使用zookeeper注册中心实现dubbo(代码片段)

1.写在前面有关Zookeeper注册中心的相关内容,可以参考我的这篇博客:https://blog.csdn.net/weixin_43823808/article/details/1173390442.案例分析这里使用注册中心仍然需要三个maven工程,第一个是mavenjava工程,后两个是mavenweb工... 查看详情

关于zookeeper

Zookeeper是分布式协调工具应用场景   命名服务(注册中心)Dubbo注册中心  分布式配置中心(SpringCloudconfig)动态管理配置文件信息  消息中间件事件通知(类似发布订阅)  分布式事务(全局协... 查看详情

大鹏rpc流程分析(代码片段)

大鹏RPC1.概述采用Zookeeper作为注册中心,记录服务提供者IP端口信息.客户端读取Zookeeper上已注册的服务提供方信息.服务器与客户端采用Netty通讯.序列化方式为TProtocol2.通讯协议项层协议为:TProtocol2.1.数据包4bytes1byte1byte1byte4bytesheaderb... 查看详情

基于zookeeper实现服务注册与发现(代码片段)

简介ZooKeeper官网在分布式系统中,服务注册与发现是一项重要的技术,本文提供Java代码基于ZooKeeper来实现的服务注册与发现的功能.服务注册packagecom.fanqiechaodan.service;importorg.apache.zookeeper.*;importjava.io.IOException;/***@authorfanqiechaodan*&#... 查看详情