0
点赞
收藏
分享

微信扫一扫

封装webSoket,添加重连、心跳检测机制

/**
 * @file websocket封装
 */

interface SubscribeList {
  [index: string]: Function[];
}

export class Socket {
  private static instance: Socket | null;
  private static _isConnected: boolean | null = null;

  private readonly cachedUrl: string = "";
  private readonly cachedOnConnected: Function | undefined;

  private socket!: WebSocket;

  // all,通用回调函数
  private cbs: Function[] = [];
  private subscribeList: SubscribeList = {};
  // error,socket连接出错时的回调函数
  private errCbs: Function[] = [];

  // 重试次数
  private reconnectTimes: number = 0;
  // eslint-disable-next-line @typescript-eslint/no-explicit-any
  private reconnectTimer: any = null;
  private maxReconnectTimes: number = 10;

  private timeout: number = 60 * 1000; // 60s超时
  // eslint-disable-next-line @typescript-eslint/no-explicit-any
  private timeoutTimer: any = null;
  // eslint-disable-next-line @typescript-eslint/no-explicit-any
  private heartTimer: any = null;

  private cachedConnectedCb: Function[] = [];
  private cachedConnectErrCb: Function[] = [];

  // 是否正在连接中
  private isConnecting: boolean = false;

  constructor(
    url: string,
    onConnected?: Function,
    onConnectedError?: Function
  ) {
    if (Socket.instance) {
      onConnected && Socket.instance.cachedConnectedCb.push(onConnected);
      onConnectedError &&
        Socket.instance.cachedConnectErrCb.push(onConnectedError);

      return Socket.instance;
    }
    Socket.instance = this;

    this.cachedUrl = url;
    this.cachedOnConnected = onConnected;
    this.socket = this.connect(url, onConnected, onConnectedError);
  }

  static get isConnected(): boolean | null {
    return this._isConnected;
  }

  get socketInstance(): WebSocket {
    return this.socket;
  }

// 心跳检测
  private heartCheck() {
    this.heartTimer && clearInterval(this.heartTimer);
    this.heartTimer = setInterval(() => {
      this.socket.send("ping");
    }, 10000);
  }

  connect(url: string, onConnected?: Function, onConnectedError?: Function) {
    // 由于UI渲染线程和网络IO线程两者异步,不同组件挂载时new出来的Socket单例的连接状态没办法保证
    // 如果处于正在连接中状态,则将新传入的连接成功回调函数和连接失败回调函数缓存起来,等待Socket链接成功或链接失败后再执行
    if (this.isConnecting) {
      onConnected && this.cachedConnectedCb.push(onConnected);
      onConnectedError && this.cachedConnectErrCb.push(onConnectedError);
      return this.socket;
    }
    // 已经链接失败了,直接执行失败函数
    if (Socket._isConnected === false) {
      onConnectedError && onConnectedError();
      return this.socket;
    }
    this.isConnecting = true;
    const ws = new WebSocket(url);

    ws.onopen = (...args) => {
      Socket._isConnected = true;
      onConnected && onConnected(...args);
      this.cachedConnectedCb.forEach((cb) => cb(...args));
      this.isConnecting = false;
      this.heartCheck();
    };
    this.init(ws, onConnectedError);

    return ws;
  }

  init(socket: WebSocket, onConnectedError?: Function) {
    // eslint-disable-next-line @typescript-eslint/no-explicit-any
    socket.onmessage = (evt: any) => {
      this.timeoutTimer && clearTimeout(this.timeoutTimer);
      this.reconnectTimes = 0;
      // eslint-disable-next-line @typescript-eslint/no-explicit-any
      let jsonData: any = {};

      try {
        jsonData = JSON.parse(evt.data);
      } catch (e) {
        jsonData = evt.data;
      } finally {
        this.callCbs(this.cbs, jsonData);
        this.callCbs(this.subscribeList[jsonData.Name] || [], jsonData);
      }
      this.heartCheck();
    };

    socket.onerror = (...err) => {
      if (this.isConnecting) {
        onConnectedError && onConnectedError(...err);
        this.cachedConnectErrCb.forEach((cb) => cb(...err));
      }
      this.isConnecting = false;
      Socket._isConnected = false;
      // 在签入之后再进行重连操作,否则没有任何意义
      this.reconnected();
      this.errCbs.forEach((cb: Function) => cb(...err));
    };

    socket.onclose = (e) => {
      console.log(
        "websocket 断开: " + e.code + " " + e.reason + " " + e.wasClean
      );
      this.isConnecting = false;
      Socket._isConnected = null;
    };
  }

