分布式链路监控系统(代码片段)

xinyuan_java xinyuan_java     2022-12-30     765

关键词:

本文通过阿里的Eagleeye(鹰眼)和开源的Skywalking,从数据模型、数据埋点以及数据存储三个方面介绍分布式链路监控系统的实现细节,其中将重点介绍Skywalking字节码增强的实现方案。

背景

传统的大型单体系统随着业务体量的增大已经很难满足市场对技术的需求,通过对将整块业务系统拆分为多个互联依赖的子系统并针对子系统进行独立优化,能够有效提升整个系统的吞吐量。在进行系统拆分之后,完整的业务事务逻辑所对应的功能会部署在多个子系统上,此时用户的一次点击请求会触发若干子系统之间的相互功能调用,如何分析一次用户请求所触发的多次跨系统的调用过程、如何定位存在响应问题的调用链路等等问题是链路追踪技术所要解决的问题。

举一个网络搜索的示例,来说明这样一个链路监控系统需要解决的一些挑战。当用户在搜索引擎中输入一个关键词后,一个前端服务可能会将这次查询分发给数百个查询服务,每个查询服务在其自己的索引中进行搜索。该查询还可以被发送到许多其他子系统,这些子系统可以处理敏感词汇、检查拼写、用户画像分析或寻找特定领域的结果,包括图像、视频、新闻等。所有这些服务的结果有选择地组合在一起,最终展示在搜索结果页面中,我们将这个模型称为一次完整的搜索过程。

在这样一次搜索过程中,总共可能需要数千台机器和许多不同的服务来处理一个通用搜索查询。此外,在网络搜索场景中,用户的体验和延迟紧密相关,一次搜索延时可能是由于任何子系统的性能不佳造成的。开发人员仅考虑延迟可能知道整个系统存在问题,但却无法猜测哪个服务有问题,也无法猜测其行为不良的原因。首先,开发人员可能无法准确知道正在使用哪些服务,随时都可能加入新服务和修改部分服务,以增加用户可见的功能,并改进性能和安全性等其他方面;其次,开发人员不可能是庞大系统中每个内部微服务的专家,每一个微服务可能有不同团队构建和维护;另外,服务和机器可以由许多不同的客户端同时共享,因此性能问题可能是由于另一个应用的行为引起。

Dapper简介

在分布式链路追踪方面,Google早在2010年针对其内部的分布式链路跟踪系统Dapper[1],发表了相关论文对分布式链路跟踪技术进行了介绍(强烈推荐阅读)。其中提出了两个基本要求。第一,拥有广泛的覆盖面。针对庞大的分布式系统,其中每个服务都需要被监控系统覆盖,即使是整个系统的一小部分没有被监控到,该链路追踪系统也可能是不可靠的。第二,提供持续的监控服务。对于链路监控系统,需要7*24小时持续保障业务系统的健康运行,保证任何时刻都可以及时发现系统出现的问题,并且通常情况下很多问题是难以复现的。根据这两个基本要求,分布式链路监控系统的有如下几个设计目标:

  • 应用级透明

链路监控组件应该以基础通用组件的方式提供给用户,以提高稳定性,应用开发者不需要关心它们。对于Java语言来说,方法可以说是调用的最小单位,想要实现对调用链的监控埋点势必对方法进行增强。Java中对方法增强的方式有很多,比如直接硬编码、动态代理、字节码增强等等。应用级透明其实是一个比较相对的概念,透明度越高意味着难度越大,对于不同的场景可以采用不同的方式。

  • 低开销

低开销是链路监控系统最重要的关注点,分布式系统对于资源和性能的要求本身就很苛刻,因此监控组件必须对原服务的影响足够小,将对业务主链路的影响降到最低。链路监控组件对于资源的消耗主除了体现在增强方法的消耗上,其次还有网络传输和数据存储的消耗,因为对于链路监控系统来说,想要监控一次请求势必会产生出请求本身外的额外数据,并且在请求过程中,这些额外的数据不仅会暂时保存在内存中,在分布式场景中还会伴随着该请求从上游服务传输至下游服务,这就要求产生的额外数据尽可能地少,并且在伴随请求进行网络传输的时候只保留少量必要的数据。

  • 扩展性和开放性


无论是何种软件系统,可扩展性和开放性都是衡量其质量优劣的重要标准。对于链路监控系统这样的基础服务系统来说,上游业务系统对于链路监控系统来说是透明的,在一个规模较大的企业中,一个基础服务系统往往会承载成千上万个上游业务系统。每个业务系统由不同的团队和开发人员负责,虽然使用的框架和中间件在同一个企业中有大致的规范和要求,但是在各方面还是存在差异的。因此作为一个基础设施,链路监控系统需要具有非常好的可扩展性,除了对企业中常用中间件和框架的支撑外,还要能够方便开发人员针对特殊的业务场景进行定制化的开发。

数据模型

OpenTracing规范

Dapper将请求按照三个维度划分为Trace、Segment、Span三种模型,该模型已经形成了OpenTracing[2]规范。OpenTracing是为了描述分布式系统中事务的语义,而与特定下游跟踪或监控系统的具体实现细节无关,因此描述这些事务不应受到任何特定后端数据展示或者处理的影响。大的概念就不多介绍了,重点看一下Trace、Segment、Span这三种模型到底是什么。

  • Trace


表示一整条调用链,包括跨进程、跨线程的所有Segment的集合。

  • Segment


表示一个进程(JVM)或线程内的所有操作的集合,即包含若干个Span。

  • Span

