0
点赞
收藏
分享

微信扫一扫

【Flume中间件】(8)channel选择器副本机制

书写经典 2023-01-17 阅读 141


channel选择器副本机制

我们现在的需求就是实时的监听一个文件的变化,然后将变化的数据上传到HDFS,另外需要在本地文件系统保存副本,所以此次的sink去向有两个,一个是HDFS,另外一个就是本地文件系统。

其实我们可以用一个flume就可以实现将第一个flume的sink分别设置为两个去向,为什么下图要多用两个flume呢?是因为有时HDFSsink端写数据较慢,生产数据较快,那就有可能生产大于消费,刚开始还可以在channel进行缓存,如果channel也满了就会崩掉,多两个flume也是为了增加缓冲。

【Flume中间件】(8)channel选择器副本机制_大数据

上图我们可以看到第一个flume从source监听数据,然后交给两个channel,这就需要两个channel收到的数据是一致的,所以channel选择器需要配置成副本,然后两个channel分别发往对应的sink,这里sink采用的是avro,它是用于两个flume之间联系的组件,如果两个flume需要联系,上一个flume的sink就要配置avro,下端的flume的source也要配置相应的avro。

avro的sink就是将数据发往指定IP的某个端口,avro的source就是从某个IP地址的指定端口进行获取数据。

Flume2配置的就是avrosource,然后sink端就是HDFS。

Flume3同样配置avro,但是sink端配置的是file_roll,用于传输到本地文件系统

Flume1

a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

a1.sources.r1.selector.type=replicating

a1.sources.r1.type = taildir
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /opt/module/flume/position/position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /home/hadoop/data3/data

a1.sinks.k1.type = avro
a1.sinks.k1.hostname=hadoop102
a1.sinks.k1.port=4141

a1.sinks.k2.type = avro
a1.sinks.k2.hostname=hadoop102
a1.sinks.k2.port=4142

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

Flume2

a2.sources = r1
a2.sinks = k1
a2.channels = c1

a2.sources.r1.type = avro
a2.sources.r1.bind=hadoop102
a2.sources.r1.port=4141

a2.sinks.k1.type = hdfs
# 配置hdfs存储的路径,这里我设置的是动态路径,按照时间命名的文件夹
a2.sinks.k1.hdfs.path=hdfs://hadoop102:9000/flume/%Y%m%d/%H
# 给存储到HDFS的文件加个前缀标明该数据为日志信息
a2.sinks.k1.hdfs.filePrefix=logs-
# 是否按照时间进行滚动文件夹,对应上面配置的动态文件夹
a2.sinks.k1.hdfs.round=true
a2.sinks.k1.hdfs.roundValue=1
# 设置时间量级为秒、分钟还是小时
a2.sinks.k1.hdfs.roundUnit=hour
# 是否使用本地时间戳,我测试了一下,如果这里设置成false,HDFS目录不会发生变化,可能是hdfs配置的动态时间路径就是按照时间戳配置的
a2.sinks.k1.hdfs.useLocalTimeStamp=true
# 积攒多少个Event才会flush到HDFS
a2.sinks.k1.hdfs.batchSize=1000
# 文件类型
a2.sinks.k1.hdfs.fileType=DataStream
# 多久会生成一个新的文件,如果监听的文件没有变化,尽管到了时间也不会产生新文件,产生新文件需要事件去触发
a2.sinks.k1.hdfs.rollInterval=30
# 每个文件大小达到多少会进行滚动
a2.sinks.k1.hdfs.rollSize=134217700
# 设置文件的滚动与事件无关
a2.sinks.k1.hdfs.rollCount=0

a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

Flume3

a3.sources = r1
a3.sinks = k1
a3.channels = c1

a3.sources.r1.type = avro
a3.sources.r1.bind=hadoop102
a3.sources.r1.port=4142

a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /home/hadoop/data4/flume

a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

这样当我们向data文件中追加数据,整个flume流程就将我们追加的数据以副本机制分别传输到HDFS和本地文件系统。


举报

相关推荐

0 条评论