  private callCbs(cbs: Function[], jsonData: object) {
    const delIdxList: number[] = [];

    cbs.forEach((cb, idx) => {
      cb(jsonData);
      // eslint-disable-next-line @typescript-eslint/ban-ts-comment
      // @ts-ignore
      if (cb.isOnce) {
        delIdxList.push(idx);
      }
    });

    delIdxList.forEach((it) => {
      cbs.splice(it, 1);
    });
  }

  /** @property 超时处理 */
  private startTimeOutTimer() {
    this.timeoutTimer && clearTimeout(this.timeoutTimer);
    this.timeoutTimer = setTimeout(() => {
      this.callCbs(this.errCbs, new Error("[socket]: webSocket接受消息超时!"));
    }, this.timeout);
  }

  /** @property 发送数据,转换json字符串后 */
  sendJSON(data: Object) {
    if (Socket._isConnected) {
      this.socket.send(JSON.stringify(data, null, 4));
      // 设置统一超时处理,不区分某个send请求的60s延迟,发送多个命令之后只要有响应就清除超时定时器
      // 因为只要有响应就说明服务器在正常工作,应继续等待请求处理
      this.startTimeOutTimer();
    } else {
      this.callCbs(this.errCbs, new Error("[socket]: webSocket未连接!"));
    }
  }

  /** @property 发送数据,未转换json字符串格式的数据 */
  sendPrimitive(data: string) {
    if (Socket._isConnected) {
      this.socket.send(data);
    } else {
      this.callCbs(this.errCbs, new Error("[socket]: webSocket未连接!"));
    }
  }

  /** @property 尝试重新连接 */
  private reconnected() {
    if (!Socket._isConnected && this.reconnectTimes < this.maxReconnectTimes) {
      this.reconnectTimer && clearTimeout(this.reconnectTimer);
      this.reconnectTimer = setTimeout(() => {
        this.reconnectTimes++;
        this.socket = this.connect(this.cachedUrl, this.cachedOnConnected);
      }, 2000);
    }
  }

  /** @property 销毁一个socket */
  destroy() {
    this.socket.close(1000);
    Socket.instance = null;
  }

  // 只能remove具名函数
  private removeListener(cbList: Function[], cb?: Function) {
    if (!cb) {
      cbList.length = 0;
      return;
    }

    for (let i = 0; i < cbList.length; i++) {
      const currentCb = cbList[i];
      if (cb === currentCb) {
        cbList.splice(i, 1);
        break;
      }
    }
  }

  /** @property 取消订阅回调事件 */
  unsubscribe(eventName: string, cb?: Function) {
    if (eventName === "error") {
      this.removeListener(this.errCbs, cb);
    } else if (eventName === "all") {
      this.removeListener(this.cbs, cb);
    } else if (this.subscribeList[eventName]) {
      this.removeListener(this.subscribeList[eventName], cb);
    }
  }

  /** @property 订阅回调事件 */
  subscribe(eventName: string, cb: Function, once?: boolean) {
    // eslint-disable-next-line @typescript-eslint/ban-ts-comment
    // @ts-ignore
    cb.isOnce = once;
    if (eventName === "error") {
      this.errCbs.push(cb);
    } else if (eventName === "all") {
      this.cbs.push(cb);
    } else {
      if (!this.subscribeList[eventName]) {
        this.subscribeList[eventName] = [cb];
      } else {
        this.subscribeList[eventName].push(cb);
      }
    }
  }
}

使用方式:

	// 初始化一个socket链接
    const initSocket = () => {
      // 实例化一个socket
      socket.value = new Socket(`socket链接地址`);
      // 订阅一个事件
      socket.value.subscribe("all", formatJson);
    };

    // 关闭socket链接
    const closeSocket = () => {
      // 销毁页面 结束服务
      // 取消订阅的事件
      socket.value?.unsubscribe("all", formatJson);
      // 销毁socket
      socket.value?.destroy();
    };
举报

相关推荐

0 条评论