0
点赞
收藏
分享

微信扫一扫

【Flink Kafka】Flink程序连接Kafka没输出也不报错


Flink程序连接Kafka没输出也不报错

本人最近在使用​​Kafka​​作为数据源输出数据到​​Flink​​时遇到一个问题,那就是既没有结果输出,也没有报错

代码如下

package Source

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.streaming.api.scala._
import java.util.Properties

object SourceFromKafka {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment

//配置项
val properties = new Properties()
properties.setProperty("bootstrap.servers", "master:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")

//这边是要读取Kafka的数据,所以算是一个消费者
/**
* 第一个泛型:读取过来的数据类型
* 第一个参数:Kafka名
* 第二个参数:序列化
* 第三个参数:配置项
*/
val stream =
env.addSource(new FlinkKafkaConsumer011[String]
("first", new SimpleStringSchema(), properties))

stream.print()
env.execute()
}
}

结果就和下图相似,不报错,也没有输出

【Flink Kafka】Flink程序连接Kafka没输出也不报错_flink

这样的结果让人很困惑,于是我们准备看看更加详细的信息,我们在​​pom​​文件中添加下面的依赖

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>

在​​src/main/resources​​文件夹下创建一个名为​​log4j.properties​​的文件,在该文件里添加如下内容

log4j.rootLogger=info,console  

log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Threshold=INFO
log4j.appender.console.ImmediateFlush=true
log4j.appender.console.Target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=[%-5p] %d(%r) --> [%t] %l: %m %x %n

这个时候在运行,我们就可以看到下面几行信息

[INFO ] 2022-03-23 10:34:33,892(26157) --> [Kafka 0.10 Fetcher for Source: Custom Source -> Sink: Print to Std. Out (5/6)] org.apache.kafka.clients.consumer.internals.AbstractCoordinator$GroupCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:607): Discovered coordinator slave2:9092 (id: 2147483645 rack: null) for group consumer-group.  
[INFO ] 2022-03-23 10:34:33,892(26157) --> [Kafka 0.10 Fetcher for Source: Custom Source -> Sink: Print to Std. Out (5/6)] org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:652): Marking the coordinator slave2:9092 (id: 2147483645 rack: null) dead for group consumer-group

我们看到信息中出现了​​slave2​​这个正是我们安装​​Kafka​​集群的虚拟机的主机名,然后我们想到是否是​​windows​​上运行的​​kafka​​拿到的​​host​​是机器名而不是​​IP​​地址,因此我们赶紧去修改​​host​​文件,文件地址:​​C:\Windows\System32\drivers\etc\hosts​

【Flink Kafka】Flink程序连接Kafka没输出也不报错_apache_02

此时再运行程序,问题解决【Flink Kafka】Flink程序连接Kafka没输出也不报错_Flink_03



举报

相关推荐

0 条评论