0
点赞
收藏
分享

微信扫一扫

消息队列: pulsar安装部署

看山远兮 2022-02-17 阅读 69

hostname均已混淆

1、前置条件

安装java1.8, zookeeper独立安装, 这里就不再详细安装了

2、下载

apache-pulsar-2.9.1-bin.tar.gz

https://pulsar.apache.org/en/download/

3、初始化创建一个cluster

Cluster metadata initialization
tip:  you only need to write these metadata once.
bin/pulsar initialize-cluster-metadata \
  --cluster pulsar-cluster \
  --zookeeper baoding.domain.com:8181 \
  --configuration-store baoding.domain.com:8181 \
  --web-service-url http://suzhou-bigdata01.domain.com:8180/ \
  --web-service-url-tls https://suzhou-bigdata01.domain.com:8443/ \
  --broker-service-url pulsar://suzhou-bigdata01.domain.com:8650/ \
  --broker-service-url-tls pulsar+ssl://suzhou-bigdata01.domain.com:8651/

4、部署bookkeeper

Deploy BookKeeper
1)、Configure bookies
 vim bookeeper.conf 
# zkServers=zk1:2181,zk2:2181,zk3:2181
zkServers=baoding-bigdata01.domain.com:8181,baoding-bigdata02.domain.com:8181,baoding-bigdata03.domain.com:8181

journalDirectory=/home/disk2/pulsar/data/bookkeeper/journal

ledgerDirectories=/home/disk2/pulsar/data/bookkeeper/ledgers

prometheusStatsHttpPort=8100

 mkdir -p /home/disk2/pulsar/data/bookkeeper/{journal,ledgers}
2) Start bookies
 two ways: in the foreground or as a background daemon
 background:
 bin/pulsar-daemon start bookie

3) 验证
可以通过运行BookKeeper shell上的bookiesanity命令验证 bookie 是否正常工作:
bin/bookkeeper shell bookiesanity

FAQ:
1) is not matching with 
解决: 删除zookeeper里 
zkCli.sh 登录

delete /ledgers/cookies/192.168.1.1:3181


5、Deploy brokers

1) Broker configuration
vim conf/broker.conf
# Zookeeper quorum connection string
zookeeperServers=baoding-bigdata01.domain.com:8181,baoding-bigdata02.domain.com:8181,baoding-bigdata03.domain.com:8181

# Configuration Store connection string
configurationStoreServers=baoding-bigdata01.domain.com:8181,baoding-bigdata02.domain.com:8181,baoding-bigdata03.domain.com:8181

# Broker data port
brokerServicePort=8650

# Broker data port for TLS - By default TLS is disabled
brokerServicePortTls=8651

# Port to use to server HTTP request
webServicePort=8180

# Port to use to server HTTPS request - By default TLS is disabled
webServicePortTls=8443

# Hostname or IP address the service binds on, default is 0.0.0.0.
bindAddress=0.0.0.0

# Name of the cluster to which this broker belongs to
clusterName=pulsar-cluster

### --- Functions --- ###
# Enable Functions Worker Service in Broker
functionsWorkerEnabled=true

2) Start the broker service
background: [*]
bin/pulsar-daemon start broker
foreground:
bin/pulsar broker

6、client 
注意:集群各个实例的该配置的机器名字要不同, 要改
vim conf/client.conf

# webServiceUrl=https://localhost:8443/
webServiceUrl=http://suzhou-bigdata01.domain.com:8180/

# URL for Pulsar Binary Protocol (for produce and consume operations)
# For TLS:
# brokerServiceUrl=pulsar+ssl://localhost:6651/
brokerServiceUrl=pulsar://suzhou-bigdata01.domain.com:8650/

7、查看集群下broker列表
scp到其它节点, 并启动bookie和broker,查看整个集群broker信息:
bin/pulsar-admin brokers list pulsar-cluster
"suzhou-bigdata01.domain.com:8180"
"suzhou-bigdata02.domain.com:8180"
"suzhou-bigdata03.domain.com:8180"

