0
点赞
收藏
分享

微信扫一扫

11.4.4、kafka__java生产消费kafka,分区,序列化

elvinyang 2022-03-17 阅读 42

1、Java代码实现生产kafka数据

(1)导包

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>1.0.0</version>
</dependency>

<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>2.11.12</version>
</dependency>

<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-compiler</artifactId>
    <version>2.11.12</version>
</dependency>

<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-reflect</artifactId>
    <version>2.11.12</version>
</dependency>

(2)代码实现

object Demo01KafkaProduct {
  def main(args: Array[String]): Unit = {

    /**
      * 创建kafka的连接
      */
    //指定kafka的broker的地址
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "master:9092")
    properties.setProperty("group.id", "test1")
    //对key进行序列化
    //kafka的数据以key-value形式存储类似于文件中
    properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
    properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer")

    //生产者
    val producer: KafkaProducer[String, String] = new KafkaProducer[String,String](properties)
    //topic的名称,value数据
    val producerRecord = new ProducerRecord[String,String]("test1","java")
    //生产数据
    producer.send(producerRecord)
    //将数据刷到kafka,不刷的话就是批次,我们的数据一条数据不够
    producer.flush()

    //关闭连接
    producer.close()

  }

}

2、Java代码生产kafka数据(分区)

使用hash将数据打到不同的区
object Demo02ToKafka {

def main(args: Array[String]): Unit = {

/**
  * 创建kafka的连接
  */
val properties: Properties = new Properties()
properties.setProperty("bootstrap.servers", "master:9092")
properties.setProperty("group.id", "ghtf")

properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

//生产者
val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](properties)

//读取数据
Source.fromFile("kafkaproject/data/students.txt")
  .getLines()
  .foreach(line => {
    //将性别不同数据打到不同的分区
    val gender: String = line.split(",")(4)
    val parti: Int = Math.abs(gender.hashCode) % 2

    val redode = new ProducerRecord[String, String]("students5", parti, null, line)
    producer.send(redode)
    producer.flush()
  })
producer.close()

}

}

3、Java代码消费kafka数据(分区)

object Demo03Coutomer {

  def main(args: Array[String]): Unit = {

    /**
      * 创建kafka的连接
      */
    val properties: Properties = new Properties()
    properties.setProperty("bootstrap.servers", "master:9092")
    properties.setProperty("group.id", "f")
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

    /**
      * earliest
      * 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      * latest
      * 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
      * none
      * topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
      *
      */

    //从最早读取数据
    properties.put("auto.offset.reset", "earliest")
    //消费者
    val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](properties)
    //订阅一个消费者,订阅一个topic
    val list = new util.ArrayList[String]()
    list.add("students5")
    consumer.subscribe(list)

    while (true) {
      //拉取数据,设置超时时间
      val records: ConsumerRecords[String, String] = consumer.poll(1000)
      //读取所有数据,迭代器
      val iterators: util.Iterator[ConsumerRecord[String, String]] = records.iterator()
      while (iterators.hasNext) {
        //获取一行数据
        val line: ConsumerRecord[String, String] = iterators.next()
        val topic = line.topic()
        val partition = line.partition()
        val offset = line.offset()
        val key = line.key() //默认空
        val value = line.value()
        val time = line.timestamp() //默认系统时间
        println(s"$topic\t$partition\t$offset\t$key\t$value\t$time")
      }
    }
    consumer.close()

  }

}
举报

相关推荐

0 条评论