0
点赞
收藏
分享

微信扫一扫

AI网关助力边缘物联网

猫er聆听没落的旋律 04-16 06:30 阅读 2

【背景说明】

我要起一个将kafka上的topic_log主题中的数据上传到hdfs上的flume进程。

这是我的flume配置文件脚本:

#定义组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1

#配置source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers= = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.TimestampInterceptor$Builder

#配置channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6

#配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.round = false


a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

#控制输出文件类型
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip

#组装 
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

我上面这个flume是没问题的,但我在起它的时候:[atguigu@hadoop102 flume]$ bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_log.conf 

报了如下2个错误:

【报错1】

四月 2024 01:08:35,304 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:158)  - Unable to deliver event. Exception follows.
java.lang.IllegalStateException: Channel closed [channel=c1]. Due to java.io.IOException: File /opt/module/flume-1.10.1/data/behavior1/log-14 has bad version 7f000001
        at org.apache.flume.channel.file.FileChannel.createTransaction(FileChannel.java:358) ~[flume-file-channel-1.10.1.jar:1.10.1]
        at org.apache.flume.channel.BasicChannelSemantics.getTransaction(BasicChannelSemantics.java:123) ~[flume-ng-core-1.10.1.jar:1.10.1]
        at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:365) ~[flume-hdfs-sink-1.10.1.jar:1.10.1]
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:39) ~[flume-ng-core-1.10.1.jar:1.10.1]
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) ~[flume-ng-core-1.10.1.jar:1.10.1]
        at java.lang.Thread.run(Thread.java:745) ~[?:1.8.0_121]
Caused by: java.io.IOException: File /opt/module/flume-1.10.1/data/behavior1/log-14 has bad version 7f000001
        at org.apache.flume.channel.file.LogFileFactory.getRandomReader(LogFileFactory.java:100) ~

这个报错的原因是这两个文件夹里的数据损坏了,删掉这两个文件里的内容

cd checkpoint/

rm behavior*

cd data/        

rm behavior*

【报错2】

四月 2024 01:39:22,929 ERROR [PollableSourceRunner-KafkaSource-r1] (org.apache.flume.source.kafka.KafkaSource.doProcess:327)  - KafkaSource EXCEPTION, {}
java.lang.IllegalStateException: Channel closed [channel=c1]. Due to java.lang.NullPointerException: null
        at org.apache.flume.channel.file.FileChannel.createTransaction(FileChannel.java:358) ~[flume-file-channel-1.10.1.jar:1.10.1]
        at org.apache.flume.channel.BasicChannelSemantics.getTransaction(BasicChannelSemantics.java:123) ~[flume-ng-core-1.10.1.jar:1.10.1]
        at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:183) ~[flume-ng-core-1.10.1.jar:1.10.1]
        at org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:308) ~[flume-kafka-source-1.10.1.jar:1.10.1]
        at org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60) ~[flume-ng-core-1.10.1.jar:1.10.1]
        at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133) ~[flume-ng-core-1.10.1.jar:1.10.1]

这个报错的原因是里面有JSONObject这个类找不到,解决办法有两个,要么把对应的这个类单独上传到flume的lib目录下,要么重新打包,把带dependencies的jar传到lib目录下。

再重新跑就行了。

举报

相关推荐

0 条评论