8、列出cluster
官方文档: https://pulsar.apache.org/docs/en/pulsar-admin/
1) 列表
bin/pulsar-admin clusters list
"pulsar-cluster"
2) 查询集群配置
 bin/pulsar-admin clusters get pulsar-cluster
{
  "serviceUrl" : "http://suzhou-bigdata01.domain.com:8180/",
  "serviceUrlTls" : "https://suzhou-bigdata01.domain.com:8443/",
  "brokerServiceUrl" : "pulsar://suzhou-bigdata01.domain.com:8650/",
  "brokerServiceUrlTls" : "pulsar+ssl://suzhou-bigdata01.domain.com:8651/",
  "brokerClientTlsEnabled" : false,
  "tlsAllowInsecureConnection" : false,
  "brokerClientTlsEnabledWithKeyStore" : false,
  "brokerClientTlsTrustStoreType" : "JKS"
}

9、列出在租户/命名空间下创建的所有主题:
默认情况下,主题创建为“public”租户/“default”命名空间下的单个分区持久主题。可以使用以下命令列出在下创建的所有主题:
bin/pulsar-admin topics list public/default

让我们创建一个新的分区主题:
$ ./bin/pulsar-admin topics create-partitioned-topic --partitions 3 my-partitioned-topic

要列出分区主题,必须使用以下命令:
./bin/pulsar-admin topics list-partitioned-topics public/default

列出主题的所有订阅:
$ ./bin/pulsar-admin topics subscriptions persistent://public/default/my-first-topic

获取有关某个主题的统计信息
./bin/pulsar-admin topics stats persistent://public/default/my-first-topic

10、验证pub/sub
Eg1: 
1) 模拟生产者发送消息
bin/pulsar-client produce persistent://public/default/test -n 1 -m "Hello Pulsar"
相当于
bin/pulsar-client produce test -n 1 -m "Hello Pulsar"
2) 监听消费者接收到的消息
bin/pulsar-client consume persistent://public/default/test -n 100 -s "consumer-test" -t "Exclusive"

Eg2:
bin/pulsar-client consume my-topic -s "first-subscription"
表示从“my-topic”这个topic上消费消息,并且指定订阅名称为“first-subscription”. (my-topic会在persistent://public/default下创建持久化主题)

bin/pulsar-client produce my-topic --messages "hello pulsar"
表示发送消息到my-topic这个topic上。

11、 Function测试
function是一个极有前途的功能,可以把一个topic中喷出的消息,实时接收并处理后,再把处理结果发到另一个topic,相当于轻量级的流式计算。
./examples目录下有一个api-examples.jar包,里面自带了一些Function示例。

1)部署
部署的过程,其实就是把带处理逻辑的jar包,放到集群上,命令如下:
bin/pulsar-admin functions create \
--jar examples/api-examples.jar \
--className org.apache.pulsar.functions.api.examples.ExclamationFunction \
--inputs persistent://public/default/exclamation-input \
--output persistent://public/default/exclamation-output \
--name exclamation

大致是创建一个function,来源是examples/api-examples.jar这个文件,并指定了具体的类名(因为一个jar包中,可以写多个function,必须指定具体的className), 然后这个function的入参是exclamation-input这个topic,处理完的结果,将输出到exclamation-output,最后这个function在pulsar中的名字是exclamation - 注:如果上述命令执行失败,可以尝试把className,换成classname. (不同版本的pulsar这个参数的大小写略有不同)

附:ExclamationFunction的java源码如下,逻辑很简单,只是在输入参数后加一个!
package org.apache.pulsar.functions.api.examples;
 
import java.util.function.Function;
 
public class ExclamationFunction implements Function<String, String> {
    @Override
    public String apply(String input) {
        return String.format("%s!", input);
    }
}

2) 查看已部署的function列表
bin/pulsar-admin functions list \
--tenant public \
--namespace default

3) 启动消费者,查看实时处理结果   
bin/pulsar-client consume persistent://public/default/exclamation-output \
--subscription-name my-subscription \
--num-messages 0 

4) 启动生产者,产生实时处理所需的素材
bin/pulsar-client produce persistent://public/default/exclamation-input \
--num-produce 1 \
--messages "Hello world"

12、测试代码

1) pom.xml:

<dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-client</artifactId>
    <!--<version>2.9.1</version>-->
    <version>2.8.1</version>
</dependency>

producer:

package com.baidu.matrix.pulsar.demo;

import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.shade.org.apache.commons.codec.digest.DigestUtils;
import org.apache.pulsar.shade.org.apache.commons.codec.digest.PureJavaCrc32;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
 * @author leh
 * @version 1.0
 * @desc:
 * @date 2022/2/16 2:24 PM
 * <p>
 * <p>
 * 参考: http://javakk.com/2121.html
 */

