D:\dev\env\java\jdk1.8.0_321\bin\java.exe "-javaagent:D:\dev\env\idea\IntelliJ IDEA 2021.3.3\lib\idea_rt.jar=64768:D:\dev\env\idea\IntelliJ IDEA 2021.3.3\bin" -Dfile.encoding=UTF-8 -classpath D:\dev\env\java\jdk1.8.0_321\jre\lib\charsets.jar;D:\dev\env\java\jdk1.8.0_321\jre\lib\deploy.jar;D:\dev\env\java\jdk1.8.0_321\jre\lib\ext\access-bridge-64.jar;D:\dev\env\java\jdk1.8.0_321\jre\lib\ext\cldrdata.jar;D:\dev\env\java\jdk1.8.0_321\jre\lib\ext\dnsns.jar;D:\dev\env\java\jdk1.8.0_321\jre\lib\ext\jaccess.jar;D:\dev\env\java\jdk1.8.0_321\jre\lib\ext\jfxrt.jar;D:\dev\env\java\jdk1.8.0_321\jre\lib\ext\localedata.jar;D:\dev\env\java\jdk1.8.0_321\jre\lib\ext\nashorn.jar;D:\dev\env\java\jdk1.8.0_321\jre\lib\ext\sunec.jar;D:\dev\env\java\jdk1.8.0_321\jre\lib\ext\sunjce_provider.jar;D:\dev\env\java\jdk1.8.0_321\jre\lib\ext\sunmscapi.jar;D:\dev\env\java\jdk1.8.0_321\jre\lib\ext\sunpkcs11.jar;D:\dev\env\java\jdk1.8.0_321\jre\lib\ext\zipfs.jar;D:\dev\env\java\jdk1.8.0_321\jre\lib\javaws.jar;D:\dev\env\java\jdk1.8.0_321\jre\lib\jce.jar;D:\dev\env\java\jdk1.8.0_321\jre\lib\jfr.jar;D:\dev\env\java\jdk1.8.0_321\jre\lib\jfxswt.jar;D:\dev\env\java\jdk1.8.0_321\jre\lib\jsse.jar;D:\dev\env\java\jdk1.8.0_321\jre\lib\management-agent.jar;D:\dev\env\java\jdk1.8.0_321\jre\lib\plugin.jar;D:\dev\env\java\jdk1.8.0_321\jre\lib\resources.jar;D:\dev\env\java\jdk1.8.0_321\jre\lib\rt.jar;D:\dev\CODE\space\idea\BigData\flink14\flinkStarter\target\classes;D:\dev\env\maven\rep\org\apache\flink\flink-java\1.14.3\flink-java-1.14.3.jar;D:\dev\env\maven\rep\org\apache\flink\flink-core\1.14.3\flink-core-1.14.3.jar;D:\dev\env\maven\rep\org\apache\flink\flink-annotations\1.14.3\flink-annotations-1.14.3.jar;D:\dev\env\maven\rep\org\apache\flink\flink-metrics-core\1.14.3\flink-metrics-core-1.14.3.jar;D:\dev\env\maven\rep\org\apache\flink\flink-shaded-asm-7\7.1-14.0\flink-shaded-asm-7-7.1-14.0.jar;D:\dev\env\maven\rep\com\esotericsoftware\kryo\kryo\2.24.0\kryo-2.24.0.jar;D:\dev\env\maven\rep\com\esotericsoftware\minlog\minlog\1.2\minlog-1.2.jar;D:\dev\env\maven\rep\org\objenesis\objenesis\2.1\objenesis-2.1.jar;D:\dev\env\maven\rep\commons-collections\commons-collections\3.2.2\commons-collections-3.2.2.jar;D:\dev\env\maven\rep\org\apache\commons\commons-compress\1.21\commons-compress-1.21.jar;D:\dev\env\maven\rep\org\apache\commons\commons-lang3\3.3.2\commons-lang3-3.3.2.jar;D:\dev\env\maven\rep\org\apache\commons\commons-math3\3.5\commons-math3-3.5.jar;D:\dev\env\maven\rep\org\slf4j\slf4j-api\1.7.15\slf4j-api-1.7.15.jar;D:\dev\env\maven\rep\com\google\code\findbugs\jsr305\1.3.9\jsr305-1.3.9.jar;D:\dev\env\maven\rep\org\apache\flink\flink-shaded-force-shading\14.0\flink-shaded-force-shading-14.0.jar;D:\dev\env\maven\rep\org\apache\flink\flink-streaming-java_2.11\1.14.3\flink-streaming-java_2.11-1.14.3.jar;D:\dev\env\maven\rep\org\apache\flink\flink-file-sink-common\1.14.3\flink-file-sink-common-1.14.3.jar;D:\dev\env\maven\rep\org\apache\flink\flink-runtime\1.14.3\flink-runtime-1.14.3.jar;D:\dev\env\maven\rep\org\apache\flink\flink-rpc-core\1.14.3\flink-rpc-core-1.14.3.jar;D:\dev\env\maven\rep\org\apache\flink\flink-rpc-akka-loader\1.14.3\flink-rpc-akka-loader-1.14.3.jar;D:\dev\env\maven\rep\org\apache\flink\flink-queryable-state-client-java\1.14.3\flink-queryable-state-client-java-1.14.3.jar;D:\dev\env\maven\rep\org\apache\flink\flink-hadoop-fs\1.14.3\flink-hadoop-fs-1.14.3.jar;D:\dev\env\maven\rep\commons-io\commons-io\2.8.0\commons-io-2.8.0.jar;D:\dev\env\maven\rep\org\apache\flink\flink-shaded-netty\4.1.65.Final-14.0\flink-shaded-netty-4.1.65.Final-14.0.jar;D:\dev\env\maven\rep\org\apache\flink\flink-shaded-jackson\2.12.4-14.0\flink-shaded-jackson-2.12.4-14.0.jar;D:\dev\env\maven\rep\org\apache\flink\flink-shaded-zookeeper-3\3.4.14-14.0\flink-shaded-zookeeper-3-3.4.14-14.0.jar;D:\dev\env\maven\rep\org\javassist\javassist\3.24.0-GA\javassist-3.24.0-GA.jar;D:\dev\env\maven\rep\org\xerial\snappy\snappy-java\1.1.8.3\snappy-java-1.1.8.3.jar;D:\dev\env\maven\rep\org\lz4\lz4-java\1.8.0\lz4-java-1.8.0.jar;D:\dev\env\maven\rep\org\apache\flink\flink-scala_2.11\1.14.3\flink-scala_2.11-1.14.3.jar;D:\dev\env\maven\rep\com\twitter\chill_2.11\0.7.6\chill_2.11-0.7.6.jar;D:\dev\env\maven\rep\com\twitter\chill-java\0.7.6\chill-java-0.7.6.jar;D:\dev\env\maven\rep\org\apache\flink\flink-shaded-guava\30.1.1-jre-14.0\flink-shaded-guava-30.1.1-jre-14.0.jar;D:\dev\env\maven\rep\org\apache\flink\flink-table-planner_2.11\1.14.3\flink-table-planner_2.11-1.14.3.jar;D:\dev\env\maven\rep\org\apache\flink\flink-table-common\1.14.3\flink-table-common-1.14.3.jar;D:\dev\env\maven\rep\org\apache\flink\flink-connector-files\1.14.3\flink-connector-files-1.14.3.jar;D:\dev\env\maven\rep\org\apache\flink\flink-connector-base\1.14.3\flink-connector-base-1.14.3.jar;D:\dev\env\maven\rep\org\apache\flink\flink-table-runtime_2.11\1.14.3\flink-table-runtime_2.11-1.14.3.jar;D:\dev\env\maven\rep\org\apache\flink\flink-table-code-splitter\1.14.3\flink-table-code-splitter-1.14.3.jar;D:\dev\env\maven\rep\org\codehaus\janino\janino\3.0.11\janino-3.0.11.jar;D:\dev\env\maven\rep\org\codehaus\janino\commons-compiler\3.0.11\commons-compiler-3.0.11.jar;D:\dev\env\maven\rep\org\apache\calcite\avatica\avatica-core\1.17.0\avatica-core-1.17.0.jar;D:\dev\env\maven\rep\org\apache\flink\flink-connector-kafka-0.11_2.11\1.11.6\flink-connector-kafka-0.11_2.11-1.11.6.jar;D:\dev\env\maven\rep\org\apache\flink\flink-connector-kafka-0.10_2.11\1.11.6\flink-connector-kafka-0.10_2.11-1.11.6.jar;D:\dev\env\maven\rep\org\apache\flink\flink-connector-kafka-base_2.11\1.11.6\flink-connector-kafka-base_2.11-1.11.6.jar;D:\dev\env\maven\rep\org\apache\kafka\kafka-clients\0.11.0.2\kafka-clients-0.11.0.2.jar;D:\dev\env\maven\rep\net\jpountz\lz4\lz4\1.3.0\lz4-1.3.0.jar;D:\dev\env\maven\rep\org\apache\flink\force-shading\1.11.6\force-shading-1.11.6.jar;D:\dev\env\maven\rep\org\apache\flink\flink-table-api-java\1.14.3\flink-table-api-java-1.14.3.jar;D:\dev\env\maven\rep\org\apache\flink\flink-table-api-java-bridge_2.11\1.14.3\flink-table-api-java-bridge_2.11-1.14.3.jar;D:\dev\env\maven\rep\org\apache\flink\flink-clients_2.11\1.14.3\flink-clients_2.11-1.14.3.jar;D:\dev\env\maven\rep\org\apache\flink\flink-optimizer\1.14.3\flink-optimizer-1.14.3.jar;D:\dev\env\maven\rep\commons-cli\commons-cli\1.3.1\commons-cli-1.3.1.jar;D:\dev\env\maven\rep\org\apache\flink\flink-table-api-scala_2.11\1.14.3\flink-table-api-scala_2.11-1.14.3.jar;D:\dev\env\maven\rep\org\scala-lang\scala-reflect\2.11.12\scala-reflect-2.11.12.jar;D:\dev\env\maven\rep\org\scala-lang\scala-library\2.11.12\scala-library-2.11.12.jar;D:\dev\env\maven\rep\org\scala-lang\scala-compiler\2.11.12\scala-compiler-2.11.12.jar;D:\dev\env\maven\rep\org\scala-lang\modules\scala-xml_2.11\1.0.5\scala-xml_2.11-1.0.5.jar;D:\dev\env\maven\rep\org\scala-lang\modules\scala-parser-combinators_2.11\1.0.4\scala-parser-combinators_2.11-1.0.4.jar;D:\dev\env\maven\rep\org\apache\flink\flink-table-api-scala-bridge_2.11\1.14.3\flink-table-api-scala-bridge_2.11-1.14.3.jar;D:\dev\env\maven\rep\org\apache\flink\flink-streaming-scala_2.11\1.14.3\flink-streaming-scala_2.11-1.14.3.jar;D:\dev\env\maven\rep\org\apache\logging\log4j\log4j-slf4j-impl\2.17.1\log4j-slf4j-impl-2.17.1.jar;D:\dev\env\maven\rep\org\apache\logging\log4j\log4j-api\2.17.1\log4j-api-2.17.1.jar;D:\dev\env\maven\rep\org\apache\logging\log4j\log4j-core\2.17.1\log4j-core-2.17.1.jar;D:\dev\env\maven\rep\org\projectlombok\lombok\1.18.22\lombok-1.18.22.jar;D:\dev\env\maven\rep\com\fasterxml\jackson\core\jackson-databind\2.13.2.2\jackson-databind-2.13.2.2.jar;D:\dev\env\maven\rep\com\fasterxml\jackson\core\jackson-annotations\2.13.2\jackson-annotations-2.13.2.jar;D:\dev\env\maven\rep\com\fasterxml\jackson\core\jackson-core\2.13.2\jackson-core-2.13.2.jar cn.imdada.flink.java.kafka.StreamAPIDemo04
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:258)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
at akka.dispatch.OnComplete.internal(Future.scala:300)
at akka.dispatch.OnComplete.internal(Future.scala:297)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:252)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:242)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:233)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:684)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444)
at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
... 4 more
Caused by: java.lang.NoSuchMethodError: org.apache.flink.api.common.functions.RuntimeContext.getMetricGroup()Lorg/apache/flink/metrics/MetricGroup;
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:762)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)
Process finished with exit code 1
依赖jar包
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.11.6</version>
</dependency>
<!-- ... -->
</dependencies>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.14.3</flink.version>
<target.java.version>1.8</target.java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<log4j.version>2.17.1</log4j.version>
<kafka.version>1.11.6</kafka.version>
</properties>
package xxx.flink.java.kafka;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import java.util.Properties;
public class StreamAPIDemo04 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.setParallelism(1);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id","flink_consumer");
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
//如果value合法,则自动提交偏移量
properties.setProperty("enable.auto.commit","true");
//设置多久一次更新被消费消息的偏移量
properties.setProperty("auto.commit.interval.ms","1000");
//设置会话响应的时间,超过这个时间kafka可以选择放弃消费或者消费下一条消息
properties.setProperty("session.timeout.ms","30000");
//自动重置offset
properties.setProperty("auto.offset.reset","earliest");
//properties.setProperty("auto.offset.reset","latest");
FlinkKafkaConsumer011<String> tp_sensor = new FlinkKafkaConsumer011<>("tp_sensor", new SimpleStringSchema(), properties);
DataStreamSource<String> sensor = env.addSource(tp_sensor ).setParallelism(2);
sensor.print();
env.execute();
}
}
换什么版本都报上面的错误。
解决办法:换连接器新版本
参见: (3条消息) Flink 1.14.0 全新的 Kafka Connector_JasonLee-后厂村程序员的博客-CSDN博客
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>1.14.4</version>
</dependency>
package xxx.flink.java.kafka;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;
public class StreamAPIDemo05 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id","flink_consumer");
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
//如果value合法,则自动提交偏移量
properties.setProperty("enable.auto.commit","true");
//设置多久一次更新被消费消息的偏移量
properties.setProperty("auto.commit.interval.ms","1000");
//设置会话响应的时间,超过这个时间kafka可以选择放弃消费或者消费下一条消息
properties.setProperty("session.timeout.ms","30000");
//自动重置offset
properties.setProperty("auto.offset.reset","earliest");
//properties.setProperty("auto.offset.reset","latest");
String brokers = "localhost:9092";
String topic = "tp_sensor";
String consumergroup = "flink_consumer";
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics(topic)
.setGroupId(consumergroup)
.setStartingOffsets(OffsetsInitializer.earliest()) //自动重置offset
.setValueOnlyDeserializer(new SimpleStringSchema())
.setProperties(properties)
.build();
DataStreamSource<String> kafka_source = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
kafka_source.print();
//kafka_source.flatMap(...).print();
env.execute();
}
}
爽歪歪。