1、Java代码实现生产kafka数据
(1)导包
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.12</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>2.11.12</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>2.11.12</version>
</dependency>
(2)代码实现
object Demo01KafkaProduct {
def main(args: Array[String]): Unit = {
/**
* 创建kafka的连接
*/
//指定kafka的broker的地址
val properties = new Properties()
properties.setProperty("bootstrap.servers", "master:9092")
properties.setProperty("group.id", "test1")
//对key进行序列化
//kafka的数据以key-value形式存储类似于文件中
properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer")
//生产者
val producer: KafkaProducer[String, String] = new KafkaProducer[String,String](properties)
//topic的名称,value数据
val producerRecord = new ProducerRecord[String,String]("test1","java")
//生产数据
producer.send(producerRecord)
//将数据刷到kafka,不刷的话就是批次,我们的数据一条数据不够
producer.flush()
//关闭连接
producer.close()
}
}
2、Java代码生产kafka数据(分区)
使用hash将数据打到不同的区
object Demo02ToKafka {
def main(args: Array[String]): Unit = {
/**
* 创建kafka的连接
*/
val properties: Properties = new Properties()
properties.setProperty("bootstrap.servers", "master:9092")
properties.setProperty("group.id", "ghtf")
properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
//生产者
val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](properties)
//读取数据
Source.fromFile("kafkaproject/data/students.txt")
.getLines()
.foreach(line => {
//将性别不同数据打到不同的分区
val gender: String = line.split(",")(4)
val parti: Int = Math.abs(gender.hashCode) % 2
val redode = new ProducerRecord[String, String]("students5", parti, null, line)
producer.send(redode)
producer.flush()
})
producer.close()
}
}
3、Java代码消费kafka数据(分区)
object Demo03Coutomer {
def main(args: Array[String]): Unit = {
/**
* 创建kafka的连接
*/
val properties: Properties = new Properties()
properties.setProperty("bootstrap.servers", "master:9092")
properties.setProperty("group.id", "f")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
/**
* earliest
* 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
* latest
* 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
* none
* topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
*
*/
//从最早读取数据
properties.put("auto.offset.reset", "earliest")
//消费者
val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](properties)
//订阅一个消费者,订阅一个topic
val list = new util.ArrayList[String]()
list.add("students5")
consumer.subscribe(list)
while (true) {
//拉取数据,设置超时时间
val records: ConsumerRecords[String, String] = consumer.poll(1000)
//读取所有数据,迭代器
val iterators: util.Iterator[ConsumerRecord[String, String]] = records.iterator()
while (iterators.hasNext) {
//获取一行数据
val line: ConsumerRecord[String, String] = iterators.next()
val topic = line.topic()
val partition = line.partition()
val offset = line.offset()
val key = line.key() //默认空
val value = line.value()
val time = line.timestamp() //默认系统时间
println(s"$topic\t$partition\t$offset\t$key\t$value\t$time")
}
}
consumer.close()
}
}