关键词:
Flume简介与使用(二)——Thrift Source采集数据
继上一篇安装Flume后,本篇将介绍如何使用Thrift Source采集数据。
Thrift是Google开发的用于跨语言RPC通信,它拥有功能强大的软件堆栈和代码生成引擎,允许定义一个简单的IDL文件来生成不同语言的代码,服务器端和客户端通过共享这个IDL文件来构建来完成通信。
Flume的Thrift Source是其实现的众多Source中的一个,Flume已经实现了服务器端,因此我们可以用任意自己熟悉的语言编写自己的Thrift Source客户端来采集数据,然后发送给Thrift Source服务器端。
[一]、生成C++代码
下载源码版的Flume,在apache-flume-1.6.0-src\flume-ng-sdk\src\main\thrift目录下有Flume定义好的flume.thrift文件,现在只要用这个文件来生成我们需要的代码就行了。
flume.thrift文件内容如下:
1 namespace java org.apache.flume.thrift 2 3 struct ThriftFlumeEvent { 4 1: required map <string, string> headers, 5 2: required binary body, 6 } 7 8 enum Status { 9 OK, 10 FAILED, 11 ERROR, 12 UNKNOWN 13 } 14 15 service ThriftSourceProtocol { 16 Status append(1: ThriftFlumeEvent event), 17 Status appendBatch(1: list<ThriftFlumeEvent> events), 18 }
1、定义了一个ThriftFlumeEvent结构体,用来封装发送的数据;
2、定义了一个service类ThriftSourceProtocol,服务器端具体实现ThriftSourceProtocol里面的两个方法,再由客户端调用这些方法把数据传给Thrift Source服务器端。
3、运行下面的命令:thrift --gen cpp flume.thrift,会在当前目录生成gen-cpp目录,里面是Thrift自动生成c++头文件和代码。(在这之前要先安装Thrift)
[二]、下面是编写自己的客户端代码,我这里是接收远程传过来的数据,然后发送给Flume的Thrift Source服务器。
1 #include <arpa/inet.h> 2 #include <sys/types.h> 3 #include <sys/socket.h> 4 #include <pthread.h> 5 #include <unistd.h> 6 #include <stdlib.h> 7 #include "include/MESA_prof_load.h" 8 #include "include/MESA_handle_logger.h" 9 10 #include <string> 11 #include <iostream> 12 #include "gen-cpp/flume_constants.h" 13 #include "gen-cpp/flume_types.h" 14 #include "gen-cpp/ThriftSourceProtocol.h" 15 #include <thrift/protocol/TBinaryProtocol.h> 16 #include <thrift/protocol/TCompactProtocol.h> 17 #include <thrift/transport/TSocket.h> 18 #include <thrift/transport/TTransportUtils.h> 19 using namespace std; 20 using namespace apache::thrift; 21 using namespace apache::thrift::protocol; 22 using namespace apache::thrift::transport; 23 24 #define LOG_PATH "/home/zjf/DFcode/trafficlog/traffic_source.log" 25 #define DATA_BUFFER 2048 //send buffer data length 26 #define BUFLEN 2048 //received buffer data length 27 #define BATCH_SIZE 1000 //send event num to flume once 28 29 //defined my C++ object 30 class ThriftClient{ 31 public: 32 // Thrift protocol needings... 33 boost::shared_ptr<TTransport> socket; 34 boost::shared_ptr<TTransport> transport; 35 boost::shared_ptr<TProtocol> protocol; 36 ThriftSourceProtocolClient* pClient; 37 38 public: 39 ThriftClient(); 40 }; 41 //cconstruction function, init the thrift source server ip and port 42 ThriftClient::ThriftClient(): 43 socket(new TSocket("10.208.129.12",5497)), 44 transport(new TFramedTransport(socket)), 45 protocol(new TCompactProtocol(transport)) 46 { 47 pClient = new ThriftSourceProtocolClient(protocol); 48 } 49 50 //log 51 struct log_info_t{ 52 char *path; 53 int log_level; 54 void * handle; 55 }; 56 struct log_info_t log_info; 57 const char *module = "zjf_traffic_data_collector"; 58 59 //类的对象 60 ThriftClient *client = new ThriftClient(); 61 std::map<std::string, std::string> headers; 62 std::vector<ThriftFlumeEvent> eventbatch; 63 unsigned long long pkt_num_tgl = 0; 64 65 int RecvAndSendUDP(){ 66 MESA_handle_runtime_log(log_info.handle, RLOG_LV_INFO, module, "RecvUDP be called"); 67 int listen_socket; //socket id 68 struct sockaddr_in local; //client IP, where to recevied data 69 struct sockaddr_in from; //server IP(local host) 70 char server_addr[16] = "10.208.129.12"; //received traffic IP 71 int server_port = 6789; //received traffic port 72 char send_buf[DATA_BUFFER] = {0}; //data send to flume 73 char Buf[BUFLEN] = {0}; 74 int fromlen; 75 int len; 76 77 //init socket 78 reconnect: 79 memset(&local, 0, sizeof(local)); 80 local.sin_family = AF_INET; 81 local.sin_addr.s_addr = inet_addr(server_addr); 82 local.sin_port = htons(server_port); 83 listen_socket = socket(AF_INET, SOCK_DGRAM, 0); // UDP socket 84 if(listen_socket < 0) { 85 printf("error udp socket\n"); 86 }else{ 87 printf("listen_socket create OK\n"); 88 } 89 if(bind(listen_socket, (struct sockaddr *)&local, sizeof(local)) < 0) { 90 printf("error udp bind\n"); 91 return -1; 92 }else{ 93 printf("socket bind OK\n"); 94 } 95 96 while(1){ 97 char sip[16] = {0}; 98 char dip[16] = {0}; 99 char srcport[6] = {0}; 100 char destport[6] = {0}; 101 char url[BUFLEN] = {0}; 102 memset(Buf,0,BUFLEN); 103 fromlen = sizeof(from); 104 len = recvfrom(listen_socket, (void *)Buf, (size_t)BUFLEN, 0, (struct sockaddr *)&from,(socklen_t *)&fromlen); 105 if(len == -1) { 106 printf("error udp recvfrom\n"); 107 close(listen_socket); 108 goto reconnect; 109 } 110 //parse received buf, transform to key-value 111 int i; 112 int sip_loc = 0; 113 int sport_loc = 0; 114 int dip_loc = 0; 115 int dport_loc = 0; 116 int dotcount = 0; 117 for(i=0;Buf[i] != '\0';i++){ 118 if(Buf[i] == '.'){ 119 dotcount++; 120 if(dotcount == 4){ 121 sip_loc = i; 122 memcpy(sip,Buf,i); 123 } 124 else if(dotcount == 8){ 125 dip_loc = i; 126 memcpy(dip,Buf+sport_loc+1,dip_loc-sport_loc-1); 127 } 128 else if(dotcount == 9){ 129 dport_loc = i; 130 memcpy(destport,Buf+dip_loc+1,dport_loc-dip_loc-1); 131 break; 132 } 133 else{} 134 } 135 if(Buf[i] == '>'){ 136 sport_loc = i; 137 memcpy(srcport,Buf+sip_loc+1,sport_loc-sip_loc-1); 138 } 139 } 140 memcpy(url,Buf+dport_loc+1,strlen(Buf)-dport_loc); 141 unsigned long src_ip = inet_addr(sip); 142 unsigned long dst_ip = inet_addr(dip); 143 sprintf(send_buf,"SrcIP=%u SrcPort=%s DestIP=%u DestPort=%s",ntohl(src_ip),srcport,ntohl(dst_ip),destport); 144 //construct an event and append to send 145 if(0 != strlen(send_buf) ){ 146 pkt_num_tgl++; 147 string sBody(send_buf); 148 ThriftFlumeEvent tfEvent; 149 tfEvent.__set_headers(headers); 150 tfEvent.__set_body(sBody); 151 eventbatch.push_back(tfEvent); 152 if(eventbatch.size() >= BATCH_SIZE){ 153 if(!client->transport->isOpen()) 154 client->transport->open(); 155 Status::type res = client->pClient->appendBatch(eventbatch); 156 if(res != Status::OK){ 157 MESA_handle_runtime_log(log_info.handle, RLOG_LV_FATAL, module, "WARNING: send event via thrift failed, return code:%d",res); 158 }else{ 159 //printf("sended %lld event data to flume successful\n", pkt_num_tgl); 160 } 161 eventbatch.clear(); 162 } 163 } 164 bzero(send_buf,DATA_BUFFER); 165 } 166 } 167 168 169 int main() 170 { 171 //create――logger 172 log_info.path = (char *)LOG_PATH; 173 log_info.log_level = 10; 174 log_info.handle = MESA_create_runtime_log_handle(log_info.path, log_info.log_level); 175 //open thrift connection 176 if(!client->transport->isOpen()){ 177 client->transport->open(); 178 } 179 eventbatch.clear(); 180 RecvAndSendUDP(); 181 return 0; 182 }
[三]、编译并运行
g++ -g -DHAVE_NETINET_IN_H -I. -I/usr/local/include/thrift -L/usr/local/lib rec_send_traffic_thrift.cpp gen-cpp/flume_constants.cpp gen-cpp/flume_types.cpp gen-cpp/ThriftSourceProtocol.cpp -o rec_send_traffic_thrift -lthrift -lpcap -L/usr/lib64 -lMESA_htable -lpthread -lMESA_handle_logger
用守护进程启动程序:
1 #!/bin/sh 2 3 while [ 1 ]; do 4 ulimit -c unlimited 5 #./jz 6 #cgexec -g cpu,memory:/MESA/jz ./jz >> jz.log 7 ./rec_send_traffic_thrift 8 #./jz 9 echo program crashed, restart at `date +"%w %Y/%m/%d, %H:%M:%S"` >> RESTART.log 10 sleep 10 11 done
推荐博文:【1】http://www.micmiu.com/soa/rpc/thrift-sample/
【2】http://www.mamicode.com/info-detail-869223.html
【3】http://blog.csdn.net/yuzx2008/article/details/50179033
【4】http://shiyanjun.cn/archives/456.html
【5】http://flume.apache.org/FlumeDeveloperGuide.html#rpc-clients-avro-and-thrift
转载请注明原文出处,谢谢
flume简介与使用
Flume简介与使用(一)Flume简介 Flume是一个分布式的、可靠的、实用的服务——从不同的数据源高效的采集、整合、移动海量数据。 分布式:可以多台机器同时运行采集数据,不同Agent的之前通过网络传输数据... 查看详情
flume案例支持
...ro可以发送一个给定的文件给Flume,Avro源使用AVRORPC机制。ThriftSource:ThriftSource与AvroSource基本一致。只要把source的类型改成thrift即可,例如a1.sources.r1.type= 查看详情
日志抽取框架flume简介与安装配置(代码片段)
一:flume简介与功能二:flume安装与配置与简单测试一:flume的简介与功能架构1.1flume的简介:1.1.1Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发... 查看详情
flume安装与使用
1.flume简介Flume是Cloudera提供的日志收集系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。Flume是一个分布式、可靠、和高可用... 查看详情
flume简介及使用(代码片段)
一、Flume概述1)官网地址http://flume.apache.org/2)日志采集工具 Flume是一种分布式,可靠且可用的服务,用于有效地收集,聚合和移动大量日志数据。它具有基于流数据流的简单灵活的架构。它具有可靠的可靠性机制和许多故障... 查看详情
flume简介
Kafka在实际的开发之中的确可以处理千万级别的数据,但是现在有一个问题,这些数据从哪里来呢?Kafka产生的初衷是进行数据的收集以及合理的消费,但是这些实际之中的数据我们应该如何获取,我们该用什么样的方式来获取... 查看详情
flume安装和使用(代码片段)
概览1-flume简介2-系统要求3-安装和配置4-启动和测试 一、flume的简介官网地址: http://flume.apache.org/1-概述Flume是一种分布式,可靠且可用的服务,用于高效地收集,汇总和移动大量日志数据。它具有基于流式数据流的简单... 查看详情
(01)flume简介
1、Flume简单介绍 ApacheFlume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到... 查看详情
flume的安装与使用
Flume下载后,解压,新增一个配置文件,写入配置即可我将配置文件写在conf下,取名为flume-conf-spooldir.propertiesFlume运行命令:bin/flume-ngagent--confconf--conf-fileconf/flume-conf-spooldir.properties--nameLogAgent-Dflume.root.logger=DEBUG,con 查看详情
flume简介
绪论: 本文的内容包括flume的背景、数据流模型、常见的数据流操作、flumeagent启动和flumeagent简单实例。参考文档为flume官网的flume1.8.0FlumeUserGuide。一、背景 flume是由cloudera软件公司产出的可分布式日志收集系统,2009年被... 查看详情
理解flumeng的batchsize和transactioncapacity参数和传输事务的原理
基于ThriftSource,MemoryChannel,HdfsSink三个组件,对Flume数据传输的事务进行分析,如果使用的是其他组件,Flume事务具体的处理方式将会不同。Flume的事务处理原理: Flume在对Channel进行Put和Take操作的时候,必须要用事物包住,比如... 查看详情
flume1.7安装与使用
Flume安装系统要求: 需安装JDK1.7及以上版本1、下载二进制包 下载页面:http://flume.apache.org/download.html 1.7.0下载地址:http://www.apache.org/dyn/closer.lua/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz2、解压$cp~/Downloads 查看详情
flume简介
摘自:https://flume.apache.org/FlumeUserGuide.html是什么:Flume是一个用来收集聚合海量多来源日志数据并转移到一个数据存储中心的分布式,可依赖,高可用,高性能服务框架。他基于流数据提供简单灵活的架构。具有健壮性,容错性... 查看详情
flume原理分析与使用案例(代码片段)
1、flume的特点: flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS... 查看详情
flume原理
1.flume简介 flume作为cloudera开发的实时日志收集系统,受到了业界的认可与广泛应用。Flume初始的发行版本目前被统称为FlumeOG(originalgeneration),属于cloudera。但随着FLume功能的扩展,FlumeOG代码工程臃肿、核心组件设... 查看详情
使用flume采集日志数据到hdfs中(代码片段)
文章目录1.简介1.1.Source组件1.2.Channel组件1.3.Sink组件2.安装Flume3.采集数据测试4.日志汇总到HDFS中4.1.日志收集服务配置4.2.日志汇总服务配置4.3.运行服务测试1.简介Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量... 查看详情
flume与kafka区别
今天开会讨论日志处理为什么要同时使用Flume和Kafka,是否可以只用Kafka不使用Flume?当时想到的就只用Flume的接口多,不管是输入接口(socket和文件)以及输出接口(Kafka/HDFS/HBase等)。 考虑单一应用场景,从简化系统的角... 查看详情
flume基础知识01简介+基本架构+核心概念+架构模式+agent内部原理+配置格式(一篇就可入门flume)(代码片段)
1简介ApacheFlume是一个分布式,高可用的数据收集系统。它可以从不同的数据源收集数据,经过聚合后发送到存储系统中,通常用于日志数据的收集。Flume分为NG和OG(1.0之前)两个版本,NG在OG的基础上进行了完全的重... 查看详情