flume简介与使用——thriftsource采集数据

WOTGL WOTGL     2022-08-01     718

关键词:

Flume简介与使用(二)——Thrift Source采集数据

  继上一篇安装Flume后,本篇将介绍如何使用Thrift Source采集数据。

  Thrift是Google开发的用于跨语言RPC通信,它拥有功能强大的软件堆栈和代码生成引擎,允许定义一个简单的IDL文件来生成不同语言的代码,服务器端和客户端通过共享这个IDL文件来构建来完成通信。

  Flume的Thrift Source是其实现的众多Source中的一个,Flume已经实现了服务器端,因此我们可以用任意自己熟悉的语言编写自己的Thrift Source客户端来采集数据,然后发送给Thrift Source服务器端。

  [一]、生成C++代码

  下载源码版的Flume,在apache-flume-1.6.0-src\flume-ng-sdk\src\main\thrift目录下有Flume定义好的flume.thrift文件,现在只要用这个文件来生成我们需要的代码就行了。

  flume.thrift文件内容如下:

 1 namespace java org.apache.flume.thrift
 2 
 3 struct ThriftFlumeEvent {
 4   1: required map <string, string> headers,
 5   2: required binary body,
 6 }
 7 
 8 enum Status {
 9   OK,
10   FAILED,
11   ERROR,
12   UNKNOWN
13 }
14 
15 service ThriftSourceProtocol {
16   Status append(1: ThriftFlumeEvent event),
17   Status appendBatch(1: list<ThriftFlumeEvent> events),
18 }

  1、定义了一个ThriftFlumeEvent结构体,用来封装发送的数据;

  2、定义了一个service类ThriftSourceProtocol,服务器端具体实现ThriftSourceProtocol里面的两个方法,再由客户端调用这些方法把数据传给Thrift Source服务器端。

  3、运行下面的命令:thrift --gen cpp flume.thrift,会在当前目录生成gen-cpp目录,里面是Thrift自动生成c++头文件和代码。(在这之前要先安装Thrift)

  [二]、下面是编写自己的客户端代码,我这里是接收远程传过来的数据,然后发送给Flume的Thrift Source服务器。

  1 #include <arpa/inet.h>
  2 #include <sys/types.h>
  3 #include <sys/socket.h>
  4 #include <pthread.h>
  5 #include <unistd.h>
  6 #include <stdlib.h>
  7 #include "include/MESA_prof_load.h"
  8 #include "include/MESA_handle_logger.h"
  9 
 10 #include <string>
 11 #include <iostream>
 12 #include "gen-cpp/flume_constants.h"
 13 #include "gen-cpp/flume_types.h"
 14 #include "gen-cpp/ThriftSourceProtocol.h"
 15 #include <thrift/protocol/TBinaryProtocol.h>
 16 #include <thrift/protocol/TCompactProtocol.h>
 17 #include <thrift/transport/TSocket.h>
 18 #include <thrift/transport/TTransportUtils.h>
 19 using namespace std;
 20 using namespace apache::thrift;
 21 using namespace apache::thrift::protocol;
 22 using namespace apache::thrift::transport;
 23 
 24 #define LOG_PATH "/home/zjf/DFcode/trafficlog/traffic_source.log"
 25 #define DATA_BUFFER 2048    //send buffer data length
 26 #define BUFLEN   2048       //received buffer data length
 27 #define BATCH_SIZE 1000     //send event num to flume once
 28 
 29 //defined my C++ object
 30 class ThriftClient{
 31     public:
 32         // Thrift protocol needings...
 33         boost::shared_ptr<TTransport> socket;
 34         boost::shared_ptr<TTransport> transport;
 35         boost::shared_ptr<TProtocol> protocol;
 36         ThriftSourceProtocolClient* pClient;
 37 
 38     public:
 39         ThriftClient();
 40 };
 41 //cconstruction function, init the thrift source server ip and port
 42 ThriftClient::ThriftClient():
 43     socket(new TSocket("10.208.129.12",5497)),
 44     transport(new TFramedTransport(socket)),
 45     protocol(new TCompactProtocol(transport))
 46 {
 47     pClient = new ThriftSourceProtocolClient(protocol);
 48 }
 49 
 50 //log
 51 struct log_info_t{
 52     char *path;
 53     int log_level;
 54     void * handle;
 55 };
 56 struct log_info_t log_info;
 57 const char *module = "zjf_traffic_data_collector";
 58 
 59 //类的对象
 60 ThriftClient *client = new ThriftClient();
 61 std::map<std::string, std::string>  headers;
 62 std::vector<ThriftFlumeEvent> eventbatch;
 63 unsigned long long pkt_num_tgl = 0;
 64 
 65 int RecvAndSendUDP(){
 66     MESA_handle_runtime_log(log_info.handle, RLOG_LV_INFO, module, "RecvUDP be called");
 67     int listen_socket;          //socket id
 68     struct sockaddr_in    local;    //client IP, where to recevied data
 69     struct sockaddr_in    from;      //server IP(local host)
 70     char server_addr[16] = "10.208.129.12";    //received traffic IP
 71     int server_port = 6789;                    //received traffic port
 72     char send_buf[DATA_BUFFER] = {0};          //data send to flume
 73     char Buf[BUFLEN] = {0};
 74     int fromlen;
 75     int len;
 76 
 77     //init socket
 78 reconnect:
 79     memset(&local, 0, sizeof(local));
 80     local.sin_family = AF_INET;
 81     local.sin_addr.s_addr = inet_addr(server_addr);
 82     local.sin_port = htons(server_port);
 83     listen_socket = socket(AF_INET, SOCK_DGRAM, 0); // UDP socket
 84     if(listen_socket < 0) {
 85         printf("error udp socket\n");
 86     }else{
 87         printf("listen_socket create OK\n");
 88     }
 89     if(bind(listen_socket, (struct sockaddr *)&local, sizeof(local)) < 0) {
 90         printf("error udp bind\n");
 91         return -1;
 92     }else{
 93         printf("socket bind OK\n");
 94     }
 95 
 96     while(1){
 97         char sip[16] = {0};
 98         char dip[16] = {0};
 99         char srcport[6] = {0};
100         char destport[6] = {0};
101         char url[BUFLEN] = {0};
102         memset(Buf,0,BUFLEN);
103         fromlen = sizeof(from);
104         len = recvfrom(listen_socket, (void *)Buf, (size_t)BUFLEN, 0, (struct sockaddr *)&from,(socklen_t *)&fromlen);
105         if(len == -1) {
106             printf("error udp recvfrom\n");
107             close(listen_socket);
108             goto reconnect;
109         }
110         //parse received buf, transform to key-value
111         int i;
112         int sip_loc = 0;
113         int sport_loc = 0;
114         int dip_loc = 0;
115         int dport_loc = 0;
116         int dotcount = 0;
117         for(i=0;Buf[i] != '\0';i++){
118             if(Buf[i] == '.'){
119                 dotcount++;
120                 if(dotcount == 4){
121                     sip_loc = i;
122                     memcpy(sip,Buf,i);
123                 }
124                 else if(dotcount == 8){
125                     dip_loc = i;
126                     memcpy(dip,Buf+sport_loc+1,dip_loc-sport_loc-1);
127                 }
128                 else if(dotcount == 9){
129                     dport_loc = i;
130                     memcpy(destport,Buf+dip_loc+1,dport_loc-dip_loc-1);
131                     break;
132                 }
133                 else{}
134             }
135             if(Buf[i] == '>'){
136                 sport_loc = i;
137                 memcpy(srcport,Buf+sip_loc+1,sport_loc-sip_loc-1);
138             }
139         }
140         memcpy(url,Buf+dport_loc+1,strlen(Buf)-dport_loc);
141         unsigned long src_ip = inet_addr(sip);
142         unsigned long dst_ip = inet_addr(dip);
143         sprintf(send_buf,"SrcIP=%u SrcPort=%s DestIP=%u DestPort=%s",ntohl(src_ip),srcport,ntohl(dst_ip),destport);
144         //construct an event and append to send
145         if(0 != strlen(send_buf) ){
146             pkt_num_tgl++;
147             string sBody(send_buf);
148             ThriftFlumeEvent tfEvent;
149             tfEvent.__set_headers(headers);
150             tfEvent.__set_body(sBody);
151             eventbatch.push_back(tfEvent);
152             if(eventbatch.size() >= BATCH_SIZE){
153                 if(!client->transport->isOpen())
154                     client->transport->open();
155                 Status::type res = client->pClient->appendBatch(eventbatch);
156                 if(res != Status::OK){
157                     MESA_handle_runtime_log(log_info.handle, RLOG_LV_FATAL, module, "WARNING: send event via thrift failed, return code:%d",res);
158                 }else{
159                     //printf("sended %lld event data to flume successful\n", pkt_num_tgl);
160                 }
161                 eventbatch.clear();
162             }
163         }
164         bzero(send_buf,DATA_BUFFER);
165     }
166 }
167 
168 
169 int main()
170 {
171     //create――logger
172     log_info.path = (char *)LOG_PATH;
173     log_info.log_level = 10;
174     log_info.handle = MESA_create_runtime_log_handle(log_info.path, log_info.log_level);
175     //open thrift connection
176     if(!client->transport->isOpen()){
177         client->transport->open();
178     }
179     eventbatch.clear();
180     RecvAndSendUDP();
181     return 0;
182 }

 [三]、编译并运行

  g++ -g -DHAVE_NETINET_IN_H -I. -I/usr/local/include/thrift -L/usr/local/lib rec_send_traffic_thrift.cpp gen-cpp/flume_constants.cpp gen-cpp/flume_types.cpp gen-cpp/ThriftSourceProtocol.cpp  -o  rec_send_traffic_thrift  -lthrift   -lpcap -L/usr/lib64 -lMESA_htable -lpthread -lMESA_handle_logger

  用守护进程启动程序:

 1 #!/bin/sh
 2 
 3 while [ 1 ]; do
 4     ulimit -c unlimited
 5     #./jz
 6     #cgexec -g cpu,memory:/MESA/jz ./jz >> jz.log
 7     ./rec_send_traffic_thrift
 8     #./jz
 9     echo program crashed, restart at `date +"%w %Y/%m/%d, %H:%M:%S"` >> RESTART.log
