示例1
使用的是spooldir directory source 和 logger sink 组件
spool-to-logger.properties
agent1.sources = source1
agent1.channels = channel1
agent1.sinks = sink1
# For each one of the sources, the type is defined
agent1.sources.source1.type = spooldir
agent1.sources.source1.spoolDir = /tmp/spooldir
# The channel can be defined as follows.
agent1.sources.source1.channels = channel1
# Each sink's type must be defined
agent1.sinks.sink1.type = logger
#Specify the channel the sink should use
agent1.sinks.sink1.channel = channel1
# Each channel's type is defined.
agent1.channels.channel1.type = file
启动flume代理
flume-ng agent --conf-file spool-to-logger.properties --name agent1 --conf $FLUME_HOME/conf -Dflume.root.logger=INFO,console
新建日志文件
echo "hello flume" > /tmp/spooldir/.file1.txt
mv /tmp/spooldir/.file1.txt /tmp/spooldir/file1.txt
flume处理结果
再看源文件被source重命名为file1.txt.COMPLETED,这表明Flume已经完成文件的处理,并且对它不会再有任何动作。
HDFS sink
使用的是spooldir directory source 和 HDFS sink 组件
spool-to-hdfs.properties
agent1.sources = source1
agent1.channels = channel1
agent1.sinks = sink1
# For each one of the sources, the type is defined
agent1.sources.source1.type = spooldir
agent1.sources.source1.spoolDir = /tmp/spooldir
# The channel can be defined as follows.
agent1.sources.source1.channels = channel1
# Each sink's type must be defined
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path = /tmp/flume
agent1.sinks.sink1.hdfs.filePrefix = events
agent1.sinks.sink1.hdfs.fileSuffix = .log
agent1.sinks.sink1.hdfs.inUsePrefix = _
agent1.sinks.sink1.hdfs.fileType = DataStream
#Specify the channel the sink should use
agent1.sinks.sink1.channel = channel1
# Each channel's type is defined.
agent1.channels.channel1.type = file
启动flume代理
flume-ng agent --conf-file spool-to-hdfs.properties --name agent1 --conf $FLUME_HOME/conf -Dflume.root.logger=INFO,console
运行出现以下问题
解决方案:把 Hadoop 中的 guava jar包复制到 Flume 中
结果如下:
这一次,事件被传递给HDFS sink 并写到一个文件。对于正在进行写操作处理的文件,其文件名会添加一个后缀“.tmp",以表明文件处理尚未完成。在本例中,hdfs.inUsePrefix属性被设置为下划线(默认值为空),此举将导致正在进行写操作处理的文件的文件名还要添加一个(下划线)作为前缀。这样做的是因为MapReduce会忽略以下划线为前缀的文件。因此一个典型的临时文件的文件名为 " _events_1399295780136.log.tmp" ,其中的数据是有HDFS生成的时间戳。
分区和拦截器
将上面HDFS sink 的配置改为分区存储方式,只需要对hdfs.path属性进行设置,使之具有使用时间格式转义序列的子目录:
agent1.sinks.sink1.hdfs.path = /tmp/flume/%Y-%m-%d
一个Flume事件将被写入哪个分区是由事件的header中的 timestamp(时间戳)决定的。在默认情况下,事件header中并没有 timestamp ,但是它可以通过Flume拦截器来添加。
拦截器是简单的插件式组件,设置在source和channel之间。source接收到的事件event,在写入channel之前,拦截器都可以进行转换或者删除这些事件。每个拦截器只处理同一个source接收到的事件。可以自定义拦截器
agent1.sources.source1.interceptors = interceptor1
agent1.sources.source1.interceptors.interceptor1.type = timestamp
注意:
如果多层Flume代理,那么事件的创建时间和写入时间之间可能存在明显差异,需要对HDFS sink 的hdfs.useLocalTimeStamp 属性进行设置,以便自由运行HDFS sink 的 flume 代理所产生的时间戳。
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
文件格式
hdfs.fileType默认为SequenceFile
下面展示一个Avro文件所使用的配置。
spool-to-hdfs-avro.properties
agent1.sources = source1
agent1.channels = channel1
agent1.sinks = sink1
# For each one of the sources, the type is defined
agent1.sources.source1.type = spooldir
agent1.sources.source1.spoolDir = /tmp/spooldir
# The channel can be defined as follows.
agent1.sources.source1.channels = channel1
agent1.sources.source1.interceptors = interceptor1
agent1.sources.source1.interceptors.interceptor1.type = timestamp
# Each sink's type must be defined
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path = /tmp/flume/%Y-%m-%d
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
agent1.sinks.sink1.hdfs.filePrefix = events
agent1.sinks.sink1.hdfs.fileSuffix = .avro
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.serializer = avro_event
agent1.sinks.sink1.serializer.compressionCodec = snappy
#Specify the channel the sink should use
agent1.sinks.sink1.channel = channel1
# Each channel's type is defined.
agent1.channels.channel1.type = file
启动flume代理
flume-ng agent --conf-file spool-to-hdfs-avro.properties --name agent1 --conf $FLUME_HOME/conf -Dflume.root.logger=INFO,console
结果如下: