实时监控:基于流计算oceanus(flink)实现系统和应用级实时监控(代码片段)

腾讯技术工程 腾讯技术工程     2023-01-14     272

关键词:


作者:吴云涛,腾讯 CSIG 高级工程师

本文描述了如何使用腾讯云大数据组件来完成实时监控系统的设计和实现,通过实时采集并分析云服务器(CVM)及其 App 应用的 CPU和内存等资源消耗数据,以短信、电话、微信消息等方式实时反馈监控告警信息,高效地保障系统稳健运行。运用云化的 Kafka、Flink、ES 等组件,大大减少了开发运维人员的投入。

一、解决方案描述

(一)概述

本方案结合腾讯云 CKafka、流计算 Oceanus (Flink)、 Elasticsearch、Prometheus 等,通过 Filebeat 实时采集系统和应用监控数据,并传输到 CKafka,再将 CKafka 数据接入流计算 Oceanus (Flink),经过简单的业务逻辑处理输出到 Elasticsearch,最后通过 Kibana 页面查询结果。方案中利用 Promethus 监控系统指标,如流计算 Oceanus 作业运行状况,利用云 Grafana 监控 CVM 或业务应用指标。

(二)方案架构 

二、前置准备

在实现本方案前,请确保已创建并配置了相应的大数据组件。

(一)创建私有网络 VPC