10     sleep 10
11 done

 


推荐博文:【1】http://www.micmiu.com/soa/rpc/thrift-sample/

     【2】http://www.mamicode.com/info-detail-869223.html

     【3】http://blog.csdn.net/yuzx2008/article/details/50179033

     【4】http://shiyanjun.cn/archives/456.html

     【5】http://flume.apache.org/FlumeDeveloperGuide.html#rpc-clients-avro-and-thrift

转载请注明原文出处,谢谢

flume简介与使用

Flume简介与使用(一)Flume简介  Flume是一个分布式的、可靠的、实用的服务——从不同的数据源高效的采集、整合、移动海量数据。    分布式:可以多台机器同时运行采集数据,不同Agent的之前通过网络传输数据... 查看详情

flume案例支持

...ro可以发送一个给定的文件给Flume,Avro源使用AVRORPC机制。ThriftSource:ThriftSource与AvroSource基本一致。只要把source的类型改成thrift即可,例如a1.sources.r1.type= 查看详情

日志抽取框架flume简介与安装配置(代码片段)

一:flume简介与功能二:flume安装与配置与简单测试一:flume的简介与功能架构1.1flume的简介:1.1.1Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发... 查看详情

flume安装与使用

 1.flume简介Flume是Cloudera提供的日志收集系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。Flume是一个分布式、可靠、和高可用... 查看详情

