0
点赞
收藏
分享

微信扫一扫

看完我忍不住鼓掌了!zookeeper服务端与客户端通信分析

云上笔记 2021-10-04 阅读 57
日记本

使用官方客户端连接 zookeeper 服务端非常简单。只需要实例化底下这个类就可以了。

// connectString 连接串,如 127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002
// 会话超时时间
// 监听器,用于监听状态变化
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException {
    this(connectString, sessionTimeout, watcher, false);
}
public interface Watcher {
    void process(WatchedEvent event);   
}

跟踪 Zookeeper 构造器代码,最终调用到以下逻辑

public ZooKeeper(
    String connectString,
    int sessionTimeout,
    Watcher watcher,
    boolean canBeReadOnly,
    HostProvider aHostProvider,
    ZKClientConfig clientConfig) throws IOException {
    LOG.info(
        "Initiating client connection, connectString={} sessionTimeout={} watcher={}",
        connectString,
        sessionTimeout,
        watcher);

    if (clientConfig == null) {
        clientConfig = new ZKClientConfig();
    }
    this.clientConfig = clientConfig;
    // 监听管理器
    watchManager = defaultWatchManager();
    watchManager.defaultWatcher = watcher;
    ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
    hostProvider = aHostProvider;
    // cnxn 为 ClientCnxn
    cnxn = createConnection(
        connectStringParser.getChrootPath(),
        hostProvider,
        sessionTimeout,
        this,
        watchManager,
        getClientCnxnSocket(),
        canBeReadOnly);
    cnxn.start();
}

ClientCnxn 管理客户端到服务端的连接,在有需要时切换服务器进行连接。比如服务器A崩溃时,将自动连接到集群中其他可用服务器。只要该服务器的 zxid >= 客户端感知到的 zxid 即可。

ClientCnxn 通过 SendThread 发送消息,通过 EventThread 处理事件。

SendThread 是一个线程,启动时将跟服务端建立连接。默认情况下 通过 ClientCnxnSocketNIO 实现服务端的通信。

发起请求

后台线程发送数据

后台线程接收数据

会话连接维护

Zookeeper 中持有 ClientCnxn 实例,而且 ClientCnxn 持有 SendThread。

创建完 Zookeeper 实例后,将启动该线程。由 ClientCnxn 委托 SendThread 发起连接到 zookeeper 服务端。

客户端侧

客户端发送请求的方式是将消息放入一个阻塞队列中,之后由 SendThread 来依次消费,这样保证了消息的有序性,谁先到谁先处理。

服务端侧

在 QuorumPeerMain 实例化时,根据配置实例化了 ServerCnxnFactory。并将其绑定到 QuorumPeer 中。ServerCnxnFactory 有两种实现方式:

  • NIOServerCnxnFactory
  • NettyServerCnxnFactory

默认使用 NIOServerCnxnFactory。

这里以 NIOServerCnxnFactory 为例。

在 NIOServerCnxnFactory 创建时,通过 Java NIO 的方式创建 ServerSocketChannel 并绑定在配置文件指定的端口上(即 clientPort)。

在启动 QuorumPeer 时,连带着调用 NIOServerCnxnFactory.start 方法。

NIOServerCnxnFactory 绑定了 三种线程:

  • SelectorThread 用于处理与客户端连接绑定的 selector 的 IO 事件处理
  • AcceptThread 用于处理接入客户端连接的 selector 的 IO 事件
  • ConnectionExpirerThread 用于执行会话过期处理

其中 SelectorThread 和 AcceptThread 的关系是,AcceptThread 将客户端连接接入后,选择一个 SelectorThread 进行绑定,实现方式是将 SocketChannel 放入 SelectorThread.acceptedQueue 中。

之后 SelectorThread 从 acceptedQueue 中获取连接进行 IO 事件处理,完成网络数据的读写。

未初始化时(即收到的消息为 ConnectRequest )

在 processConnectRequest 方法中,将为该连接创建 session id、检查该链接客户端的 zxid 是否小于等于该服务端的 zxid、设置会话超时时间。

会话超时处理机制

