0
点赞
收藏
分享

微信扫一扫

kafka生产者(同步/异步发送数据)

import java.util.{Properties, UUID}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}
import org.apache.log4j.{Level, Logger}

object KafkaProducer2{
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.WARN)
    val brokers = "node1:9092, node2:9092, node3:9092"
    val mytopic = "mykafka1"
    val kafkaProps = new Properties()
    kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
    kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,   "org.apache.kafka.common.serialization.StringSerializer")
    // 上面的参数必须指定,以下为可选参数
    kafkaProps.put(ProducerConfig.ACKS_CONFIG, "1")
    kafkaProps.put(ProducerConfig.RETRIES_CONFIG, "3")
    // 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。16K
    kafkaProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384")
    // 默认情况下,只要有可用的线程,生产者就会把消息发送出去,就算批次里只有一个消息。
    // KafkaProducer 会在批次填满或 linger.ms 达到上限时把批次发送出去。
    kafkaProps.put(ProducerConfig.LINGER_MS_CONFIG, "100")
    // 设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。32M
    kafkaProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432")

    val producer = new KafkaProducer[String, String](kafkaProps)

     // 缺省异步发送
    for (i <- 1 to 100) {
      val record = new ProducerRecord(topic, i.toString, s"$i")
      producer.send(record)
    }

    // 同步发送
    for (i <- 1 to 88) {
      val record = new ProducerRecord[String, String](mytopic, s"$i, ${UUID.randomUUID}")
      // 程序阻塞,直到该条消息发送成功返回元数据信息或者报错
      val metadata: RecordMetadata = producer.send(record).get
      println(s"(topic, partition, offset) : (${metadata.topic()}, ${metadata.partition()}, ${metadata.offset})")
    }

    producer.close()
  }
}
// 检查topic中的数据
// kafka-run-class.sh kafka.tools.GetOffsetShell --topic mykafka1 --broker-list node1:9092

举报

相关推荐

0 条评论