elasticsearch源码更新性能分析(代码片段)

衣舞晨风 衣舞晨风     2023-03-14     410

关键词:

带着疑问学源码,第三篇:Elasticsearch 更新性能
代码分析基于:https://github.com/jiankunking/elasticsearch
Elasticsearch 7.10.2+

目的

在看源码之前先梳理一下,自己对于更新疑惑的点:
为什么Elasticsearch更新与写入的性能会有比较大的差异?

源码分析

建议先看一下:【Elasticsearch源码】 写入分析

【Elasticsearch源码】 写入分析中可以看到bulk请求最终在TransportShardBulkAction doRun()中执行的时候,还是通过一个循环,一个一个处理的,并没有什么神奇之处。

下面看一下具体执行的代码executeBulkItemRequest doRun()

     /**
     * Executes bulk item requests and handles request execution exceptions.
     * @return @code true if request completed on this thread and the listener was invoked, @code false if the request triggered
     *                      a mapping update that will finish and invoke the listener on a different thread
     */
    static boolean executeBulkItemRequest(BulkPrimaryExecutionContext context, UpdateHelper updateHelper, LongSupplier nowInMillisSupplier,
                                       MappingUpdatePerformer mappingUpdater, Consumer<ActionListener<Void>> waitForMappingUpdate,
                                       ActionListener<Void> itemDoneListener) throws Exception 
        final DocWriteRequest.OpType opType = context.getCurrent().opType();

        final UpdateHelper.Result updateResult;
        if (opType == DocWriteRequest.OpType.UPDATE) 
            final UpdateRequest updateRequest = (UpdateRequest) context.getCurrent();
            try 
                // 
                updateResult = updateHelper.prepare(updateRequest, context.getPrimary(), nowInMillisSupplier);
             catch (Exception failure) 
                // we may fail translating a update to index or delete operation
                // we use index result to communicate failure while translating update request
                final Engine.Result result =
                    new Engine.IndexResult(failure, updateRequest.version());
                context.setRequestToExecute(updateRequest);
                context.markOperationAsExecuted(result);
                context.markAsCompleted(context.getExecutionResult());
                return true;
            
            // execute translated update request
            switch (updateResult.getResponseResult()) 
                case CREATED:
                case UPDATED:
                    IndexRequest indexRequest = updateResult.action();
                    IndexMetadata metadata = context.getPrimary().indexSettings().getIndexMetadata();
                    MappingMetadata mappingMd = metadata.mapping();
                    indexRequest.process(metadata.getCreationVersion(), mappingMd, updateRequest.concreteIndex());
                    context.setRequestToExecute(indexRequest);
                    break;
                case DELETED:
                    context.setRequestToExecute(updateResult.action());
                    break;
                case NOOP:
                    context.markOperationAsNoOp(updateResult.action());
                    context.markAsCompleted(context.getExecutionResult());
                    return true;
                default:
                    throw new IllegalStateException("Illegal update operation " + updateResult.getResponseResult());
            
         else 
            context.setRequestToExecute(context.getCurrent());
            updateResult = null;
        

        assert context.getRequestToExecute() != null; // also checks that we're in TRANSLATED state

        final IndexShard primary = context.getPrimary();
        final long version = context.getRequestToExecute().version();
        final boolean isDelete = context.getRequestToExecute().opType() == DocWriteRequest.OpType.DELETE;
        final Engine.Result result;
        if (isDelete) 
            final DeleteRequest request = context.getRequestToExecute();
            result = primary.applyDeleteOperationOnPrimary(version, request.id(), request.versionType(),
                request.ifSeqNo(), request.ifPrimaryTerm());
         else 
            final IndexRequest request = context.getRequestToExecute();
            result = primary.applyIndexOperationOnPrimary(version, request.versionType(), new SourceToParse(
                    request.index(), request.id(), request.source(), request.getContentType(), request.routing()),
                    request.ifSeqNo(), request.ifPrimaryTerm(), request.getAutoGeneratedTimestamp(), request.isRetry());
        
        if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) 

            try 
                primary.mapperService().merge(MapperService.SINGLE_MAPPING_NAME,
                    new CompressedXContent(result.getRequiredMappingUpdate(), XContentType.JSON, ToXContent.EMPTY_PARAMS),
                    MapperService.MergeReason.MAPPING_UPDATE_PREFLIGHT);
             catch (Exception e) 
                logger.info(() -> new ParameterizedMessage(" mapping update rejected by primary", primary.shardId()), e);
                onComplete(exceptionToResult(e, primary, isDelete, version), context, updateResult);
                return true;
            

            mappingUpdater.updateMappings(result.getRequiredMappingUpdate(), primary.shardId(),
                new ActionListener<>() 
                    @Override
                    public void onResponse(Void v) 
                        context.markAsRequiringMappingUpdate();
                        waitForMappingUpdate.accept(
                            ActionListener.runAfter(new ActionListener<>() 
                                @Override
                                public void onResponse(Void v) 
                                    assert context.requiresWaitingForMappingUpdate();
                                    context.resetForExecutionForRetry();
                                

                                @Override
                                public void onFailure(Exception e) 
                                    context.failOnMappingUpdate(e);
                                
                            , () -> itemDoneListener.onResponse(null))
                        );
                    

                    @Override
                    public void onFailure(Exception e) 
                        onComplete(exceptionToResult(e, primary, isDelete, version), context, updateResult);
                        // Requesting mapping update failed, so we don't have to wait for a cluster state update
                        assert context.isInitial();
                        itemDoneListener.onResponse(null);
                    
                );
            return false;
         else 
            onComplete(result, context, updateResult);
        
        return true;
    

    /**
     * Prepares an update request by converting it into an index or delete request or an update response (no action).
     */
    public Result prepare(UpdateRequest request, IndexShard indexShard, LongSupplier nowInMillis) 
        // 这里是实时获取
        // 获取结果最终会到InternalEngine 
        // get(Get get, DocumentMapper mapper, Function<Engine.Searcher, Engine.Searcher> searcherWrapper)
        // 后面会附上 代码
        final GetResult getResult = indexShard.getService().getForUpdate(
            request.id(), request.ifSeqNo(), request.ifPrimaryTerm());
        return prepare(indexShard.shardId(), request, getResult, nowInMillis);
    

    public GetResult getForUpdate(String id, long ifSeqNo, long ifPrimaryTerm) 
        // realtime是true
        return get(id, new String[]RoutingFieldMapper.NAME, true,
            Versions.MATCH_ANY, VersionType.INTERNAL, ifSeqNo, ifPrimaryTerm, FetchSourceContext.FETCH_SOURCE);
    

    private GetResult get(String id, String[] gFields, boolean realtime, long version, VersionType versionType,
                          long ifSeqNo, long ifPrimaryTerm, FetchSourceContext fetchSourceContext) 
        currentMetric.inc();
        try 
            long now = System.nanoTime();
            GetResult getResult =
                innerGet(id, gFields, realtime, version, versionType, ifSeqNo, ifPrimaryTerm, fetchSourceContext);

            if (getResult.isExists()) 
                existsMetric.inc(System.nanoTime() - now);
             else 
                missingMetric.inc(System.nanoTime() - now);
            
            return getResult;
         finally 
            currentMetric.dec();
        
    

    private GetResult innerGet(String id, String[] gFields, boolean realtime, long version, VersionType versionType,
                               long ifSeqNo, long ifPrimaryTerm, FetchSourceContext fetchSourceContext) 
        fetchSourceContext = normalizeFetchSourceContent(fetchSourceContext, gFields);

        Engine.GetResult get = indexShard.get(new Engine.Get(realtime, realtime, id)
            .version(version).versionType(versionType).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm));
        assert get.isFromTranslog() == false || realtime : "should only read from translog if realtime enabled";
        if (get.exists() == false) 
            get.close();
        

        if (get == null || get.exists() == false) 
            return new GetResult(shardId.getIndexName(), id, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, -1, false, null, null, null);
        

        try 
            // break between having loaded it from translog (so we only have _source), and having a document to load
            return innerGetLoadFromStoredFields(id, gFields, fetchSourceContext, get, mapperService);
         finally 
            get.close();
        
    

    public Engine.GetResult get(Engine.Get get) 
        readAllowed();
        DocumentMapper mapper = mapperService.documentMapper();
        if (mapper == null) 
            return GetResult.NOT_EXISTS;
        
        return getEngine().get(get, mapper, this::wrapSearcher);
    

     /**
     * Prepares an update request by converting it into an index or delete request or an update response (no action, in the event of a
     * noop).
     */
    protected Result prepare(ShardId shardId, UpdateRequest request, final GetResult getResult, LongSupplier nowInMillis) 
        if (getResult.isExists() == false) 
            // If the document didn't exist, execute the update request as an upsert
            return prepareUpsert(shardId, request, getResult, nowInMillis);
         else if (getResult.internalSourceRef() == null) 
            // no source, we can't do anything, throw a failure...
            throw new DocumentSourceMissingException(shardId, request.id());
         else if (request.script() == null && request.doc() != null) 
            // The request has no script, it is a new doc that should be merged with the old document
            return prepareUpdateIndexRequest(shardId, request, getResult, request.detectNoop());
         else 
            // The request has a script (or empty script), execute the script and prepare a new index request
            return prepareUpdateScriptRequest(shardId, request, getResult, nowInMillis);
        
    

