import java.util.{Properties, UUID}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}
import org.apache.log4j.{Level, Logger}
object KafkaProducer2{
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.WARN)
val brokers = "node1:9092, node2:9092, node3:9092"
val mytopic = "mykafka1"
val kafkaProps = new Properties()
kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
// 上面的参数必须指定,以下为可选参数
kafkaProps.put(ProducerConfig.ACKS_CONFIG, "1")
kafkaProps.put(ProducerConfig.RETRIES_CONFIG, "3")
// 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。16K
kafkaProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384")
// 默认情况下,只要有可用的线程,生产者就会把消息发送出去,就算批次里只有一个消息。
// KafkaProducer 会在批次填满或 linger.ms 达到上限时把批次发送出去。
kafkaProps.put(ProducerConfig.LINGER_MS_CONFIG, "100")
// 设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。32M
kafkaProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432")
val producer = new KafkaProducer[String, String](kafkaProps)
// 缺省异步发送
for (i <- 1 to 100) {
val record = new ProducerRecord(topic, i.toString, s"$i")
producer.send(record)
}
// 同步发送
for (i <- 1 to 88) {
val record = new ProducerRecord[String, String](mytopic, s"$i, ${UUID.randomUUID}")
// 程序阻塞,直到该条消息发送成功返回元数据信息或者报错
val metadata: RecordMetadata = producer.send(record).get
println(s"(topic, partition, offset) : (${metadata.topic()}, ${metadata.partition()}, ${metadata.offset})")
}
producer.close()
}
}
// 检查topic中的数据
// kafka-run-class.sh kafka.tools.GetOffsetShell --topic mykafka1 --broker-list node1:9092