表示一个具体的操作。Span在不同的实现里可能有不同的划分方式,这里介绍一个比较容易理解的定义方式:


1、Entry Span:入栈Span。Segment的入口,一个Segment有且仅有一个Entry Span,比如HTTP或者RPC的入口,或者MQ消费端的入口等。
2、Local Span:通常用于记录一个本地方法的调用。
3、Exit Span:出栈Span。Segment的出口,一个Segment可以有若干个Exit Span,比如HTTP或者RPC的出口,MQ生产端,或者DB、Cache的调用等。

按照上面的模型定义,一次用户请求的调用链路图如下所示:

唯一id

每个请求有唯一的id还是很必要的,那么在海量的请求下如何保证id的唯一性并且能够包含请求的信息?Eagleeye的traceId设计如下:

根据这个id,我们可以知道这个请求在2022-10-18 10:10:40发出,被11.15.148.83机器上进程号为14031的Nginx(对应标识位e)接收到。其中的四位原子递增数从0-9999,目的是为了防止单机并发造成traceId碰撞。

关系描述

将请求划分为Trace、Segment、Span三个层次的模型后,如何描述他们之间的关系?
从【OpenTracing规范】一节的调用链路图中可以看出,Trace、Segment可以作为整个调用链路中的逻辑结构,而Span才是真正串联起整个链路的单元,系统可以通过若干个Span串联起整个调用链路。


在Java中,方法是以入栈、出栈的形式进行调用,那么系统在记录Span的时候就可以通过模拟出栈、入栈的动作来记录Span的调用顺序,不难发现最终一个链路中的所有Span呈现树形关系,那么如何描述这棵Span树?Eagleeye中的设计很巧妙,EagleEye设计了RpcId来区别同一个调用链下多个网络调用的顺序和嵌套层次。如下图所示:

RpcId用0.X1.X2.X3.....Xi来表示,根节点的RpcId固定从0开始,id的位数("."的数量)表示了Span在这棵树中的层级,Id最后一位表示了Span在这一层级中的顺序。那么给定同一个Trace中的所有RpcId,便可以很容易还原出一个完成的调用链:

- 0  - 0.1    - 0.1.1    - 0.1.2      - 0.1.2.1  - 0.2    - 0.2.1  - 0.3    - 0.3.1      - 0.3.1.1    - 0.3.2

跨进程传输

再进一步,在整个调用链的收集过程中,不可能将整个Trace信息随着请求携带到下个应用中,为了将跨进程传输的trace信息减少到最小,每个应用(Segment)中的数据一定是分段收集的,这样在Eagleeye的实现下跨Segment的过程只需要携带traceId和rpcid两个简短的信息即可。在服务端收集数据时,数据自然也是分段到达服务端的,但由于种种原因分段数据可能存在乱序和丢失的情况:

如上图所示,收集到一个Trace的数据后,通过rpcid即可还原出一棵调用树,当出现某个Segment数据缺失时,可以用第一个子节点替代。

数据埋点

如何进行方法增强(埋点)是分布式链路追系统的关键因素,在Dapper提出的要求中可以看出,方法增强同时要满足应用级透明和低开销这两个要求。之前我们提到应用级透明其实是一个比较相对的概念,透明度越高意味着难度越大,对于不同的场景可以采用不同的方式。本文我们介绍阿里的Eagleye和开源的SkyWalking来比较两种埋点方式的优劣。

编码


阿里Eagleye的埋点方式是直接编的码方式,通过中间件预留的扩展点实现。但是按照我们通常的理解来说,编码对于Dapper提出的扩展性和开放性似乎并不友好,那为什Eagleye么要采用这样的方式?个人认为有以下几点:


1、阿里有中间件的使用规范,不是想用什么就用什么,因此对于埋点的覆盖范围是有限的;
2、阿里有给力的中间件团队专门负责中间件的维护,中间件的埋点对于上层应用来说也是应用级透明的,对于埋点的覆盖是全面的;
3、阿里应用有接入Eagleye监控系统的要求,因此对于可插拔的诉求并没有非常强烈。
从上面几点来说,编码方式的埋点完全可以满足Eagleye的需要,并且直接编码的方式在维护、性能消耗方面也是非常有优势的。

字节码增强


相比于Eagleye,SkyWalking这样开源的分布式链路监控系统,在开源环境下就没有这么好做了。开源环境下面临的问题其实和阿里集团内部的环境正好相反:


1、开源环境下每个开发者使用的中间件可能都不一样,想用什么就用什么,因此对于埋点的覆盖范围几乎是无限的;
2、开源环境下,各种中间件都由不同组织或个人进行维护,甚至开发者还可以进行二次开发,不可能说服他们在代码中加入链路监控的埋点;
3、开源环境下,并不一定要接入链路监控体系,大多数个人开发者由于资源有限或其他原因没有接入链路监控系统的需求。

从上面几点来说,编码方式的埋点肯定是无法满足SkyWalking的需求的。针对这样的情况,Skywalking采用如下的开发模式:

Skywalking提供了核心的字节码增强能力和相关的扩展接口,对于系统中使用到的中间件可以使用官方或社区提供的插件打包后植入应用进行埋点,如果没有的话甚至可以自己开发插件实现埋点。Skywalking采用字节码增强的方式进行埋点,下面简单介绍字节码增强的相关知识和Skywalking的相关实现。

两种方式


对Java应用实现字节码增强的方式有Attach和Javaagent两种,本文做一个简单的介绍。