/**
 * producer 配置项:
 *
 * “topicName” : “persistent://public/pulsar-cluster/default/my-topic”, //topicName 由四部分组成 [topic类型://租户名/命名空间/主题名]
 * “producerName” : “my-producer”, //生产者名称
 * “sendTimeoutMs” : 30000, //发送超时时间,默认 30s
 * “blockIfQueueFull” : false, //消息队列已满时是否阻止发送操作 默认false,当消息队列满,发送操作将立即失败
 * “maxPendingMessages” : 1000,//设置等待接收来自broker的确认消息的队列的最大大小,队列满试,blockIfQueueFull=true才有效
 * “maxPendingMessagesAcrossPartitions” : 50000,//设置所有分区的最大挂起消息数
 * “messageRoutingMode” : “CustomPartition”, //消息分发路由模式 CustomPartition;RoundRobinPartition 环形遍历分区;SinglePartition 随机选择一个分区 //参考http://pulsar.apache.org/docs/zh-CN/2.2.0/cookbooks-partitioned/
 * “hashingScheme” : “JavaStringHash”,//更改用于选择在何处发布特定消息的分区的哈希方案
 * “cryptoFailureAction” : “FAIL”,//为失效的生产者指定一个默认的特定值
 * “batchingMaxPublishDelayMicros” : 1000,//设置发送的消息将被成批处理的时间段默认值:如果启用了成批消息,则为1毫秒。
 * “batchingMaxMessages” : 1000, //设置批处理中允许的最大消息数
 * “batchingEnabled” : true, //控制是否为生产者启用消息的自动批处理。
 * “compressionType” : “NONE”, //设置生产者的压缩类型 (eg: CompressionType.SNAPPY)
 * “initialSequenceId” : null, //为生产者发布的消息设置序列ID的基础值
 * “properties” : { } //为生产者设置属性
 */

public class PulsarProducerDemo {

    public static void main(String[] args) {
        PulsarClient client = null;
        Producer<String> producer = null;

        //  bin/pulsar-client consume my-first-topic -n 10 -s "consumer-test" -t "Exclusive"
        try {
            client = PulsarClient.builder()
                    .serviceUrl("pulsar://suzhou-bigdata01.domain.com:8650,pulsar://suzhou-bigdata02.domain.com:8650,pulsar://suzhou-bigdata03.domain.com:8650")
                    .build();


            // 1、单条字节同步发送(send) byte[]
            /*
            Producer<byte[]> producer1 = client.newProducer()
                    .topic("my-first-topic")
                    .create();

            producer1.send("Hello byte Streams Word!".getBytes());
            */

            // 2、单条字符串同步发送(send) String
            /*
            producer = client.newProducer(Schema.STRING)
                    .topic("my-first-topic")
                    .create();

            producer.send("Hello Streams Word!");
            */


            // 3、移步发送(sendAsync)
            /*
            producer = client.newProducer(Schema.STRING)
                    .topic("my-first-topic")
                    .create();

            CompletableFuture<MessageId> future = producer.sendAsync("sendAsync streams processing");
            future.thenAccept(msgId -> {
                System.out.printf("Message with ID %s successfully sent asynchronously\n", msgId);
            });

            // 消费端:
            // ----- got message -----
            // key:[null], properties:[], content:sendAsync streams processing
            */

            // 4、还可以使用给定的键和属性构建消息:
            /*
            producer = client.newProducer(Schema.STRING)
                    .topic("my-first-topic")
                    .create();

            TypedMessageBuilder<String> message = producer.newMessage()
                    .key("my-key")
                    .property("application", "pulsar-java-quickstart")
                    .property("pulsar.client.version", "2.4.1")
                    .value("this message content");
            message.send();
            */

            // 消费端:
            // ----- got message -----
            // key:[my-key], properties:[application=pulsar-java-quickstart, pulsar.client.version=2.4.1], content:value-message

            // 5、出于性能方面的考虑,通常最好发送成批消息,以便根据吞吐量节省一些网络带宽。在创建producer客户端时可以启用b缓存。
            producer = client.newProducer(Schema.STRING)
                    .producerName("my-producer") //生产者名称
                    .topic("my-first-topic") //topicName 由四部分组成 [topic类型://租户名/命名空间/主题名]
                    .compressionType(CompressionType.SNAPPY)
                    .enableBatching(true)
                    .blockIfQueueFull(true)
                    .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS)
                    .batchingMaxMessages(10)
                    .maxPendingMessages(512)
                    // 设置消息发送超时时间
                    .sendTimeout(86400, TimeUnit.SECONDS)  //发送超时时间,默认 30s
                    //设置集群路由策略(该信息存到那个分区里)
                    .messageRoutingMode(MessageRoutingMode.CustomPartition).messageRouter(
                            new MessageRouter() {
                                @Override
                                public int choosePartition(Message<?> message, TopicMetadata metadata) {
                                    return new String(message.getData()).trim().charAt(0) % metadata.numPartitions();
                                }
                            }
                    )
                    // 为生产者设置属性
                    .property("author", "leh")
                    .create();

            for (int i = 0; i < 100; i++) {
                producer.send("message_" + i);
            }


            System.out.println("send ok!");

            TimeUnit.SECONDS.sleep(5);

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (producer != null) {
                    producer.close();
                }
            } catch (PulsarClientException e) {
                e.printStackTrace();
            }

