0
点赞
收藏
分享

微信扫一扫

nsq源码(8) 生产和消费流程

绑定注册

  • 当nsqd带参数启动时,除了会处理客户端的请求,还会启动一个线程去注册nsqlookupd
graph LR
nsqd--注册-->nsqlookupd
func (n *NSQD) Main() {
    // 超时消息检索和处理任务
    n.waitGroup.Wrap(n.queueScanLoop)

    // 根据参数选择注册中心nsqlookupd
    n.waitGroup.Wrap(n.lookupLoop)
    if n.getOpts().StatsdAddress != "" {
        n.waitGroup.Wrap(n.statsdLoop)
    }
}

发布生产和消费订阅

  • 通过nsqd的http接口生产消息:
curl -d 'hello world 1' 'http://127.0.0.1:4151/pub?topic=test'
  • 启动消费者客户端:
nsq_to_file --topic=test --output-dir=/tmp --channel=chan --lookupd-http-address=127.0.0.1:4161
  • 生产消费流程图:
graph LR
nsq_to_file --请求-->nsqlookupd
nsqlookupd --返回分配的nsqd生产者信息-->nsq_to_file
graph LR
生产者--生产发布消息-->nsqd
nsqd --持续为消息寻找subchan--> nsq_to_file

nsq_to_file --连接并消费订阅指定topic-->nsqd
举报

相关推荐

0 条评论