Attach


Attach是一种相对动态的方式,在阿尔萨斯(Arthas)这样的诊断系统中广泛使用,利用JVM提供的Attach API可以实现一个JVM对另一个运行中的JVM的通信。用一个具体的场景举例:我们要实现Attach JVM对一个运行中JVM的监控。如下图所示:

1、Attach JVM利用Attach API获取目标JVM的实例,底层会通过socketFile建立两个JVM间的通信;
2、Attach JVM指定目标JVM需要挂载的agent.jar包,挂载成功后会执行agent包中的agentmain方法,此时就可以对目标JVM中类的字节码进行修改;
3、Attach JVM通过Socket向目标JVM发送命令,目标JVM收到后会进行响应,以达到监控的目的。

虽然Attach可以灵活地对正在运行中的JVM进行字节码修改,但在修改时也会受到一些限制,比如不能增减父类、不能增加接口、不能调整字段等。


Javaagent


Javaagent大家应该相对熟悉,他的启动方式是在启动命令中加入javaagent参数,指定需要挂载的agent:

java -javaagent:/path/agent.jar=key1=value1,key2=value2 -jar myJar.jar

Javaagent在IDE的Debug模式、链路监控系统等场景中广泛使用。它的核心是在目标JVM执行main方法前执行agent的premain方法,以插入前置逻辑:

1、目标JVM通过javaagent参数启动后找到指定的agent,执行agent的premain方法;
2、agent中通过JVM暴露的接口添加一个Transformer,顾名思义它可以Transform字节码;
3、目标JVM在类加载的时候会触发JVM内置的事件,回调Transformer以实现字节码的增强。

和Attach方式相比,Javaagent只能在main方法之前执行。但是在修改字节码时较为灵活,甚至可以修改JDK的核心类库。

字节码增强类库

Java提供了很多字节码增强类库,比如大家耳熟能详的cglib、Javassist,原生的Jdk Proxy还有底层的ASM等。在2014年,一款名为Byte Buddy[3]的字节码增强类库横空出世,并在2015年获得Duke's Choice award。Byte Buddy兼顾高性能、易用、功能强大3个方面,下面是摘自其官网的一张常见字节码增强类库性能比较图(单位: 纳秒):

上图中的对比项我们可以大致分为两个方面:生成快速代码(方法调用、父类方法调用)和快速生成代码(简单类创建、接口实现、类型扩展),我们理所应当要优先选择前者。从数据可以看出Byte Buddy在纳秒级的精度下,在方法调用和父类方法调用上和基线基本没有差距,而位于其后的是cglib。


Byte Buddy和cglib有较为出色的性能得益于它们底层都是基于ASM构建,如果将ASM也加入对比那么它的性能一定是最高的。但是用过ASM的同学虽然不一定能感受到它的高性能,但一定能感受到它噩梦般的开发体验:

mv.visitFieldInsn(GETSTATIC, "java/lang/System", "out", "Ljava/io/PrintStream;");mv.visitLdcInsn("begin of sayhello().");mv.visitMethodInsn(INVOKEVIRTUAL, "java/io/PrintStream", "println", "(Ljava/lang/String;)V", false);

Skywalking案例分析

介绍了这么多,下面结合Skywalking中使用Byte Buddy的案例和大家一起体验下字节码增强的开发过程,其中只简单介绍相关主流程代码,各种细节就不介绍了。

插件模型


Skywalking为开发者提供了简单易用的插件接口,对于开发者来说不需要知道怎么增强方法的字节码,只需要关心以下几点:

  • 要增强哪个类的哪个方法?


Skywalking提供了ClassMatch,支持各种类、方法的匹配方式。包括类名、前缀、正则、注解等方式的匹配,除此之外还提供了与、或、非逻辑链接,以支持用户通过各种方式精确定位到一个具体的方法。我们看一个插件中的代码:

这段逻辑表示需要增强不带annotation1注解,并且带有annotaion2注解或annotaion3注解的方法的字节码。ClassMatch通过Builder模式提供用户流式编程的方式,最终Skywalking会将用户提供的一串ClassMatch构建出一个内部使用的类匹配逻辑。

  • 需要添加/修改什么逻辑?


知道了需要增强哪个类的哪个方法,那下一步就是如何增强。Java中的方法可以分为静态方法、实例方法和构造方法三类方法,Skywalking对于这三种方法的增强逻辑为用户提供了不同的扩展点:

以实例方法为例,Skywalking提供了如下实例方法拦截器:

public interface InstanceMethodsAroundInterceptor     // 方法执行前置扩展点    void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,                      MethodInterceptResult result) throws Throwable;
    // 方法执行后置扩展点    Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,                       Object ret) throws Throwable;
    // 方法抛出异常时扩展点    void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,                               Class<?>[] argumentsTypes, Throwable t);

开发者通过实现该接口即可对一个实例方法进行逻辑扩展(字节码增强)。方法参数列表中的第一个类型为EnhancedInstance的参数其实就是当前对象(this),Skywalking中所有实例方法或构造方法被增强的类都会实现EnhancedInstance接口。


假设我们有一个Controller,里面只有一个sayHello方法返回"Hello",经过Skywalking增强后,反编译一下它被增强后的字节码文件:

可以看到:


