flume是以多种组件形成的一个框架,最最常用的三种组件有:source、channel、sink。这三个组件分别来完成事件(event)数据的“收集”、“传递”、“写入”的功能,一般需求都可以通过这三种组件来满足其需求,但是在一些特殊场景下,我们需要在event数据流向中间,加入一些自定义的逻辑,这时候intercept组件就发挥了作用。
拦截器设置在source和channel之间,source接收到后,在写入channel之前,拦截器都可以进行转换或者删除这些事件。每个拦截器只处理同一个source接收到的事件。flume内部默认设置了很多intercept组件,同时也支持自定义拦截器。
一、flume内部intercept:
1、时间戳拦截器:
         flume中一个最经常使用的拦截器 ,该拦截器的作用是将时间戳插入到flume的事件报头中。如果不使用任何拦截器,flume接受到的只有message。时间戳拦截器的配置:
| 参数 | 默认值 | 描述 | 
| type |   | 类型名称timestamp,也可以使用类名的全路径 | 
| preserveExisting | false | 如果设置为true,若事件中报头已经存在,不会替换时间戳报头的值 | 
1)source连接到时间戳拦截器的配置:
a1.sources.r1.interceptors = timestamp
a1.sources.r1.interceptors.timestamp.type=timestamp
a1.sources.r1.interceptors.timestamp.preserveExisting=false2)在拦截器代码中如何获取:
public Event intercept(Event event) {
    try {
      Map<String, String> headers = event.getHeaders();
      String hostName = headers.get("hostname");
      String timeStamp = headers.get("timestamp"); 2、主机拦截器
         主机拦截器插入服务器的ip地址或者主机名,agent将这些内容插入到事件的报头中。时间报头中的key使用hostHeader配置,默认是host。主机拦截器的配置:
| 参数 | 默认值 | 描述 | 
| type |   | 类型名称host | 
| hostHeader | host | 事件投的key | 
| useIP | true | 如果设置为false,host键插入主机名 | 
| preserveExisting | false | 如果设置为true,若事件中报头已经存在,不会替换host报头的值 | 
1)source连接到主机拦截器的配置:
a1.sources.r1.interceptors = host
a1.sources.r1.interceptors.host.type=host
a1.sources.r1.interceptors.host.useIP=false
a1.sources.r1.interceptors.timestamp.preserveExisting=true2)在拦截器代码中如何获取:
public Event intercept(Event event) {
    try {
      Map<String, String> headers = event.getHeaders();
      String hostName = headers.get("hostname"); 3、静态拦截器
     静态拦截器的作用是将k/v插入到事件的报头中。配置如下
| 参数 | 默认值 | 描述 | 
| type |   | 类型名称static | 
| key | key | 事件头的key | 
| value | value | key对应的value值 | 
| preserveExisting | true | 如果设置为true,若事件中报头已经存在该key,不会替换value的值 | 
source连接到静态拦截器的配置:
a1.sources.r1.interceptors = static
a1.sources.r1.interceptors.static.type=static
a1.sources.r1.interceptors.static.key=logs
a1.sources.r1.interceptors.static.value=logFlume
 4、正则过滤拦截器
 在日志采集的时候,可能有一些数据是我们不需要的,这样添加过滤拦截器,可以过滤掉不需要的日志,也可以根据需要收集满足正则条件的日志。
| 参数 | 默认值 | 描述 | 
| type |   | 类型名称REGEX_FILTER | 
| regex | .* | 匹配除“\n”之外的任何个字符 | 
| excludeEvents | false | 默认收集匹配到的事件。如果为true,则会删除匹配到的event,收集未匹配到的。 | 
source连接到正则过滤拦截器的配置:
a1.sources.r1.interceptors = regex
a1.sources.r1.interceptors.regex.type=REGEX_FILTER
a1.sources.r1.interceptors.regex.regex=.*recId.*
a1.sources.r1.interceptors.regex.excludeEvents=false5、各种拦截器可以同时使用,在配置的时候拦截器作用的顺序和配置的顺序相同,
#source
agent1.sources.ngrinder.type = exec
agent1.sources.ngrinder.command = tail -F /data/logs/ttbrain/ttbrain-recommend-api.log
agent1.sources.ngrinder.channels = mc1 mc2
#filter
agent1.sources.ngrinder.interceptors=filt1 filt2 filt3 filt4
agent1.sources.ngrinder.interceptors.filt1.type=regex_filter
agent1.sources.ngrinder.interceptors.filt1.regex=.*recId.*
agent1.sources.ngrinder.interceptors.filt2.type=host
agent1.sources.ngrinder.interceptors.filt2.hostHeader=hostname
agent1.sources.ngrinder.interceptors.filt2.useIP=true
agent1.sources.ngrinder.interceptors.filt3.type=timestamp
agent1.sources.ngrinder.interceptors.filt4.type=com.abc.ttbrain.log.flume.interceptor.MyInterceptor$Builder可以看到,ngrinder source配置了4个拦截器,作用的顺序是filt1、filt2、filt3、filt4.
最后,Flume的拦截器可以配合Sink完成许多业务场景需要的功能,比如:按照时间及主机生成目标文件目录及文件名;配合Kafka Sink完成多分区的写入等等。
二、自定义拦截器:
1、开发拦截器:
1)pom.xml
 
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <groupId>com.abc</groupId>
    <artifactId>ttbrain-log</artifactId>
    <version>0.0.1-SNAPSHOT</version>
  </parent>
  
  <groupId>com.abc</groupId>
  <artifactId>ttbrain-log-flume</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>ttbrain-log-flume</name>
  
  <properties>
    <version.flume>1.7.0</version.flume>
  </properties>
  
  
  <dependencies>
    <!-- flume -->
    <dependency>
       <groupId>org.apache.flume</groupId>
       <artifactId>flume-ng-core</artifactId>
       <version>${version.flume}</version>
    </dependency>
    <dependency>
       <groupId>org.apache.flume</groupId>
       <artifactId>flume-ng-configuration</artifactId>
       <version>${version.flume}</version>
    </dependency>
  </dependencies>
  
  <profiles>
    <profile>
      <id>dev</id>
      <properties>
        <profile.env.name>dev</profile.env.name>
      </properties>
      <activation>
        <activeByDefault>true</activeByDefault>
      </activation>
    </profile>
    <profile>
      <id>test</id>
      <properties>
        <profile.env.name>test</profile.env.name>
      </properties>
    </profile>
    <profile>
      <id>product</id>
      <properties>
        <profile.env.name>product</profile.env.name>
      </properties>
    </profile>
  </profiles>
  
  <build>
        <finalName>ttbrain-log-flume-PredictInterceptor</finalName>
    <filters>
      <filter>${basedir}/filters/filter-${profile.env.name}.properties</filter><!--这里指定filter属性文件的位置-->
  </filters>
  <resources>
    <resource>
      <directory>src/main/resources</directory>
      <filtering>true</filtering><!--这里开启变量替换-->
      <includes>
        <include>**/*.xml</include>
        <include>conf/*.properties</include>
        <include>**/*.properties</include>
        <include>**/*.json</include>
      </includes>
    </resource>
  </resources>
  <plugins>
      <!-- <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>2.4</version>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>lib/</classpathPrefix>
                            <mainClass>com.abc.ttbrain.log.flume.interceptor.MyInterceptor</mainClass>
                        </manifest>
                        <manifestEntries>
                            <Class-Path>conf/</Class-Path>
                        </manifestEntries>
                    </archive>
                    <includes>
                        <include>**/*.class</include>
                    </includes>
                </configuration>
            </plugin> -->
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>2.4</version>
        <configuration>
          <!-- <descriptors>
            <descriptor>assembly/assembly.xml</descriptor>
          </descriptors> -->
          <descriptorRefs>  
            <descriptorRef>jar-with-dependencies</descriptorRef>  
          </descriptorRefs>
          <archive>
                        <manifest>
                            <mainClass>com.abc.ttbrain.log.flume.interceptor.RRPredictInterceptor</mainClass>
                        </manifest>
                    </archive> 
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
</project>
2)新建自定义过滤器类MyInterceptor 继承Interceptor 类:
package com.abc.ttbrain.log.flume.interceptor;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import com.abc.ttbrain.log.common.entity.LogEntity;
/**
 * flume interceptor
 * @author kevinliu
 *
 */