            try {
                if (client != null) {
                    client.close();
                }
            } catch (PulsarClientException e) {
                e.printStackTrace();
            }


            // 关闭操作也可以是异步的:
            if (producer != null) {
                producer.closeAsync()
                        .thenRun(() -> System.out.println("Producer closed"))
                        .exceptionally((ex) -> {
                            System.err.println("Failed to close producer: " + ex);
                            return null;
                        });
            }

        }
    }
}

consumer:

package com.baidu.matrix.pulsar.demo;

import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;

import java.util.concurrent.TimeUnit;

/**
 * @author leh
 * @version 1.0
 * @desc:
 * @date 2022/2/16 5:51 PM
 */

/**
 * 参考链接
 * http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/PulsarClient.html
 * http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientBuilder.html
 *
 * client 配置项:
 *
 * “serviceUrl” : “pulsar://localhost:6650”, //broker集群地址, 地址和多个,逗号分割
 * “operationTimeoutMs” : 30000, //操作超时设置
 * “statsIntervalSeconds” : 60, //设置每个统计信息之间的间隔(默认值:60秒)统计信息将以正值激活状态间隔秒数应设置为至少1秒
 * “numIoThreads” : 1,//设置用于处理与broker的连接的线程数(默认值:1个线程)
 * “numListenerThreads” : 1,// 设置要用于消息侦听器的线程数(默认值:1个线程)
 * “connectionsPerBroker” : 1, //设置客户端库将向单个broker打开的最大连接数。
 * “enableTcpNoDelay” : true, //配置是否在连接上使用无延迟tcp,默认为true。无延迟功能确保数据包尽快发送到网络上,实现低延迟发布至关重要。另一方面,发送大量的小数据包可能会限制整体吞吐量。
 * “useTls” : false, // 启用ssl,在serviceurl中使用“pulsar+ssl://”启用
 * “tlsTrustCertsFilePath” : “”,//设置受信任的TLS证书文件的路径
 * “tlsAllowInsecureConnection” : false, //配置pulsar客户端是否接受来自broker的不受信任的TLS证书(默认值:false)
 * “tlsHostnameVerificationEnable” : false,//它允许在客户端通过TLS连接到代理时验证主机名验证
 * “concurrentLookupRequest” : 5000,//允许在每个broker连接上发送的并发查找请求数,以防止代理过载。
 * “maxLookupRequest” : 50000,//为防止broker过载,每个broker连接上允许的最大查找请求数。
 * “maxNumberOfRejectedRequestPerConnection” : 50,//设置在特定时间段(30秒)内被拒绝的broker请求的最大数目,在此时间段后,当前连接将关闭,客户端将创建一个新连接,以便有机会连接其他broker(默认值:50)
 * “keepAliveIntervalSeconds” : 30 //为每个客户端broker连接设置以秒为单位的心跳检测时间
 */


