大数据技术之dataxdatax之opentsdbwriter插件开发(代码片段)

脚丫先生 脚丫先生     2022-12-01     710

关键词:

大家好,我是脚丫先生 (o^^o)

大数据项目之数据集成模块,按照项目需求需要集成时序数据库OpenTSDB。于是着手进行调研,https://github.com/alibaba/DataX
发现关于该时序数据库的插件只有单一的读插件,而阿里自研的TSDB读写插件都齐全。为了彻底的分离,同时为了完全适配OpenTSDB数据库,于是进行了OpenTSDB的写插件开发。


文章目录

一、OpenTSDB时序数据库

官方描述:OpenTSDB is a distributed, scalable Time Series Database (TSDB) written on top of HBase;翻译过来就是,基于Hbase的分布式的,可伸缩的时间序列数据库。
主要用途:常常用于集成到监控系统,譬如收集大规模集群(包括网络设备、操作系统、应用程序)的监控数据并进行存储,查询。
详细: OpenTSDB数据库,数据都是以metric为单位的进行存储。可以这样对metric进行理解,metric就是一个需要监控的指标或者项,譬如服务器的话,会有CPU使用率、内存使用率,磁盘使用率等。OpenTSDB是基于HBase作为存储底层,因此对数据存储支持到秒级别,并且支持数据永久存储,不会主动删除。
OpenTSDB存储的一些核心概念: 我们以一个实际的场景进行理解:例如,我们采集1个服务器的CPU使用率,发现该服务器在21:00的时候,CPU使用率达到99%。

  • Metric:监控项。譬如上面的CPU使用率。

  • Tags:就是一些标签,在OpenTSDB里面,Tags由tagk和tagv组成,即tagk:takv。标签是用来描述Metric的。

  • Value:一个Value表示一个metric的实际数值,譬如上面的99%

  • Timestamp:即时间戳,用来描述Value是什么时候的;譬如上面的21:00

  • Data Point:即某个Metric在某个时间点的数值。

     Data Point包括以下部分:Metric、Tags、Value、Timestamp
    
     上面描述的服务器在21:00时候的cpu使用率,就是1个DataPoint
    

保存到OpenTSDB的数据,就是无数个DataPoint。

例如一个DataPoint


    "metric":"temperature", 
    "timestamp":1567675709879, 
    "value":20.5, 
    "tags":
        "host":"192.168.239.128"
    


指标名称:temperature
指标标签:host=192.168.239.128
指标值:20.5
时间戳:1567675709879

二、API接口

官方地址:https://www.w3cschool.cn/doc_opentsdb/opentsdb-api_http-put.html?lang=en

1、保存

2、查询

3、删除

三、opentsdbwriter初步开发

1、根据前文进行add插件模块

2、opentsdb数据库的java工具类

如今,卷又卷的时代,我们必须要站在前人的肩膀上探索,不需要重复的去造轮子。这里利用github上开源的opentsdb工具类进行插件的开发。地址如下所示:

https://github.com/fangpanpan/opentsdb-java-sdk

3、插件的开发

对于阿里datax的opentsdbwriter插件的开发,主要逻辑OpenTSDBWriter去实现。代码如下所示:

package com.alibaba.datax.plugin.writer.opentsdbwriter;


import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.fastjson.JSONArray;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.nio.reactor.IOReactorException;
import org.opentsdb.client.OpenTSDBClient;
import org.opentsdb.client.OpenTSDBClientFactory;
import org.opentsdb.client.OpenTSDBConfig;
import org.opentsdb.client.bean.request.Point;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;

/**
 * @date : 12:11 2022/1/15
 */