public class MyInterceptor implements Interceptor {
    private static final Logger logger = LoggerFactory.getLogger(MyInterceptor.class);
    
    @Override
    public void close() {
        // TODO Auto-generated method stub
        logger.info("flume myinterceptor is close");
    }
    @Override
    public void initialize() {
        // TODO Auto-generated method stub
        logger.info("flume myinterceptor is initialize");
    }
    /**
     * [08-04 10:12:26] [INFO] [com.abc.ttbrain.recommend.api.controller.PersonalRecommendController:195] personalRecommend(): 
     * cost=13ms; puid=; uId=579AEB028EA6402A5F5507FDB5A27B64; fnum=8; chId=1; usg=1; 
     * recId=[325747850570, 325825180570, 325801330570, 325401880570, 325714680570, 325750900570, 325805720570, 325823150570]; 
     * mutilFeeds={"p_7":[325747850570,325825180570,325801330570,325401880570,325714680570,325750900570,325805720570,325823150570]}; 
     * typeFeeds={"VIDEO":[325747850570,325825180570,325801330570,325401880570,325714680570,325750900570,325805720570,325823150570]}; 
     * prefMap={325805720570:"奔跑吧兄弟,陈赫,过山车",325750900570:"明星宝贝,贾静雯,妈妈是超人",325714680570:"张杰,朱亚文,佟大为",325747850570:"叶倩文,郑秀文",325801330570:"郑秀晶,郑秀妍",325401880570:"黄子韬",325825180570:"丁俊晖,吴尊,台球",325823150570:"极限挑战,罗志祥,黄宗泽"}; 
     * prior=null; reqUniqId=1501812746481177835258579AEB028EA6402A5F5507FDB5A27B64; 
     * version=; flag=per_rec; rg=0; rh=0; pg=0; ph=7; sg=0; sh=1
     */
    @Override
    public Event intercept(Event event) {
        try {
            Map<String, String> headers = event.getHeaders();
            String body = new String(event.getBody(), Charsets.UTF_8);
            String[] split = body.split("personalRecommend\\(\\):");
            
            if (split == null || split.length <2) {
                return null;
            } else {
                String logStr = split[1];
                Map<String, String> fieldMap = getLongStr4Map(logStr);
                LogEntity logEntity = getLogEntityFromMap(fieldMap);
                
                String hostName = headers.get("hostname");
                String timeStamp = headers.get("timestamp");
                logEntity.setHost(hostName);
                logEntity.setTimeStamp(timeStamp);
                
                event.setBody(logEntity.toString().getBytes());
                logger.info("device:{}",logEntity.getUid());
                return event;
            }
        } catch (Exception e ) {
            logger.error("intercept:",e);
        }
        return null;
    }
    
