elasticsearch:计算多个状态更新的总持续时间-transform应用案例(代码片段)

中国社区官方博客 中国社区官方博客     2023-01-09     178

关键词:

在我之前的文章中,我使用了 Kibana 所提供的 UI 来对所感兴趣的数据进行 transform:

在今天的文章中,我将使用 API 的形式来展示如何使用 transform 来对数据进行转换。

Elasticsearch Transforms 让你

这种以实体(entity)为中心的视图对于由多个文档(如用户行为或会话)组成的各种数据很有帮助。 例如,分布式系统中的会话或请求的持续时间是常见的场景。 以下帖子基于 StackOverflow 问题,该问题以微小的变化反复出现 - 将其用作蓝图。

样本数据

共有三个不同的实体,其唯一 ID A、B 和 C。每个实体都可以通过 eventStart.timestamp 或 eventStop.timestamp 进行多个状态更新:

POST test/_bulk
 "index" :  "_id" : "1"  
 "uniqueID" : "A",   "eventStart": "timestamp": "2020-07-01T13:50:55.000Z" 
 "index" :  "_id" : "2"  
 "uniqueID" : "A",   "eventStop": "timestamp": "2020-07-01T13:51:00.000Z" 
 "index" :  "_id" : "3"  
 "uniqueID" : "B",   "eventStart": "timestamp": "2020-07-01T13:52:25.000Z" 
 "index" :  "_id" : "4"  
 "uniqueID" : "B",   "eventStop": "timestamp": "2020-07-01T13:53:00.000Z" 
 "index" :  "_id" : "5"  
 "uniqueID" : "A",   "eventStop": "timestamp": "2020-07-01T13:54:55.000Z" 
 "index" :  "_id" : "6"  
 "uniqueID" : "C",   "eventStart": "timestamp": "2020-07-01T13:54:55.000Z" 

我们运行上面的命令把数据导入到 Elasticsearch 中去。

依靠默认映射,两个日期和关键字字段是相关的,用于计算不同的持续时间:

# Request
GET test/_mapping

# Response

  "test" : 
    "mappings" : 
      "properties" : 
        "eventStart" : 
          "properties" : 
            "timestamp" : 
              "type" : "date"
            
          
        ,
        "eventStop" : 
          "properties" : 
            "timestamp" : 
              "type" : "date"
            
          
        ,
        "uniqueID" : 
          "type" : "text",
          "fields" : 
            "keyword" : 
              "type" : "keyword",
              "ignore_above" : 256
            
          
        
      
    
  

Transforms API

计算方法如下:

  1. 按 uniqueID 分组。
  2. 获取第一个 eventStart 和最后一个 eventStop 时间戳。
  3. 计算时差(以秒为单位)。

虽然 Kibana 在 Elasticsearch Transform API 之上提供了一个 UI 来完成 transform,但此示例坚持使用 Elasticsearch API,它更易于遵循和重现。 一个方便的 API 是使用 POST _transform/_preview 进行 preview

从分组的第一步开始,由于聚合部分是强制性的,计算状态更新的数量:

# Request
POST _transform/_preview

  "source": 
    "index": "test"
  ,
  "dest": 
    "index": "test_transformed"
  ,
  "pivot": 
    "group_by": 
      "id": 
        "terms": 
          "field": "uniqueID.keyword"
        
      
    ,
    "aggregations": 
      "event_count": 
        "value_count": 
          "field": "_id"
        
      
    
  


# Response

  "preview" : [
    
      "event_count" : 3,
      "id" : "A"
    ,
    
      "event_count" : 2,
      "id" : "B"
    ,
    
      "event_count" : 1,
      "id" : "C"
    
  ],
  "generated_dest_index" : 
    "mappings" : 
      "_meta" : 
        "_transform" : 
          "transform" : "transform-preview",
          "version" : 
            "created" : "7.14.0"
          ,
          "creation_date_in_millis" : 1632892282869
        ,
        "created_by" : "transform"
      ,
      "properties" : 
        "event_count" : 
          "type" : "long"
        ,
        "id" : 
          "type" : "keyword"
        
      
    ,
    "settings" : 
      "index" : 
        "number_of_shards" : "1",
        "auto_expand_replicas" : "0-1"
      
    ,
    "aliases" :  
  

对于最终结果,它 “只是” 缺少正确的聚合:bucket 脚本聚合听起来很有希望。

使用 Bucket 脚本聚合进行转换

继续之前的转换,这个添加最早的开始时间戳,最晚的结束时间戳,以及两者之间的持续时间:

POST _transform/_preview

  "source": 
    "index": "test"
  ,
  "dest": 
    "index": "test_transformed"
  ,
  "pivot": 
    "group_by": 
      "id": 
        "terms": 
          "field": "uniqueID.keyword"
        
      
    ,
    "aggregations": 
      "event_count": 
        "value_count": 
          "field": "_id"
        
      ,
      "start": 
        "min": 
          "field": "eventStart.timestamp"
        
      ,
      "stop": 
        "max": 
          "field": "eventStop.timestamp"
        
      ,
      "duration": 
        "bucket_script": 
          "buckets_path": 
            "start": "start.value",
            "stop": "stop.value"
          ,
          "script": """
            return (params.stop - params.start)/1000;
          """
        
      
    
  

上面请求的响应为:


  "preview" : [
    
      "duration" : 240.0,
      "stop" : "2020-07-01T13:54:55.000Z",
      "event_count" : 3,
      "start" : "2020-07-01T13:50:55.000Z",
      "id" : "A"
    ,
    
      "duration" : 35.0,
      "stop" : "2020-07-01T13:53:00.000Z",
      "event_count" : 2,
      "start" : "2020-07-01T13:52:25.000Z",
      "id" : "B"
    ,
    
      "stop" : null,
      "event_count" : 1,
      "start" : "2020-07-01T13:54:55.000Z",
      "id" : "C"
    
  ],
  "generated_dest_index" : 
    "mappings" : 
      "_meta" : 
        "_transform" : 
          "transform" : "transform-preview",
          "version" : 
            "created" : "7.14.0"
          ,
          "creation_date_in_millis" : 1632905272319
        ,
        "created_by" : "transform"
      ,
      "properties" : 
        "stop" : 
          "type" : "date"
        ,
        "event_count" : 
          "type" : "long"
        ,
        "start" : 
          "type" : "date"
        ,
        "id" : 
          "type" : "keyword"
        
      
    ,
    "settings" : 
      "index" : 
        "number_of_shards" : "1",
        "auto_expand_replicas" : "0-1"
      
    ,
    "aliases" :  
  

Painless 中的计算出奇的简单:(params.stop - params.start)/1000:

  • 不需要更复杂的 datetime API。 Elasticsearch 中的每个日期都存储为自 epoch 以来的长时间(以毫秒为单位),因此简单的差异就足够了。
  • 除以 1,000 就得到了以秒为单位。
  • 自动处理缺少的结束时间。

要创建 transform 作业而不仅仅是预览它,你需要将请求调整为以下内容:

PUT _transform/test_duration

  "description": "Calculate the duration of an event from multiple status updates (based on its uniqueID)",
  "frequency": "1m",
  "source": 
    "index": "test"
  ,
  "dest": 
    "index": "test_transformed"
  ,
  "pivot": 
    "group_by": 
      "id": 
        "terms": 
          "field": "uniqueID.keyword"
        
      
    ,
    "aggregations": 
      "event_count": 
        "value_count": 
          "field": "_id"
        
      ,
      "start": 
        "min": 
          "field": "eventStart.timestamp"
        
      ,
      "stop": 
        "max": 
          "field": "eventStop.timestamp"
        
      ,
      "duration": 
        "bucket_script": 
          "buckets_path": 
            "start": "start.value",
            "stop": "stop.value"
          ,
          "script": """
            return (params.stop - params.start)/1000;
          """
        
      
    
  