flume简介及使用(代码片段)

一、Flume概述1)官网地址http://flume.apache.org/2)日志采集工具  Flume是一种分布式,可靠且可用的服务,用于有效地收集,聚合和移动大量日志数据。它具有基于流数据流的简单灵活的架构。它具有可靠的可靠性机制和许多故障... 查看详情

flume简介

Kafka在实际的开发之中的确可以处理千万级别的数据,但是现在有一个问题,这些数据从哪里来呢?Kafka产生的初衷是进行数据的收集以及合理的消费,但是这些实际之中的数据我们应该如何获取,我们该用什么样的方式来获取... 查看详情

flume安装和使用(代码片段)

概览1-flume简介2-系统要求3-安装和配置4-启动和测试 一、flume的简介官网地址: http://flume.apache.org/1-概述Flume是一种分布式,可靠且可用的服务,用于高效地收集,汇总和移动大量日志数据。它具有基于流式数据流的简单... 查看详情

(01)flume简介

  1、Flume简单介绍  ApacheFlume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到... 查看详情

flume的安装与使用

Flume下载后,解压,新增一个配置文件,写入配置即可我将配置文件写在conf下,取名为flume-conf-spooldir.propertiesFlume运行命令:bin/flume-ngagent--confconf--conf-fileconf/flume-conf-spooldir.properties--nameLogAgent-Dflume.root.logger=DEBUG,con 查看详情