私有网络(VPC)是一块您在腾讯云上自定义的逻辑隔离网络空间,在构建 CKafka、流计算 Oceanus,Elasticsearch 集群等服务时选择建议同一个 VPC。具体创建步骤请参考 帮助文档 (https://cloud.tencent.com/document/product/215/36515)。

(二)创建 CKafka 实例

Kafka 建议选择最新的 2.4.1 版本,和 Filebeat 采集工具兼容性较好。

购买完成后,再创建 Kafka topic: topic-app-info

(三)创建流计算 Oceanus 集群

流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。

在流计算 Oceanus 控制台的【集群管理】->【新建集群】页面创建集群,具体步骤请参考帮助文档(https://cloud.tencent.com/document/product/849/48298)。

(四)创建 Elasticsearch 实例

在 Elasticsearch 控制台,点击左上角【新建】创建集群,具体步骤请参考帮助文档(https://cloud.tencent.com/document/product/845/19536)  。

(五)创建云监控 Prometheus 实例

为了展示自定义系统指标,需购买 Promethus 服务。只需要自定业务指标的同学可以省略此步骤。

进入云监控控制台,点击左侧 【Prometheus 监控】,新建 Promethus 实例,具体的步骤请参考帮助文档 (https://cloud.tencent.com/document/product/1416/55982)。

(六)创建独立 Grafana 资源

独立的 Grafana 在灰度发布中,需在 Grafana 管理页面(https://console.cloud.tencent.com/monitor/grafana)进行单独购买,以实现业务监控指标的展示。

(七)安装配置 Filebeat

Filebeat 是一款轻量级日志数据采集的工具,通过监控指定位置的文件收集信息。在该 VPC 下给需要监控主机信息和应用信息的云服务器上安装 Filebeat。安装方式一:下载 Filebeat 并安装。下载地址(https://www.elastic.co/cn/downloads/beats/filebeat);方式二:采用【Elasticsearch 管理页面】-->【beats 管理】中提供的 Filebeat。本示例中采用了方式一。下载到 CVM 中并配置 Filebeat,在 filebeat.yml 文件中添加如下配置项: 

# 监控日志文件配置
- type: log
enabled: true
paths:
   - /tmp/test.log
   #- c:\\programdata\\elasticsearch\\logs\\*
# 监控数据输出项配置
output.kafka:
version: 2.0.0                         # kafka版本号
hosts: ["xx.xx.xx.xx:xxxx"]       # 请填写实际的IP地址+端口
topic: 'topic-app-info'           # 请填写实际的topic

请根据实际业务需求配置相对应的 Filebeat.yml 文件,参考 Filebeat 官方文档(https://www.elastic.co/guide/en/beats/filebeat/current/configuring-howto-filebeat.html)。

注:示例选用2.4.1的 CKafka 版本,这里配置 version: 2.0.0。版本对应不上可能出现“ERROR   [kafka] kafka/client.go:341     Kafka (topic=topic-app-info): dropping invalid message”错误

三、方案实现

接下来通过案例介绍如何通过流计算 Oceanus 实现个性化监控。

(一)Filebeat 采集数据

1、进入到 Filebeat 根目录下,并启动 Filebeat 进行数据采集。示例中采集了 top 命令中显示的 CPU、内存等信息,也可以采集 jar 应用的日志、JVM 使用情况、监听端口等,详情参考 Filebeat 官网

(https://www.elastic.co/guide/en/beats/filebeat/current/configuration-filebeat-options.html)。

# filebeat启动
./filebeat -e -c filebeat.yml

# 监控系统信息写入test.log文件
top -d 10 >>/tmp/test.log

2、进入 CKafka 页面,点击左侧【消息查询】,查询对应 topic 消息,验证是否采集到数据。

Filebeat 采集到的 Kafka 的数据格式:


  "@timestamp": "2021-08-30T10:22:52.888Z",
  "@metadata": 
    "beat": "filebeat",
    "type": "_doc",
    "version": "7.14.0"
  ,
  "input": 
    "type": "log"
  ,
  "host": 
    "ip": ["xx.xx.xx.xx", "xx::xx:xx:xx:xx"],
    "mac": ["xx:xx:xx:xx:xx:xx"],
    "hostname": "xx.xx.xx.xx",
    "architecture": "x86_64",
    "os": 
      "type": "linux",
      "platform": "centos",
      "version": "7(Core)",
      "family": "redhat",
      "name": "CentOSLinux",
      "kernel": "3.10.0-1062.9.1.el7.x86_64",
      "codename": "Core"
    ,
    "id": "0ea734564f9a4e2881b866b82d679dfc",
    "name": "xx.xx.xx.xx",
    "containerized": false
  ,
  "agent": 
    "name": "xx.xx.xx.xx",
    "type": "filebeat",
    "version": "7.14.0",
    "hostname": "xx.xx.xx.xx",
    "ephemeral_id": "6c0922a6-17af-4474-9e88-1fc3b1c3b1a9",
    "id": "6b23463c-0654-4f8b-83a9-84ec75721311"
  ,
  "ecs": 
    "version": "1.10.0"
  ,
  "log": 
    "offset": 2449931,
    "file": 
      "path": "/tmp/test.log"
    
  ,
  "message": "(B[m16root0-20000S0.00.00:00.00kworker/1:0H(B[m[39;49m[K"

(二)创建 Flink SQL 作业

使用流计算 Oceanus 对 CKafka 接入的数据进行加工处理,并存入 Elasticsearch。

1、定义 Source

按照 Filebeat 中 json 消息的格式,构造 Flink Table Source。

CREATE TABLE DataInput (
    `@timestamp` VARCHAR,
    `host`       ROW<id VARCHAR,ip ARRAY<VARCHAR>>,
    `log`       ROW<`offset` INTEGER,file ROW<path VARCHAR>>,
    `message`    VARCHAR
) WITH (
    'connector' = 'kafka',   -- 可选 'kafka','kafka-0.11'. 注意选择对应的内置 Connector
    'topic' = 'topic-app-info',  -- 替换为您要消费的 Topic
    'scan.startup.mode' = 'earliest-offset', 
    'properties.bootstrap.servers' = '10.0.0.29:9092',  
    'properties.group.id' = 'oceanus_group2',  -- 必选参数, 一定要指定 Group ID
    'format' = 'json',
    'json.ignore-parse-errors' = 'true',     -- 忽略 JSON 结构解析异常
    'json.fail-on-missing-field' = 'false'   -- 如果设置为 true, 则遇到缺失字段会报错 设置为 false 则缺失字段设置为 null
);

2、定义 Sink

CREATE TABLE es_output (
  `id` VARCHAR,
  `ip` ARRAY<VARCHAR>,
  `path` VARCHAR,
  `num` INTEGER,
  `message` VARCHAR,
  `createTime` VARCHAR
) WITH (
   'connector.type' = 'elasticsearch', 
   'connector.version' = '6', 
   'connector.hosts' = 'http://10.0.0.175:9200',  
   'connector.index' = 'oceanus_test2',  
   'connector.document-type' = '_doc',  
   'connector.username' = 'elastic',  
   'connector.password' = 'yourpassword',
   'update-mode' = 'upsert',  -- 可选无主键的 'append' 模式,或有主键的 'upsert' 模式    
   'connector.key-null-literal' = 'n/a',  -- 主键为 null 时的替代字符串,默认是 'null'                                               
   'format.type' = 'json'        -- 输出数据格式, 目前只支持 'json'
);

3、加工业务数据

INSERT INTO es_output
SELECT
    host.id as `id`,
    host.ip as `ip`,
    log.file.path as `path`,
    log.`offset` as `num`,
    message,
    `@timestamp` as `createTime`
from DataInput;

4、配置作业参数

【内置 connector】选择flink-connector-elasticsearch6flink-connector-kafka

注: 根据实际版本选择

5、查询 ES 数据

在 ES 控制台的 Kibana 页面查询数据,或者进入某台相同子网的 CVM 下,使用以下命令进行查询:

# 查询索引 username:password请替换为实际账号密码
curl -XGET -u username:password http://xx.xx.xx.xx:xxxx/oceanus_test2/_search -H 'Content-Type: application/json' -d'

   "query":  "match_all": ,
   "size":  10

'

更多访问方式请参考 访问 ES 集群(https://cloud.tencent.com/document/product/845/42868)。

(三)系统指标监控

本章节主要实现系统信息监控,对 Flink 作业运行状况进行监控告警。

Prometheus 是一个非常灵活的时序数据库,通常用于监控数据的存储、计算和告警。流计算 Oceanus 建议用户使用腾讯云监控提供的 Prometheus 服务,以免去部署、运维开销;同时它还支持腾讯云的通知模板,可以通过短信、电话、邮件、企业微信机器人等方式,将告警信息轻松触达不同的接收方。

监控配置  

流计算 Oceanus 作业监控

除了流计算 Oceanus 控制台自带的监控信息,还可以配置目前已经支持了任务级细粒度监控、作业级监控和集群 Flink 作业列表监控。

1、流计算 Oceanus 作业详情页面,点击【作业参数】,在【高级参数】处添加如下配置:

pipeline.max-parallelism: 2048
metrics.reporters: promgateway
metrics.reporter.promgateway.host: xx.xx.xx.xx           # Prometheus实例地址
metrics.reporter.promgateway.port: 9090                     # Prometheus实例端口
metrics.reporter.promgateway.needBasicAuth: true
metrics.reporter.promgateway.password: xxxxxxxxxxx # Prometheus实例密码
metrics.reporter.promgateway.interval: 10 SECONDS

2、在任一流计算 Oceanus 作业中,点击【云监控】进入云 Prometheus 实例,点击链接进入Grafana(灰度中的 Grafana 不能由此进入),导入 json 文件,详情请参见 接入 Prometheus 自定义监控

(https://cloud.tencent.com/document/product/849/55239)。 

3、展现出来的 Flink 任务监控效果如下,用户也可以点击【Edit】设置不同 Panel 来优化展现效果。 

告警配置

1、进入腾讯云监控界面,点击左侧【Prometheus 监控】,点击已购买的实例进入服务管理页面,点击左侧【告警策略】,点击【新建】,配置相关信息。具体操作参考 接入 Prometheus 自定义监控

(https://cloud.tencent.com/document/product/849/55239)。

2、设置告警通知。选择【选择模版】或【新建】,设置通知模版。

3、短信通知消息

(四)业务指标监控

通过 Filebeat 采集到应用业务数据,经过流计算 Oceanus 服务进行数据的加工处理并存入 ES,利用 ES + Grafana 来实现业务数据的监控。

1、Grafana 配置 ES 数据源。进入灰度发布中的 Grafana 控制台

(https://console.cloud.tencent.com/monitor/grafana),进入刚刚创建的 Grafana 服务,找到外网地址打开并登录,Grafana 账号为 admin,登录后点击【Configuration】,点击【Add Source】,搜索elasticsearch,填写相关 ES 实例信息,添加数据源。

2、点击左侧【Dashboards】,点击【Manage】,点击右上角【New Dashboard】,新建面板,编辑面板。

3、展现效果如下:  

  • 总数据量写入实时监控:对写入数据源的总数据量进行监控; 

  • 数据来源实时监控:对来源于某个特定 log 的数据写入量进行监控; 

  • 字段平均值监控:对某个字段的平均值进行监控; 

  • num字段最大值监控:对 num 字段的最大值进行监控;

注:本处只做示例,无实际业务

四、总结

本方案中对系统监控指标和业务监控指标2种监控方案都进行尝试。若只需要对业务指标进行监控,可省略 Promethus 相关操作。

此外,需要注意的是:

  • CKafka 的版本和开源版本 Kafka 并没有严格对应,方案中 CKafka2.4.1和开源 Filebeat-1.14.1版本能够调试成功。

  • 云监控中的 Promethus 服务已经嵌入了 Grafana 监控服务。但不支持自定义数据源,该嵌入的 Grafana 只能接入 Promethus,需使用独立灰度发布的 Grafana 才能完成ES数据接入 Grafana。

流计算 Oceanus 限量秒杀专享活动火爆进行中↓↓

点击文末「阅读原文」,了解腾讯云流计算 Oceanus 更多信息~

腾讯云大数据

长按二维码
关注我们

基于流计算oceanus和elasticsearchservice构建百亿级实时监控系统

为什么要构建监控系统作者:龙逸尘,腾讯CSIG高级工程师在后移动互联网时代,良好的用户体验是增长的基础,稳定的使用体验就是用户体验的基础。大型的互联网公司,特别是面向C端客户的公司,对业... 查看详情

袋鼠云:基于flink构建实时计算平台的总体架构和关键技术点

...一的数据同步工具,既可以采集静态的数据,也可以采集实时变化的数据,是全域、异构、批流一体的数据同步引擎。大家喜欢的话请给我们点个star!star!star!github开源项目:https://github.com/DTStack/flinkxgitee 查看详情

flink基于1.15.2的java开发-实时流计算商品销售热榜(代码片段)

需求每一个商品被卖出去一条就以以下格式通过kafka发送过来,只对status=101的productId进行统计:#格式:productId,statusCodea1001,101a1001,102a1001,101a1003,101a1002,101假设每过60s有上述内容被发送过来,那么flink应该会形成... 查看详情

基于flink建设流批一体实时数仓

...;却还没搞过 Flink?每年双十一,阿里都在 Flink 实时计算技术的驱动下全程保持了“如丝般顺滑”,基于 Flink 的阿里巴巴实时计算平台简直强·无敌。最恐怖的是,阿里几乎每年的实时计算峰值都达到了破纪录... 查看详情

流计算oceanus|flinkjvm内存超限的分析方法总结(代码片段)

等常见的需要使用Native内存且容易造成内存泄漏的第三方库,而且从GC日志来看,堆内各个区域远远没有用满,说明余量还是比较充足的。类来描述的。首先Flink的ResourceManager会调用工具类,从用户和系统的各项配置Configuration中... 查看详情

flink视频教程_基于flink流处理的动态实时电商实时分析系统

Flink视频教程_基于Flink流处理的动态实时电商实时分析系统课程分享地址链接:https://pan.baidu.com/s/1cX7O-45y6yUPT4B-ACfliA密码:jqmk在开始学习前给大家说下什么是Flink?1.Flink是一个针对流数据和批数据的分布式处理引擎,主要用Java代码... 查看详情

基于kafka的实时计算引擎:flink能否替代spark?

...欺诈检测、出租车预订、患者监控等场景处理时,需要对实时数据进行实时处理,以便做出快速可行的决策。目前业界有开源不少实时计算引擎,以Apache基金会的两款开源实时计算引擎最受欢迎,它们分 查看详情

基于flink构建全场景多维度实时计算数仓

...f0c;却还没搞过 Flink?去年双十一,阿里在 Flink 实时计算技术的驱动下全程保持了“如丝般顺滑”,基于 Flink 的阿里巴巴实时计算平台简直强·无敌。最恐怖的是,阿里当时的实时计算峰值达到了破纪录的每秒... 查看详情

flink和spark对比

...个个批次,通过分布式数据集RDD进行批量处理,是一种伪实时。而Flink是基于事件驱动,它是一个面向流的处理框架,Flink基于每个事件一行一行地流式处理,是真正的流式计算。1、技术理念不同:Spark的技术理念是使用微批来... 查看详情

作业帮基于flink的实时计算平台实践

更多Flink相关技术问题,可扫码加入社区钉钉交流群~   戳我,查看原文视频~ 查看详情

flink基础入门(含案例)(代码片段)

...08;tez)--->Spark流批处理框架,内存计算(伪实时)-->flink流批处理,内存计算(真正的实时计算)flinkvsspark什么是flinkflink是一个分布式,高性能,随时可用的以及准确的流处理计算框架࿰... 查看详情

flink学习笔记概述

...分布式流处理框架,它能够在大规模的数据流上进行实时计算和批处理。Flink支持丰富的API,包括DataStreamAPI和DataSetAPI,可以在多种计算场景中使用,例如实时数据处理、批处理、图形计算和机器学习等。Flink还具... 查看详情

美团基于flink的实时数仓平台建设新进展

...k系统性学习笔记1.平台建设现状美团于2018年首次引入Flink实时计算引擎,当时的实时数仓概念还不太普及,平台只提供了FlinkJar任务的生命周期管理和监控报警。2019年,我们注意到实时计算的主要应用场景是解决离线... 查看详情

flink+kafka实现wordcount实时计算(代码片段)

1.FlinkFlink介绍:Flink是一个针对流数据和批数据的分布式处理引擎。它主要是由Java代码实现。目前主要还是依靠开源社区的贡献而发展。对Flink而言,其所要处理的主要场景就是流数据,批数据只是流数据的一个极限特例而已。... 查看详情

flink流处理的动态实时亿级全端用户画像系统视频课程分享

基于Flink流处理的动态实时亿级全端用户画像系统课程下载:https://pan.baidu.com/s/1YtMs-XG5-PsTFV9_7-AlfA提取码:639m项目中采用到的算法包含LogisticRegression、Kmeans、TF-IDF等,Flink暂时支持的算法比较少,对于以上算法,本课程将手把手带大... 查看详情

基于实时计算flink版的场景解决方案demo

简介:通过两个demo分享技术实时计算flink版的解决方案本文整理自阿里云智能行业解决方案专家GIN的直播分享直播链接:https://developer.aliyun.com/learning/course/839本文主要分享两个基于Flink制作的实时大数据的应用。为了更好... 查看详情

快手基于flink构建实时数仓场景化实践

简介: 一文了解快手基于Flink构建的实时数仓架构,以及一些难题的解决方案。本文整理自快手数据技术专家李天朔在5月22日北京站FlinkMeetup分享的议题《快手基于Flink构建实时数仓场景化实践》,内容包括:快... 查看详情

聊聊批计算、流计算、hadoop、spark、storm、flink等等

...。单个处理数据量大,处理速度比流慢。流:处理在线,实时产生的数据。单次处理的数据量小,但处理速度更快。Spark是UCBerkeleyAMPlab所开源的类HadoopMapReduce的通用并行框架。Spark,拥有HadoopMapReduce所具有的优点;但不同于MapRedu... 查看详情