使用 GET _transform/test_duration 你可以看到 transform 作业。 并且你必须明确地用 POST _transform/test_duration/_start 启动它,否则它什么也做不了。我们执行如下的命令:

POST _transform/test_duration/_start

最后,stats API 非常适合查看工作正在进行或已经完成的工作:

GET _transform/test_duration/_stats

上面命令的结果为:


  "count" : 1,
  "transforms" : [
    
      "id" : "test_duration",
      "state" : "stopped",
      "stats" : 
        "pages_processed" : 2,
        "documents_processed" : 6,
        "documents_indexed" : 3,
        "documents_deleted" : 0,
        "trigger_count" : 1,
        "index_time_in_ms" : 753,
        "index_total" : 1,
        "index_failures" : 0,
        "search_time_in_ms" : 7,
        "search_total" : 2,
        "search_failures" : 0,
        "processing_time_in_ms" : 8,
        "processing_total" : 2,
        "delete_time_in_ms" : 0,
        "exponential_avg_checkpoint_duration_ms" : 1012.0,
        "exponential_avg_documents_indexed" : 3.0,
        "exponential_avg_documents_processed" : 6.0
      ,
      "checkpointing" : 
        "last" : 
          "checkpoint" : 1,
          "timestamp_millis" : 1632907175535
        ,
        "changes_last_detected_at" : 1632907175535
      
    
  ]

从上面我们可以看出来状态为 stopped,它表示 transform 已经完成。我们打开 Kibana,并查看状态:

 

 

最后但并非最不重要的是,这些是生成的文档:

GET test_transformed/_search

上面命令显示的结果为:


  "took" : 1,
  "timed_out" : false,
  "_shards" : 
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  ,
  "hits" : 
    "total" : 
      "value" : 3,
      "relation" : "eq"
    ,
    "max_score" : 1.0,
    "hits" : [
      
        "_index" : "test_transformed",
        "_type" : "_doc",
        "_id" : "QRRx52klPRvG45a5oLgZ95sAAAAAAAAA",
        "_score" : 1.0,
        "_source" : 
          "duration" : 240.0,
          "stop" : "2020-07-01T13:54:55.000Z",
          "event_count" : 3,
          "start" : "2020-07-01T13:50:55.000Z",
          "id" : "A"
        
      ,
      
        "_index" : "test_transformed",
        "_type" : "_doc",
        "_id" : "Qq7col5MOHvjTNMiAGonnqAAAAAAAAAA",
        "_score" : 1.0,
        "_source" : 
          "duration" : 35.0,
          "stop" : "2020-07-01T13:53:00.000Z",
          "event_count" : 2,
          "start" : "2020-07-01T13:52:25.000Z",
          "id" : "B"
        
      ,
      
        "_index" : "test_transformed",
        "_type" : "_doc",
        "_id" : "Q-N5zMGevsgbxCl0WsHH6CIAAAAAAAAA",
        "_score" : 1.0,
        "_source" : 
          "stop" : null,
          "event_count" : 1,
          "start" : "2020-07-01T13:54:55.000Z",
          "id" : "C"
        
      
    ]
  

这就是计算持续时间。

没有 transform 的聚合

你需要转换来获得这个结果吗? 不。

通过一些小的修改,您可以通过常规聚合获得相同的结果:

POST test/_search

  "size": 0,
  "aggregations": 
    "group_by": 
      "terms": 
        "field": "uniqueID.keyword"
      ,
      "aggregations": 
        "start": 
          "min": 
            "field": "eventStart.timestamp"
          
        ,
        "stop": 
          "max": 
            "field": "eventStop.timestamp"
          
        ,
        "duration": 
          "bucket_script": 
            "buckets_path": 
              "start": "start.value",
              "stop": "stop.value"
            ,
            "script": """
              return (params.stop - params.start)/1000;
            """
          
        
      
    
  

