0
点赞
收藏
分享

微信扫一扫

go 连接stomp协议

IT程序员 2022-04-14 阅读 61
golang

代码


//解决第一个问题的代码
var activeConn *stomp.Conn

func Init(mqAddr []string) {
	var options = []func(*stomp.Conn) error{
		//设置读写超时	stomp.ConnOpt.HeartBeat(time.Duration(svc.Config.MQOption.HeartBeatSendTimeOut)*time.Second, time.Duration(svc.Config.MQOption.HeartBeatRecvTimeOut)*time.Second),
		stomp.ConnOpt.HeartBeatError(time.Duration(svc.Config.MQOption.HeartBeatErrorTimeOut) * time.Second),
	}
	for _, v := range mqAddr {
		conn, err := stomp.Dial("tcp", v, options...)
		if err != nil {
			continue
		}
		activeConn = conn
		break
	}
	if activeConn == nil {
		logx.Errorf("无法连接mqtt服务")
	}
}

func GetActiveMQConn(mqAddr []string) *stomp.Conn {
	Init(mqAddr)
	return activeConn
}

//解决第二个问题
func CreateSupplierOrderQueryCuConsumer(svcCtx *ServiceContext, wg *sync.WaitGroup) {
	addr := strings.Split(svcCtx.Config.OA.MQHost, ",")
	svc = svcCtx
	// 手动消息修改审批
	if svcCtx.Config.OA.MQOpen == 1 {
		conn := GetActiveMQConn(addr)
		if conn == nil {
			//os.Exit(1)
			return
		}
		defer conn.Disconnect()
		logx.Info("启动订单查询的消费者成功.....")
		orderQuerySub, err := conn.Subscribe(svcCtx.Config.OA.MQQueue, stomp.AckClient)
		if err != nil {
			logx.Errorf("conn Subscribe", err.Error())
			return
		}
		for {
			select {
			case v := <-orderQuerySub.C:
				if !orderQuerySub.Active() {
					wg.Done()
					return
				}
				msgBody := v.Body
				logx.Info("消费者正在处理订单查询: " + string(msgBody))
				var oaMsg = new(OAMsg)
				err := jsoniter.Unmarshal(msgBody, &oaMsg)
				if err != nil {
					logx.Errorf("解析OA消息错误 err:%s", err.Error())
				}
				err = AcceptMsg(svcCtx, oaMsg)
				if err != nil {
					logx.Errorf("处理OA消息错误 err:%s", err.Error())
				}
				//应答,重要
				err = conn.Ack(v)
				if err != nil {
					logx.Error("消息应答失败!")
				}
				// todo 记录日志
				logIn := &workflowmanager.FlowMqLogData{
					Destination: v.Destination,
					MsgBody:     string(msgBody),
				}
				if err != nil {
					logIn.ErrMsg = err.Error()
				}
				if v.Conn != nil {
					logIn.ServerAddr = v.Conn.Server()
				}
				_, err = svc.Workflowmanager.AddFlowMqLog(context.Background(), logIn)
				if err != nil {
					logx.Error("mq消息日志添加失败 err:%s", err.Error())
				}
			}
		}
	}
}

遇到的问题

1、发现没有消息的时候,到了超时时间消费就会断开,解决办法,判断是否关闭:
在这里插入图片描述
然后,在开启的地方加入队列代码,如下:

orderWg := &sync.WaitGroup{}
	// 启动stoMQ监听
	go func() {
		for {
			orderWg.Add(1)
			svc.CreateSupplierOrderQueryCuConsumer(ctx, orderWg)
			orderWg.Wait()
			fmt.Println("重新启动,开始下轮消费")
		}
	}()
举报

相关推荐

0 条评论