1、Skywalking在其中插入了一个名为_$EnhancedClassField_ws的字段,开发者在某些场合可以合理利用该字段存储一些信息。比如存储Spring MVC中Controller的跟路径,或者Jedis、HttpClient链接中对端信息等。
2、原来的syHello方法名被修改了但仍保存下来,并且新生成了一个增强后的sayHello方法,静态代码块里将经过字节码增强后的sayHello方法存入缓存字段。


增强的前置条件是什么?


在某些时候,并不是只要引入了对应插件就一定会对相关的代码进行字节码增强。比如我们想对Spring MVC的Controller进行埋点,我们使用的是Spring 4.x版本,但是插件却是 5.x 版本的,如果直接对源码进行增强可能会因为版本的差别带来意料之外的问题。Skywalking提供了一种witness机制,简单来说就是当我们的代码中存在指定的类或方式时,当前插件才会进行字节码增强。比如Spring 4.x版本中需要witness这两个类:

如果粒度不够,还可以对方法进行witness。比如Elastic Search 6.x版本中witness了这个方法:

意思就是SearchHits类中必须有名为getTotalHits、参数列表为空并且返回long的方法。

除了上面的扩展点外,Skywalking还支持对jdk核心类库的字节码增强,比如对Callable和Runnable进行增强已支持异步模式下的埋点透传。这就需要和BootstrapClassLoader打交道了,Skywalking帮我们完成了这些复杂的逻辑。Skywalking Agent部分整体的模型如下图所示:

左侧SPI部分是Skywalking暴露的插件规范接口,开发者根据这些接口实现插件。右侧Core部分负责加载插件并且利用Byte Buddy提供的字节码增强逻辑对应用中指定类和方法的字节码进行增强。

主流程源码


介绍了Skywalking的插件模型后,下面从Javaagent的入口premain开始介绍下主要的流程:

上面的流程主要做了两件事:


1、从指定的目录加载所有插件到内存中;
2、构建Byte Buddy核心的AgentBuilder插桩到JVM的Instrumentation API上,包括需要增强哪些类以及核心的增强逻辑Transformer。

private static class Transformer implements AgentBuilder.Transformer   private PluginFinder pluginFinder;
  Transformer(PluginFinder pluginFinder)     this.pluginFinder = pluginFinder;  
  /**   * 这个方法在类加载的过程中会由JVM调用(Byte Buddy做了封装)   * @param builder       原始类的字节码构建器   * @param typeDescription  类描述信息   * @param classLoader      这个类的类加载器    * @param module         jdk9中模块信息    * @return           修改后的类的字节码构建器     */  @Override  public DynamicType.Builder<?> transform(final DynamicType.Builder<?> builder,                      final TypeDescription typeDescription,                      final ClassLoader classLoader,                      final JavaModule module)     LoadedLibraryCollector.registerURLClassLoader(classLoader);    // 根据类信息找到针对这个类进行字节码增强的插件,可能有多个    List<AbstractClassEnhancePluginDefine> pluginDefines = pluginFinder.find(typeDescription);    if (pluginDefines.size() > 0)       DynamicType.Builder<?> newBuilder = builder;      EnhanceContext context = new EnhanceContext();      for (AbstractClassEnhancePluginDefine define : pluginDefines)         // 调用插件的define方法得到新的字节码        DynamicType.Builder<?> possibleNewBuilder = define.define(          typeDescription, newBuilder, classLoader, context);        if (possibleNewBuilder != null)           newBuilder = possibleNewBuilder;                    // 返回增强后的字节码给JVM,完成字节码增强      return newBuilder;        return builder;  

JVM在类加载的时候会触发JVM内置事件,回调Transformer传入原始类的字节码、类加载器等信息,从而实现对字节码的增强。其中的AbstractClassEnhancePluginDefine就是一个插件的抽象。

public abstract class AbstractClassEnhancePluginDefine   public DynamicType.Builder<?> define(TypeDescription typeDescription, DynamicType.Builder<?> builder,                     ClassLoader classLoader, EnhanceContext context) throws PluginException     // witness机制    WitnessFinder finder = WitnessFinder.INSTANCE;
    //通过类加载器找witness类,没有就直接返回,不进行字节码的改造    String[] witnessClasses = witnessClasses();    if (witnessClasses != null)       for (String witnessClass : witnessClasses)         if (!finder.exist(witnessClass, classLoader))           return null;                  
    //通过类加载器找witness方法,没有就直接返回,不进行字节码的改造    List<WitnessMethod> witnessMethods = witnessMethods();    if (!CollectionUtil.isEmpty(witnessMethods))       for (WitnessMethod witnessMethod : witnessMethods)         if (!finder.exist(witnessMethod, classLoader))           return null;                  
    // enhance开始修改字节码    DynamicType.Builder<?> newClassBuilder = this.enhance(typeDescription, builder, classLoader, context);
    // 修改完成,返回新的字节码    context.initializationStageCompleted();    return newClassBuilder;  
  protected DynamicType.Builder<?> enhance(TypeDescription typeDescription, DynamicType.Builder<?> newClassBuilder,                       ClassLoader classLoader, EnhanceContext context) throws PluginException     // 增强静态方法    newClassBuilder = this.enhanceClass(typeDescription, newClassBuilder, classLoader);    // 增强实例方法& 构造方法    newClassBuilder = this.enhanceInstance(typeDescription, newClassBuilder, classLoader, context);    return newClassBuilder;         

通过witness机制检测满足条件后,对静态方法、实例方法和构造方法进行字节码增强。我们以实例方法和构造方法为例:

