kafka作为分布式日志收集或系统监控服务,我们有必要在合适的场合使用它。kafka的部署包括zookeeper环境/kafka环境,同时还需要进行一些配置操作.接下来介绍如何使用kafka.
我们使用3个zookeeper实例构建zk集群,使用2个kafka broker构建kafka集群.
其中kafka为0.8V,zookeeper为3.4.5V
一.Zookeeper集群构建
我们有3个zk实例,分别为zk-0,zk-1,zk-2;如果你仅仅是测试使用,可以使用1个zk实例.
1) zk-0
调整配置文件:
1. clientPort=2181
2. server.0=127.0.0.1:2888:3888
3. server.1=127.0.0.1:2889:3889
4. server.2=127.0.0.1:2890:3890
5. ##只需要修改上述配置,其他配置保留默认值
启动zookeeper
- ./zkServer.sh start
2) zk-1
调整配置文件(其他配置和zk-0一只):
- clientPort=2182
- ##只需要修改上述配置,其他配置保留默认值
启动zookeeper
- ./zkServer.sh start
3) zk-2
调整配置文件(其他配置和zk-0一只):
- clientPort=2183
- ##只需要修改上述配置,其他配置保留默认值
启动zookeeper
- ./zkServer.sh start
二. Kafka集群构建
因为Broker配置文件涉及到zookeeper的相关约定,因此我们先展示broker配置文件.我们使用2个kafka broker来构建这个集群环境,分别为kafka-0,kafka-1.
1) kafka-0
在config目录下修改配置文件为:
1. broker.id=0
2. port=9092
3. num.network.threads=2
4. num.io.threads=2
5. socket.send.buffer.bytes=1048576
6. socket.receive.buffer.bytes=1048576
7. socket.request.max.bytes=104857600
8. log.dir=./logs
9. num.partitions=2
10. log.flush.interval.messages=10000
11. log.flush.interval.ms=1000
12. log.retention.hours=168
13. #log.retention.bytes=1073741824
14. log.segment.bytes=536870912
15. num.replica.fetchers=2
16. log.cleanup.interval.mins=10
17. zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
18. zookeeper.connection.timeout.ms=1000000
19. kafka.metrics.polling.interval.secs=5
20. kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
21. kafka.csv.metrics.dir=/tmp/kafka_metrics
22. kafka.csv.metrics.reporter.enabled=false
因为kafka用scala语言编写,因此运行kafka需要首先准备scala相关环境。
1. > cd kafka-0
2. > ./sbt update
3. > ./sbt package
4. > ./sbt assembly-package-dependency
其中最后一条指令执行有可能出现异常,暂且不管。 启动kafka broker:
1. > JMS_PORT=9997 bin/kafka-server-start.sh config/server.properties &
因为zookeeper环境已经正常运行了,我们无需通过kafka来挂载启动zookeeper.如果你的一台机器上部署了多个kafka broker,你需要声明JMS_PORT.
2) kafka-1
1. broker.id=1
2. port=9093
3. ##其他配置和kafka-0保持一致
然后和kafka-0一样执行打包命令,然后启动此broker.
1. > JMS_PORT=9998 bin/kafka-server-start.sh config/server.properties &
到目前为止环境已经OK了,那我们就开始展示编程实例吧。
三.项目准备
项目基于maven构建,不得不说kafka java客户端实在是太糟糕了;构建环境会遇到很多麻烦。建议参考如下pom.xml;其中各个依赖包必须版本协调一致。如果kafka client的版本和kafka server的版本不一致,将会有很多异常,比如"broker id not exists"等;因为kafka从0.7升级到0.8之后(正名为2.8.0),client与server通讯的protocol已经改变.
1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
2. "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
3. 4.0.0</modelVersion>
4. <groupId>com.test</groupId>
5. <artifactId>test-kafka</artifactId>
6. <packaging>jar</packaging>
7.
8. <name>test-kafka</name>
9. //maven.apache.org</url>
10. 1.0.0</version>
11. <dependencies>
12. <dependency>
13. <groupId>log4j</groupId>
14. <artifactId>log4j</artifactId>
15. 1.2.14</version>
16. </dependency>
17. <dependency>
18. <groupId>org.apache.kafka</groupId>
19. 8.0</artifactId>
20. 0.8.0-beta1</version>
21. <exclusions>
22. <exclusion>
23. <groupId>log4j</groupId>
24. <artifactId>log4j</artifactId>
25. </exclusion>
26. </exclusions>
27. </dependency>
28. <dependency>
29. <groupId>org.scala-lang</groupId>
30. <artifactId>scala-library</artifactId>
31. 2.8.1</version>
32. </dependency>
33. <dependency>
34. <groupId>com.yammer.metrics</groupId>
35. <artifactId>metrics-core</artifactId>
36. 2.2.0</version>
37. </dependency>
38. <dependency>
39. <groupId>com.101tec</groupId>
40. <artifactId>zkclient</artifactId>
41. 0.3</version>
42. </dependency>
43. </dependencies>
44. <build>
45. 1.0</finalName>
46. <resources>
47. <resource>
48. <directory>src/main/resources</directory>
49. true</filtering>
50. </resource>
51. </resources>
52. <plugins>
53. <plugin>
54. <artifactId>maven-compiler-plugin</artifactId>
55. 2.3.2</version>
56. <configuration>
57. 1.5</source>
58. 1.5</target>
59. <encoding>gb2312</encoding>
60. </configuration>
61. </plugin>
62. <plugin>
63. <artifactId>maven-resources-plugin</artifactId>
64. 2.2</version>
65. <configuration>
66. <encoding>gbk</encoding>
67. </configuration>
68. </plugin>
69. </plugins>
70. </build>
71. </project>
四.Producer端代码
1) producer.properties文件:此文件放在/resources目录下
1. #partitioner.class=
2. metadata.broker.list=127.0.0.1:9092,127.0.0.1:9093
3. ##,127.0.0.1:9093
4. producer.type=sync
5. compression.codec=0
6. serializer.class=kafka.serializer.StringEncoder
7. ##在producer.type=async时有效
8. #batch.num.messages=100
2) LogProducer.java代码样例
1. package com.test.kafka;
2.
3. import java.util.ArrayList;
4. import java.util.Collection;
5. import java.util.List;
6. import java.util.Properties;
7.
8. import kafka.javaapi.producer.Producer;
9. import kafka.producer.KeyedMessage;
10. import kafka.producer.ProducerConfig;
11. public class LogProducer {
12.
13. private Producer<String,String> inner;
14. public LogProducer() throws Exception{
15. new Properties();
16. "producer.properties"));
17. new ProducerConfig(properties);
18. new Producer<String, String>(config);
19. }
20.
21.
22. public void send(String topicName,String message) {
23. if(topicName == null || message == null){
24. return;
25. }
26. new KeyedMessage<String, String>(topicName,message);
27. inner.send(km);
28. }
29.
30. public void send(String topicName,Collection<String> messages) {
31. if(topicName == null || messages == null){
32. return;
33. }
34. if(messages.isEmpty()){
35. return;
36. }
37. new ArrayList<KeyedMessage<String, String>>();
38. for(String entry : messages){
39. new KeyedMessage<String, String>(topicName,entry);
40. kms.add(km);
41. }
42. inner.send(kms);
43. }
44.
45. public void close(){
46. inner.close();
47. }
48.
49. /**
50. * @param args
51. */
52. public static void main(String[] args) {
53. null;
54. try{
55. new LogProducer();
56. int i=0;
57. while(true){
58. "test-topic", "this is a sample" + i);
59. i++;
60. 2000);
61. }
62. catch(Exception e){
63. e.printStackTrace();
64. finally{
65. if(producer != null){
66. producer.close();
67. }
68. }
69.
70. }
71.
72. }
五.Consumer端
1) consumer.properties:文件位于/resources目录下
1. zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
2. ##,127.0.0.1:2182,127.0.0.1:2183
3. # timeout in ms for connecting to zookeeper
4. zookeeper.connectiontimeout.ms=1000000
5. #consumer group id
6. group.id=test-group
7. #consumer timeout
8. #consumer.timeout.ms=5000
9. auto.commit.enable=true
10. auto.commit.interval.ms=60000
2) LogConsumer.java代码样例
1. package com.test.kafka;
2.
3. import java.util.HashMap;
4. import java.util.List;
5. import java.util.Map;
6. import java.util.Properties;
7. import java.util.concurrent.ExecutorService;
8. import java.util.concurrent.Executors;
9.
10. import kafka.consumer.Consumer;
11. import kafka.consumer.ConsumerConfig;
12. import kafka.consumer.ConsumerIterator;
13. import kafka.consumer.KafkaStream;
14. import kafka.javaapi.consumer.ConsumerConnector;
15. import kafka.message.MessageAndMetadata;
16. public class LogConsumer {
17.
18. private ConsumerConfig config;
19. private String topic;
20. private int partitionsNum;
21. private MessageExecutor executor;
22. private ConsumerConnector connector;
23. private ExecutorService threadPool;
24. public LogConsumer(String topic,int partitionsNum,MessageExecutor executor) throws Exception{
25. new Properties();
26. "consumer.properties"));
27. new ConsumerConfig(properties);
28. this.topic = topic;
29. this.partitionsNum = partitionsNum;
30. this.executor = executor;
31. }
32.
33. public void start() throws Exception{
34. connector = Consumer.createJavaConsumerConnector(config);
35. new HashMap<String,Integer>();
36. topics.put(topic, partitionsNum);
37. byte[], byte[]>>> streams = connector.createMessageStreams(topics);
38. byte[], byte[]>> partitions = streams.get(topic);
39. threadPool = Executors.newFixedThreadPool(partitionsNum);
40. for(KafkaStream<byte[], byte[]> partition : partitions){
41. new MessageRunner(partition));
42. }
43. }
44.
45.
46. public void close(){
47. try{
48. threadPool.shutdownNow();
49. catch(Exception e){
50. //
51. finally{
52. connector.shutdown();
53. }
54.
55. }
56.
57. class MessageRunner implements Runnable{
58. private KafkaStream<byte[], byte[]> partition;
59.
60. byte[], byte[]> partition) {
61. this.partition = partition;
62. }
63.
64. public void run(){
65. byte[], byte[]> it = partition.iterator();
66. while(it.hasNext()){
67. //connector.commitOffsets();手动提交offset,当autocommit.enable=false时使用
68. byte[],byte[]> item = it.next();
69. "partiton:" + item.partition());
70. "offset:" + item.offset());
71. new String(item.message()));//UTF-8,注意异常
72. }
73. }
74. }
75.
76. interface MessageExecutor {
77.
78. public void execute(String message);
79. }
80.
81. /**
82. * @param args
83. */
84. public static void main(String[] args) {
85. null;
86. try{
87. new MessageExecutor() {
88.
89. public void execute(String message) {
90. System.out.println(message);
91.
92. }
93. };
94. new LogConsumer("test-topic", 2, executor);
95. consumer.start();
96. catch(Exception e){
97. e.printStackTrace();
98. finally{
99. // if(consumer != null){
100. // consumer.close();
101. // }
102. }
103.
104. }
105.
106. }
需要提醒的是,上述LogConsumer类中,没有太多的关注异常情况,必须在MessageExecutor.execute()方法中抛出异常时的情况.
在测试时,建议优先启动consumer,然后再启动producer,这样可以实时的观测到最新的消息。