0
点赞
收藏
分享

微信扫一扫

Flume拦截器 & 测试Flume-Kafka通道


1)创建Maven工程flume-interceptor

2)创建包名:com.atguigu.flume.interceptor

3)在pom.xml文件中添加如下配置

<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

注意:
scope中provided的含义是编译时用该jar包。打包时时不用。因为集群上已经存在flume的jar包。只是本地编译时用一下。

4)在com.atguigu.flume.interceptor包下创建JSONUtils类

package com.atguigu.flume.interceptor;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;

public class JSONUtils {
public static boolean isJSONValidate(String log){
try {
JSON.parse(log);
return true;
}catch (JSONException e){
return false;
}
}
}

5)在com.atguigu.flume.interceptor包下创建LogInterceptor类

package com.atguigu.flume.interceptor;

import com.alibaba.fastjson.JSON;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;

public class ETLInterceptor implements Interceptor {

@Override
public void initialize() {

}

@Override
public Event intercept(Event event) {

byte[] body = event.getBody();
String log = new String(body, StandardCharsets.UTF_8);

if (JSONUtils.isJSONValidate(log)) {
return event;
} else {
return null;
}
}

@Override
public List<Event> intercept(List<Event> list) {

Iterator<Event> iterator = list.iterator();

while (iterator.hasNext()){
Event next = iterator.next();
if(intercept(next)==null){
iterator.remove();
}
}

return list;
}

public static class Builder implements Interceptor.Builder{

@Override
public Interceptor build() {
return new ETLInterceptor();
}
@Override
public void configure(Context context) {

}

}

@Override
public void close() {

}
}

6)打包

7)需要先将打好的包放入到hadoop102的/opt/module/flume/lib文件夹下面。

[atguigu@hadoop102 lib]$ ls | grep interceptor
flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar

8)分发Flume到hadoop103、hadoop104

[atguigu@hadoop102 module]$ xsync flume/

9)分别在hadoop102、hadoop103上启动Flume

[atguigu@hadoop102 flume]$ bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf &

[atguigu@hadoop103 flume]$ bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf &

测试Flume-Kafka通道

(1)生成日志

[atguigu@hadoop102 ~]$ lg.sh

(2)消费Kafka数据,观察控制台是否有数据获取到

[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh \
--bootstrap-server hadoop102:9092 --from-beginning --topic topic_log

说明:
如果获取不到数据,先检查Kafka、Flume、Zookeeper是否都正确启动。
再检查Flume的拦截器代码是否正常。


举报

相关推荐

0 条评论