0
点赞
收藏
分享

微信扫一扫

Flume示例

代码敲到深夜 2022-02-10 阅读 21

示例1

使用的是spooldir directory source 和 logger sink 组件

spool-to-logger.properties

agent1.sources = source1
agent1.channels = channel1
agent1.sinks = sink1

# For each one of the sources, the type is defined
agent1.sources.source1.type = spooldir
agent1.sources.source1.spoolDir = /tmp/spooldir
# The channel can be defined as follows.
agent1.sources.source1.channels = channel1

# Each sink's type must be defined
agent1.sinks.sink1.type = logger

#Specify the channel the sink should use
agent1.sinks.sink1.channel = channel1

# Each channel's type is defined.
agent1.channels.channel1.type = file

启动flume代理

flume-ng agent --conf-file spool-to-logger.properties --name agent1 --conf $FLUME_HOME/conf -Dflume.root.logger=INFO,console

新建日志文件

echo "hello flume" > /tmp/spooldir/.file1.txt
mv /tmp/spooldir/.file1.txt /tmp/spooldir/file1.txt

flume处理结果

Flume示例_拦截器

再看源文件被source重命名为file1.txt.COMPLETED,这表明Flume已经完成文件的处理,并且对它不会再有任何动作。

HDFS sink

使用的是spooldir directory source 和 HDFS sink 组件

spool-to-hdfs.properties

agent1.sources = source1
agent1.channels = channel1
agent1.sinks = sink1

# For each one of the sources, the type is defined
agent1.sources.source1.type = spooldir
agent1.sources.source1.spoolDir = /tmp/spooldir
# The channel can be defined as follows.
agent1.sources.source1.channels = channel1

# Each sink's type must be defined
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path = /tmp/flume
agent1.sinks.sink1.hdfs.filePrefix = events
agent1.sinks.sink1.hdfs.fileSuffix = .log
agent1.sinks.sink1.hdfs.inUsePrefix = _
agent1.sinks.sink1.hdfs.fileType = DataStream
#Specify the channel the sink should use
agent1.sinks.sink1.channel = channel1

# Each channel's type is defined.
agent1.channels.channel1.type = file

启动flume代理

flume-ng agent --conf-file spool-to-hdfs.properties --name agent1 --conf $FLUME_HOME/conf -Dflume.root.logger=INFO,console

运行出现以下问题

Flume示例_flume_02

解决方案:把 Hadoop 中的 guava jar包复制到 Flume 中

​结果如下:

Flume示例_拦截器_03

这一次,事件被传递给HDFS sink 并写到一个文件。对于正在进行写操作处理的文件,其文件名会添加一个后缀“.tmp",以表明文件处理尚未完成。在本例中,hdfs.inUsePrefix属性被设置为下划线(默认值为空),此举将导致正在进行写操作处理的文件的文件名还要添加一个(下划线)作为前缀。这样做的是因为MapReduce会忽略以下划线为前缀的文件。因此一个典型的临时文件的文件名为 " _events_1399295780136.log.tmp" ,其中的数据是有HDFS生成的时间戳。


分区和拦截器

将上面HDFS sink 的配置改为分区存储方式,只需要对hdfs.path属性进行设置,使之具有使用时间格式转义序列的子目录:

agent1.sinks.sink1.hdfs.path = /tmp/flume/%Y-%m-%d

一个Flume事件将被写入哪个分区是由事件的header中的 timestamp(时间戳)决定的。在默认情况下,事件header中并没有 timestamp ,但是它可以通过Flume拦截器来添加。

拦截器是简单的插件式组件,设置在source和channel之间。source接收到的事件event,在写入channel之前,拦截器都可以进行转换或者删除这些事件。每个拦截器只处理同一个source接收到的事件。可以自定义拦截器

agent1.sources.source1.interceptors = interceptor1
agent1.sources.source1.interceptors.interceptor1.type = timestamp

注意:

如果多层Flume代理,那么事件的创建时间和写入时间之间可能存在明显差异,需要对HDFS sink 的hdfs.useLocalTimeStamp 属性进行设置,以便自由运行HDFS sink 的 flume 代理所产生的时间戳。

agent1.sinks.sink1.hdfs.useLocalTimeStamp = true

文件格式

hdfs.fileType默认为SequenceFile

下面展示一个Avro文件所使用的配置。

spool-to-hdfs-avro.properties

agent1.sources = source1
agent1.channels = channel1
agent1.sinks = sink1

# For each one of the sources, the type is defined
agent1.sources.source1.type = spooldir
agent1.sources.source1.spoolDir = /tmp/spooldir
# The channel can be defined as follows.
agent1.sources.source1.channels = channel1

agent1.sources.source1.interceptors = interceptor1
agent1.sources.source1.interceptors.interceptor1.type = timestamp

# Each sink's type must be defined
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path = /tmp/flume/%Y-%m-%d
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
agent1.sinks.sink1.hdfs.filePrefix = events
agent1.sinks.sink1.hdfs.fileSuffix = .avro
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.serializer = avro_event
agent1.sinks.sink1.serializer.compressionCodec = snappy
#Specify the channel the sink should use
agent1.sinks.sink1.channel = channel1

# Each channel's type is defined.
agent1.channels.channel1.type = file

启动flume代理

flume-ng agent --conf-file spool-to-hdfs-avro.properties --name agent1 --conf $FLUME_HOME/conf -Dflume.root.logger=INFO,console

结果如下:

Flume示例_hdfs_04

举报

相关推荐

0 条评论