其中,prepare在org/elasticsearch/action/update/UpdateHelper.java 中。

从代码中可以看到更新逻辑分两步:

  • 获取待更新文档的数据
  • 执行更新文档的操作

第1步最终会调用InternalEngine中的get方法。代码如下:

    @Override
    public GetResult get(Get get, DocumentMapper mapper, Function<Engine.Searcher, Engine.Searcher> searcherWrapper) 
        assert Objects.equals(get.uid().field(), IdFieldMapper.NAME) : get.uid().field();
        try (ReleasableLock ignored = readLock.acquire()) 
            ensureOpen();
            // 是否实时获取
            if (get.realtime()) 
                final VersionValue versionValue;
                try (Releasable ignore = versionMap.acquireLock(get.uid().bytes())) 
                    // we need to lock here to access the version map to do this truly in RT
                    versionValue = getVersionFromMap(get.uid().bytes());
                
                if (versionValue != null) 
                    if (versionValue.isDelete()) 
                        return GetResult.NOT_EXISTS;
                    
                    if (get.versionType().isVersionConflictForReads(versionValue.version, get.version())) 
                        throw new VersionConflictEngineException(shardId, get.id(),
                            get.versionType().explainConflictForReads(versionValue.version, get.version()));
                    
                    if (get.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && (
                        get.getIfSeqNo() != versionValue.seqNo || get.getIfPrimaryTerm() != versionValue.term
                        )) 
                        throw new VersionConflictEngineException(shardId, get.id(),
                            get.getIfSeqNo(), get.getIfPrimaryTerm(), versionValue.seqNo, versionValue.term);
                    
                    // 是否从Translog获取
                    if (get.isReadFromTranslog()) 
                        // this is only used for updates - API _GET calls will always read form a reader for consistency
                        // the update call doesn't need the consistency since it's source only + _parent but parent can go away in 7.0
                        if (versionValue.getLocation() != null) 
                            try 
                                final Translog.Operation operation = translog.readOperation(versionValue.getLocation());
                                if (operation != null) 
                                    return getFromTranslog(get, (Translog.Index) operation, mapper, searcherWrapper);
                                
                             catch (IOException e) 
                                maybeFailEngine("realtime_get", e); // lets check if the translog has failed with a tragic event
                                throw new EngineException(shardId, "failed to read operation from translog", e);
                            
                         else 
                            trackTranslogLocation.set(true);
                        
                    
                    assert versionValue.seqNo >= 0 : versionValue;
                    refreshIfNeeded("realtime_get", versionValue.seqNo);
                
                return getFromSearcher(get, acquireSearcher("realtime_get", SearcherScope.INTERNAL, searcherWrapper));
             else 
                // we expose what has been externally expose in a point in time snapshot via an explicit refresh
                return getFromSearcher(get, acquireSearcher("get", SearcherScope.EXTERNAL, searcherWrapper));
            
        
    

