ribbon源码之client(代码片段)

zwh1988 zwh1988     2022-11-01     667

关键词:

  客户端模块的核心功能是提供统一的用户请求操作接口

接口定义

  客户端模块的核心是IClient接口,定义了客户端网络请求的方法。

public interface IClient<S extends ClientRequest, T extends IResponse> 
    public T execute(S request, IClientConfig requestConfig) throws Exception; 

  ClientRequest为客户端定义的请求体,存储了请求uri、loadbalancer的key,是否重试、配置。

public class ClientRequest implements Cloneable 
    protected URI uri;
    protected Object loadBalancerKey = null;
    protected Boolean isRetriable = null;
    protected IClientConfig overrideConfig;

  IResponse为客户端定义的响应内容的接口。

public interface IResponse extends Closeable

   public Object getPayload() throws ClientException;
   public boolean hasPayload();
   public boolean isSuccess();
   public URI getRequestedURI();
   public Map<String, ?> getHeaders();   

  IClientConfigAware定义了需要使用IClientConfig初始化IClient的方法。

public interface IClientConfigAware 
    public abstract void initWithNiwsConfig(IClientConfig clientConfig);

  ribbon基于http请求,相关类和接口,HttpRequest为http请求;HttpResponse为http请求返回结果;Restclient是基于jesery的IClient实现。

类图

 

客户端工厂类

  客户端模块提供了一个客户端工厂类(ClientFactory)用于通过配置文件来创建IClient实例和负载均衡器(ILoadBalancer)实例。

public static synchronized IClient getNamedClient(String name) //根据配置获取iclient实例,默认使用DefaultClientConfigImpl配置类return getNamedClient(name, DefaultClientConfigImpl.class);
    
public static synchronized IClient getNamedClient(String name, Class<? extends IClientConfig> configClass) 
...
return createNamedClient(name, configClass);
...
public static synchronized IClient createNamedClient(String name, Class<? extends IClientConfig> configClass) throws ClientException 
IClientConfig config = getNamedConfig(name, configClass);//实例化配置类
return registerClientFromProperties(name, config);//通过配置文件创建iclient
public static synchronized ILoadBalancer getNamedLoadBalancer(String name) 
return getNamedLoadBalancer(name, DefaultClientConfigImpl.class);

public static synchronized ILoadBalancer getNamedLoadBalancer(String name, Class<? extends IClientConfig> configClass)
...
lb = registerNamedLoadBalancerFromProperties(name, configClass);
...
public static synchronized IClient<?, ?> registerClientFromProperties(String restClientName, IClientConfig clientConfig) throws ClientException  
        IClient<?, ?> client = null;
        ILoadBalancer loadBalancer = null;
        ...
            String clientClassName = (String) clientConfig.getProperty(CommonClientConfigKey.ClientClassName);//通过配置文件获取client的实现类
            client = (IClient<?, ?>) instantiateInstanceWithClientConfig(clientClassName, clientConfig); //通过配置文件创建client实例
            boolean initializeNFLoadBalancer = Boolean.parseBoolean(clientConfig.getProperty(
                    CommonClientConfigKey.InitializeNFLoadBalancer, DefaultClientConfigImpl.DEFAULT_ENABLE_LOADBALANCER).toString());
            if (initializeNFLoadBalancer) //如果需要初始化负载均衡器,则通过配置文件创建一个负载均衡器
                loadBalancer  = registerNamedLoadBalancerFromclientConfig(restClientName, clientConfig);
            
            if (client instanceof AbstractLoadBalancerAwareClient) //如果client实现AbstractLoadBalancerAwareClient,则注入负载均衡器
                ((AbstractLoadBalancerAwareClient) client).setLoadBalancer(loadBalancer);
            
        ...return client;
    
public static ILoadBalancer registerNamedLoadBalancerFromclientConfig(String name, IClientConfig clientConfig) throws ClientException 
...
ILoadBalancer lb = null;
...
String loadBalancerClassName = (String) clientConfig.getProperty(CommonClientConfigKey.NFLoadBalancerClassName);//
lb = (ILoadBalancer) ClientFactory.instantiateInstanceWithClientConfig(loadBalancerClassName, clientConfig);
...
return lb;
...

//初始化指定的class类
public static Object instantiateInstanceWithClientConfig(String className, IClientConfig clientConfig)
          throws InstantiationException, IllegalAccessException, ClassNotFoundException 
Class clazz = Class.forName(className);
if (IClientConfigAware.class.isAssignableFrom(clazz)) //如果指定的iclient实现了IClientConfigAware,ClientFactory在创建时会使用IClientConfig进行初始化
IClientConfigAware obj = (IClientConfigAware) clazz.newInstance();
obj.initWithNiwsConfig(clientConfig);
return obj;
else
try
if (clazz.getConstructor(IClientConfig.class) != null)
return clazz.getConstructor(IClientConfig.class).newInstance(clientConfig);

catch (Throwable e) // NOPMD


return clazz.newInstance();

  使用客户端工厂类(ClientFactory)涉及的配置:

