对于kafka client建议使用java版本的api(kafka0.9版本中客户端代码使用java重构了一次),通过下面配置添加:(下面配置只会添加java版本的jar)
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.1</version>
</dependency>
1)Kafka包括以下四种核心API:
- TheProducer API allows an application to publish a stream of records to one or more Kafka topics.
- TheConsumer API allows an application to subscribe to one or more topics and process the stream of records produced to them.
- TheStreams API allows an application to act as a stream processor, consuming an input stream from one or more topics and producing an output stream to one or more output topics, effectively transforming the input streams to output streams.
- TheConnector API allows building and running reusable producers or consumers that connect Kafka topics to existing applications or data systems. For example, a connector to a relational database might capture every change to a table.
这里我们来讲解比较基础的Producer和Consumer。
2)说明:
kafka client有新旧两个版本,在kafka0.9中全部使用了新版本的客户端api(使用java重构过)。旧版本的客户端api是使用scala实现的。新旧客户端api对比:
//旧版本
kafka.javaapi.producer.Producer<K,V>
//新版本
org.apache.kafka.clients.producer.KafkaProducer<K,V>
1、新版producer api :
Producer用来向Kafka集群中发布消息记录的Kafka客户端。Producer是线程安全的,并且通常来讲,在多个线程间共享一个producer要比每个线程都创建一个producer速度更快。
producer由一个缓冲池(buffer pool)和一个I / O后台线程组成,新版producer api采用异步批量方式发送消息。该缓冲池保存尚未传输到服务器的消息记录,I / O线程负责将这些记录转换为请求并将它们传输到broker集群,使用后需要关闭生产者资源,关闭时会自动检查缓冲区的消息,将剩余消息发送到broker。
1.1)producer 消息发送方式:
1)异步发送:(fire-and-forget)
Properties props = new Properties();
props.put("bootstrap.servers", "10.39.16.34:9092");
props.put("acks", "1");
props.put("retries", 0);
props.put("batch.size", 5);
props.put("linger.ms", 10000);
//props.put("buffer.memory", 1024 * 1024 * 16);
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10000000000000L; i++) {
ProducerRecord<String, String> record =
new ProducerRecord<>("test", Integer.toString(i), Integer.toString(i));//1
try {
producer.send(record);//2
} catch(Exception e) {
logger.error("error...",e);
}
}
producer.close();
绝大多数情况下采用fire-and-forget模式发送消息,因为kafka本身的高可用以及客户端自动重试可以保证消息的正确发送。但这种方式有可能会丢消息(发送中的异常无法捕获)!
2)异步+回调:
Properties props = new Properties();
props.put("bootstrap.servers", "10.39.16.34:9092");
props.put("acks", "1");
props.put("retries", 0);
props.put("batch.size", 2);
props.put("linger.ms", 10_000);
//props.put("buffer.memory", 1024 * 1024 * 16);
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++) {
try {
producer.send(new ProducerRecord<>("test", Integer.toString(i), Integer.toString(i)),
new Callback(){
public void onCompletion(RecordMetadata metadata,Exception exception) {//1
if (exception == null) {//2
System.out.println("Received new metadata." +
"Topic: " + metadata.topic() + "," +
"Partition: " + metadata.partition() + "," +
"Offset: " + metadata.offset() + "," +
"Timestamp: " + metadata.timestamp());
} else {
logger.error("onCompletion error...",exception);
}
}
});
} catch (Exception e) {//3
logger.error("send error...",e);
}
}
//producer.close();
3)同步发送:
Properties props = new Properties();
props.put("bootstrap.servers", "10.39.16.34:9092");
props.put("acks", "1");
props.put("retries", 0);
props.put("batch.size", 5);
props.put("linger.ms", 10_000);
props.put("buffer.memory", 1024 * 1024 * 16);
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++) {
ProducerRecord<String, String> record =
new ProducerRecord<>("test", Integer.toString(i), Integer.toString(i));
try {
RecordMetadata metadata = producer.send(record).get();//1
System.out.println("Received new metadata." +
"Topic: " + metadata.topic() + "," +
"Partition: " + metadata.partition() + "," +
"Offset: " + metadata.offset() + "," +
"Timestamp: " + metadata.timestamp());
} catch(Exception e) {
logger.error("send error",e);//2
}
}
//producer.close();
注:send()方法本身不会抛出“发送中”的异常,会抛出发送前的异常(比如:序列化失败),对于发送中的异常信息捕获只能靠同步返回值(future.get()方法)、或者callback中的异常。
2、Callable中获取消息:
kafka是不会在producer的回调中,把发送的消息再一次返回回来的,因为这些消息我们可以自己记录,没必要浪费网络资源。上面的示例中我们看到,callback回调函数里只有metadata信息和Exception信息,当发送中出现异常,callback里改如何拿到消息做进一步处理呢?
继承Callback类,添加消息属性,在send的时候使用我们自己写的callback类。
1)继承callback:
class RecordCallback implements Callback {
private static final Logger logger = LoggerFactory.getLogger(RecordCallback.class);
private ProducerRecord<String,String> msg;
public RecordCallback(ProducerRecord<String,String> msg) {
this.msg = msg;
}
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {//2
System.out.println("Received new metadata." +
"Topic: " + metadata.topic() + "," +
"Partition: " + metadata.partition() + "," +
"Offset: " + metadata.offset() + "," +
"Timestamp: " + metadata.timestamp());
} else {
//将消息保存到日志
logger.info("save error msg,key:{},value:{}",msg.key(),msg.value());
logger.error("RecordCallback error...",exception);
}
}
}
2)callbak发送消息:
Properties props = new Properties();
props.put("bootstrap.servers", "10.39.16.34:9092");
props.put("acks", "1");
props.put("retries", 1);
props.put("batch.size", 2);
props.put("linger.ms", 10_000);
//props.put("buffer.memory", 1024 * 1024 * 16);
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++) {
ProducerRecord<String, String> record =
new ProducerRecord<>("test", Integer.toString(i), Integer.toString(i));
try {
producer.send(record, new RecordCallback(record));
} catch (Exception e) {//3
logger.error("send error...",e);
}
}
producer.close();
总结:推荐使用callback模式
- 对于fire-and-forget模式,由于不会调用future.get()方法,所以对于一些异常无法捕获,从而会造成消息丢失;
- 对于同步模式,性能很差,所以一般也不会使用;