总结

update操作需要先获取原始文档,如果查询不到,会新增;如果存在,会根据原始文档更新。

虽然更新操作最终调用的方法也是InternalEngine中的index,但在更新时调用lucene softUpdateDocuments,会包含两个操作:标记删除、新增。

相对于新增而言:

  • 多了一次完整的查询(为了保证一致性,update调用GET时将realtime选项设置为true,并且不可配置。因此update操作可能会导致refresh生成新的Lucene分段。)
  • 多了一个标记删除

如果数据量比较大,操作又比较频繁的情况下,update这种操作还是要慎重。

elasticsearch源码写入分析(代码片段)

带着疑问学源码,第一篇:Elasticsearch写入代码分析基于:https://github.com/jiankunking/elasticsearchElasticsearch7.10.2+目的在看源码之前先梳理一下,自己对于写入流程疑惑的点:Elasticsearch写入是等待所有副本都写入... 查看详情

elasticsearch源码节点关闭分析(代码片段)

带着疑问学源码,第六篇:Elasticsearch节点关闭分析代码分析基于:https://github.com/jiankunking/elasticsearchElasticsearch7.10.2+目的在看源码之前先梳理一下,自己对于节点关闭流程疑惑的点:节点关闭都做了哪些检... 查看详情

elasticsearch源码检索分析(代码片段)

