0
点赞
收藏
分享

微信扫一扫

Flume架构和用法示例

Soy丶sauce 2022-03-31 阅读 90

Flume架构和用法示例

Flume架构

Source

Exec Source
Property NameDefaultDescription
channels-
type-exec
command-需要执行的命令,如 tail -F /log
shell-指定运行shell,如/bin/sh -c 指定使用sh执行command的内容
logStdERRfalse是否输出标准错误的日志
batchSize20一次发往channel的最大批次大小
batchTimeout3000如果没达到buffer size,隔多久强行发送一次
restartThrottle10000重试等待时间
restartfalsecommand失败死亡,是否重试
interceptors-拦截器
#只包含必填属性的示例
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1
Spooling Directory Source
Property NameDefaultDescription
channels-
type-spooldir
spoolDir-监视的文件夹
fileSuffix.COMPLETED完成后添加的后缀
deletePolicynever是否删除完成的文件,never,immediate
fileHeaderfalse在Event头部添加文件路径
fileHeaderKeyfile添加的key
batchSize-同上文
trackingPolicy-追踪策略,rename,tacker_dir
trackerDir.flumespool追踪文件存储地址
a1.channels = ch-1
a1.sources = src-1

a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool

a1.sources.src-1.fileHeader = true
Taildir Source
Property NameDefaultDescription
channels
typeTAILDIR
filegroups文件组,表示一系列需要tail的文件。
filegroups.filegroupsName使用绝对路径表示需要监控的文件组,一般使用正则表示文件组。
#必须属性示例
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /var/log/test1/example.log
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
Kafka Source
Property NameDefaultDescription
channels
typeorg.apache.flume.source.kafka.KafkaSource
kafka.bootstrap.serversbroker的服务地址
kafka.consumer.group.idflume为多个source设置相同的groupid,让他们作为同一个消费者组进行消费。
kafka.topics消费的topics
kafka.topics.regex使用正则表示消费的topic,如果此参数存在会覆盖topics
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics = test1, test2
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$
Event Deserializers
Property NameDefaultDescription
deserializer.maxLineLength2048单行最大字符数,超过的会被截断
deserializer.outputCharsetUTF-8发送到channel的数据采用的字符编码
NetCat TCP Source
Property NameDefaultDescription
channels
typeThe component type name, needs to be netcat
bindHost name or IP address to bind to
portPort # to bind to
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666
a1.sources.r1.channels = c1

Channel

Memory Channel
Property NameDefaultDescription
typememory
capacity100最大event数目
transactionCapacity100source每个事务添加的最大event数目,和给予sink的最大数目。
keep-alive3source和sink交互的超时时间
byteCapacityjvm -Xmx的80%最大字节数,默认为jvm -Xmx的80%。
a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacity = 800000
Kafka Channel
Property NameDefaultDescription
typeorg.apache.flume.channel.kafka.KafkaChannel
kafka.bootstrap.serverskafka集群的broker
kafka.topicflume-channel存储在哪个topic
a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer
File Channel
Property Name DefaultDescription
typefile
checkpointDir~/.flume/file-channel/checkpoint检查点目录
a1.channels = c1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data

Sink

HDFS Sink
AliasDescription
%{host}headers中的key为host的value
%t毫秒时间milliseconds
%a周几缩写 (Mon, Tue, …)
%A周几完整名称 (Monday, Tuesday, …)
%b月份缩写 (Jan, Feb, …)
%B月份完整 (January, February, …)
%cdatetime (Thu Mar 3 23:05:25 2005)
%d该月几号(01)
%e月份,无填充 (1)
%D日期:%m/%d/%y
%H小时(00…23)
%I小时(01…12)
%j一年的第几天 (001…366)
%k小时,无填充( 0…23)
%m月份(01…12)
%n月份 (1…12)
%M分钟(00…59)
%p上午下午 am or pm
%s自1970-01-01 00:00:00 UTC以来秒数
%S秒,(00…59)
%y年的后两位(00…99)
%Y年(2010)
%z时区 (for example, -0400)
%[localhost]hostname of the host 主机名
%[IP]IP address of the host 地址IP
%[FQDN]canonical hostname of the host 规范主机名
NameDefaultDescription
channel
typehdfs
hdfs.pathHDFS路径,可以使用前文的转义字符拼接路径
hdfs.rollInterval30触发写入时间
hdfs.rollSize1024触发写入大小,字节为单位
hdfs.rollCount10触发写入event数目
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
File Roll Sink
Property NameDefaultDescription
channel
typefile_roll.
sink.directorysink目录
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume

相关链接

https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#flume-sources

举报

相关推荐

0 条评论