自定义拦截器
在有些情况下,我们需要对采集来的数据进行分类,那么我们就可以采用multiplexing拦截器的方式,在数据中添加头部的键值,根据键值选择相应的channel。
我们自定义拦截器需要实现官方的Interceptor,实现相应的方法,而且还需要一个静态内部类,用于返回Interceptor类。
而且在实现interceptor方法时,可以进行丢失据,就是有些不符合条件的数据就抛弃掉,可以进行简单的过滤,直接返回null即可。
下面实现的就是服务器1进行监听一个数据,如果该数据中存在“hello”字符串,就将其传入服务器2,否则就将其传入服务器3.
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
a1.sources.r1.type = taildir
a1.sources.r1.positionFile = /opt/module/flume/position/position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /home/hadoop/data3/data6
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=com.atguigu.Interceptor.MyInterceptor$Builder
a1.sources.r1.selector.type=multiplexing
a1.sources.r1.selector.header=type
a1.sources.r1.selector.mapping.yes=c1
a1.sources.r1.selector.mapping.no=c2
a1.sinks.k1.type = avro
a1.sinks.k1.hostname=hadoop103
a1.sinks.k1.port=4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname=hadoop104
a1.sinks.k2.port=4141
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
a2.sources = r1
a2.sinks = k1
a2.channels = c1
a2.sources.r1.type = avro
a2.sources.r1.bind=hadoop103
a2.sources.r1.port=4141
a2.sinks.k1.type = logger
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
a3.sources = r1
a3.sinks = k1
a3.channels = c1
a3.sources.r1.type = avro
a3.sources.r1.bind=hadoop104
a3.sources.r1.port=4141
a3.sinks.k1.type = logger
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
package com.atguigu.Interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.List;
public class MyInterceptor implements Interceptor {
ArrayList<Event> list_event;
@Override
public void initialize() {
list_event = new ArrayList<>();
}
@Override
public Event intercept(Event event) {
# 获取时间封装成字符串
String string = new String(event.getBody());
# 判断数据中是否存在hello,如果存在封装键值对,type-yes,type就是header
if (string.contains("hello")) {
event.getHeaders().put("type", "yes");
} else {
event.getHeaders().put("type", "no");
}
return event;
}
# 操纵事件集合
@Override
public List<Event> intercept(List<Event> list) {
# 将上一次的集合清空
list_event.clear();
for (Event event : list) {
# 调用上面的方法,将数据加工头部,存到list中
list_event.add(intercept(event));
}
return list_event;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new MyInterceptor();
}
@Override
public void configure(Context context) {
}
}
}