博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Flume简介与使用(二)——Thrift Source采集数据
阅读量:4631 次
发布时间:2019-06-09

本文共 7283 字,大约阅读时间需要 24 分钟。

Flume简介与使用(二)——Thrift Source采集数据

  继上一篇安装Flume后,本篇将介绍如何使用Thrift Source采集数据。

  Thrift是Google开发的用于跨语言RPC通信,它拥有功能强大的软件堆栈和代码生成引擎,允许定义一个简单的IDL文件来生成不同语言的代码,服务器端和客户端通过共享这个IDL文件来构建来完成通信。

  Flume的Thrift Source是其实现的众多Source中的一个,Flume已经实现了服务器端,因此我们可以用任意自己熟悉的语言编写自己的Thrift Source客户端来采集数据,然后发送给Thrift Source服务器端。

  [一]、生成C++代码

  下载源码版的Flume,在apache-flume-1.6.0-src\flume-ng-sdk\src\main\thrift目录下有Flume定义好的flume.thrift文件,现在只要用这个文件来生成我们需要的代码就行了。

  flume.thrift文件内容如下:

1 namespace java org.apache.flume.thrift 2  3 struct ThriftFlumeEvent { 4   1: required map 
headers, 5 2: required binary body, 6 } 7 8 enum Status { 9 OK,10 FAILED,11 ERROR,12 UNKNOWN13 }14 15 service ThriftSourceProtocol {16 Status append(1: ThriftFlumeEvent event),17 Status appendBatch(1: list
events),18 }

  1、定义了一个ThriftFlumeEvent结构体,用来封装发送的数据;

  2、定义了一个service类ThriftSourceProtocol,服务器端具体实现ThriftSourceProtocol里面的两个方法,再由客户端调用这些方法把数据传给Thrift Source服务器端。

  3、运行下面的命令:thrift --gen cpp flume.thrift,会在当前目录生成gen-cpp目录,里面是Thrift自动生成c++头文件和代码。(在这之前要先安装Thrift)

  [二]、下面是编写自己的客户端代码,我这里是接收远程传过来的数据,然后发送给Flume的Thrift Source服务器。