flume简介

绪论:  本文的内容包括flume的背景、数据流模型、常见的数据流操作、flumeagent启动和flumeagent简单实例。参考文档为flume官网的flume1.8.0FlumeUserGuide。一、背景  flume是由cloudera软件公司产出的可分布式日志收集系统,2009年被... 查看详情

理解flumeng的batchsize和transactioncapacity参数和传输事务的原理

基于ThriftSource,MemoryChannel,HdfsSink三个组件,对Flume数据传输的事务进行分析,如果使用的是其他组件,Flume事务具体的处理方式将会不同。Flume的事务处理原理: Flume在对Channel进行Put和Take操作的时候,必须要用事物包住,比如... 查看详情

flume1.7安装与使用

Flume安装系统要求: 需安装JDK1.7及以上版本1、下载二进制包 下载页面:http://flume.apache.org/download.html 1.7.0下载地址:http://www.apache.org/dyn/closer.lua/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz2、解压$cp~/Downloads 查看详情

flume简介

摘自:https://flume.apache.org/FlumeUserGuide.html是什么:Flume是一个用来收集聚合海量多来源日志数据并转移到一个数据存储中心的分布式,可依赖,高可用,高性能服务框架。他基于流数据提供简单灵活的架构。具有健壮性,容错性... 查看详情

flume原理分析与使用案例(代码片段)

1、flume的特点:  flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS... 查看详情

flume原理

1.flume简介   flume作为cloudera开发的实时日志收集系统,受到了业界的认可与广泛应用。Flume初始的发行版本目前被统称为FlumeOG(originalgeneration),属于cloudera。但随着FLume功能的扩展,FlumeOG代码工程臃肿、核心组件设... 查看详情

使用flume采集日志数据到hdfs中(代码片段)

文章目录1.简介1.1.Source组件1.2.Channel组件1.3.Sink组件2.安装Flume3.采集数据测试4.日志汇总到HDFS中4.1.日志收集服务配置4.2.日志汇总服务配置4.3.运行服务测试1.简介Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量... 查看详情

flume与kafka区别

   今天开会讨论日志处理为什么要同时使用Flume和Kafka,是否可以只用Kafka不使用Flume?当时想到的就只用Flume的接口多,不管是输入接口(socket和文件)以及输出接口(Kafka/HDFS/HBase等)。   考虑单一应用场景,从简化系统的角... 查看详情

flume基础知识01简介+基本架构+核心概念+架构模式+agent内部原理+配置格式(一篇就可入门flume)(代码片段)

1简介ApacheFlume是一个分布式,高可用的数据收集系统。它可以从不同的数据源收集数据,经过聚合后发送到存储系统中,通常用于日志数据的收集。Flume分为NG和OG(1.0之前)两个版本,NG在OG的基础上进行了完全的重... 查看详情