0
点赞
收藏
分享

微信扫一扫

消息系统kafka介绍




kafka作为分布式日志收集或系统监控服务,我们有必要在合适的场合使用它。kafka的部署包括zookeeper环境/kafka环境,同时还需要进行一些配置操作.接下来介绍如何使用kafka.

    我们使用3个zookeeper实例构建zk集群,使用2个kafka broker构建kafka集群.

    其中kafka为0.8V,zookeeper为3.4.5V

 

一.Zookeeper集群构建

    我们有3个zk实例,分别为zk-0,zk-1,zk-2;如果你仅仅是测试使用,可以使用1个zk实例.

 1) zk-0

 调整配置文件:


  


消息系统kafka介绍_php



    1. clientPort=2181  
    2. server.0=127.0.0.1:2888:3888  
    3. server.1=127.0.0.1:2889:3889  
    4. server.2=127.0.0.1:2890:3890  
    5. ##只需要修改上述配置,其他配置保留默认值



        启动zookeeper


      


    消息系统kafka介绍_php



    1. ./zkServer.sh start  



     2) zk-1

     调整配置文件(其他配置和zk-0一只):


      


    消息系统kafka介绍_php



    1. clientPort=2182  
    2. ##只需要修改上述配置,其他配置保留默认值  



        启动zookeeper

     


      


    消息系统kafka介绍_php



    1. ./zkServer.sh start  



     3) zk-2

     调整配置文件(其他配置和zk-0一只):


      


    消息系统kafka介绍_php



    1. clientPort=2183  
    2. ##只需要修改上述配置,其他配置保留默认值  



        启动zookeeper

     


      


    消息系统kafka介绍_php



    1. ./zkServer.sh start  



      

    二. Kafka集群构建

        因为Broker配置文件涉及到zookeeper的相关约定,因此我们先展示broker配置文件.我们使用2个kafka broker来构建这个集群环境,分别为kafka-0,kafka-1.

     1) kafka-0

        在config目录下修改配置文件为:


      


    消息系统kafka介绍_php


    1. broker.id=0  
    2. port=9092  
    3. num.network.threads=2  
    4. num.io.threads=2  
    5. socket.send.buffer.bytes=1048576  
    6. socket.receive.buffer.bytes=1048576  
    7. socket.request.max.bytes=104857600  
    8. log.dir=./logs  
    9. num.partitions=2  
    10. log.flush.interval.messages=10000  
    11. log.flush.interval.ms=1000  
    12. log.retention.hours=168  
    13. #log.retention.bytes=1073741824  
    14. log.segment.bytes=536870912  
    15. num.replica.fetchers=2  
    16. log.cleanup.interval.mins=10  
    17. zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183  
    18. zookeeper.connection.timeout.ms=1000000  
    19. kafka.metrics.polling.interval.secs=5  
    20. kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter  
    21. kafka.csv.metrics.dir=/tmp/kafka_metrics  
    22. kafka.csv.metrics.reporter.enabled=false



        因为kafka用scala语言编写,因此运行kafka需要首先准备scala相关环境。


      


    消息系统kafka介绍_php



    1. > cd kafka-0  
    2. > ./sbt update  
    3. > ./sbt package  
    4. > ./sbt assembly-package-dependency



        其中最后一条指令执行有可能出现异常,暂且不管。 启动kafka broker:


      


    消息系统kafka介绍_php



    1. > JMS_PORT=9997 bin/kafka-server-start.sh config/server.properties &



        因为zookeeper环境已经正常运行了,我们无需通过kafka来挂载启动zookeeper.如果你的一台机器上部署了多个kafka broker,你需要声明JMS_PORT.

     2) kafka-1


      


    消息系统kafka介绍_php


    1. broker.id=1  
    2. port=9093  
    3. ##其他配置和kafka-0保持一致



        然后和kafka-0一样执行打包命令,然后启动此broker.


      


    消息系统kafka介绍_php



    1. > JMS_PORT=9998 bin/kafka-server-start.sh config/server.properties &



        到目前为止环境已经OK了,那我们就开始展示编程实例吧。

     

    三.项目准备

        项目基于maven构建,不得不说kafka java客户端实在是太糟糕了;构建环境会遇到很多麻烦。建议参考如下pom.xml;其中各个依赖包必须版本协调一致。如果kafka client的版本和kafka server的版本不一致,将会有很多异常,比如"broker id not exists"等;因为kafka从0.7升级到0.8之后(正名为2.8.0),client与server通讯的protocol已经改变.


      


    消息系统kafka介绍_php



      1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
      2. "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
      3. 4.0.0</modelVersion>  
      4.     <groupId>com.test</groupId>  
      5.     <artifactId>test-kafka</artifactId>  
      6.     <packaging>jar</packaging>  
      7.   
      8.     <name>test-kafka</name>  
      9. //maven.apache.org</url>  
      10. 1.0.0</version>  
      11.     <dependencies>  
      12.         <dependency>  
      13.             <groupId>log4j</groupId>  
      14.             <artifactId>log4j</artifactId>  
      15. 1.2.14</version>  
      16.         </dependency>  
      17.         <dependency>  
      18.             <groupId>org.apache.kafka</groupId>  
      19. 8.0</artifactId>  
      20. 0.8.0-beta1</version>  
      21.             <exclusions>  
      22.                 <exclusion>  
      23.                     <groupId>log4j</groupId>  
      24.                     <artifactId>log4j</artifactId>  
      25.                 </exclusion>  
      26.             </exclusions>  
      27.         </dependency>  
      28.         <dependency>  
      29.             <groupId>org.scala-lang</groupId>  
      30.             <artifactId>scala-library</artifactId>  
      31. 2.8.1</version>  
      32.         </dependency>  
      33.         <dependency>  
      34.             <groupId>com.yammer.metrics</groupId>  
      35.             <artifactId>metrics-core</artifactId>  
      36. 2.2.0</version>  
      37.         </dependency>  
      38.         <dependency>  
      39.             <groupId>com.101tec</groupId>  
      40.             <artifactId>zkclient</artifactId>  
      41. 0.3</version>  
      42.         </dependency>  
      43.     </dependencies>  
      44.     <build>  
      45. 1.0</finalName>  
      46.         <resources>  
      47.             <resource>  
      48.                 <directory>src/main/resources</directory>  
      49. true</filtering>  
      50.             </resource>  
      51.         </resources>  
      52.         <plugins>  
      53.             <plugin>  
      54.                 <artifactId>maven-compiler-plugin</artifactId>  
      55. 2.3.2</version>  
      56.                 <configuration>  
      57. 1.5</source>  
      58. 1.5</target>  
      59.                     <encoding>gb2312</encoding>  
      60.                 </configuration>  
      61.             </plugin>  
      62.             <plugin>  
      63.                 <artifactId>maven-resources-plugin</artifactId>  
      64. 2.2</version>  
      65.                 <configuration>  
      66.                     <encoding>gbk</encoding>  
      67.                 </configuration>  
      68.             </plugin>  
      69.         </plugins>  
      70.     </build>  
      71. </project>



       

      四.Producer端代码

       1) producer.properties文件:此文件放在/resources目录下


        


      消息系统kafka介绍_php



        1. #partitioner.class=  
        2. metadata.broker.list=127.0.0.1:9092,127.0.0.1:9093  
        3. ##,127.0.0.1:9093  
        4. producer.type=sync  
        5. compression.codec=0  
        6. serializer.class=kafka.serializer.StringEncoder  
        7. ##在producer.type=async时有效  
        8. #batch.num.messages=100



         2) LogProducer.java代码样例


          


        消息系统kafka介绍_php



          1. package com.test.kafka;  
          2.   
          3. import java.util.ArrayList;  
          4. import java.util.Collection;  
          5. import java.util.List;  
          6. import java.util.Properties;  
          7.   
          8. import kafka.javaapi.producer.Producer;  
          9. import kafka.producer.KeyedMessage;  
          10. import kafka.producer.ProducerConfig;  
          11. public class LogProducer {  
          12.   
          13. private Producer<String,String> inner;  
          14. public LogProducer() throws Exception{  
          15. new Properties();  
          16. "producer.properties"));  
          17. new ProducerConfig(properties);  
          18. new Producer<String, String>(config);  
          19.     }  
          20.   
          21.       
          22. public void send(String topicName,String message) {  
          23. if(topicName == null || message == null){  
          24. return;  
          25.         }  
          26. new KeyedMessage<String, String>(topicName,message);  
          27.         inner.send(km);  
          28.     }  
          29.       
          30. public void send(String topicName,Collection<String> messages) {  
          31. if(topicName == null || messages == null){  
          32. return;  
          33.         }  
          34. if(messages.isEmpty()){  
          35. return;  
          36.         }  
          37. new ArrayList<KeyedMessage<String, String>>();  
          38. for(String entry : messages){  
          39. new KeyedMessage<String, String>(topicName,entry);  
          40.             kms.add(km);  
          41.         }  
          42.         inner.send(kms);  
          43.     }  
          44.       
          45. public void close(){  
          46.         inner.close();  
          47.     }  
          48.       
          49. /**
          50.      * @param args
          51.      */  
          52. public static void main(String[] args) {  
          53. null;  
          54. try{  
          55. new LogProducer();  
          56. int i=0;  
          57. while(true){  
          58. "test-topic", "this is a sample" + i);  
          59.                 i++;  
          60. 2000);  
          61.             }  
          62. catch(Exception e){  
          63.             e.printStackTrace();  
          64. finally{  
          65. if(producer != null){  
          66.                 producer.close();  
          67.             }  
          68.         }  
          69.   
          70.     }  
          71.   
          72. }



           

          五.Consumer端

               1) consumer.properties:文件位于/resources目录下


            


          消息系统kafka介绍_php



          1. zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183  
          2. ##,127.0.0.1:2182,127.0.0.1:2183  
          3. # timeout in ms for connecting to zookeeper  
          4. zookeeper.connectiontimeout.ms=1000000  
          5. #consumer group id  
          6. group.id=test-group  
          7. #consumer timeout  
          8. #consumer.timeout.ms=5000  
          9. auto.commit.enable=true  
          10. auto.commit.interval.ms=60000



           2) LogConsumer.java代码样例


            


          消息系统kafka介绍_php



          1. package com.test.kafka;  
          2.   
          3. import java.util.HashMap;  
          4. import java.util.List;  
          5. import java.util.Map;  
          6. import java.util.Properties;  
          7. import java.util.concurrent.ExecutorService;  
          8. import java.util.concurrent.Executors;  
          9.   
          10. import kafka.consumer.Consumer;  
          11. import kafka.consumer.ConsumerConfig;  
          12. import kafka.consumer.ConsumerIterator;  
          13. import kafka.consumer.KafkaStream;  
          14. import kafka.javaapi.consumer.ConsumerConnector;  
          15. import kafka.message.MessageAndMetadata;  
          16. public class LogConsumer {  
          17.   
          18. private ConsumerConfig config;  
          19. private String topic;  
          20. private int partitionsNum;  
          21. private MessageExecutor executor;  
          22. private ConsumerConnector connector;  
          23. private ExecutorService threadPool;  
          24. public LogConsumer(String topic,int partitionsNum,MessageExecutor executor) throws Exception{  
          25. new Properties();  
          26. "consumer.properties"));  
          27. new ConsumerConfig(properties);  
          28. this.topic = topic;  
          29. this.partitionsNum = partitionsNum;  
          30. this.executor = executor;  
          31.     }  
          32.       
          33. public void start() throws Exception{  
          34.         connector = Consumer.createJavaConsumerConnector(config);  
          35. new HashMap<String,Integer>();  
          36.         topics.put(topic, partitionsNum);  
          37. byte[], byte[]>>> streams = connector.createMessageStreams(topics);  
          38. byte[], byte[]>> partitions = streams.get(topic);  
          39.         threadPool = Executors.newFixedThreadPool(partitionsNum);  
          40. for(KafkaStream<byte[], byte[]> partition : partitions){  
          41. new MessageRunner(partition));  
          42.         }   
          43.     }  
          44.   
          45.           
          46. public void close(){  
          47. try{  
          48.             threadPool.shutdownNow();  
          49. catch(Exception e){  
          50. //  
          51. finally{  
          52.             connector.shutdown();  
          53.         }  
          54.           
          55.     }  
          56.       
          57. class MessageRunner implements Runnable{  
          58. private KafkaStream<byte[], byte[]> partition;  
          59.           
          60. byte[], byte[]> partition) {  
          61. this.partition = partition;  
          62.         }  
          63.           
          64. public void run(){  
          65. byte[], byte[]> it = partition.iterator();  
          66. while(it.hasNext()){  
          67. //connector.commitOffsets();手动提交offset,当autocommit.enable=false时使用  
          68. byte[],byte[]> item = it.next();  
          69. "partiton:" + item.partition());  
          70. "offset:" + item.offset());  
          71. new String(item.message()));//UTF-8,注意异常  
          72.             }  
          73.         }  
          74.     }  
          75.       
          76. interface MessageExecutor {  
          77.           
          78. public void execute(String message);  
          79.     }  
          80.       
          81. /**
          82.      * @param args
          83.      */  
          84. public static void main(String[] args) {  
          85. null;  
          86. try{  
          87. new MessageExecutor() {  
          88.                   
          89. public void execute(String message) {  
          90.                     System.out.println(message);  
          91.                       
          92.                 }  
          93.             };  
          94. new LogConsumer("test-topic", 2, executor);  
          95.             consumer.start();  
          96. catch(Exception e){  
          97.             e.printStackTrace();  
          98. finally{  
          99. //          if(consumer != null){  
          100. //              consumer.close();  
          101. //          }  
          102.         }  
          103.   
          104.     }  
          105.   
          106. }



              需要提醒的是,上述LogConsumer类中,没有太多的关注异常情况,必须在MessageExecutor.execute()方法中抛出异常时的情况.

              在测试时,建议优先启动consumer,然后再启动producer,这样可以实时的观测到最新的消息。

           


          举报

          相关推荐

          0 条评论