    public Map<String,String> getLongStr4Map(String str) {
        Map<String,String> map = new HashMap<>();
        String[] split = str.split(";");
        //...
        
        return map;
    }
    
    /**
     * uid|ppuid|channel|feedNum|cost|usg|prior|reqUniqId|version|rg|rh|pg|ph|sg|sh|timeStamp|host
     * |recFeedId|txt|gallery|vedio|p_1|p_2|p_3|p_4|p_5|p_6|p_7|p_8|p_9|p_10|p_11|p_12|p_13|p_14|p_15
     */
    public LogEntity getLogEntityFromMap(Map<String, String> fieldMap) {
        LogEntity logEntity = new LogEntity();
        
        //...
        
        return logEntity;
    }
    
    @Override
    public List<Event> intercept(List<Event> events) {
        List<Event> intercepted = Lists.newArrayListWithCapacity(events.size());
        for (Event event : events) {
            Event interceptedEvent = intercept(event);
            if (interceptedEvent != null) {
                intercepted.add(interceptedEvent);
            }
        }
        return intercepted;
    }
    
    public static class Builder implements Interceptor.Builder {
        //使用Builder初始化Interceptor
        @Override
        public Interceptor build() {
            return new MyInterceptor();
        }
        @Override
        public void configure(Context arg0) {
            // TODO Auto-generated method stub
            
        }
    }
}3)打包:
maven package,生成ttbrain-log-flume-MyInterceptor-jar-with-dependencies.jar
2、部署:
1)配置flume的配置文件:
agent1.sources = ngrinder
agent1.channels = mc1
agent1.sinks = avro-sink
#source
agent1.sources.ngrinder.type = exec
agent1.sources.ngrinder.command = tail -F /data/logs/ttbrain/ttbrain-recommend-api.log
agent1.sources.ngrinder.channels = mc1
#filter
agent1.sources.ngrinder.interceptors=filt1 filt2 filt3 filt4
agent1.sources.ngrinder.interceptors.filt1.type=regex_filter
agent1.sources.ngrinder.interceptors.filt1.regex=.*recId.*
agent1.sources.ngrinder.interceptors.filt2.type=host
agent1.sources.ngrinder.interceptors.filt2.hostHeader=hostname
agent1.sources.ngrinder.interceptors.filt2.useIP=true
agent1.sources.ngrinder.interceptors.filt3.type=timestamp
agent1.sources.ngrinder.interceptors.filt4.type=com.abc.ttbrain.log.flume.interceptor.MyInterceptor$Builder
#channel1
#agent1.channels.mc1.type = memory
#agent1.channels.mc1.capacity = 1000
#agent1.channels.mc1.keep-alive = 60
agent1.channels.mc1.type = file
agent1.channels.mc1.checkpointDir = /data/flume/ckdir/mc1_ck
agent1.channels.mc1.dataDirs = /data/flume/datadir/mc1_data
#sink1
agent1.sinks.avro-sink.type = avro
agent1.sinks.avro-sink.channel = mc1
agent1.sinks.avro-sink.hostname = 10.153.135.113
agent1.sinks.avro-sink.port = 41414说明:agent1.sources.ngrinder.interceptors.filt4.type 为自定义intercept类全路径。
2)将ttbrain-log-flume-MyInterceptor-jar-with-dependencies.jar 放到flume_home的lib下;
3)启动flume:
nohup flume-ng agent -c /usr/local/apache-flume-1.7.0-bin/conf -f /usr/local/apache-flume-1.7.0-bin/conf/engine-api-log.conf  -n agent1 >/dev/null 2>&1 &
                