public abstract class ClassEnhancePluginDefine extends AbstractClassEnhancePluginDefine   protected DynamicType.Builder<?> enhanceInstance(TypeDescription typeDescription,                           DynamicType.Builder<?> newClassBuilder, ClassLoader classLoader,                           EnhanceContext context) throws PluginException     // 获取插件定义的构造方法拦截点ConstructorInterceptPoint    ConstructorInterceptPoint[] constructorInterceptPoints = getConstructorsInterceptPoints();    // 获取插件定义的实例方法拦截点InstanceMethodsInterceptPoint    InstanceMethodsInterceptPoint[] instanceMethodsInterceptPoints = getInstanceMethodsInterceptPoints();    String enhanceOriginClassName = typeDescription.getTypeName();    // 非空校验    boolean existedConstructorInterceptPoint = false;    if (constructorInterceptPoints != null && constructorInterceptPoints.length > 0)       existedConstructorInterceptPoint = true;        boolean existedMethodsInterceptPoints = false;    if (instanceMethodsInterceptPoints != null && instanceMethodsInterceptPoints.length > 0)       existedMethodsInterceptPoints = true;        if (!existedConstructorInterceptPoint && !existedMethodsInterceptPoints)       return newClassBuilder;    
    // 这里就是之前提到的让类实现EnhancedInstance接口,并添加_$EnhancedClassField_ws字段    if (!typeDescription.isAssignableTo(EnhancedInstance.class))       if (!context.isObjectExtended())         // Object类型、private volatie修饰符、提供方法进行访问        newClassBuilder = newClassBuilder.defineField(          "_$EnhancedClassField_ws", Object.class, ACC_PRIVATE | ACC_VOLATILE)          .implement(EnhancedInstance.class)          .intercept(FieldAccessor.ofField("_$EnhancedClassField_ws"));        context.extendObjectCompleted();          
    // 构造方法增强    if (existedConstructorInterceptPoint)       for (ConstructorInterceptPoint constructorInterceptPoint : constructorInterceptPoints)         // jdk核心类        if (isBootstrapInstrumentation())           newClassBuilder = newClassBuilder.constructor(constructorInterceptPoint.getConstructorMatcher())            .intercept(SuperMethodCall.INSTANCE.andThen(MethodDelegation.withDefaultConfiguration()                                  .to(BootstrapInstrumentBoost                                  .forInternalDelegateClass(constructorInterceptPoint                                                // 非jdk核心类                                                                                                         .getConstructorInterceptor()))));              else           // 找到对应的构造方法,并通过插件自定义的InstanceConstructorInterceptor进行增强          newClassBuilder = newClassBuilder.constructor(constructorInterceptPoint.getConstructorMatcher())            .intercept(SuperMethodCall.INSTANCE.andThen(MethodDelegation.withDefaultConfiguration()                                  .to(new ConstructorInter(constructorInterceptPoint                                               .getConstructorInterceptor(), classLoader))));                  
    // 实例方法增强    if (existedMethodsInterceptPoints)       for (InstanceMethodsInterceptPoint instanceMethodsInterceptPoint : instanceMethodsInterceptPoints)         // 找到插件自定义的实例方法拦截器InstanceMethodsAroundInterceptor        String interceptor = instanceMethodsInterceptPoint.getMethodsInterceptor();        // 这里在插件自定义的匹配条件上加了一个【不为静态方法】的条件        ElementMatcher.Junction<MethodDescription> junction = not(isStatic()).and(instanceMethodsInterceptPoint.getMethodsMatcher());
        // 需要重写入参        if (instanceMethodsInterceptPoint.isOverrideArgs())           // jdk核心类          if (isBootstrapInstrumentation())             newClassBuilder = newClassBuilder.method(junction)              .intercept(MethodDelegation.withDefaultConfiguration()                     .withBinders(Morph.Binder.install(OverrideCallable.class))                     .to(BootstrapInstrumentBoost.forInternalDelegateClass(interceptor)));            // 非jdk核心类            else               newClassBuilder = newClassBuilder.method(junction)                .intercept(MethodDelegation.withDefaultConfiguration()                       .withBinders(Morph.Binder.install(OverrideCallable.class))                       .to(new InstMethodsInterWithOverrideArgs(interceptor, classLoader)));                  // 不需要重写入参         else           // jdk核心类              if (isBootstrapInstrumentation())               newClassBuilder = newClassBuilder.method(junction)                .intercept(MethodDelegation.withDefaultConfiguration()                       .to(BootstrapInstrumentBoost.forInternalDelegateClass(interceptor)));          // 非jdk核心类                                                                              else             // 找到对应的实例方法,并通过插件自定义的InstanceMethodsAroundInterceptor进行增强            newClassBuilder = newClassBuilder.method(junction)                .intercept(MethodDelegation.withDefaultConfiguration()                       .to(new InstMethodsInter(interceptor, classLoader)));                            
    return newClassBuilder;          

根据是否要重写入参、是否是核心类走到不同的逻辑分支,大致的增强逻辑大差不差,就是根据用户自定义的插件找到需要增强的方法和增强逻辑,利用Byte Buddy类库进行增强。


用户通过方法拦截器实现增强逻辑,但是它是面向用户的,并不能直接用来进行字节码增强,Skywalking加了一个中间层来连接用户逻辑和Byte Buddy类库。上述代码中的XXXInter便是中间层,比如针对实例方法的InstMethodsInter:

InstMethodsInter封装用户自定义的逻辑,并且对接ByteBuddy的核心类库,当执行到被字节码增强的方法时会执行InstMethodsInter的intercept方法(可以和上面反编译被增强后类的字节码文件进行对比):

