关键词:
在我之前的文章中,我使用了 Kibana 所提供的 UI 来对所感兴趣的数据进行 transform:
在今天的文章中,我将使用 API 的形式来展示如何使用 transform 来对数据进行转换。
- 将现有文档转换为汇总文档(pivot transform)或
- 找到具有特定唯一键(latest transform)的最新文档。
这种以实体(entity)为中心的视图对于由多个文档(如用户行为或会话)组成的各种数据很有帮助。 例如,分布式系统中的会话或请求的持续时间是常见的场景。 以下帖子基于 StackOverflow 问题,该问题以微小的变化反复出现 - 将其用作蓝图。
样本数据
共有三个不同的实体,其唯一 ID A、B 和 C。每个实体都可以通过 eventStart.timestamp 或 eventStop.timestamp 进行多个状态更新:
POST test/_bulk
"index" : "_id" : "1"
"uniqueID" : "A", "eventStart": "timestamp": "2020-07-01T13:50:55.000Z"
"index" : "_id" : "2"
"uniqueID" : "A", "eventStop": "timestamp": "2020-07-01T13:51:00.000Z"
"index" : "_id" : "3"
"uniqueID" : "B", "eventStart": "timestamp": "2020-07-01T13:52:25.000Z"
"index" : "_id" : "4"
"uniqueID" : "B", "eventStop": "timestamp": "2020-07-01T13:53:00.000Z"
"index" : "_id" : "5"
"uniqueID" : "A", "eventStop": "timestamp": "2020-07-01T13:54:55.000Z"
"index" : "_id" : "6"
"uniqueID" : "C", "eventStart": "timestamp": "2020-07-01T13:54:55.000Z"
我们运行上面的命令把数据导入到 Elasticsearch 中去。
依靠默认映射,两个日期和关键字字段是相关的,用于计算不同的持续时间:
# Request
GET test/_mapping
# Response
"test" :
"mappings" :
"properties" :
"eventStart" :
"properties" :
"timestamp" :
"type" : "date"
,
"eventStop" :
"properties" :
"timestamp" :
"type" : "date"
,
"uniqueID" :
"type" : "text",
"fields" :
"keyword" :
"type" : "keyword",
"ignore_above" : 256
Transforms API
计算方法如下:
- 按 uniqueID 分组。
- 获取第一个 eventStart 和最后一个 eventStop 时间戳。
- 计算时差(以秒为单位)。
虽然 Kibana 在 Elasticsearch Transform API 之上提供了一个 UI 来完成 transform,但此示例坚持使用 Elasticsearch API,它更易于遵循和重现。 一个方便的 API 是使用 POST _transform/_preview 进行 preview。
从分组的第一步开始,由于聚合部分是强制性的,计算状态更新的数量:
# Request
POST _transform/_preview
"source":
"index": "test"
,
"dest":
"index": "test_transformed"
,
"pivot":
"group_by":
"id":
"terms":
"field": "uniqueID.keyword"
,
"aggregations":
"event_count":
"value_count":
"field": "_id"
# Response
"preview" : [
"event_count" : 3,
"id" : "A"
,
"event_count" : 2,
"id" : "B"
,
"event_count" : 1,
"id" : "C"
],
"generated_dest_index" :
"mappings" :
"_meta" :
"_transform" :
"transform" : "transform-preview",
"version" :
"created" : "7.14.0"
,
"creation_date_in_millis" : 1632892282869
,
"created_by" : "transform"
,
"properties" :
"event_count" :
"type" : "long"
,
"id" :
"type" : "keyword"
,
"settings" :
"index" :
"number_of_shards" : "1",
"auto_expand_replicas" : "0-1"
,
"aliases" :
对于最终结果,它 “只是” 缺少正确的聚合:bucket 脚本聚合听起来很有希望。
使用 Bucket 脚本聚合进行转换
继续之前的转换,这个添加最早的开始时间戳,最晚的结束时间戳,以及两者之间的持续时间:
POST _transform/_preview
"source":
"index": "test"
,
"dest":
"index": "test_transformed"
,
"pivot":
"group_by":
"id":
"terms":
"field": "uniqueID.keyword"
,
"aggregations":
"event_count":
"value_count":
"field": "_id"
,
"start":
"min":
"field": "eventStart.timestamp"
,
"stop":
"max":
"field": "eventStop.timestamp"
,
"duration":
"bucket_script":
"buckets_path":
"start": "start.value",
"stop": "stop.value"
,
"script": """
return (params.stop - params.start)/1000;
"""
上面请求的响应为:
"preview" : [
"duration" : 240.0,
"stop" : "2020-07-01T13:54:55.000Z",
"event_count" : 3,
"start" : "2020-07-01T13:50:55.000Z",
"id" : "A"
,
"duration" : 35.0,
"stop" : "2020-07-01T13:53:00.000Z",
"event_count" : 2,
"start" : "2020-07-01T13:52:25.000Z",
"id" : "B"
,
"stop" : null,
"event_count" : 1,
"start" : "2020-07-01T13:54:55.000Z",
"id" : "C"
],
"generated_dest_index" :
"mappings" :
"_meta" :
"_transform" :
"transform" : "transform-preview",
"version" :
"created" : "7.14.0"
,
"creation_date_in_millis" : 1632905272319
,
"created_by" : "transform"
,
"properties" :
"stop" :
"type" : "date"
,
"event_count" :
"type" : "long"
,
"start" :
"type" : "date"
,
"id" :
"type" : "keyword"
,
"settings" :
"index" :
"number_of_shards" : "1",
"auto_expand_replicas" : "0-1"
,
"aliases" :
Painless 中的计算出奇的简单:(params.stop - params.start)/1000:
- 不需要更复杂的 datetime API。 Elasticsearch 中的每个日期都存储为自 epoch 以来的长时间(以毫秒为单位),因此简单的差异就足够了。
- 除以 1,000 就得到了以秒为单位。
- 自动处理缺少的结束时间。
要创建 transform 作业而不仅仅是预览它,你需要将请求调整为以下内容:
PUT _transform/test_duration
"description": "Calculate the duration of an event from multiple status updates (based on its uniqueID)",
"frequency": "1m",
"source":
"index": "test"
,
"dest":
"index": "test_transformed"
,
"pivot":
"group_by":
"id":
"terms":
"field": "uniqueID.keyword"
,
"aggregations":
"event_count":
"value_count":
"field": "_id"
,
"start":
"min":
"field": "eventStart.timestamp"
,
"stop":
"max":
"field": "eventStop.timestamp"
,
"duration":
"bucket_script":
"buckets_path":
"start": "start.value",
"stop": "stop.value"
,
"script": """
return (params.stop - params.start)/1000;
"""
使用 GET _transform/test_duration 你可以看到 transform 作业。 并且你必须明确地用 POST _transform/test_duration/_start 启动它,否则它什么也做不了。我们执行如下的命令:
POST _transform/test_duration/_start
最后,stats API 非常适合查看工作正在进行或已经完成的工作:
GET _transform/test_duration/_stats
上面命令的结果为:
"count" : 1,
"transforms" : [
"id" : "test_duration",
"state" : "stopped",
"stats" :
"pages_processed" : 2,
"documents_processed" : 6,
"documents_indexed" : 3,
"documents_deleted" : 0,
"trigger_count" : 1,
"index_time_in_ms" : 753,
"index_total" : 1,
"index_failures" : 0,
"search_time_in_ms" : 7,
"search_total" : 2,
"search_failures" : 0,
"processing_time_in_ms" : 8,
"processing_total" : 2,
"delete_time_in_ms" : 0,
"exponential_avg_checkpoint_duration_ms" : 1012.0,
"exponential_avg_documents_indexed" : 3.0,
"exponential_avg_documents_processed" : 6.0
,
"checkpointing" :
"last" :
"checkpoint" : 1,
"timestamp_millis" : 1632907175535
,
"changes_last_detected_at" : 1632907175535
]
从上面我们可以看出来状态为 stopped,它表示 transform 已经完成。我们打开 Kibana,并查看状态:
最后但并非最不重要的是,这些是生成的文档:
GET test_transformed/_search
上面命令显示的结果为:
"took" : 1,
"timed_out" : false,
"_shards" :
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
,
"hits" :
"total" :
"value" : 3,
"relation" : "eq"
,
"max_score" : 1.0,
"hits" : [
"_index" : "test_transformed",
"_type" : "_doc",
"_id" : "QRRx52klPRvG45a5oLgZ95sAAAAAAAAA",
"_score" : 1.0,
"_source" :
"duration" : 240.0,
"stop" : "2020-07-01T13:54:55.000Z",
"event_count" : 3,
"start" : "2020-07-01T13:50:55.000Z",
"id" : "A"
,
"_index" : "test_transformed",
"_type" : "_doc",
"_id" : "Qq7col5MOHvjTNMiAGonnqAAAAAAAAAA",
"_score" : 1.0,
"_source" :
"duration" : 35.0,
"stop" : "2020-07-01T13:53:00.000Z",
"event_count" : 2,
"start" : "2020-07-01T13:52:25.000Z",
"id" : "B"
,
"_index" : "test_transformed",
"_type" : "_doc",
"_id" : "Q-N5zMGevsgbxCl0WsHH6CIAAAAAAAAA",
"_score" : 1.0,
"_source" :
"stop" : null,
"event_count" : 1,
"start" : "2020-07-01T13:54:55.000Z",
"id" : "C"
]
这就是计算持续时间。
没有 transform 的聚合
你需要转换来获得这个结果吗? 不。
通过一些小的修改,您可以通过常规聚合获得相同的结果:
POST test/_search
"size": 0,
"aggregations":
"group_by":
"terms":
"field": "uniqueID.keyword"
,
"aggregations":
"start":
"min":
"field": "eventStart.timestamp"
,
"stop":
"max":
"field": "eventStop.timestamp"
,
"duration":
"bucket_script":
"buckets_path":
"start": "start.value",
"stop": "stop.value"
,
"script": """
return (params.stop - params.start)/1000;
"""
上面命令返回的结果为:
"took" : 9,
"timed_out" : false,
"_shards" :
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
,
"hits" :
"total" :
"value" : 6,
"relation" : "eq"
,
"max_score" : null,
"hits" : [ ]
,
"aggregations" :
"group_by" :
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
"key" : "A",
"doc_count" : 3,
"stop" :
"value" : 1.593611695E12,
"value_as_string" : "2020-07-01T13:54:55.000Z"
,
"start" :
"value" : 1.593611455E12,
"value_as_string" : "2020-07-01T13:50:55.000Z"
,
"duration" :
"value" : 240.0
,
"key" : "B",
"doc_count" : 2,
"stop" :
"value" : 1.59361158E12,
"value_as_string" : "2020-07-01T13:53:00.000Z"
,
"start" :
"value" : 1.593611545E12,
"value_as_string" : "2020-07-01T13:52:25.000Z"
,
"duration" :
"value" : 35.0
,
"key" : "C",
"doc_count" : 1,
"stop" :
"value" : null
,
"start" :
"value" : 1.593611695E12,
"value_as_string" : "2020-07-01T13:54:55.000Z"
]
虽然结果的结构不同,但结果是相同的。一些附加说明:
- 设置“size”: 0 这样无需返回任何文档。
- 在术语聚合(terms aggregation)中,运行其它子聚合(sub aggregation)运行。
- doc_count 中会自动计算涉及多少状态更新,因此不需要 value_count。
- bucket_script 是相同的。
结论
希望这是 transform 或等效 aggregation 的有用蓝图。 现在你知道所有部分如何组合在一起以及要避免哪些陷阱。
该文档还描述了何时(不)使用转换,这导致了经典的 “取决于” 讨论:
译文:Elasticsearch Transforms: Calculate the Total Duration from Multiple Status Updates
获取整个索引中的总词频(Elasticsearch)
】获取整个索引中的总词频(Elasticsearch)【英文标题】:Gettingtotaltermfrequencythroughoutentireindex(Elasticsearch)【发布时间】:2017-06-0208:27:43【问题描述】:我正在尝试计算特定术语在整个索引中出现的总次数(术语收集频率)。我试... 查看详情
使用 AFNetworking 计算下载多个文件的总进度
】使用AFNetworking计算下载多个文件的总进度【英文标题】:calculatingtotalprogressofdownloadingmultiplefilewithAFNetworking【发布时间】:2014-09-0713:57:16【问题描述】:我想下载多个文件,然后向用户显示总进度。但问题就在这里,我不知道... 查看详情
如何计算多个 android webview 所花费的总时间?
】如何计算多个androidwebview所花费的总时间?【英文标题】:howtocalculatetotaltimetakenformultipleandroidwebview?【发布时间】:2021-11-0622:08:31【问题描述】:我已经在android中加载了多个webview,我知道webview加载所用的时间是“onPageStarted”... 查看详情
java示例代码_计算多个线程完成执行所需的总时间
java示例代码_计算多个线程完成执行所需的总时间 查看详情
计算多个选项卡/窗口中的总 XMPP 会话
】计算多个选项卡/窗口中的总XMPP会话【英文标题】:CounttotalXMPPsessioninmultipletabs/window【发布时间】:2015-10-0913:38:31【问题描述】:我也有与here提到的相同的问题。但是,我继续采用随机化我的资源的方法,以便在多个选项卡/... 查看详情
MySQL - PHP:计算一天中多个事件之间的总小时数
】MySQL-PHP:计算一天中多个事件之间的总小时数【英文标题】:MySQL-PHP:Calculatetotalhoursinadaybetweenmultipleevents【发布时间】:2016-11-0219:44:16【问题描述】:我有一张名为time_track的表:+----+--------+---------------------+---------+|id|emplid|ctime... 查看详情
没有脚本编译速率限制的多个文档上的 Python ElasticSearch 更新字段
】没有脚本编译速率限制的多个文档上的PythonElasticSearch更新字段【英文标题】:PythonElasticSearchupdatingfieldonmultipledocumentswithoutscriptcompilationratelimit【发布时间】:2022-01-1300:06:16【问题描述】:我正在使用fromelasticsearchimportElasticsearc... 查看详情
计算多个文件夹的文件夹大小[关闭]
】计算多个文件夹的文件夹大小[关闭]【英文标题】:Calculatefoldersizeformultiplefolders[closed]【发布时间】:2019-01-0615:42:36【问题描述】:我想构建一个脚本:计算特定路径的总文件夹大小;以MB为单位显示每个路径的文件夹大小(... 查看详情
C# WPF 加速(线程)来自多个文件的总 FileInfo.Length
...加速由一个路径递归给出的所有文件夹中所有文件的总和计算。假设我选择“E:\\”作为文件夹。我现在将以毫秒为单位通过“SafeFileEnumerator”将条目递归文件 查看详情
创建计算每个新插入员工的总工资的行级触发器
】创建计算每个新插入员工的总工资的行级触发器【英文标题】:Createrowleveltriggerthatcomputesthetotalsalaryforeachnewlyinsertedemployee【发布时间】:2015-12-0319:45:54【问题描述】:我需要创建一个行级触发器来计算每个新插入的员工的总薪... 查看详情
扒一扒react计算状态的原理(代码片段)
...依次处理其中的update,这是处理更新的大致过程,也就是计算组件新状态的本质。在React中,类组件与根组件使用一类update对象, 查看详情
Elasticsearch“_cat/indices”api更新延迟到搜索?
】Elasticsearch“_cat/indices”api更新延迟到搜索?【英文标题】:Elasticsearch"_cat/indices"apiupdatedelayeduntilsearch?【发布时间】:2021-02-0322:49:13【问题描述】:Elasticsearch(v7.9.2)获得了一个api_cat/indices来显示索引状态,对docs.count所... 查看详情
在 React Native 中更新多个状态
】在ReactNative中更新多个状态【英文标题】:UpdatingmultiplestatesinReactNative【发布时间】:2022-01-2203:20:52【问题描述】:我正在尝试跟踪多个状态(本例中的错误处理),但由于现阶段未知的原因(我认为缺乏理解)我似乎只能保... 查看详情
React Hooks - 等待多个状态更新完成
】ReactHooks-等待多个状态更新完成【英文标题】:ReactHooks-Waitformultiplestateupdatestofinish【发布时间】:2021-06-1806:43:38【问题描述】:我有以下状态const[humans,setHumans]=useState([]);const[animales,setAnimals]=useState([]);还有下面的函数constf=()=>... 查看详情
PHP 和 GPS 坐标,获取由多个点组成的路径的总距离
...度、纬度和高度组成(我现在完全不担心高度,我知道我计算镜头的功能不支持高 查看详情
python一次更新多个状态变量(代码片段)
flink状态编程
参考技术A1.流式计算分为无状态和有状态两种情况。无状态的计算观察每个独立事件,并根据最后一个事件输出结果。例如,流处理应用程序从传感器接收水位数据,并在水位超过指定高度时发出警告。有状态的计算则会基于多... 查看详情
使用状态变量更新计算属性
】使用状态变量更新计算属性【英文标题】:Updatecomputedpropertywithstatevariable【发布时间】:2019-09-2616:55:51【问题描述】:我有以下情况:我正在创建一个Form,用户可以在其中输入一个TextField中的金额。此外,用户可以通过Picker... 查看详情