public class OpenTSDBWriter extends Writer 

    public static class Job extends Writer.Job

        private static final Logger LOG = LoggerFactory.getLogger(Job.class);

        private Configuration originalConfig;

        @Override
        public List<Configuration> split(int mandatoryNumber) 
       //按照reader 配置文件的格式  来 组织相同个数的writer配置文件

            ArrayList<Configuration> configurations = new ArrayList<Configuration>(mandatoryNumber);
            for (int i = 0; i < mandatoryNumber; i++) 
                configurations.add(this.originalConfig.clone());
            
            return configurations;
        

        @Override
        public void prepare() 
        

        @Override
        public void init() 
          //获取配置文件信息parameter 里面的参数
            this.originalConfig = super.getPluginJobConf();

            String address = originalConfig.getString(Key.ENDPOINT);
            if (StringUtils.isBlank(address)) 
                throw DataXException.asDataXException(
                        OpenTSDBWriterErrorCode.REQUIRED_VALUE,
                        "The parameter [" + Key.ENDPOINT + "] is not set.");
            

            List<String> columns = originalConfig.getList(Key.COLUMN, String.class);
            if (columns == null || columns.isEmpty()) 
                throw DataXException.asDataXException(
                        OpenTSDBWriterErrorCode.REQUIRED_VALUE,
                        "The parameter [" + Key.COLUMN + "] is not set.");
            

            List<String> columnTypes = originalConfig.getList(Key.COLUMNTYPE, String.class);

            if (columnTypes == null || columnTypes.isEmpty()) 
                throw DataXException.asDataXException(
                        OpenTSDBWriterErrorCode.REQUIRED_VALUE,
                        "The parameter [" + Key.COLUMNTYPE + "] is not set.");
            

            String metric = originalConfig.getString(Key.METRIC);
            if (StringUtils.isBlank(metric)) 
                throw DataXException.asDataXException(
                        OpenTSDBWriterErrorCode.REQUIRED_VALUE,
                        "The parameter [" + Key.METRIC + "] is not set.");
            
        

        @Override
        public void destroy() 

        
    
    public static class Task extends Writer.Task 

        private static final Logger LOG = LoggerFactory.getLogger(Task.class);
        private OpenTSDBConfig config;
        private List<String>  columnTypes;
        private List<String>  columns;
        private String metric;

        @Override
        public void init() 
            Configuration writerSliceConfig = getPluginJobConf();
            String address = writerSliceConfig.getString(Key.ENDPOINT);
            this.columnTypes = writerSliceConfig.getList(Key.COLUMNTYPE, String.class);
            this.columns = writerSliceConfig.getList(Key.COLUMN, String.class);
            this.metric = writerSliceConfig.getString(Key.METRIC);
            this.config = OpenTSDBConfig.address(address, 4242)
                    .config();
        

        @Override
        public void startWrite(RecordReceiver recordReceiver) 
            try 
                Record record;
                List<Number> values = new ArrayList<>();
                List<Map<String,String>> tags = new ArrayList<>();
                while ((record = recordReceiver.getFromReader()) != null) 
                    final int recordLength = record.getColumnNumber();
                    Map<String,String> tag = new HashMap<>();
                    for (int i = 0; i < recordLength; i++) 
                        if (columnTypes.get(i).equals("tag"))
                            tag.put(columns.get(i),record.getColumn(i).asString());
                        
                        if (columnTypes.get(i).equals("value"))
                            values.add(record.getColumn(i).asBigInteger());
                            tags.add(tag);
                        
                    
                
                OpenTSDBClient client = OpenTSDBClientFactory.connect(config);
                for (int i = 0;i < values.size();i++)
                    Point point = Point.metric(metric)
                            .tag(tags.get(i))
                            .value(System.currentTimeMillis(), values.get(i))
                            .build();
                    client.put(point);
                
                client.gracefulClose();
            catch (Exception e)
                e.printStackTrace();
            

        

        @Override
        public void post() 
        

        @Override
        public void destroy() 

        
    



四、OpenTSDBWriter 插件的job

我这里想去实现一个场景:把mysql的数据通过datax导入到opentsdb数据库。

1、在mysql数据库新建源表

需要在mysql的数据库里,新建自己想同步数据的表。

