17.3 消费者群例子
消费群是多线程或多机器接收KafkaTopic。
17.3.1 消费者群
Ø 消费者可以通过使用相同的“group.id”来加入组。
Ø 组的最大并行数目是组中消费者数<=分区数。
Ø Kafka将Topic分区分配给组中的消费者,以便每个分区都由组中的一个消费者使用。
Ø Kafka保证消息只能被组中的一个消费者读取。
Ø 消费者可以按照消息存储在日志中的顺序查看消息。
17.3.2消费者重现平衡
添加更多进程/线程将导致Kafka重新平衡。如果任何消费者或broker无法向ZooKeeper发送心跳,则可以通过Kafka集群重新配置。在重新平衡期间,Kafka将分配可用分区到可用线程,可能将分区移动到另一个进程:
import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class ConsumerGroup {
public static void main(String[] args) throws Exception {
if(args.length < 2)
{
System.out.println("Usage: consumer <topic> <groupname>");
return;
}
String topic = args[0].toString();
String group = args[1].toString();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", group);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topic));
System.out.println("Subscribed to topic " + topic);
int i = 0;
while (true) {
ConsumerRecords<String, String> records = con-sumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
}
}
}
编译:应用程序用下面的命令进行编译。
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” ConsumerGroup.java
执行:用下面的命令进行执行。
java -cp“/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":. ConsumerGroup<topic-name> my-group
java -cp"/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":.ConsumerGroup <topic-name> my-group
输入:打开生产者CLI,发送像下面的信息
Test consumer group 01
Test consumer group 02
第一个进程输出:
Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumergroup 01
第二个进程输出:
Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumergroup 02