带着疑问学源码,第二篇:Elasticsearch搜索代码分析基于:https://github.com/jiankunking/elasticsearchElasticsearch7.10.2+目的在看源码之前先梳理一下,自己对于检索流程疑惑的点:当索引是按照日期拆分之后,在使... 查看详情

elasticsearch源码节点启动分析(代码片段)

带着疑问学源码,第五篇:Elasticsearch节点启动分析代码分析基于:https://github.com/jiankunking/elasticsearchElasticsearch7.10.2+目的在看源码之前先梳理一下,自己对于节点启动流程疑惑的点:节点启动都做了哪些检... 查看详情

elasticsearch源码get分析(代码片段)

带着疑问学源码,第四篇:ElasticsearchGET代码分析基于:https://github.com/jiankunking/elasticsearchElasticsearch7.10.2+通过前3篇的学习,可以稍微总结一下Elasticsearch:ES是一个集群,所以每个Node都需要和其他的N 查看详情

elasticsearch6源码分析cluster模块(代码片段)

1.cluser概述Oneofthemainrolesofthemasteristodecidewhichshardstoallocatetowhichnodes,andwhentomoveshardsbetweennodesinordertorebalancethecluster.2.ClusterModule模块的作用Configuresclassesandservicesthataffect 查看详情

elasticsearch启动时加载analyzer源码分析(代码片段)

ElasticSearch启动时加载Analyzer源码分析本文介绍ElasticSearch启动时如何创建、加载Analyzer,主要的参考资料是Lucene中关于Analyzer官方文档介绍、ElasticSearch6.3.2源码中相关类:AnalysisModule、AnalysisPlugin、AnalyzerProvider、各种Tokenizer类和它... 查看详情

threadx内核源码分析-定时器及线程时间片调度(arm)(代码片段)