上面命令返回的结果为:


  "took" : 9,
  "timed_out" : false,
  "_shards" : 
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  ,
  "hits" : 
    "total" : 
      "value" : 6,
      "relation" : "eq"
    ,
    "max_score" : null,
    "hits" : [ ]
  ,
  "aggregations" : 
    "group_by" : 
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        
          "key" : "A",
          "doc_count" : 3,
          "stop" : 
            "value" : 1.593611695E12,
            "value_as_string" : "2020-07-01T13:54:55.000Z"
          ,
          "start" : 
            "value" : 1.593611455E12,
            "value_as_string" : "2020-07-01T13:50:55.000Z"
          ,
          "duration" : 
            "value" : 240.0
          
        ,
        
          "key" : "B",
          "doc_count" : 2,
          "stop" : 
            "value" : 1.59361158E12,
            "value_as_string" : "2020-07-01T13:53:00.000Z"
          ,
          "start" : 
            "value" : 1.593611545E12,
            "value_as_string" : "2020-07-01T13:52:25.000Z"
          ,
          "duration" : 
            "value" : 35.0
          
        ,
        
          "key" : "C",
          "doc_count" : 1,
          "stop" : 
            "value" : null
          ,
          "start" : 
            "value" : 1.593611695E12,
            "value_as_string" : "2020-07-01T13:54:55.000Z"
          
        
      ]
    
  

虽然结果的结构不同,但结果是相同的。一些附加说明:

  • 设置“size”: 0 这样无需返回任何文档。
  • 在术语聚合(terms aggregation)中,运行其它子聚合(sub aggregation)运行。
  • doc_count 中会自动计算涉及多少状态更新,因此不需要 value_count。
  • bucket_script 是相同的。

 

结论

希望这是 transform 或等效 aggregation 的有用蓝图。 现在你知道所有部分如何组合在一起以及要避免哪些陷阱。

该文档还描述了何时(不)使用转换,这导致了经典的 “取决于” 讨论:

译文:Elasticsearch Transforms: Calculate the Total Duration from Multiple Status Updates

获取整个索引中的总词频(Elasticsearch)

】获取整个索引中的总词频(Elasticsearch)【英文标题】:Gettingtotaltermfrequencythroughoutentireindex(Elasticsearch)【发布时间】:2017-06-0208:27:43【问题描述】:我正在尝试计算特定术语在整个索引中出现的总次数(术语收集频率)。我试... 查看详情

使用 AFNetworking 计算下载多个文件的总进度

】使用AFNetworking计算下载多个文件的总进度【英文标题】:calculatingtotalprogressofdownloadingmultiplefilewithAFNetworking【发布时间】:2014-09-0713:57:16【问题描述】:我想下载多个文件,然后向用户显示总进度。但问题就在这里,我不知道... 查看详情

如何计算多个 android webview 所花费的总时间?

】如何计算多个androidwebview所花费的总时间?【英文标题】:howtocalculatetotaltimetakenformultipleandroidwebview?【发布时间】:2021-11-0622:08:31【问题描述】:我已经在android中加载了多个webview,我知道webview加载所用的时间是“onPageStarted”... 查看详情

java示例代码_计算多个线程完成执行所需的总时间

java示例代码_计算多个线程完成执行所需的总时间 查看详情

计算多个选项卡/窗口中的总 XMPP 会话

】计算多个选项卡/窗口中的总XMPP会话【英文标题】:CounttotalXMPPsessioninmultipletabs/window【发布时间】:2015-10-0913:38:31【问题描述】:我也有与here提到的相同的问题。但是,我继续采用随机化我的资源的方法,以便在多个选项卡/... 查看详情

MySQL - PHP:计算一天中多个事件之间的总小时数

