关键词:
本文将分享一个高可用的池化 Thrift Client 及其源码实现,欢迎阅读源码(Github)并使用,同时欢迎提出宝贵的意见和建议,本人将持续完善。
本文的主要目标读者是对 Thrift 有一定了解并使用的童鞋,如对 Thrift 的基础知识了解不多或者想重温一下基础知识,推荐先阅读本站文章《和 Thrift 的一场美丽邂逅》。
下面进入正题。
为什么我们需要这么一个组件?
我们知道,Thrift 是一个 RPC 框架体系,可以非常方便的进行跨语言 RPC 服务的开发和调用。然而,它并没有提供针对多个 Server 的 Smart Client【1】。比如,你有一个服务 service,分别部署在 116.31.1.1 和 116.31.1.2 两台服务器上,当你需要从 Client 端调用该 service 的某个远程方法的时候,你只能在代码中显式指定使用 116.31.1.1 或者 116.31.1.2 其中的一个。这种情况下,你调用的时候无法预知所指定 IP 对应的服务是否可用,并且当该服务不可用时,无法隐式自动切换到调用另外一个 IP 对应的服务。也就是说,服务的状态对你并不是透明的,并且无法做到服务的负载均衡和高可用。
此外,当你调用远程方法时,每次你都得新建一个连接,当请求量很大时,不断的创建、删除连接所耗费的服务资源是巨大的。
因此,我们需要这么一个组件,使服务状态透明化并底层实现负载均衡和高可用,让你可以专注于业务逻辑的实现,提升工作效率和服务的质量。下面我们就对该组件(ThrifJ)进行详细的剖析。
它到底能做些什么?
特性
- 链式调用API,简洁直观
- 完善的默认配置,无需担心调用时配置不全导致抛错
- 池化连接对象,高效管理连接的生命周期
- 异常服务自动隔离与恢复
- 多种可配置的负载均衡策略,支持随机、轮询、权重和哈希
- 多种可配置的服务级别,并自动根据服务级别进行服务降级
该如何使用它?
目前最新版本为1.0.1(点此关注最新版本的更新),首先在项目中引入 thriftj-1.0.1.jar,或在 Maven 依赖中加入:
<dependency> <groupId>com.github.cyfonly</groupId> <artifactId>thriftj</artifactId> <version>1.0.1</version> </dependency>
需要注意的是,ThriftJ 基于 slf4j 构建,因此你需要在项目中增加具体日志实现的依赖,比如 log4j 或 logback。
然后在项目中,参照以下这段代码进行调用:
//Thrift server 列表 private static final String servers = "127.0.0.1:10001,127.0.0.1:10002"; //TTransport 验证器 ConnectionValidator validator = new ConnectionValidator() { @Override public boolean isValid(TTransport object) { return object.isOpen(); } }; //连接对象池配置 GenericKeyedObjectPoolConfig poolConfig = new GenericKeyedObjectPoolConfig(); //failover 策略 FailoverStrategy failoverStrategy = new FailoverStrategy(); //构造 ThriftClient 对象并配置 final ThriftClient thriftClient = new ThriftClient(); thriftClient.servers(servers) .loadBalance(Constant.LoadBalance.RANDOM) .connectionValidator(validator) .poolConfig(poolConfig) .failoverStrategy(failoverStrategy) .connTimeout(5) .backupServers("") .serviceLevel(Constant.ServiceLevel.NOT_EMPTY) .start(); //打印从 ThriftClient 获取到的可用服务列表 List<ThriftServer> servers = thriftClient.getAvailableServers(); for(ThriftServer server : servers){ System.out.println(server.getHost() + ":" + server.getPort()); } //服务调用 if(servers.size()>0){ try{ TestThriftJ.Client client = thriftClient.iface(TestThriftJ.Client.class); QryResult result = client.qryTest(1); System.out.println("result[code=" + result.code + " msg=" + result.msg + "]"); }catch(Throwable t){ logger.error("-------------exception happen", t); } }
友情提示:除 servers 必须配置外,其他配置均为可选(使用默认配置)
它是如何设计并实现的呢?
整体设计
连接池对象工厂及连接对象的管理
基于 commons-pool2 中的 KeyedPooledObjectFactory,以 ThriftServer 为 key,TTransport 为 value 进行实现。关键代码如下:
@Override public PooledObject<TTransport> makeObject(ThriftServer thriftServer) throws Exception { TSocket tsocket = new TSocket(thriftServer.getHost(), thriftServer.getPort()); tsocket.setTimeout(timeout); TFramedTransport transport = new TFramedTransport(tsocket); transport.open(); DefaultPooledObject<TTransport> result = new DefaultPooledObject<TTransport>(transport); logger.trace("Make new thrift connection: {}:{}", thriftServer.getHost(), thriftServer.getPort()); return result; } @Override public boolean validateObject(ThriftServer thriftServer, PooledObject<TTransport> pooledObject) { boolean isValidate; try { if (failoverChecker == null) { isValidate = pooledObject.getObject().isOpen(); } else { ConnectionValidator validator = failoverChecker.getConnectionValidator(); isValidate = pooledObject.getObject().isOpen() && (validator == null || validator.isValid(pooledObject.getObject())); } } catch (Throwable e) { logger.warn("Fail to validate tsocket: {}:{}", new Object[]{thriftServer.getHost(), thriftServer.getPort(), e}); isValidate = false; } if (failoverChecker != null && !isValidate) { failoverChecker.getFailoverStrategy().fail(thriftServer); } logger.info("ValidateObject isValidate:{}", isValidate); return isValidate; } @Override public void destroyObject(ThriftServer thriftServer, PooledObject<TTransport> pooledObject) throws Exception { TTransport transport = pooledObject.getObject(); if (transport != null) { transport.close(); logger.trace("Close thrift connection: {}:{}", thriftServer.getHost(), thriftServer.getPort()); } }
在使用连接对象时,根据用户的自定义连接池配置创建连接池,并实现连接对象的获取、回池、清除以及连接池的关闭操作。关键代码如下:
public DefaultThriftConnectionPool(KeyedPooledObjectFactory<ThriftServer, TTransport> factory, GenericKeyedObjectPoolConfig config) { connections = new GenericKeyedObjectPool<>(factory, config); } @Override public TTransport getConnection(ThriftServer thriftServer) { try { return connections.borrowObject(thriftServer); } catch (Exception e) { logger.warn("Fail to get connection for {}:{}", new Object[]{thriftServer.getHost(), thriftServer.getPort(), e}); throw new RuntimeException(e); } } @Override public void returnConnection(ThriftServer thriftServer, TTransport transport) { connections.returnObject(thriftServer, transport); } @Override public void returnBrokenConnection(ThriftServer thriftServer, TTransport transport) { try { connections.invalidateObject(thriftServer, transport); } catch (Exception e) { logger.warn("Fail to invalid object:{},{}", new Object[] { thriftServer, transport, e }); } } @Override public void close() { connections.close(); } @Override public void clear(ThriftServer thriftServer) { connections.clear(thriftServer); }
异常服务自动隔离与恢复
需要实现服务状态的透明化,就必须在底层实现服务的监测、隔离和恢复。在 ThriftJ 中,调用 ThriftClient 时会启动一个线程对服务进行异步监测,用户可以指定检验规则(对应配置为 ConnectionValidator)以及 failover 策略(对应配置为 FailoverStrategy,可以指定失败的次数、失效持续时间和恢复持续时间)。默认情况下,服务验证规则为判断 TTransport 是否处于开启状态,即:
if (this.validator == null) { this.validator = new ConnectionValidator() { @Override public boolean isValid(TTransport object) { return object.isOpen(); } }; }
而默认的 failover 策略为
- 失败次数:10(次),表示通过 ConnectionValidator 检验失败 10 次后才考虑将该服务失效,需要配合失效持续时间一起使用
- 时效持续时间:1(分钟),表示在一个检验周期内,首次检验失败的时间持续达到该值后才考虑将该服务失效,配合失败次数一起使用
- 恢复持续时间:1(分钟),表示在判定某服务失效并隔离后,经过该值后将服务重新恢复
以上功能基于 Guava cache 实现,关键代码如下:
/** * 使用默认 failover 策略 */ public FailoverStrategy() { this(DEFAULT_FAIL_COUNT, DEFAULT_FAIL_DURATION, DEFAULT_RECOVER_DURATION); } /** * 自定义 failover 策略 * @param failCount 失败次数 * @param failDuration 失效持续时间 * @param recoverDuration 恢复持续时间 */ public FailoverStrategy(final int failCount, long failDuration, long recoverDuration) { this.failDuration = failDuration; this.failedList = CacheBuilder.newBuilder().weakKeys().expireAfterWrite(recoverDuration, TimeUnit.MILLISECONDS).build(); this.failCountMap = CacheBuilder.newBuilder().weakKeys().build(new CacheLoader<T, EvictingQueue<Long>>() { @Override public EvictingQueue<Long> load(T key) throws Exception { return EvictingQueue.create(failCount); } }); } public void fail(T object) { logger.info("Server {}:{} failed.", ((ThriftServer)object).getHost(),((ThriftServer)object).getPort()); boolean addToFail = false; try { EvictingQueue<Long> evictingQueue = failCountMap.get(object); synchronized (evictingQueue) { evictingQueue.add(System.currentTimeMillis()); if (evictingQueue.remainingCapacity() == 0 && evictingQueue.element() >= (System.currentTimeMillis() - failDuration)) { addToFail = true; } } } catch (ExecutionException e) { logger.error("Ops.", e); } if (addToFail) { failedList.put(object, Boolean.TRUE); logger.info("Server {}:{} failed. Add to fail list.", ((ThriftServer)object).getHost(), ((ThriftServer)object).getPort()); } } public Set<T> getFailed() { return failedList.asMap().keySet(); }
负载均衡
ThriftJ 提供了四种可选的负载均衡策略:
- 随机
- 轮询
- 权重
- 哈希
在用户不显式指定的情况下,默认采用随机算法。具体算法的实现在此就不再进行过多的描述了。
需要注意的是,ThriftJ 严格规范了调用的语义,比如使用哈希策略时,必须要指定 hash key;当使用非哈希的其他策略时,一定不能指定 key,避免造成理解的二义性。
服务级别与服务降级
ThriftJ 提供了多种可配置的服务级别,并根据服务级别进行服务降级处理,其对应关系如下:
- SERVERS_ONLY:最高级别,仅返回配置的 servers 列表中可用的服务
- ALL_SERVERS:中等级别,当 servers 列表中的服务全部不可用时,返回 backupServers 列表中的可用服务
- NOT_EMPTY:最低级别,当 servers 和 backupServers 列表中的服务全部不可用时,返回 servers 列表中的所有服务
其中 ThriftJ 默认使用的服务级别是 NOT_EMPTY。服务降级处理的关键代码如下:
private List<ThriftServer> getAvailableServers(boolean all) { List<ThriftServer> returnList = new ArrayList<>(); Set<ThriftServer> failedServers = failoverStrategy.getFailed(); for (ThriftServer thriftServer : serverList) { if (!failedServers.contains(thriftServer)) returnList.add(thriftServer); } if (this.serviceLevel == Constant.ServiceLevel.SERVERS_ONLY) { return returnList; } if ((all || returnList.isEmpty()) && !backupServerList.isEmpty()) { for (ThriftServer thriftServer : backupServerList) { if (!failedServers.contains(thriftServer)) returnList.add(thriftServer); } } if (this.serviceLevel == Constant.ServiceLevel.ALL_SERVERS) { return returnList; } if(returnList.isEmpty()){ returnList.addAll(serverList); } return returnList; }
我还有话要说
技术的提升源自无私的分享,好的技术或工具分享出来,并不会让自己失去什么,反而可以在大家共同研究和沟通后使之获得更好的完善。不要担心自己写的工具不够好,不要害怕自己的技术不够牛,谁能一步就登天呢?
请热爱你的热爱!
【1】Smart Client:比如 MongoClient,可自动发现集群服务节点、自动故障转移和负载均衡。
池化与随时间的池化
】池化与随时间的池化【英文标题】:PoolingvsPooling-over-time【发布时间】:2018-07-1100:08:45【问题描述】:我从概念上理解max/sum池中发生的事情作为CNN层操作,但我看到这个术语“随时间变化的最大池”或“随时间变化的总和池”... 查看详情
tensorflow中的池化函数解析
1.池化原理2.tensorflow中的池化函数2.1tf.nn.max_pool(1)函数功能描述:axpooling是CNN当中的最大值池化操作(2)函数原型:tf.nn.max_pool(value,ksize,strides,padding,name=None)(3)函数 查看详情
含源码解析,深入java线程池原理
...来减少系统消耗,提升系统性能。在编程领域,比较典型的池化技术有:线程池、连接池、内存池、对象池等。对象池通过复用对象来减少创建对象、垃圾回收的开销;连接池(数据库连接池、Redis连接池和HTTP连接池等)通过复... 查看详情
线程池源码解析(代码片段)
什么是池化技术常见的池化技术有:连接池、对象池、内存池、线程池等。池化技术的核心是复用。线程池的概念系统启动一个新线程的成本是比较高的,因为它涉及与操作系统的交互。使用线程池可以很好的提高性能... 查看详情
再聊线程池
...程池有一个全新的认识。池化这里池化并不是深度学习中的池化,而是将资源交给池来管理的这一过程。我们在开发中经常回接触到池化资源的技术,最常见的当然是数据库连接池,以及我们今天要讲 查看详情
java装箱==的池化坑
原创作品,出自“晓风残月xj”博客,欢迎转载,转载时请务必注明出处(http://blog.csdn.net/xiaofengcanyuexj)。由于各种原因,可能存在诸多不足,欢迎斧正! 今天读《EffectiveJava》,读到“基... 查看详情
神经网络中的池化层(pooling)
在卷积神经网络中,我们经常会碰到池化操作,而池化层往往在卷积层后面,通过池化来降低卷积层输出的特征向量,同时改善结果(不易出现过拟合)。为什么可以通过降低维度呢?因为图像具有一种“静态性”的属性,... 查看详情
rocketmq源码分析——高可用
概述本文主要解析Namesrv、Broker如何实现高可用,Producer、Consumer怎么与它们通信保证高可用。Namesrv高可用启动多个Namesrv实现高可用。相较于Zookeeper、Consul、Etcd等,Namesrv是一个超轻量级的注册中心,提供命名服务。2.1Broker注册到... 查看详情
keepalived高可用服务总结分享
...监控LVS集群中各个集群的节点状态,后来又加入了vrrp高可用功能,因此keepalived不但可以管理LVS集群节点,还可做其他服务(Nginx、Mysql、Haproxy等)的高可用解决方案二、keepalived高可用工作原理keepalived高可用服务之间的故障转移... 查看详情
xen-server实现单点故障转移高可用(ha)
Xen-Server实现单点故障转移高可用(HA)新建一个资源池(本处的池为服务器Xen-server池) 650)this.width=650;"height="72"src="https://a3.qpic.cn/psb?/594581eb-e62e-4426-a878-953c87dd5729/*sHhFUdfLMMUJ6CyuLqkfInqlZgvPdX5eRq0qjlcRkE! 查看详情
微信群分享预告kubernetes结合lvs实现高可用负载均衡与集群外服务访问
主题介绍Kubernetes结合LVS实现高可用负载均衡与集群外服务访问分享时间: 9月26日 (周二)20:00-20:40(40分钟)互动时间:20:40-21:00(20分钟)分享群:有容云Docker技术交流群参与方式:关注公众... 查看详情
tensorflow中的池化函数解析
1.池化原理2.tensorflow中的池化函数2.1tf.nn.max_pool(1)函数功能描述:axpooling是CNN当中的最大值池化操作(2)函数原型:tf.nn.max_pool(value,ksize,strides,padding,name=None)(3)函数参数介绍:参数是四... 查看详情
再聊线程池(代码片段)
...程池有一个全新的认识。池化这里池化并不是深度学习中的池化,而是将 查看详情
再聊线程池(代码片段)
...程池有一个全新的认识。池化这里池化并不是深度学习中的池化,而是将 查看详情
netty源码之内存池(代码片段)
...rectBuffer和HeapBuffer对外直接内存缓冲堆内存缓冲IO二、Netty的池化池化的好处netty的缓冲池使用1、池化Buffer2、非池化Buffer三、内存分配1、PooledDirectByteBuf对象池的使用2、回收池Recycler原理3、堆外内存的分配四、apache的对象池commons-... 查看详情
rocketmq源码—rocketmq高可用(代码片段)
高可用究竟指的是什么?请参考:关于高可用的系统RocketMQ做了以下的事情来保证系统的高可用多master部署,防止单点故障消息冗余(主从结构),防止消息丢失故障恢复(本篇暂不讨论)那么问题来了:怎么支持多broker的写?... 查看详情
微信群分享预告kubernetes结合lvs实现高可用负载均衡与集群外服务访问
主题介绍Kubernetes结合LVS实现高可用负载均衡与集群外服务访问分享时间: 9月26日 (周二)20:00-20:40(40分钟)互动时间:20:40-21:00(20分钟)分享群:有容云Docker技术交流群参与方式:关注公众... 查看详情
大厂分布式面试题分享:zookeeper集群如何实现高可用部署?
...调服务,是Dubbo等服务框架的注册中心等。原理在介绍高可用部署前,我们先了解下Zookeeper的基本知识,这对充分理解它的高可用部署非常重要。架构下图是Zookeeper的架构图,ZooKeeper集群中包含Leader、Follower以及Observer三个角色:... 查看详情