1 #include 
2 #include
3 #include
4 #include
5 #include
6 #include
7 #include "include/MESA_prof_load.h" 8 #include "include/MESA_handle_logger.h" 9 10 #include
11 #include
12 #include "gen-cpp/flume_constants.h" 13 #include "gen-cpp/flume_types.h" 14 #include "gen-cpp/ThriftSourceProtocol.h" 15 #include
16 #include
17 #include
18 #include
19 using namespace std; 20 using namespace apache::thrift; 21 using namespace apache::thrift::protocol; 22 using namespace apache::thrift::transport; 23 24 #define LOG_PATH "/home/zjf/DFcode/trafficlog/traffic_source.log" 25 #define DATA_BUFFER 2048 //send buffer data length 26 #define BUFLEN 2048 //received buffer data length 27 #define BATCH_SIZE 1000 //send event num to flume once 28 29 //defined my C++ object 30 class ThriftClient{ 31 public: 32 // Thrift protocol needings... 33 boost::shared_ptr
socket; 34 boost::shared_ptr
transport; 35 boost::shared_ptr
protocol; 36 ThriftSourceProtocolClient* pClient; 37 38 public: 39 ThriftClient(); 40 }; 41 //cconstruction function, init the thrift source server ip and port 42 ThriftClient::ThriftClient(): 43 socket(new TSocket("10.208.129.12",5497)), 44 transport(new TFramedTransport(socket)), 45 protocol(new TCompactProtocol(transport)) 46 { 47 pClient = new ThriftSourceProtocolClient(protocol); 48 } 49 50 //log 51 struct log_info_t{ 52 char *path; 53 int log_level; 54 void * handle; 55 }; 56 struct log_info_t log_info; 57 const char *module = "zjf_traffic_data_collector"; 58 59 //类的对象 60 ThriftClient *client = new ThriftClient(); 61 std::map
headers; 62 std::vector
eventbatch; 63 unsigned long long pkt_num_tgl = 0; 64 65 int RecvAndSendUDP(){ 66 MESA_handle_runtime_log(log_info.handle, RLOG_LV_INFO, module, "RecvUDP be called"); 67 int listen_socket;          //socket id 68 struct sockaddr_in local;    //client IP, where to recevied data 69 struct sockaddr_in from;   //server IP(local host) 70 char server_addr[16] = "10.208.129.12"; //received traffic IP 71 int server_port = 6789; //received traffic port 72 char send_buf[DATA_BUFFER] = { 0}; //data send to flume 73 char Buf[BUFLEN] = { 0}; 74 int fromlen; 75 int len; 76 77 //init socket 78 reconnect: 79 memset(&local, 0, sizeof(local)); 80 local.sin_family = AF_INET; 81 local.sin_addr.s_addr = inet_addr(server_addr); 82 local.sin_port = htons(server_port); 83 listen_socket = socket(AF_INET, SOCK_DGRAM, 0); // UDP socket 84 if(listen_socket < 0) { 85 printf("error udp socket\n"); 86 }else{ 87 printf("listen_socket create OK\n"); 88 } 89 if(bind(listen_socket, (struct sockaddr *)&local, sizeof(local)) < 0) { 90 printf("error udp bind\n"); 91 return -1; 92 }else{ 93 printf("socket bind OK\n"); 94 } 95 96 while(1){ 97 char sip[16] = { 0}; 98 char dip[16] = { 0}; 99 char srcport[6] = { 0};100 char destport[6] = { 0};101 char url[BUFLEN] = { 0};102 memset(Buf,0,BUFLEN);103 fromlen = sizeof(from);104 len = recvfrom(listen_socket, (void *)Buf, (size_t)BUFLEN, 0, (struct sockaddr *)&from,(socklen_t *)&fromlen);105 if(len == -1) {106 printf("error udp recvfrom\n");107 close(listen_socket);108 goto reconnect;109 }110 //parse received buf, transform to key-value111 int i;112 int sip_loc = 0;113 int sport_loc = 0;114 int dip_loc = 0;115 int dport_loc = 0;116 int dotcount = 0;117 for(i=0;Buf[i] != '\0';i++){118 if(Buf[i] == '.'){119 dotcount++;120 if(dotcount == 4){121 sip_loc = i;122 memcpy(sip,Buf,i);123 }124 else if(dotcount == 8){125 dip_loc = i;126 memcpy(dip,Buf+sport_loc+1,dip_loc-sport_loc-1);127 }128 else if(dotcount == 9){129 dport_loc = i;130 memcpy(destport,Buf+dip_loc+1,dport_loc-dip_loc-1);131 break;132 }133 else{}134 }135 if(Buf[i] == '>'){136 sport_loc = i;137 memcpy(srcport,Buf+sip_loc+1,sport_loc-sip_loc-1);138 }139 }140 memcpy(url,Buf+dport_loc+1,strlen(Buf)-dport_loc);141 unsigned long src_ip = inet_addr(sip);142 unsigned long dst_ip = inet_addr(dip);143 sprintf(send_buf,"SrcIP=%u SrcPort=%s DestIP=%u DestPort=%s",ntohl(src_ip),srcport,ntohl(dst_ip),destport);144 //construct an event and append to send145 if(0 != strlen(send_buf) ){146 pkt_num_tgl++;147 string sBody(send_buf);148 ThriftFlumeEvent tfEvent;149 tfEvent.__set_headers(headers);150 tfEvent.__set_body(sBody);151 eventbatch.push_back(tfEvent);152 if(eventbatch.size() >= BATCH_SIZE){153 if(!client->transport->isOpen())154 client->transport->open();155 Status::type res = client->pClient->appendBatch(eventbatch);156 if(res != Status::OK){157 MESA_handle_runtime_log(log_info.handle, RLOG_LV_FATAL, module, "WARNING: send event via thrift failed, return code:%d",res);158 }else{159 //printf("sended %lld event data to flume successful\n", pkt_num_tgl);160 }161 eventbatch.clear();162 }163 }164 bzero(send_buf,DATA_BUFFER);165 }166 }167 168 169 int main()170 {171 //create――logger172 log_info.path = (char *)LOG_PATH;173 log_info.log_level = 10;174 log_info.handle = MESA_create_runtime_log_handle(log_info.path, log_info.log_level);175 //open thrift connection176 if(!client->transport->isOpen()){177 client->transport->open();178 }179 eventbatch.clear();180 RecvAndSendUDP();181 return 0;182 }

 [三]、编译并运行

  g++ -g -DHAVE_NETINET_IN_H -I. -I/usr/local/include/thrift -L/usr/local/lib rec_send_traffic_thrift.cpp gen-cpp/flume_constants.cpp gen-cpp/flume_types.cpp gen-cpp/ThriftSourceProtocol.cpp  -o  rec_send_traffic_thrift  -lthrift   -lpcap -L/usr/lib64 -lMESA_htable -lpthread -lMESA_handle_logger

  用守护进程启动程序:

1 #!/bin/sh 2  3 while [ 1 ]; do 4     ulimit -c unlimited 5     #./jz 6     #cgexec -g cpu,memory:/MESA/jz ./jz >> jz.log 7     ./rec_send_traffic_thrift 8     #./jz 9     echo program crashed, restart at `date +"%w %Y/%m/%d, %H:%M:%S"` >> RESTART.log10     sleep 1011 done

 


推荐博文:【1】http://www.micmiu.com/soa/rpc/thrift-sample/

     【2】http://www.mamicode.com/info-detail-869223.html

     【3】http://blog.csdn.net/yuzx2008/article/details/50179033

     【4】http://shiyanjun.cn/archives/456.html

     【5】http://flume.apache.org/FlumeDeveloperGuide.html#rpc-clients-avro-and-thrift

转载请注明,谢谢

转载于:https://www.cnblogs.com/vincent-vg/p/5813505.html

你可能感兴趣的文章
测试用例设计方法基础理论知识
查看>>
基于visual Studio2013解决面试题之0804复杂链表
查看>>
find_in_set
查看>>
【转帖】SQLServer登录连接失败(error:40-无法打开到SQLServer的连接)的解决方案...
查看>>
ibatis的there is no statement named xxx in this SqlMap
查看>>
系统启动时,spring配置文件解析失败,报”cvc-elt.1: 找不到元素 'beans' 的声明“异常...
查看>>
《Python学习手册》读书笔记
查看>>
简单数据结构(队列 栈 树 堆 )
查看>>
洛谷P2380 狗哥采矿
查看>>
learning to openstack concept
查看>>
Kindeditor学习中的那些坑
查看>>
Servlet
查看>>
一篇价值百万的文章:我为什么在22岁辞去年薪150万的工作?
查看>>
信息安全系统设计基础期末总结
查看>>
leetcode 203 Remove Linked List Elements
查看>>
TCP/IP 笔记 1.3 IP:网际协议
查看>>
HDU 1061 Rightmost Digit
查看>>
八种简易健康减肥瘦身法
查看>>
win7旗舰版下配置IIS服务器
查看>>
web开发基础
查看>>