CREATE TABLE `mysql_to_opentsdb` (
  `id` int(11) DEFAULT NULL,
  `name` varchar(255) DEFAULT NULL,
  `sex` varchar(255) DEFAULT NULL,
  `price` decimal(10,2) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

2、编写job


  "job": 
    "content": [
      
        "reader": 
          "parameter": 
            "password": "cetc@2021",
            "connection": [
              
                "querySql": [
                  " SELECT id,name,sex,price FROM mysql_to_opentsdb ; "
                ],
                "jdbcUrl": [
                  "jdbc:mysql://192.168.239.128:3306/test"
                ]
              
            ],
            "splitPk": "",
            "username": "root"
          ,
          "name": "mysqlreader"
        ,
        "writer": 
          "parameter": 
            "columnType": [
              "tag",
              "tag",
              "tag",
              "value"
            ],
            "endpoint": "http://172.10.10.51",
            "metric": "dog",
            "column": [
              "id",
              "name",
              "sex",
              "price"
            ]
          ,
          "name": "opentsdbwriter"
        
      
    ],
    "setting": 
      "errorLimit": 
        "record": 0,
        "percentage": 0.02
      ,
      "speed": 
        "channel": 3
      
    
  

关于datax的job需要自己去进行编写,因此我们需要根据项目业务进行设计。
endpoint: opentsdb的地址,4242端口已经固定。
metric:指标。

最后,进行maven命令行的打包操作,生成我们需要的插件,然后进行数据同步操作,

大数据技术之hadoop入门

?第1章大数据概论1.1大数据概念 大数据概念如图2-1所示。 图2-1大数据概念 1.2大数据特点(4V) 大数据特点如图2-2,2-3,2-4,2-5所示 图2-2大数据特点之大量 图2-3大数据特点之高速 图2-4大数据特点之多样 图2-5大数据... 查看详情

大数据技术之大数据概论

大数据概论第1章大数据概念第2章大数据特点(4V)1、Volume(大量)2、Velocity(高速)3、Variety(多样)4、Value(低价值密度)第3章大数据应用场景第4章大数据发展前景第5章大数据部门间业... 查看详情

大数据技术之大数据概论

一、大数据概念   大数据(bigdata),指无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合,是需要新处理模式才能具有更强的决策力、洞察发现力和流程优化能力的海量、高增长率和多样化的... 查看详情

大数据分析之技术框架整理

大数据离线部分HDFS1:HDFS的架构部分及工作原理NameNode:负责管理元素据,将信息保存在内存中DataNode:保存数据,以块的形式保存。启动后需要定时的向NameNode发送心跳,报告自身存储的块信息2:HDFS的上传过程3:HDFS的下载4:N... 查看详情

大数据技术之大数据概论

第1章大数据概念大数据(BigData):指无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合,是需要新处理模式才能具有更强的决策力、洞察发现力和流程优化能力的海量、高增长率和多样化的信息资产。大... 查看详情

大数据技术之dolphinscheduler

架构设计​1.1系统架构图​1.2启动流程图​1.3架构说明​MasterServer​MasterServer采用分布式无中心设计理念,MasterServer主要负责DAG任务切分、任务提交监控,并同时监听其它MasterServer和WorkerServer的健康状态。MasterServer服务启动时向... 查看详情

大数据技术之数据采集篇

【导读】数据采集是进行大数据分析的前提也是必要条件,在整个流程中占据重要地位。本文将介绍大数据三种采集形式:系统日志采集法、网络数据采集法以及其他数据采集法。(一)系统日志采集法系统日志是记录系统中硬... 查看详情

大数据技术之linux(上)

...了JAVASE基础这座大山之后,终于能够学习我最爱的大数据技术啦!    第一天学习的是Linux知识,那么----                 它是什么& 查看详情

大数据技术之hadoop(mapreduce)

...e核心思想WordCount案例Hadoop序列化MapReduce框架原理InputFormat数据输入MapReduce定义MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。MapReduce核心功能是将用户编写的业务逻辑代码和... 查看详情

大数据技术之hadoop(hdfs)

第1章HDFS概述1.1HDFS产出背景及定义 1.2HDFS优缺点 1.3HDFS组成架构 1.4HDFS文件块大小(面试重点) 第2章HDFS的Shell操作(开发重点)1.基本语法 bin/hadoopfs具体命令ORbin/hdfsdfs具体命令dfs是fs的实现类。2.命令大全 [[em... 查看详情

建议收藏大数据技术之hadoop(生产调优手册)(代码片段)

大数据技术之Hadoop(生产调优手册)1.HDFS—核心参数1.1NameNode内存生产配置1.2NameNode心跳并发配置1.3开启回收站配置2.HDFS—集群压测2.1测试HDFS写性能2.2测试HDFS读性能3.HDFS—多目录3.1NameNode多目录配置3.2DataNode多目录配置3.3... 查看详情

一文带你了解大数据技术之hdfs

大数据技术之Hadoop-HDFS概述1.HDFS产出背景及定义2.HDFS优缺点3.HDFS组成架构4.HDFS文件块大小1.HDFS产出背景及定义1)HDFS产生背景随着数据量越来越大,在一个操作系统存不下所有的数据,那么就分配到更多的操作系统管... 查看详情

大数据技术之zookeeper(代码片段)

文章目录1Zookeeper入门1.1概述1.2Zookeeper特点1.3数据结构1.4应用场景2Zookeeper安装2.1本地模式安装部署2.2配置参数解读3Zookeeper实战(开发重点)3.1分布式安装部署3.2客户端命令行操作3.3API应用3.4监听服务器节点动态上下线案... 查看详情

大数据技术之zookeeper(代码片段)

文章目录1Zookeeper入门1.1概述1.2Zookeeper特点1.3数据结构1.4应用场景2Zookeeper安装2.1本地模式安装部署2.2配置参数解读3Zookeeper实战(开发重点)3.1分布式安装部署3.2客户端命令行操作3.3API应用3.4监听服务器节点动态上下线案... 查看详情

大数据技术之hive(代码片段)

...质1.2Hive的优缺点1.2.1优点1.2.2缺点1.3Hive架构原理1.4Hive和数据库比较1.4.1查询语言1.4.2数据更新1.4.3执行延迟1.4.4数据规模第2章Hive常用命令第3章Hive数据类型3.1基本数据类型3.2类型转化第4章DDL数据定义4.1创建数据库4.2查询数据库4.2.... 查看详情

大数据技术之hadoop——zookeeper(代码片段)

目录 一、认识Zookeeper1、概念2、特性3、集群角色二、数据模型1、数据存储结构2、Znode的类型3、Znode的属性 三、Zookeeper的Watch机制1、Watch机制的认识2、Watch机制的通知状态和时间类型四、Zookeeper的选举机制1、选举机制的认识2、... 查看详情

一文带你了解大数据技术之zookeeper(入门级)(代码片段)

大数据技术之Zookeeper入门1.Zookeeper概述2.Zookeeper特点3.数据结构4.应用场景5.下载地址1.Zookeeper概述Zookeeper是一个开源的分布式的,为分布式应用提供协调服务的Apache项目。Zookeeper的工作机制:2.Zookeeper特点3.数据结构4.应用... 查看详情

一文带你了解大数据技术之hadoop(代码片段)

...概述5.3MapReduce架构概述5.4HDFS、YARN、MapReduce三者关系6.大数据技术生态体系7.推荐系统框架 查看详情