public class InstMethodsInter     private static final ILog LOGGER = LogManager.getLogger(InstMethodsInter.class);
    // 用户在插件中定义的实例方法拦截器    private InstanceMethodsAroundInterceptor interceptor;
    public InstMethodsInter(String instanceMethodsAroundInterceptorClassName, ClassLoader classLoader)         try             // 加载用户在插件中定义的实例方法拦截器            interceptor = InterceptorInstanceLoader.load(instanceMethodsAroundInterceptorClassName, classLoader);         catch (Throwable t)             throw new PluginException("Can't create InstanceMethodsAroundInterceptor.", t);            
    /**     * 当执行被增强方法时,会执行该intercept方法     *     * @param obj          实例对象(this)     * @param allArguments 方法入参     * @param method       参数描述     * @param zuper        原方法调用的句柄    *  @param method       被增强后的方法的引用      * @return             方法返回值     */    @RuntimeType    public Object intercept(@This Object obj, @AllArguments Object[] allArguments, @SuperCall Callable<?> zuper,        @Origin Method method) throws Throwable         EnhancedInstance targetObject = (EnhancedInstance) obj;
        MethodInterceptResult result = new MethodInterceptResult();        try             // 拦截器前置逻辑            interceptor.beforeMethod(targetObject, method, allArguments, method.getParameterTypes(), result);         catch (Throwable t)             LOGGER.error(t, "class[] before method[] intercept failure", obj.getClass(), method.getName());        
        Object ret = null;        try             // 是否中断方法执行            if (!result.isContinue())                 ret = result._ret();             else                 // 执行原方法                ret = zuper.call();                // 为什么不能走method.invoke?因为method已经是被增强后方法,调用就死循环了!                // 可以回到之前的字节码文件查看原因,看一下该intercept执行的时机                     catch (Throwable t)             try                  // 拦截器异常时逻辑                interceptor.handleMethodException(targetObject, method, allArguments, method.getParameterTypes(), t);             catch (Throwable t2)                 LOGGER.error(t2, "class[] handle method[] exception failure", obj.getClass(), method.getName());                        throw t;         finally             try                 // 拦截器后置逻辑                ret = interceptor.afterMethod(targetObject, method, allArguments, method.getParameterTypes(), ret);             catch (Throwable t)                 LOGGER.error(t, "class[] after method[] intercept failure", obj.getClass(), method.getName());                            return ret;    

上述逻辑其实就是下图中红框中的逻辑:

Byte Buddy提供了声明式方式,通过几个注解就可以实现字节码增强逻辑。

数据收集

下一步就是将收集到的Trace数据发送到服务端。为了将对主链路的影响降到最小,一般都采用先存本地、再异步采集的方式。Skywalking和Eagleeye的实现有所不同,我们分别介绍:

存储

Eagleeye


鹰眼采用并发环形队列存储Trace数据,如下图所示:

环形队列在很多日志框架的异步写入过程中很常见,其中主要包括读指针take,指向队列中的最后一条数据;写指针put,指向队列中下一个数据将存放的位置,并且支持原子读、写数据。take和put指针朝一个时钟方向移动,当生产数据的速度超过消费速度时,会出现put指针“追上”take指针的情况(套圈),此时根据不同的策略可以丢弃即将写入的数据或将老数据覆盖。

Skywalking


Skywalking在实现上有所区别,采用分区的QueueBuffer存储Trace数据,多个消费线程通过Driver平均分配到各个QueueBuffer上进行数据消费:

QueueBuffer有两种实现,除了基于JDK的阻塞队列外,还有一种普通数组+原子下标的方式。Skywalking对于这两种实现有不同的使用场景:基于JDK阻塞队列的实现用在服务端,而普通数组+原子下标的方式用在Agent端,因为后者更加轻量,性能更高。对于后者这里介绍一下其中比较有趣的地方。


有趣的原子下标


普通的Oject数组是无法支持并发的,但只要保证每个线程获取下标的过程是原子的,即可保证数组的线程安全。这需要保证:


1、多线程获取的下标是依次递增的,从0开始到数组容量-1;
2、当某个线程获取的下标超过数组容量,需要从0开始重新获取。

这其实并不难实现,通过一个原子数和取模操作一行代码就能完成上面的两个功能。但我们看Skywalking是如何实现这个功能的:

// 提供原子下标的类public class AtomicRangeInteger     // JDK提供的原子数组    private AtomicIntegerArray values;
    // 固定值15    private static final int VALUE_OFFSET = 15;
    // 数组开始下标,固定为0    private int startValue;
    // 数组最后一个元素的下标,固定为数组的最大长度-1    private int endValue;
    public AtomicRangeInteger(int startValue, int maxValue)         // 创建一个长度为31的原子数组        this.values = new AtomicIntegerArray(31);        // 将第15位设置为初始值0        this.values.set(VALUE_OFFSET, startValue);        this.startValue = startValue;        this.endValue = maxValue - 1;    
    // 核心方法,获取数组的下一个下标    public final int getAndIncrement()         int next;        do             // 原子递增            next = this.values.incrementAndGet(VALUE_OFFSET);            // 如果超过了数组范围,CAS重制到0            if (next > endValue && this.values.compareAndSet(VALUE_OFFSET, next, startValue))                 return endValue;                     while (next > endValue);        return next - 1;    

