博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Hadoop生态圈-Flume的组件之自定义拦截器(interceptor)
阅读量:6983 次
发布时间:2019-06-27

本文共 4785 字,大约阅读时间需要 15 分钟。

                Hadoop生态圈-Flume的组件之自定义拦截器(interceptor)

                                              作者:尹正杰

版权声明:原创作品,谢绝转载!否则将追究法律责任。

 

   本篇博客只是举例了一个自定义拦截器的方法,测试字节传输速度。

 

1>.自定义interceptor方法

1 /*  2 @author :yinzhengjie  3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E7%94%9F%E6%80%81%E5%9C%88/  4 EMAIL:y1053419035@qq.com  5 */  6 package cn.org.yinzhengjie.interceptor;  7   8 import org.apache.flume.Context;  9 import org.apache.flume.Event; 10 import org.apache.flume.interceptor.Interceptor; 11  12 import java.util.List; 13  14 /** 15  * 设置限速拦截器 16  * 

17 * 当 字节/时间,即同一时刻,如果进入的字节过多 18 * 则休眠一会 19 */ 20 public class MyInterceptor implements Interceptor { 21 22 private int speed; 23 24 25 //构造 26 private MyInterceptor(int speed) { 27 this.speed = speed; 28 } 29 30 31 //do nothing 32 public void initialize() { 33 34 } 35 36 /** 37 * 1、拿出上一个event的时间,和当前时间进行相减,得出上一个event的时间间隔 38 * 2、得到上一个event的body字节数 39 * 3、相除得到上一个event的速度,并在此event中先进行停留,再返回event 40 * 41 * @param event 42 * @return 43 */ 44 45 long lastTime = -1; 46 long lastBodySize = 0; 47 48 public Event intercept(Event event) { 49 50 51 byte[] body = event.getBody(); 52 int len = body.length; 53 54 55 long current = System.nanoTime(); 56 57 //第一个event 58 if (lastTime == -1) { 59 lastTime = current; 60 lastBodySize = len; 61 } 62 63 //非第一个event 64 else { 65 //计算上一个event停留的时间 66 long interval = current - lastTime; 67 System.out.println("=========================" + current + "/" + lastTime + "/" + interval + "========================="); 68 //上一个event的速度 69 int now_speed = (int) ((double) lastBodySize / interval * 1000); 70 if (now_speed > speed) { 71 System.out.println("=========================" + now_speed + "========================="); 72 //计算需要停留多少秒 线程休眠,时间 = shouldTime - interval 73 try { 74 Thread.sleep((lastBodySize / speed) * 1000 - interval); 75 } catch (InterruptedException e) { 76 e.printStackTrace(); 77 } 78 } 79 lastBodySize = len; 80 lastTime = System.currentTimeMillis(); 81 82 } 83 return event; 84 85 } 86 87 //迭代List

,将所有Event交给intercept(Event)进行处理 88 public List
intercept(List
events) { 89 for (Event event : events) { 90 intercept(event); 91 } 92 return events; 93 } 94 95 //do nothing 96 public void close() { 97 98 } 99 100 public static class Builder implements Interceptor.Builder {101 102 private int speed;103 104 public void configure(Context context) {105 speed = context.getInteger(Constants.SPEED, Constants.DEFAULT_SPEED);106 107 }108 109 public Interceptor build() {110 return new MyInterceptor(speed);111 }112 }113 114 public static class Constants {115 public static String SPEED = "speed";116 public static int DEFAULT_SPEED = 1;117 118 }119 }

2>.打包并将其发送到 /soft/flume/lib下

[yinzhengjie@s101 ~]$ cd /soft/flume/lib/[yinzhengjie@s101 lib]$ [yinzhengjie@s101 lib]$ ll | grep MyFlume-rw-r--r--  1 yinzhengjie yinzhengjie    5231 Jun 20 18:53 MyFlume-1.0-SNAPSHOT.jar[yinzhengjie@s101 lib]$ [yinzhengjie@s101 lib]$ rm -rf MyFlume-1.0-SNAPSHOT.jar [yinzhengjie@s101 lib]$ [yinzhengjie@s101 lib]$ rz[yinzhengjie@s101 lib]$ [yinzhengjie@s101 lib]$ ll | grep MyFlume-rw-r--r--  1 yinzhengjie yinzhengjie    8667 Jun 20 21:02 MyFlume-1.0-SNAPSHOT.jar[yinzhengjie@s101 lib]$ [yinzhengjie@s101 lib]$

3>.编写agent的配置文件

[yinzhengjie@s101 ~]$ more /soft/flume/conf/yinzhengjie_myInterceptor.conf # Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# 定义源: seqa1.sources.r1.type = seq# 定义一次RPC产生的批次数量a1.sources.r1.batchSize = 1024# 指定添加拦截器a1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type = cn.org.yinzhengjie.interceptor.MyInterceptor$Buildera1.sources.r1.interceptors.i1.speed = 1# Describe the sinka1.sinks.k1.type = logger# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 100000a1.channels.c1.transactionCapacity = 10000# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1[yinzhengjie@s101 ~]$

4>.启动flume并测试

[yinzhengjie@s101 ~]$ flume-ng agent -f /soft/flume/conf/yinzhengjie_myInterceptor.conf -n a1

  下图是运行agent部分的输出内容 

 

转载于:https://www.cnblogs.com/yinzhengjie/p/9208268.html

你可能感兴趣的文章
掉电引起的ORA-1172错误解决过程(二)
查看>>
在网站建设过程中主要在哪几个方面为后期的网站优打好根基?
查看>>
【MOS】RAC 环境中最常见的 5 个数据库和/或实例性能问题 (文档 ID 1602076.1)
查看>>
新年图书整理和相关的产品
查看>>
Struts2的核心文件
查看>>
Spring Boot集成Jasypt安全框架
查看>>
GIS基础软件及操作(十)
查看>>
HDOJ 2041 超级楼梯
查看>>
1108File Space Bitmap Block损坏能修复吗2
查看>>
遭遇DBD::mysql::dr::imp_data_size unexpectedly
查看>>
人人都会设计模式:03-策略模式--Strategy
查看>>
被忽视但很实用的那部分SQL
查看>>
解读阿里云oss-android/ios-sdk 断点续传(多线程)
查看>>
ML之监督学习算法之分类算法一 ——— 决策树算法
查看>>
骡夫电商地址
查看>>
亚信安全火力全开猎捕“坏兔子”,全歼详解
查看>>
智能家居——IoT零基础入门篇
查看>>
《Linux From Scratch》第一部分:介绍 第一章:介绍-1.3. 更新日志
查看>>
阿里将在雄安新区设3家子公司:涉AI、蚂蚁金服和菜鸟;北航设立全国首个人工智能专业,与百度合作办学...
查看>>
Powershell指令集_2
查看>>