0
点赞
收藏
分享

微信扫一扫

ZeroMQ--使用jzmq进行编程


一、环境搭建


wget http://download.zeromq.org/zeromq-2.1.7.tar.gz         


          tar -xzf zeromq-2.1.7.tar.gz         


          cd zeromq-2.1.7         


          ./configure         


          make         


          sudo make install         


                    


          git clone https://github.com/nathanmarz/jzmq.git         


          cd jzmq         


          ./autogen.sh         


          ./configure         


          make         


          sudo make install         


                    


          如果没有安装libtool、libuuid-devel则需要先安装,否则安装失败         


          yum install libtool         


          yum install libuuid-devel



常见问题:

出现java.lang.UnsatisfiedLinkError: /usr/local/lib/libjzmq.so.0.0.0: libzmq.so.1: cannot open shared object file: No such file or directory异常 
原因是未找到zmq动态链接库。 
解决方法1:export LD_LIBRARY_PATH=/usr/local/lib 
解决方法2:编辑/etc/ld.so.conf文件,增加一行:/usr/local/lib。再执行sudo ldconfig命令 

Exception in thread "main" java.lang.UnsatisfiedLinkError: no jzmq in java.library.path 
未设置native library 
在eclipse设置native library为/usr/local/lib 
或在jvm增加参数 
-Djava.library.path=/usr/local/lib 
或在启动脚本中增加 
java -Djava.library.path=/usr/local/lib

二、使用jzmq进行编程

    1.创建maven项目,pom.xml的内容参见pom.xml

      注意:jzmq的版本不能太高,建议使用2.1.0,目前storm也是使用这个版本的jzmq-2.1.0.jar

      否则报: java.lang.UnsatisfiedLinkError: org.zeromq.ZMQ$Socket.nativeInit()V

    2.编写Publisher.java,Subscriber.java,参见源代码

    Publisher.java  


package          com.catt.mqtest.pubsub;         


                    


                    


          import          org.slf4j.Logger;         


          import          org.slf4j.LoggerFactory;         


          import          org.zeromq.ZMQ;         


          import          org.zeromq.ZMQ.Context;         


          import          org.zeromq.ZMQ.Socket;         


                    


          public          class          Publisher {         


                    


                    // 等待10个订阅者         


                    private          static          final          int          SUBSCRIBERS_EXPECTED =           10          ;         


                    // 定义一个全局的记录器,通过LoggerFactory获取         


                    private          final          static          Logger log = LoggerFactory.getLogger(Publisher.          class          );         


                    


                    public          static          void          main(String[] args)           throws          InterruptedException{         


                    Context context = ZMQ.context(          1          );         


                    Socket publisher = context.socket(ZMQ.PUB);         


                    publisher.bind(          "tcp://*:5557"          );         


                    try          {         


                    // zmq发送速度太快,在订阅者尚未与发布者建立联系时,已经开始了数据发布         


                    Thread.sleep(          1000          );         


                    }           catch          (InterruptedException e) {         


                    e.printStackTrace();         


                    }         


                    


                    publisher.send(          "send start......"          .getBytes(),           0          );         


                    for          (          int          i =           0          ; i <           10          ; i++) {         


                    publisher.send((          "Hello world "          +i).getBytes(), ZMQ.NOBLOCK);         


                    }         


                    publisher.send(          "send end......"          .getBytes(),           0          );         


                    


                    publisher.close();         


                    context.term();         


                    }         


          }



Subscriber.java

package          com.catt.mqtest.pubsub;         


                    


          import          org.slf4j.Logger;         


          import          org.slf4j.LoggerFactory;         


          import          org.zeromq.ZMQ;         


          import          org.zeromq.ZMQ.Context;         


          import          org.zeromq.ZMQ.Socket;         


                    


          public          class          Subscriber {         


                    


                    // 定义一个全局的记录器,通过LoggerFactory获取         


                    private          final          static          Logger log = LoggerFactory.getLogger(Subscriber.          class          );         


                    


                    public          static          void          main(String[] args) {         


                    Context context = ZMQ.context(          1          );         


                    Socket subscriber = context.socket(ZMQ.SUB);         


                    subscriber.connect(          "tcp://192.168.230.128:5557"          );         


                    subscriber.subscribe(          ""          .getBytes());         


                    int          total =           0          ;         


                    while          (          true          ) {         


                    byte          [] stringValue = subscriber.recv(          0          );         


                    String string =           new          String(stringValue);         


                    if          (string.equals(          "send end......"          )) {         


                    break          ;         


                    }         


                    total++;         


                    System.out.println(          "Received "          + total +           " updates. :"          + string);         


                    }         


                    


                    subscriber.close();         


                    context.term();         


                    }         


          }




pom.xml

 

<          project          xmlns          =          "http://maven.apache.org/POM/4.0.0"          xmlns:xsi          =          "http://www.w3.org/2001/XMLSchema-instance"         


                    xsi:schemaLocation          =          "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"          >         


                    <          modelVersion          >4.0.0</          modelVersion          >         


                    


                    <          groupId          >com.catt</          groupId          >         


                    <          artifactId          >mqtest</          artifactId          >         


                    <          version          >0.0.1-SNAPSHOT</          version          >         


                    <          packaging          >jar</          packaging          >         


                    


                    <          name          >mqtest</          name          >         


                    <          url          >http://maven.apache.org</          url          >         


                    


                    <          properties          >         


                    <          project.build.sourceEncoding          >UTF-8</          project.build.sourceEncoding          >         


                    </          properties          >         


                    


                    <          dependencies          >         


                    <          dependency          >         


                    <          groupId          >org.zeromq</          groupId          >         


                    <          artifactId          >jzmq</          artifactId          >         


                    <          version          >2.1.0</          version          >         


                    </          dependency          >         


                    <          dependency          >         


                    <          groupId          >ch.qos.logback</          groupId          >         


                    <          artifactId          >logback-classic</          artifactId          >         


                    <          version          >1.1.1</          version          >         


                    </          dependency          >         


                    <          dependency          >         


                    <          groupId          >junit</          groupId          >         


                    <          artifactId          >junit</          artifactId          >         


                    <          version          >4.10</          version          >         


                    <          scope          >test</          scope          >         


                    </          dependency          >         


                    </          dependencies          >         


          </          project          >

举报

相关推荐

0 条评论