属性 实现 默认值
clientname.ribbon.ClientClassName client使用的IClient实现类 com.netflix.niws.client.http.RestClient
clientname.ribbon.InitializeNFLoadBalancer 是否初始化负载均衡器 true
clientname.ribbon.NFLoadBalancerClassName 负载均衡器的实现类 com.netflix.loadbalancer.ZoneAwareLoadBalancer

类图

客户端实现类

  AbstractLoadBalancerAwareClient实现了通过负载均衡器进行请求调用。LoadBalancerCommand对负载均衡器操作进行了模版,对请求调用提供了回调函数。

public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException 
        LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
     ...return command.submit(
                new ServerOperation<T>() 
                    @Override
                    public Observable<T> call(Server server) 
                        URI finalUri = reconstructURIWithServer(server, request.getUri());
                        S requestForServer = (S) request.replaceUri(finalUri);//设置最终的调用uritry 
                            return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                         //调用execute方法执行请求调用catch (Exception e) 
                            return Observable.error(e);
                        
                    
                )
                .toBlocking()
                .single();
        ...
        
    

  LoadBalancerCommand调用了负载均衡器获得了一个server,然后调用回调函数执行请求。此外还提供了各个关键节点的监听器和异常重试机制。

public Observable<T> submit(final ServerOperation<T> operation) 
        final ExecutionInfoContext context = new ExecutionInfoContext();
        if (listenerInvoker != null) //执行回调接口
            try 
                listenerInvoker.onExecutionStart();
             catch (AbortExecutionException e) 
                return Observable.error(e);
            
        
        final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
        final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();
        Observable<T> o = 
                (server == null ? selectServer() : Observable.just(server))//调用负载均衡器获得目标server
                .concatMap(new Func1<Server, Observable<T>>() 
                    ...return operation.call(server).doOnEach(new Observer<T>() 
                                            ....
                                        );
                    ...
                        if (maxRetrysSame > 0) 
                            o = o.retry(retryPolicy(maxRetrysSame, true));
                        return o;
                    
                );
            
        if (maxRetrysNext > 0 && server == null) 
            o = o.retry(retryPolicy(maxRetrysNext, false));
        
        return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() 
            @Override
            public Observable<T> call(Throwable e) 
                if (context.getAttemptCount() > 0) 
                    if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) 
                        e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED,
                                "Number of retries on next server exceeded max " + maxRetrysNext
                                + " retries, while making a call for: " + context.getServer(), e);
                    
                    else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) 
                        e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED,
                                "Number of retries exceeded max " + maxRetrysSame
                                + " retries, while making a call for: " + context.getServer(), e);
                    
                
                if (listenerInvoker != null) 
                    listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo());
                
                return Observable.error(e);
            
        );
    

 

 

private Observable<Server> selectServer() 
        return Observable.create(new OnSubscribe<Server>() 
            @Override
            public void call(Subscriber<? super Server> next) 
                try 
                    Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);   
                    next.onNext(server);
                    next.onCompleted();
                 catch (Exception e) 
                    next.onError(e);
                
            
        );
    

 

  子类 HttpRequest用于http请求,内部定义了http请求的各个内容,并且使用了builder模式。

public class HttpRequest extends ClientRequest protected CaseInsensitiveMultiMap httpHeaders = new CaseInsensitiveMultiMap();//head参数
    protected Multimap<String, String> queryParams = ArrayListMultimap.create();//query参数
    private Object entity;//消息体
    protected Verb verb;//http请求的method:post get head delete等。默认get

   

类图

 