其中会话的超时管理值得研究一番。

// elem 传入为 SessionImpl
public Long update(E elem, int timeout) {
    // 首次调用为null
    Long prevExpiryTime = elemMap.get(elem);
    long now = Time.currentElapsedTime();
    // 过期时间
    Long newExpiryTime = roundToNextInterval(now + timeout);

    if (newExpiryTime.equals(prevExpiryTime)) {
        // No change, so nothing to update
        return null;
    }

    // 获取该过期时间的所有会话集合
    // First add the elem to the new expiry time bucket in expiryMap.
    Set<E> set = expiryMap.get(newExpiryTime);
    if (set == null) {
        // Construct a ConcurrentHashSet using a ConcurrentHashMap
        set = Collections.newSetFromMap(new ConcurrentHashMap<E, Boolean>());
        // Put the new set in the map, but only if another thread
        // hasn't beaten us to it
        Set<E> existingSet = expiryMap.putIfAbsent(newExpiryTime, set);
        if (existingSet != null) {
            set = existingSet;
        }
    }
    // 将该会话放入过期时间集合中
    set.add(elem);

    // 将该会话从上一个会话超时集合中移除
    // Map the elem to the new expiry time. If a different previous
    // mapping was present, clean up the previous expiry bucket.
    prevExpiryTime = elemMap.put(elem, newExpiryTime);
    if (prevExpiryTime != null && !newExpiryTime.equals(prevExpiryTime)) {
        Set<E> prevSet = expiryMap.get(prevExpiryTime);
        if (prevSet != null) {
            prevSet.remove(elem);
        }
    }
    return newExpiryTime;
}

在 NIOServerCnxnFactory 中绑定了一个处理会话超时的线程,跟随着 NIOServerCnxnFactory 一起启动。

private class ConnectionExpirerThread extends ZooKeeperThread {

    ConnectionExpirerThread() {
        super("ConnnectionExpirer");
    }

    public void run() {
        try {
            while (!stopped) {
                long waitTime = cnxnExpiryQueue.getWaitTime();
                // 如果此时没有对应的过期会话集合,等待一段时间
                if (waitTime > 0) {
                    Thread.sleep(waitTime);
                    continue;
                }
                // 获取超时会话
                for (NIOServerCnxn conn : cnxnExpiryQueue.poll()) {
                    ServerMetrics.getMetrics().SESSIONLESS_CONNECTIONS_EXPIRED.add(1);
                    conn.close(ServerCnxn.DisconnectReason.CONNECTION_EXPIRED);
                }
            }

        } catch (InterruptedException e) {
            LOG.info("ConnnectionExpirerThread interrupted");
        }
    }

}

// ExpiryQueue.poll
public Set<E> poll() {
    long now = Time.currentElapsedTime();
    long expirationTime = nextExpirationTime.get();
    if (now < expirationTime) {
        return Collections.emptySet();
    }

    Set<E> set = null;
    // 过期时间
    long newExpirationTime = expirationTime + expirationInterval;
    if (nextExpirationTime.compareAndSet(expirationTime, newExpirationTime)) {
        set = expiryMap.remove(expirationTime);
    }
    if (set == null) {
        return Collections.emptySet();
    }
    return set;
}

// ExpiryQueue.getWaitTime
public long getWaitTime() {
    long now = Time.currentElapsedTime();
    // 第一次执行时,返回的是 NIOServerCnxnFactory 启动时间
    long expirationTime = nextExpirationTime.get();
    return now < expirationTime ? (expirationTime - now) : 0L;
}

初始化之后(即该连接处理过 ConnectRequest 请求了)

[图片上传中...(image-643b3f-1610719138544-0)]

在 RequestThrottler 线程运行过程中,将从队列 submittedRequests 获取请求数据,并交由 ZookeeperServer.firstProcessor 处理。

ZookeeperServer 由多个子类实现,分别对应着

  • LeaderZooKeeperServer
  • FollowerZooKeeperServer
  • ObserverZooKeeperServer

每一种子类实现中都包含了一个处理请求的 RequestProcessor 责任链。

举报

相关推荐

0 条评论