/**
 * 参考链接
 * //http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerBuilder
 *
 * consumer配置项:
 *
 * “topicNames” : [ ], //消费者订阅的主题
 * “topicsPattern” : null, //指定此使用者将订阅的主题的模式。它接受正则表达式,并将在内部编译为模式。例如:“persistent://prop/use/ns abc/pattern topic-.*”
 * “subscriptionName” : “my-subscription”, //消费者的订阅名
 * “subscriptionType” : “Exclusive”,//选择订阅主题时要使用的订阅类型。 Exclusive 独占;Failover 故障转移 ;Shared 共享
 *                      独家订阅:-一次只有一个消费者可以通过订阅阅读主题
 *                      共享订阅:-竞争消费者可以通过同一订阅同时阅读主题。
 *                      故障转移订阅:-用户的活动/备份模式。如果活动consumer消费者死亡,则备份将接管。但从来没有两个活跃的消费者同时存在。
 * “receiverQueueSize” : 3,//设置消费者接收队列的大小。
 * “acknowledgementsGroupTimeMicros” : 100000, //按指定时间对消费者分组
 * “maxTotalReceiverQueueSizeAcrossPartitions” : 10, //设置跨分区的最大总接收器队列大小
 * “consumerName” : “my-consumer”, //消费者的名字
 * “ackTimeoutMillis” : 10000,//设置未确认消息的超时
 * “priorityLevel” : 0, //为共享订阅使用者设置优先级级别,broker 在调度消息时向其提供更高的优先级。
 * “cryptoFailureAction” : “FAIL”,//为失效的消费者指定一个默认的特定值
 * “properties” : { }, //设置属性值
 * “readCompacted” : false, //如果启用,消费者将从压缩的主题中读取消息,而不是读取主题的完整消息积压。
 * “subscriptionInitialPosition” : “Latest”, //设置消费者的订阅初始位置 Earliest 从最早的位置,即第一条消息。 Latest 从最后的位置,即最后一条消息。
 * “patternAutoDiscoveryPeriod” : 1, //为主题消费者使用模式时设置主题自动发现周期。
 * “subscriptionTopicsMode” : “PERSISTENT”,//确定此消费者应订阅哪些主题-持久性主题、非持久性主题或两者都应订阅。
 * “deadLetterPolicy” : null //死信策略 为消费者设置死信策略,某些消息将尽可能多次重新传递。通过使用死信机制,消息将具有最大重新传递计数,当消息超过最大重新传递数时,消息将发送到死信主题并自动确认。您可以通过设置死信策略来启用死信机制。
 */
public class PulsarConsumerDemo {