ribbon源码之客户端(代码片段)

  客户端模块的核心功能是提供统一的用户请求操作接口。接口定义  客户端模块的核心是IClient接口,定义了客户端网络请求的方法。publicinterfaceIClient<SextendsClientRequest,TextendsIResponse>publicTexecute(Srequest,IClientConfigrequestCon... 查看详情

客户端负载均衡ribbon之源码解析(代码片段)

什么是负载均衡器?假设有一个分布式系统,该系统由在不同计算机上运行的许多服务组成。但是,当用户数量很大时,通常会为服务创建多个副本。每个副本都在另一台计算机上运行。此时,出现“LoadBalancer(负载均衡器)”... 查看详情

ribbon源码之概述

  ribbon主要功能是提供客户侧负载均衡算法。  源码结构包括一下几个部分:负载均衡器  负载均衡器是ribbon的核心实现类,提供了负载均衡的功能,具体参见ribbon源码之负载均衡器。服务器存活检测  检测负载均衡器... 查看详情

聊聊ribbon源码解读(代码片段)

聊聊Ribbon源码解读要说当今最流行的组件当然是SpringCloud,要说框架中最流行的负载均衡组件,非Ribbon莫属了@LoadBalanced注解当我们使用Ribbon的时候,spring中注入RestTemplate,并在上边添加@LoadBalanced,这样使用RestTemplate发送请求的... 查看详情

springcloud之负载均衡组件ribbon原理分析(代码片段)

SpringCloud之负载均衡组件Ribbon原理分析前言一个问题引发的思考Ribbon的简单使用Ribbon原理分析@LoadBalanced注解@Qualifier注解LoadBalancerAutoConfiguration自动装配RestTemplateCustomizerLoadBalancerInterceptorRibbonLoadBalancerClient#ex 查看详情

springcloud之ribbon,feign,hystrix,gateway介绍(代码片段)

一、Ribbon1.Ribbon概述Ribbon是Netflix提供的一个基于HTTP和TCP的客户端负载均衡工具。Ribbon主要有两个功能:简化远程调用负载均衡服务端负载均衡负载均衡算法在服务端由负载均衡器维护服务地址列表客户端负载均衡负载均衡算... 查看详情

微服务架构整理-(六springcloud实战之ribbon)(代码片段)

SpringCloud实战之Ribbon负责均衡介绍Ribbon概念Ribbon中的负载均衡算法Ribbon使用添加一个服务提供者添加Ribbon区别服务提供者访问服务提供者运行结果总结负责均衡介绍负载均衡分为硬件负载均衡和软件负载均衡。常用见的如下:... 查看详情

springcloud系列之ribbonopenfeign(代码片段)

SpringCloud系列(二)之Ribbon、OpenFeign一、Ribbon负载均衡服务调用1.1Ribbon负载均衡实现过程1.2Ribbon的负载均衡策略1.3负载均衡算法原理分析二、OpenFeign服务接口调用2.1OpenFeign使用步骤2.2超时控制2.3日志打印使用SpringCloud版本&... 查看详情

ribbon源码载均衡算法(代码片段)

  负载均衡算法模块主要的功能是从负载均衡器中获取服务器列表信息,根据算法选取出一个服务器。IRule  负载均衡算法接口publicinterfaceIRulepublicServerchoose(Objectkey);//选择一个服务器publicvoidsetLoadBalancer(ILoadBalancerlb);//设置负... 查看详情

ribbon核心api源码解析:ribbon-coreiclient请求客户端-02(代码片段)

Ribbon核心API源码解析:ribbon-core(一)IClient请求客户端-02前言正文IClientClientRequestIResponse本地测试环境搭建配置key管理IClientConfigKeyCommonClientConfigKey示例总结前言上篇文章整体上对Ribbon做了介绍,可能有小伙伴的有... 查看详情

ribbon源码之获取服务列表

ServerListUpdater  动态更新ServerListUpdater的执行策略。核心方法是start,updateAction参数是更新的具体方法。voidstart(UpdateActionupdateAction);PollingServerListUpdater  是ServerListUpdater的一个实现类,内部使用ScheduledThreadPoolExecutor来执 查看详情

ribbon源码之负载均衡算法

IRule  负载均衡器用来选择服务器的规则。publicinterfaceIRule{publicServerchoose(Objectkey);publicvoidsetLoadBalancer(ILoadBalancerlb);publicILoadBalancergetLoadBalancer();}  通过BaseLoadBalancer的setRule或构造函数来为BaseLoadBa 查看详情

ribbon源码之serverlistchangelistener

ServerListChangeListener  用来监听负载均衡器的服务器列表变化。publicinterfaceServerListChangeListener{/***当BaseLoadBalancer的server列表变化时被调用。*/publicvoidserverListChanged(List<Server>oldList,List<Server>newLi 查看详情

ribbon源码之ipingstrategy

IPingStrategy  IPingStrategy用来探测注册在BaseLoadBalancer中的server的存活情况。publicinterfaceIPingStrategy{boolean[]pingServers(IPingping,Server[]servers);}BaseLoadBalancer实现了一个默认的序列化执行的实现类,我们也可以实现自己的并发策略。privates 查看详情

springcloud-springcloudnetflix之ribbon(代码片段)

...发准备(二)_MinggeQingchun的博客-CSDN博客SpringCloudRibbon是一套基于NetflixRibbon实现的客户端负载均衡和服务调用工具Ribbon是Netflix公司发布的开源组件,其主要功能是提供客户端的负载均衡算法和服务调用。S 查看详情

一起学源码-微服务ribbon源码一:ribbon概念理解及demo调试(代码片段)

...经梳理清楚了Eureka相关的概念及源码,接下来开始研究下Ribbon的实现原理。我们都知道Ribbon在springcloud中担当负载均衡的角色,当两个EurekaClient互相调用的时候,Ribbon能够做到调用时的负载,保证多节点的客户端均匀接收请求。(... 查看详情

一起学源码-微服务ribbon源码四:进一步探究ribbon的irule和iping(代码片段)

前言前情回顾上一讲深入的讲解了Ribbon的初始化过程及Ribbon与Eureka的整合代码,与Eureka整合的类就是DiscoveryEnableNIWSServerList,同时在DynamicServerListLoadBalancer中会调用PollingServerListUpdater进行定时更新Eureka注册表信息到BaseLoadBalancer中... 查看详情

[c++][原创]websocketcpp编译demo之echo_client正确方法(代码片段)

测试环境:ubuntu18.04源码地址:GitHub-zaphoyd/websocketpp:C++websocketclient/serverlibrary下载源码后随便放一个文件夹,建议是非中文空格路径,其实看到源码后没什么文件。我们先把example文件夹里面echo_client复制出来随便放一个位置。安... 查看详情