关键词:
大家好,我是脚丫先生 (o^^o)
大数据项目之数据集成模块,按照项目需求需要集成时序数据库OpenTSDB。于是着手进行调研,https://github.com/alibaba/DataX
发现关于该时序数据库的插件只有单一的读插件,而阿里自研的TSDB读写插件都齐全。为了彻底的分离,同时为了完全适配OpenTSDB数据库,于是进行了OpenTSDB的写插件开发。
文章目录
一、OpenTSDB时序数据库
官方描述:OpenTSDB is a distributed, scalable Time Series Database (TSDB) written on top of HBase;翻译过来就是,基于Hbase的分布式的,可伸缩的时间序列数据库。
主要用途:常常用于集成到监控系统,譬如收集大规模集群(包括网络设备、操作系统、应用程序)的监控数据并进行存储,查询。
详细: OpenTSDB数据库,数据都是以metric为单位的进行存储。可以这样对metric进行理解,metric就是一个需要监控的指标或者项,譬如服务器的话,会有CPU使用率、内存使用率,磁盘使用率等。OpenTSDB是基于HBase作为存储底层,因此对数据存储支持到秒级别,并且支持数据永久存储,不会主动删除。
OpenTSDB存储的一些核心概念: 我们以一个实际的场景进行理解:例如,我们采集1个服务器的CPU使用率,发现该服务器在21:00的时候,CPU使用率达到99%。
-
Metric:监控项。譬如上面的CPU使用率。
-
Tags:就是一些标签,在OpenTSDB里面,Tags由tagk和tagv组成,即tagk:takv。标签是用来描述Metric的。
-
Value:一个Value表示一个metric的实际数值,譬如上面的99%
-
Timestamp:即时间戳,用来描述Value是什么时候的;譬如上面的21:00
-
Data Point:即某个Metric在某个时间点的数值。
Data Point包括以下部分:Metric、Tags、Value、Timestamp 上面描述的服务器在21:00时候的cpu使用率,就是1个DataPoint
保存到OpenTSDB的数据,就是无数个DataPoint。
例如一个DataPoint
"metric":"temperature",
"timestamp":1567675709879,
"value":20.5,
"tags":
"host":"192.168.239.128"
指标名称:temperature
指标标签:host=192.168.239.128
指标值:20.5
时间戳:1567675709879
二、API接口
官方地址:https://www.w3cschool.cn/doc_opentsdb/opentsdb-api_http-put.html?lang=en
1、保存
2、查询
3、删除
三、opentsdbwriter初步开发
1、根据前文进行add插件模块
2、opentsdb数据库的java工具类
如今,卷又卷的时代,我们必须要站在前人的肩膀上探索,不需要重复的去造轮子。这里利用github上开源的opentsdb工具类进行插件的开发。地址如下所示:
https://github.com/fangpanpan/opentsdb-java-sdk
3、插件的开发
对于阿里datax的opentsdbwriter插件的开发,主要逻辑OpenTSDBWriter去实现。代码如下所示:
package com.alibaba.datax.plugin.writer.opentsdbwriter;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.fastjson.JSONArray;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.nio.reactor.IOReactorException;
import org.opentsdb.client.OpenTSDBClient;
import org.opentsdb.client.OpenTSDBClientFactory;
import org.opentsdb.client.OpenTSDBConfig;
import org.opentsdb.client.bean.request.Point;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
/**
* @date : 12:11 2022/1/15
*/
public class OpenTSDBWriter extends Writer
public static class Job extends Writer.Job
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
private Configuration originalConfig;
@Override
public List<Configuration> split(int mandatoryNumber)
//按照reader 配置文件的格式 来 组织相同个数的writer配置文件
ArrayList<Configuration> configurations = new ArrayList<Configuration>(mandatoryNumber);
for (int i = 0; i < mandatoryNumber; i++)
configurations.add(this.originalConfig.clone());
return configurations;
@Override
public void prepare()
@Override
public void init()
//获取配置文件信息parameter 里面的参数
this.originalConfig = super.getPluginJobConf();
String address = originalConfig.getString(Key.ENDPOINT);
if (StringUtils.isBlank(address))
throw DataXException.asDataXException(
OpenTSDBWriterErrorCode.REQUIRED_VALUE,
"The parameter [" + Key.ENDPOINT + "] is not set.");
List<String> columns = originalConfig.getList(Key.COLUMN, String.class);
if (columns == null || columns.isEmpty())
throw DataXException.asDataXException(
OpenTSDBWriterErrorCode.REQUIRED_VALUE,
"The parameter [" + Key.COLUMN + "] is not set.");
List<String> columnTypes = originalConfig.getList(Key.COLUMNTYPE, String.class);
if (columnTypes == null || columnTypes.isEmpty())
throw DataXException.asDataXException(
OpenTSDBWriterErrorCode.REQUIRED_VALUE,
"The parameter [" + Key.COLUMNTYPE + "] is not set.");
String metric = originalConfig.getString(Key.METRIC);
if (StringUtils.isBlank(metric))
throw DataXException.asDataXException(
OpenTSDBWriterErrorCode.REQUIRED_VALUE,
"The parameter [" + Key.METRIC + "] is not set.");
@Override
public void destroy()
public static class Task extends Writer.Task
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
private OpenTSDBConfig config;
private List<String> columnTypes;
private List<String> columns;
private String metric;
@Override
public void init()
Configuration writerSliceConfig = getPluginJobConf();
String address = writerSliceConfig.getString(Key.ENDPOINT);
this.columnTypes = writerSliceConfig.getList(Key.COLUMNTYPE, String.class);
this.columns = writerSliceConfig.getList(Key.COLUMN, String.class);
this.metric = writerSliceConfig.getString(Key.METRIC);
this.config = OpenTSDBConfig.address(address, 4242)
.config();
@Override
public void startWrite(RecordReceiver recordReceiver)
try
Record record;
List<Number> values = new ArrayList<>();
List<Map<String,String>> tags = new ArrayList<>();
while ((record = recordReceiver.getFromReader()) != null)
final int recordLength = record.getColumnNumber();
Map<String,String> tag = new HashMap<>();
for (int i = 0; i < recordLength; i++)
if (columnTypes.get(i).equals("tag"))
tag.put(columns.get(i),record.getColumn(i).asString());
if (columnTypes.get(i).equals("value"))
values.add(record.getColumn(i).asBigInteger());
tags.add(tag);
OpenTSDBClient client = OpenTSDBClientFactory.connect(config);
for (int i = 0;i < values.size();i++)
Point point = Point.metric(metric)
.tag(tags.get(i))
.value(System.currentTimeMillis(), values.get(i))
.build();
client.put(point);
client.gracefulClose();
catch (Exception e)
e.printStackTrace();
@Override
public void post()
@Override
public void destroy()
四、OpenTSDBWriter 插件的job
我这里想去实现一个场景:把mysql的数据通过datax导入到opentsdb数据库。
1、在mysql数据库新建源表
需要在mysql的数据库里,新建自己想同步数据的表。
CREATE TABLE `mysql_to_opentsdb` (
`id` int(11) DEFAULT NULL,
`name` varchar(255) DEFAULT NULL,
`sex` varchar(255) DEFAULT NULL,
`price` decimal(10,2) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
2、编写job
"job":
"content": [
"reader":
"parameter":
"password": "cetc@2021",
"connection": [
"querySql": [
" SELECT id,name,sex,price FROM mysql_to_opentsdb ; "
],
"jdbcUrl": [
"jdbc:mysql://192.168.239.128:3306/test"
]
],
"splitPk": "",
"username": "root"
,
"name": "mysqlreader"
,
"writer":
"parameter":
"columnType": [
"tag",
"tag",
"tag",
"value"
],
"endpoint": "http://172.10.10.51",
"metric": "dog",
"column": [
"id",
"name",
"sex",
"price"
]
,
"name": "opentsdbwriter"
],
"setting":
"errorLimit":
"record": 0,
"percentage": 0.02
,
"speed":
"channel": 3
关于datax的job需要自己去进行编写,因此我们需要根据项目业务进行设计。
endpoint: opentsdb的地址,4242端口已经固定。
metric:指标。
最后,进行maven命令行的打包操作,生成我们需要的插件,然后进行数据同步操作,
大数据技术之hadoop入门
?第1章大数据概论1.1大数据概念 大数据概念如图2-1所示。 图2-1大数据概念 1.2大数据特点(4V) 大数据特点如图2-2,2-3,2-4,2-5所示 图2-2大数据特点之大量 图2-3大数据特点之高速 图2-4大数据特点之多样 图2-5大数据... 查看详情
大数据技术之大数据概论
大数据概论第1章大数据概念第2章大数据特点(4V)1、Volume(大量)2、Velocity(高速)3、Variety(多样)4、Value(低价值密度)第3章大数据应用场景第4章大数据发展前景第5章大数据部门间业... 查看详情
大数据技术之大数据概论
一、大数据概念 大数据(bigdata),指无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合,是需要新处理模式才能具有更强的决策力、洞察发现力和流程优化能力的海量、高增长率和多样化的... 查看详情
大数据分析之技术框架整理
大数据离线部分HDFS1:HDFS的架构部分及工作原理NameNode:负责管理元素据,将信息保存在内存中DataNode:保存数据,以块的形式保存。启动后需要定时的向NameNode发送心跳,报告自身存储的块信息2:HDFS的上传过程3:HDFS的下载4:N... 查看详情
大数据技术之大数据概论
第1章大数据概念大数据(BigData):指无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合,是需要新处理模式才能具有更强的决策力、洞察发现力和流程优化能力的海量、高增长率和多样化的信息资产。大... 查看详情
大数据技术之dolphinscheduler
架构设计1.1系统架构图1.2启动流程图1.3架构说明MasterServerMasterServer采用分布式无中心设计理念,MasterServer主要负责DAG任务切分、任务提交监控,并同时监听其它MasterServer和WorkerServer的健康状态。MasterServer服务启动时向... 查看详情
大数据技术之数据采集篇
【导读】数据采集是进行大数据分析的前提也是必要条件,在整个流程中占据重要地位。本文将介绍大数据三种采集形式:系统日志采集法、网络数据采集法以及其他数据采集法。(一)系统日志采集法系统日志是记录系统中硬... 查看详情
大数据技术之linux(上)
...了JAVASE基础这座大山之后,终于能够学习我最爱的大数据技术啦! 第一天学习的是Linux知识,那么---- 它是什么& 查看详情
大数据技术之hadoop(mapreduce)
...e核心思想WordCount案例Hadoop序列化MapReduce框架原理InputFormat数据输入MapReduce定义MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。MapReduce核心功能是将用户编写的业务逻辑代码和... 查看详情
大数据技术之hadoop(hdfs)
第1章HDFS概述1.1HDFS产出背景及定义 1.2HDFS优缺点 1.3HDFS组成架构 1.4HDFS文件块大小(面试重点) 第2章HDFS的Shell操作(开发重点)1.基本语法 bin/hadoopfs具体命令ORbin/hdfsdfs具体命令dfs是fs的实现类。2.命令大全 [[em... 查看详情
建议收藏大数据技术之hadoop(生产调优手册)(代码片段)
大数据技术之Hadoop(生产调优手册)1.HDFS—核心参数1.1NameNode内存生产配置1.2NameNode心跳并发配置1.3开启回收站配置2.HDFS—集群压测2.1测试HDFS写性能2.2测试HDFS读性能3.HDFS—多目录3.1NameNode多目录配置3.2DataNode多目录配置3.3... 查看详情
一文带你了解大数据技术之hdfs
大数据技术之Hadoop-HDFS概述1.HDFS产出背景及定义2.HDFS优缺点3.HDFS组成架构4.HDFS文件块大小1.HDFS产出背景及定义1)HDFS产生背景随着数据量越来越大,在一个操作系统存不下所有的数据,那么就分配到更多的操作系统管... 查看详情
大数据技术之zookeeper(代码片段)
文章目录1Zookeeper入门1.1概述1.2Zookeeper特点1.3数据结构1.4应用场景2Zookeeper安装2.1本地模式安装部署2.2配置参数解读3Zookeeper实战(开发重点)3.1分布式安装部署3.2客户端命令行操作3.3API应用3.4监听服务器节点动态上下线案... 查看详情
大数据技术之zookeeper(代码片段)
文章目录1Zookeeper入门1.1概述1.2Zookeeper特点1.3数据结构1.4应用场景2Zookeeper安装2.1本地模式安装部署2.2配置参数解读3Zookeeper实战(开发重点)3.1分布式安装部署3.2客户端命令行操作3.3API应用3.4监听服务器节点动态上下线案... 查看详情
大数据技术之hive(代码片段)
...质1.2Hive的优缺点1.2.1优点1.2.2缺点1.3Hive架构原理1.4Hive和数据库比较1.4.1查询语言1.4.2数据更新1.4.3执行延迟1.4.4数据规模第2章Hive常用命令第3章Hive数据类型3.1基本数据类型3.2类型转化第4章DDL数据定义4.1创建数据库4.2查询数据库4.2.... 查看详情
大数据技术之hadoop——zookeeper(代码片段)
目录 一、认识Zookeeper1、概念2、特性3、集群角色二、数据模型1、数据存储结构2、Znode的类型3、Znode的属性 三、Zookeeper的Watch机制1、Watch机制的认识2、Watch机制的通知状态和时间类型四、Zookeeper的选举机制1、选举机制的认识2、... 查看详情
一文带你了解大数据技术之zookeeper(入门级)(代码片段)
大数据技术之Zookeeper入门1.Zookeeper概述2.Zookeeper特点3.数据结构4.应用场景5.下载地址1.Zookeeper概述Zookeeper是一个开源的分布式的,为分布式应用提供协调服务的Apache项目。Zookeeper的工作机制:2.Zookeeper特点3.数据结构4.应用... 查看详情
一文带你了解大数据技术之hadoop(代码片段)
...概述5.3MapReduce架构概述5.4HDFS、YARN、MapReduce三者关系6.大数据技术生态体系7.推荐系统框架 查看详情