Skywalking用了一个长度固定为31的JDK原子数组的固定第15位进行相关原子操作,JDK8中的原子数组利用Unsafe通过偏移量直接对数组中的元素进行内存操作,那为什么要这么做呢?我们先将其称为V1版本,再来看看V2版本,这是Skywalking早期版本使用的代码:

public class AtomicRangeInteger 
    private AtomicInteger value;    private int startValue;    private int endValue;
    public AtomicRangeInteger(int startValue, int maxValue)         this.value = new AtomicInteger(startValue);        this.startValue = startValue;        this.endValue = maxValue - 1;    
    public final int getAndIncrement()         int current;        int next;        do             // 获取当前下标            current = this.value.get();            // 如果超过最大范围则从0开始            next = current >= this.endValue ? this.startValue : current + 1;            // CAS更新下标,失败则循环重试         while (!this.value.compareAndSet(current, next));
        return current;        

肉眼可见这段V2版本的代码逻辑不如V1版本,因为在V2中获取当前值和CAS更新这两个步骤是分开的,并不具备原子性,因此并发冲突的可能性更高,从而导致循环次数增加;而使用JDK提供的incrementAndGet方法效率更高。再看下V3版本:

public class AtomicRangeInteger extends Number implements Serializable     // 用原子整型替代V1版本的原子数组    private AtomicInteger value;    private int startValue;    private int endValue;
    public AtomicRangeInteger(int startValue, int maxValue)         this.value = new AtomicInteger(startValue);        this.startValue = startValue;        this.endValue = maxValue - 1;    
    public final int getAndIncrement()         int next;        do             next = this.value.incrementAndGet();            if (next > endValue && this.value.compareAndSet(next, startValue))                 return endValue;                            while (next > endValue);        return next - 1;    

这个版本唯一的区别就是使用AtomicInteger代替原来的AtomicIntegerArray的第15位。还有最后一个最简单的V4版本,通过一个原子数和取模操作完成:

public class AtomicRangeInteger 
    private AtomicLong value;    private int mask;
    public AtomicRangeInteger(int startValue, int maxValue)         this.value = new AtomicLong(startValue);        this.mask = maxValue - 1;    
    public final int getAndIncrement()         return (int)(value.incrementAndGet() % mask);    

通过Benchmark压测数据来看看这几个版本的性能有什么差别,固定128线程,3轮预热、5轮正式,每轮10s。

  • Skywalking官方数据(数组大小100):

版本

得分

描述

V1

45832615.061 ± 2987464.163  ops/s

原子数组第15位操作

V2

13496720.554 ±  240134.803  ops/s

老版本

V3

39201251.850 ± 1005866.969  ops/s

原子整数代替原子数组第15位

  • 自己在mac上测试的数据(数组大小100):

版本

得分

描述

V1

37368086.272 ± 2702764.084  ops/s

原子数组第15位操作

V2

8066661.954 ± 1165851.129  ops/s

老版本

V3

26124150.437 ±  684039.516  ops/s

原子整数代替原子数组第15位

V4

51063216.834 ± 7775168.064 ops/s

原子数取模

  • 自己在mac上测试的数据(数组大小128):

版本

得分

描述

V1

29452469.035 ± 1853738.513  ops/s

原子数组第15位操作

V2

7998178.059 ±  148894.535  ops/s

老版本

V3

39011356.081 ± 3603737.004  ops/s

原子整数代替原子数组第15位

V4

61012525.493 ± 6054137.447  ops/s

原子数取模

Skywalking官方显示通过原子数组的固定第15位操作的V1版本表现最好,而在我自己本机环境测试中V3版本通过原子整数代替的方式和V1版本有高有低,而原子数取模的性能是最高的。个人猜测Skywalking通过原子数组的固定第15位操作是为了进行缓存填充,测试结果和环境有比较大的关系;而不使用原子数取模的原因是原子数的大小会无限递增。

传输

最后一步就是数据的传输,如下图所示:

Skywalking提供了GRPC和Kafka两种数据传输方式,而鹰眼则先将数据存入本地日志中,再通过agent将数据采集到服务端。和Skywalking相比,用户可以直接在机器上查看trace日志,而Skywalking提供了日志插件以提供可插拔的本地trace存储功能。

从整体上来看,Skywalking采取了埋点和中间件代码分离的方式,在某种意义上实现了应用级透明,但是在后期维护的过程中中间件版本的升级需要配合插件版本的升级,在维护方面带来了一些问题。而Eagleeye编码方式的埋点由中间件团队维护,对于上层的应用也是透明的,更加适合阿里集团内部的环境。

 

pinpoint全链路监控环境搭建(代码片段)

...formanceManagement/应用性能管理)工具,用于基于java的大规模分布式系统,基于GoogleDapper论文。仿照GoogleDapper,Pinpoint通过跟踪分布式应用之间的调用来提供解决方案,以帮助分析系统的总体结构和内部模块之间如何相互联系。如今的... 查看详情

分布式链路监控与追踪系统(代码片段)

1.分布式链路监控与追踪产生背景2.SpringCloudSleuth+Zipkin3.分布式服务追踪实现原理4.搭建Zipkin服务追踪系统5.搭建Zipkin集成RabbitMQ异步传输6.SpringCloud2.x新知识介绍7.发布SpringCloud2.0x百级完整超清视频教程含源码 分布式链路监控与... 查看详情

一文看懂分布式链路监控系统(代码片段)