    /**
     Consumer支持:
         同步接收消息:阻塞用户线程等待消息
         异步接收消息:异步等待消息(通过Future返回消息)
         通过MessageListener返回消息:接收消息后回调用户的MessageListener
     */
    public static void main(String[] args) {
        PulsarClient client = null;
        Consumer<String> consumer = null;

        //  bin/pulsar-client consume my-first-topic -n 10 -s "consumer-test" -t "Exclusive"
        try {

            client = PulsarClient.builder()
                    .serviceUrl("pulsar://suzhou-bigdata01.domain.com:8650,pulsar://suzhou-bigdata02.domain.com:8650,pulsar://suzhou-bigdata03.domain.com:8650")
                    .enableTcpNoDelay(true)
                    .build();


            // 1、single record
            /*
            consumer = client.newConsumer(Schema.STRING)
                    .topic("my-first-topic")
                    .subscriptionName("my-first-subscription")
                    .subscriptionType(SubscriptionType.Exclusive)
                    .subscribe();

             while (true) {
                // blocks until a message is available
                Message<String> messageObj = consumer.receive();

                try {

                    // Do something with the message
                    System.out.printf("Message1 received: %s\n", messageObj.getValue());
                    System.out.printf("Message2 received: %s\n", new String(messageObj.getData()));

                    // Acknowledge the message so that it can be deleted by the message broker
                    consumer.acknowledge(messageObj);
                } catch (Exception e) {
                    // Message failed to process, redeliver later
                    consumer.negativeAcknowledge(messageObj);
                }
                System.out.println("next...");
            }
            */


            // 2、batch
            consumer = client.newConsumer(Schema.STRING)
                    .topic("my-first-topic")
                    .subscriptionName("my-first-subscription")
                    .subscriptionType(SubscriptionType.Exclusive)
                    .subscriptionInitialPosition(SubscriptionInitialPosition.Latest) // SubscriptionInitialPosition.Earliest
                    .batchReceivePolicy(
                        BatchReceivePolicy.builder()
                        .maxNumMessages(50)
                        .maxNumBytes(5 * 1024 * 1024)
                        .timeout(100, TimeUnit.MILLISECONDS)
                        .build()
                    )
                    .subscribe();

            while (true) {
                // blocks until a message is available
                Messages<String> messages = consumer.batchReceive();

                try {
                    // Do something with the message
                    messages.forEach(messageObj -> {
                        System.out.printf("Message1 received: %s\n", messageObj.getValue());
                        // System.out.printf("Message2 received: %s\n", new String(messageObj.getData()));
                    });

                    // Acknowledge the message so that it can be deleted by the message broker
                    consumer.acknowledge(messages);
                } catch (Exception e) {
                    // Message failed to process, redeliver later
                    consumer.negativeAcknowledge(messages);
                }
                System.out.println("next...");
            }

            // 3、listen message
            // If you don't want to block your main thread and rather listen constantly for new messages, consider using a MessageListener
            /*
            consumer = client.newConsumer(Schema.STRING)
                    .topic("my-first-topic")
                    .subscriptionName("my-first-subscription")
                    .subscriptionType(SubscriptionType.Exclusive)
                    .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) // SubscriptionInitialPosition.Earliest
                    .messageListener(new MessageListener<String>() {
                        @Override
                        public void received(Consumer<String> consumer, Message<String> msg) {
                            try {
                                System.out.println("Message received: " + new String(msg.getData()));
                                consumer.acknowledge(msg);
                            } catch (PulsarClientException e) {
                                e.printStackTrace();
                            }
                        }
                    })
                    .subscribe();
                    */

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (consumer != null) {
                    consumer.close();
                }
            } catch (PulsarClientException e) {
                e.printStackTrace();
            }

            try {
                if (client != null) {
                    client.close();
                }
            } catch (PulsarClientException e) {
                e.printStackTrace();
            }

            // 关闭操作也可以是异步的:
            /*if (consumer != null) {
                consumer.closeAsync()
                        .thenRun(() -> System.out.println("Producer closed"))
                        .exceptionally((ex) -> {
                            System.err.println("Failed to close producer: " + ex);
                            return null;
                        });
            }*/
        }
    }
}

13、常用命令
https://pulsar.apache.org/docs/en/pulsar-admin/
1、clusters
1) 查看: bin/pulsar-admin clusters list
"pulsar-cluster"
2) 查看配置: bin/pulsar-admin clusters get pulsar-cluster
3) 创建: bin/pulsar-admin clusters create : Provisions a new cluster. This operation requires Pulsar
4) 更新: bin/pulsar-admin clusters update : Update the configuration for a cluster
5) 删除: bin/pulsar-admin clusters delete [-a] [clusterName] : Deletes an existing cluster
6) 其它: bin/pulsar-admin clusters get-peer-clusters pulsar-cluster

2、brokers
1) bin/pulsar-admin brokers version
2) 查看集群broker实例: bin/pulsar-admin brokers list [clusterName]
eg:
bin/pulsar-admin brokers list pulsar-cluster
"suzhou-bigdata02.domain.com:8180"
"suzhou-bigdata01.domain.com:8180"
"suzhou-bigdata03.domain.com:8180"
3) 查看broker的leader: bin/pulsar-admin brokers leader-broker 
{
  "serviceUrl" : "http://suzhou-bigdata02.domain.com:8180"
}

3、 topics
1) 查tenant/namespace下的topic列表: 
bin/pulsar-admin topics list public/default : Get the list of topics under a namespace.

"persistent://public/default/exclamation-output"
"persistent://public/default/exclamation-input"
"persistent://public/default/my-partitioned-topic-partition-0"
"persistent://public/default/test"
"persistent://public/default/my-topic"
"persistent://public/default/my-first-topic"
"persistent://public/default/my-partitioned-topic-partition-1"
"persistent://public/default/my-partitioned-topic-partition-2"
2) 查partition topic: 
bin/pulsar-admin topics list-partitioned-topics public/default :  Get the list of partitioned topics under a namespace
3) 删除主题: 
bin/pulsar-admin topics delete -d -f persistent://tenant/namespace/topic : Delete a topic. The topic cannot be deleted if there's any 
eg: bin/pulsar-admin topics delete -d -f persistent://public/default/test

举报

相关推荐

0 条评论