1、线程时间片介绍(tx_thread_time_slice)ThreadX内核同优先级线程之间是按时间片调度的,tx_thread_new_time_slice记录线程的时间片(一次调度的总的时间片),tx_thread_time_slice记录线程的剩余时间片(ThreadX内核每次调度线程时,并... 查看详情

elasticsearch源码检索分析(代码片段)

带着疑问学源码,第二篇:Elasticsearch搜索代码分析基于:https://github.com/jiankunking/elasticsearchElasticsearch7.10.2+目的在看源码之前先梳理一下,自己对于检索流程疑惑的点:当索引是按照日期拆分之后,在使... 查看详情

《elasticsearch源码解析与优化实战》第17章:shrink原理分析(代码片段)

...数据】一、简介官方文档:https://www.elastic.co/guide/en/elasticsearch/reference/master/indices- 查看详情

《elasticsearch源码解析与优化实战》第17章:shrink原理分析(代码片段)

...数据】一、简介官方文档:https://www.elastic.co/guide/en/elasticsearch/reference/master/indices- 查看详情

elasticsearch6源码分析actionmodule(代码片段)

1.ActionModule概述/***Buildsandbindsthegenericactionmap,all@linkTransportActions,and@linkActionFilters.*/1.1创建TransportActionstaticMap<String,ActionHandler<?,?>>setupActions(List<ActionPlugin>actionPlugins)//SubclassNamedRegistryforeasyregistrationclassActionRegistryextendsNa... 查看详情

elasticsearch6源码分析http和transport模块(代码片段)

1.http模块概述ThehttpmoduleallowstoexposeElasticsearchAPIsoverHTTP.Thehttpmechanismiscompletelyasynchronousinnature,meaningthatthereisnoblockingthreadwaitingforaresponse.ThebenefitofusingasynchronouscommunicationforHTTPissolvingtheC10kproblem.Whenpossible,considerusingHTTPkeepalivewhenconnecting... 查看详情

elasticsearch线程池类型分析之sizeblockingqueue(代码片段)

ElasticSearch线程池类型分析之SizeBlockingQueue尽管前面写好几篇ES线程池分析的文章(见文末参考链接),但都不太满意。但从ES的线程池中了解到了不少JAVA线程池的使用技巧,于是忍不住再写一篇(ES6.3.2版本的源码)。文中给出的... 查看详情

《elasticsearch源码解析与优化实战》第12章:allocation模型分析(代码片段)

文章目录一、简介1.1、什么是allocation1.2、触发Allocation1.3、Allocation模块结构概述1.4、Allocators二、Deciders2.1、负载均衡类2.2、并发控制类2.3、条件限制类三、核心reroute实现3.1、集群启动时reroute的触发时机3.2、流程分析3.3、GatewayAll... 查看详情

《elasticsearch源码解析与优化实战》第12章:allocation模型分析(代码片段)

文章目录一、简介1.1、什么是allocation1.2、触发Allocation1.3、Allocation模块结构概述1.4、Allocators二、Deciders2.1、负载均衡类2.2、并发控制类2.3、条件限制类三、核心reroute实现3.1、集群启动时reroute的触发时机3.2、流程分析3.3、GatewayAll... 查看详情

《elasticsearch源码解析与优化实战》第11章:gateway模块分析(代码片段)

文章目录一、gateway模块分析1.1、简介1.2、元数据1.3、元数据的持久化1.4、元数据的恢复1.5、元数据恢复流程分析1.5.1、选举集群级和索引级别的元数据1.6、触发allocation1.7、思考关注我的公众号【宝哥大数据】,更多干货一、... 查看详情

《elasticsearch源码解析与优化实战》第11章:gateway模块分析(代码片段)

文章目录一、gateway模块分析1.1、简介1.2、元数据1.3、元数据的持久化1.4、元数据的恢复1.5、元数据恢复流程分析1.5.1、选举集群级和索引级别的元数据1.6、触发allocation1.7、思考关注我的公众号【宝哥大数据】,更多干货一、... 查看详情