背景传统的大型单体系统随着业务体量的增大已经很难满足市场对技术的需求,通过对将整块业务系统拆分为多个互联依赖的子系统并针对子系统进行独立优化,能够有效提升整个系统的吞吐量。在进行系统拆分之后ÿ... 查看详情

「开源摘星计划」jaeger实现harbor的链路监控(代码片段)

前言1.1Harbor新功能-分布式跟踪  Harbor在2.4.0版本后新增一个功能:向Harbor添加跟踪功能,以增强故障排除、识别性能瓶颈等。目前Harbor支持Jaeger和OpenTelemetry(简称otel)两种实现方式。1.2Jaeger简单介绍  Jaeger是一个... 查看详情

分布式链路监控系统(代码片段)

...据模型、数据埋点以及数据存储三个方面介绍分布式链路监控系统的实现细节,其中将重点介绍Skywalking字节码增强的实现方案。背景传统的大型单体系统随着业务体量的增大已经很难满足市场对技术的需求,通过对将整... 查看详情

聊聊分布式链路追踪(代码片段)

...p://lidawn.github.io/2018/12/26/distribute-tracing/起因最近一直在做分布式链路追踪的调研和实践,整理一下其中的知识点。什么是链路追踪分布式系统变得日趋复杂,越来越多的组件开始走向分布式化,如微服务、分布式数据库、分布式... 查看详情

rabbitmq+sleuth+zinkip分布式链路追踪(代码片段)

...务架构系统中,几乎每一个前端请求都会形成一个复杂的分布式服务调用链路,在每条链路中任何一个依赖服务出现延迟超时或者错误都有可能引起整个请求最后的失败。当业务流程足够复杂时,一个完整的HTTP请求调用链一般... 查看详情

分布式链路追踪(sleuthzipkin)(代码片段)

...就需要解决一个问题,如何快速定位服务故障点,于是,分布式系统调用链追踪技术就此诞生了。ZipKinZipkin是一个由Twitter公司提供并开放源代码分布式的跟踪系统,它可以帮助收集服务的时间数据,以解决微服务架构中的延迟... 查看详情

全链路参数透传(代码片段)

简单分布式架构图示例:背景现在的系统大多是基于SOA/MSA的分布式应用。跨服务调用的时候,存在线程上线文里的重要信息如traceId,租户id无法传递给下游应用。要解决的问题内部系统通过RocketMQ交互时,traceId如何透... 查看详情

全链路参数透传(代码片段)

简单分布式架构图示例:背景现在的系统大多是基于SOA/MSA的分布式应用。跨服务调用的时候,存在线程上线文里的重要信息如traceId,租户id无法传递给下游应用。要解决的问题内部系统通过RocketMQ交互时,traceId如何透... 查看详情

谁说cat不能做链路跟踪的,给我站出来(代码片段)

...skywalking,jaeger等。基本上都是根据谷歌的《Dapper大规模分布式系统的跟踪系统》这篇论文发展出来的。今天讲下Cat里的链路跟踪要如何来实现,没用过Cat的同学可以查看我的这篇文章《熬夜之作:一文带你了解Cat分布式监控》... 查看详情

使用skywalking+elasticsearch实现全链路监控(代码片段)

...以便发生故障的时候,能够快速定位和解决问题。这时候分布式追踪系统就该闪亮登场了。一、分布式追踪系统skywalking1、什么是分布式追踪?上图是常见的微服务的框架,4个实例,2个MySQL、1个Redis。实际上它有两次完全不同的... 查看详情

快速了解分布式跟踪系统zipkin(代码片段)

...述FROMhttps://zipkin.io/Zipkinisadistributedtracingsystem.Zipkin是一个分布式链路追踪系统。Ithelpsgathertimingdataneededtotroubleshootlatencyproblemsinservicearchitectures.它可以收集分布式场景下的调用链路数据,用于解决服务之间延迟问题。Featuresinclud... 查看详情

全链路参数透传(代码片段)

背景现在的系统大多是基于SOA的分布式应用。跨服务调用的时候,存在线程上线文里的重要信息如traceId,租户id无法传递给下一个应用。要解决的问题内部系统通过RocketMQ交互时,traceId如何透传到消息消费应用内部系统通... 查看详情

sleuth+zipkin服务链路追踪(代码片段)

...-cloud-sleuth.html一、为什么要用链路追踪微服务架构是一个分布式架构,它按业务划分服务单元,一个分布式系统往往有很多个服务单元。由于服务单元数量众多& 查看详情

sleuth+zipkin服务链路追踪(代码片段)

...-cloud-sleuth.html一、为什么要用链路追踪微服务架构是一个分布式架构,它按业务划分服务单元,一个分布式系统往往有很多个服务单元。由于服务单元数量众多& 查看详情

利用asp.netcore中的标头传播实现分布式链路跟踪(代码片段)

在此之前,我曾写过一篇博客,《Envoy集成Jaeger实现分布式链路跟踪》,主要分享了ASP.NETCore应用如何结合Envoy和Jeager来实现分布式链路跟踪,其核心思想是生成一个全局唯一的x-request-id,并在不同的微服务或者子系统中传播该信... 查看详情

利用asp.netcore中的标头传播实现分布式链路跟踪(代码片段)

在此之前,我曾写过一篇博客,《Envoy集成Jaeger实现分布式链路跟踪》,主要分享了ASP.NETCore应用如何结合Envoy和Jeager来实现分布式链路跟踪,其核心思想是生成一个全局唯一的x-request-id,并在不同的微服务或者子系统中传播该信... 查看详情