】MySQL-PHP:计算一天中多个事件之间的总小时数【英文标题】:MySQL-PHP:Calculatetotalhoursinadaybetweenmultipleevents【发布时间】:2016-11-0219:44:16【问题描述】:我有一张名为time_track的表:+----+--------+---------------------+---------+|id|emplid|ctime... 查看详情

没有脚本编译速率限制的多个文档上的 Python ElasticSearch 更新字段

】没有脚本编译速率限制的多个文档上的PythonElasticSearch更新字段【英文标题】:PythonElasticSearchupdatingfieldonmultipledocumentswithoutscriptcompilationratelimit【发布时间】:2022-01-1300:06:16【问题描述】:我正在使用fromelasticsearchimportElasticsearc... 查看详情

计算多个文件夹的文件夹大小[关闭]

】计算多个文件夹的文件夹大小[关闭]【英文标题】:Calculatefoldersizeformultiplefolders[closed]【发布时间】:2019-01-0615:42:36【问题描述】:我想构建一个脚本:计算特定路径的总文件夹大小;以MB为单位显示每个路径的文件夹大小(... 查看详情

C# WPF 加速(线程)来自多个文件的总 FileInfo.Length

...加速由一个路径递归给出的所有文件夹中所有文件的总和计算。假设我选择“E:\\”作为文件夹。我现在将以毫秒为单位通过“SafeFileEnumerator”将条目递归文件 查看详情

创建计算每个新插入员工的总工资的行级触发器

】创建计算每个新插入员工的总工资的行级触发器【英文标题】:Createrowleveltriggerthatcomputesthetotalsalaryforeachnewlyinsertedemployee【发布时间】:2015-12-0319:45:54【问题描述】:我需要创建一个行级触发器来计算每个新插入的员工的总薪... 查看详情

扒一扒react计算状态的原理(代码片段)

...依次处理其中的update,这是处理更新的大致过程,也就是计算组件新状态的本质。在React中,类组件与根组件使用一类update对象, 查看详情

Elasticsearch“_cat/indices”api更新延迟到搜索?

】Elasticsearch“_cat/indices”api更新延迟到搜索?【英文标题】:Elasticsearch"_cat/indices"apiupdatedelayeduntilsearch?【发布时间】:2021-02-0322:49:13【问题描述】:Elasticsearch(v7.9.2)获得了一个api_cat/indices来显示索引状态,对docs.count所... 查看详情

在 React Native 中更新多个状态

】在ReactNative中更新多个状态【英文标题】:UpdatingmultiplestatesinReactNative【发布时间】:2022-01-2203:20:52【问题描述】:我正在尝试跟踪多个状态(本例中的错误处理),但由于现阶段未知的原因(我认为缺乏理解)我似乎只能保... 查看详情

React Hooks - 等待多个状态更新完成

】ReactHooks-等待多个状态更新完成【英文标题】:ReactHooks-Waitformultiplestateupdatestofinish【发布时间】:2021-06-1806:43:38【问题描述】:我有以下状态const[humans,setHumans]=useState([]);const[animales,setAnimals]=useState([]);还有下面的函数constf=()=>... 查看详情

PHP 和 GPS 坐标,获取由多个点组成的路径的总距离

...度、纬度和高度组成(我现在完全不担心高度,我知道我计算镜头的功能不支持高 查看详情

python一次更新多个状态变量(代码片段)

查看详情

flink状态编程

参考技术A1.流式计算分为无状态和有状态两种情况。无状态的计算观察每个独立事件,并根据最后一个事件输出结果。例如,流处理应用程序从传感器接收水位数据,并在水位超过指定高度时发出警告。有状态的计算则会基于多... 查看详情

使用状态变量更新计算属性

】使用状态变量更新计算属性【英文标题】:Updatecomputedpropertywithstatevariable【发布时间】:2019-09-2616:55:51【问题描述】:我有以下情况:我正在创建一个Form,用户可以在其中输入一个TextField中的金额。此外,用户可以通过Picker... 查看详情