0
点赞
收藏
分享

微信扫一扫

zookeeper源码解析--会话管理--SessionTrackerImpl

霍华德 2022-01-25 阅读 54

SessionTrackerImpl

SessionImpl

// 会话id,超时时间
SessionImpl(long sessionId, int timeout) 
{
    this.sessionId = sessionId;
    this.timeout = timeout;
    isClosing = false;
}

// 会话id
final long sessionId;
// 超时时间
final int timeout;
// 关闭标志
boolean isClosing;
// 拥有者
Object owner;
public long getSessionId() 
{
    return sessionId;
}

public int getTimeout() 
{
    return timeout;
}

public boolean isClosing() 
{
    return isClosing;
}

public String toString() 
{
    return "0x" + Long.toHexString(sessionId);
}

SessionTrackerImpl

// 基于当前时间&传入id构造新的会话id
public static long initializeNextSessionId(long id) 
{
    long nextSid;
    nextSid = (Time.currentElapsedTime() << 24) >>> 8;
    nextSid = nextSid | (id << 56);
    if (nextSid == EphemeralType.CONTAINER_EPHEMERAL_OWNER) 
    {
        ++nextSid;  
    }
    
    return nextSid;
}

public SessionTrackerImpl(
	SessionExpirer expirer, 
	ConcurrentMap<Long, Integer> sessionsWithTimeout, 
	int tickTime, 
	long serverId, 
	ZooKeeperServerListener listener) 
{
    super("SessionTracker", listener);
    this.expirer = expirer;
    this.sessionExpiryQueue = new ExpiryQueue<SessionImpl>(tickTime);
    this.sessionsWithTimeout = sessionsWithTimeout;
    this.nextSessionId.set(initializeNextSessionId(serverId));
    for (Entry<Long, Integer> e : sessionsWithTimeout.entrySet()) 
    {
        trackSession(e.getKey(), e.getValue());
    }

    EphemeralType.validateServerId(serverId);
}

volatile boolean running = true;
public void dumpSessions(PrintWriter pwriter) 
{
    pwriter.print("Session ");
    sessionExpiryQueue.dump(pwriter);
}

// 时间点--在此时间点会超期的会话id集合
public synchronized Map<Long, Set<Long>> getSessionExpiryMap() 
{
    Map<Long, Set<SessionImpl>> expiryMap = sessionExpiryQueue.getExpiryMap();
    Map<Long, Set<Long>> sessionExpiryMap = new TreeMap<Long, Set<Long>>();
    for (Entry<Long, Set<SessionImpl>> e : expiryMap.entrySet()) 
    {
        Set<Long> ids = new HashSet<Long>();
        sessionExpiryMap.put(e.getKey(), ids);
        for (SessionImpl s : e.getValue()) 
        {
            ids.add(s.sessionId);
        }
    }

    return sessionExpiryMap;
}

@Override
public String toString() 
{
    StringWriter sw = new StringWriter();
    PrintWriter pwriter = new PrintWriter(sw);
    dumpSessions(pwriter);
    pwriter.flush();
    pwriter.close();
    return sw.toString();
}

@Override
public void run() 
{
    try 
    {
        while (running) 
        {
            long waitTime = sessionExpiryQueue.getWaitTime();
            if (waitTime > 0) 
            {
                Thread.sleep(waitTime);
                continue;
            }

            for (SessionImpl s : sessionExpiryQueue.poll()) 
            {
                ServerMetrics.getMetrics().STALE_SESSIONS_EXPIRED.add(1);
                setSessionClosing(s.sessionId);
                expirer.expire(s);
            }
        }
    } 
    catch (InterruptedException e) 
    {
        handleException(this.getName(), e);
    }
    
    LOG.info("SessionTrackerImpl exited loop!");
}

public synchronized boolean touchSession(long sessionId, int timeout) 
{
    SessionImpl s = sessionsById.get(sessionId);
    if (s == null) 
    {
        logTraceTouchInvalidSession(sessionId, timeout);
        return false;
    }

    if (s.isClosing()) 
    {
        logTraceTouchClosingSession(sessionId, timeout);
        return false;
    }

    updateSessionExpiry(s, timeout);
    return true;
}

private void updateSessionExpiry(SessionImpl s, int timeout) 
{
    logTraceTouchSession(s.sessionId, timeout, "");
    sessionExpiryQueue.update(s, timeout);
}

private void logTraceTouchSession(long sessionId, int timeout, String sessionStatus) 
{
    if (LOG.isTraceEnabled()) 
    {
        String msg = MessageFormat.format(
        	"SessionTrackerImpl --- Touch {0}session: 0x{1} with timeout {2}", sessionStatus, 
        	Long.toHexString(sessionId), Integer.toString(timeout));
        ZooTrace.logTraceMessage(LOG, ZooTrace.CLIENT_PING_TRACE_MASK, msg);
    }
}

private void logTraceTouchInvalidSession(long sessionId, int timeout) 
{
    logTraceTouchSession(sessionId, timeout, "invalid ");
}

private void logTraceTouchClosingSession(long sessionId, int timeout) 
{
    logTraceTouchSession(sessionId, timeout, "closing ");
}

public int getSessionTimeout(long sessionId) 
{
    return sessionsWithTimeout.get(sessionId);
}

public synchronized void setSessionClosing(long sessionId) 
{
    if (LOG.isTraceEnabled()) 
    {
        LOG.trace("Session closing: 0x{}", Long.toHexString(sessionId));
    }

    SessionImpl s = sessionsById.get(sessionId);
    if (s == null) 
    {
        return;
    }
    
    s.isClosing = true;
}

public synchronized void removeSession(long sessionId) 
{
    LOG.debug("Removing session 0x{}", Long.toHexString(sessionId));
    SessionImpl s = sessionsById.remove(sessionId);
    sessionsWithTimeout.remove(sessionId);
    if (LOG.isTraceEnabled()) 
    {
        ZooTrace.logTraceMessage(LOG, 
        	ZooTrace.SESSION_TRACE_MASK, "SessionTrackerImpl --- Removing session 0x" + 	
        	Long.toHexString(sessionId));
    }
    
    if (s != null) 
    {
        sessionExpiryQueue.remove(s);
    }
}

public void shutdown() 
{
    LOG.info("Shutting down");
    running = false;
    if (LOG.isTraceEnabled()) 
    {
        ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(), "Shutdown SessionTrackerImpl!");
    }
}

public long createSession(int sessionTimeout) 
{
    long sessionId = nextSessionId.getAndIncrement();
    trackSession(sessionId, sessionTimeout);
    return sessionId;
}

@Override
public synchronized boolean trackSession(long id, int sessionTimeout) 
{
    boolean added = false;
    SessionImpl session = sessionsById.get(id);
    if (session == null) 
    {
        session = new SessionImpl(id, sessionTimeout);
    }
	
	// 会话容器1
    SessionImpl existedSession = sessionsById.putIfAbsent(id, session);
    if (existedSession != null) 
    {
        session = existedSession;
    } 
    else 
    {
        added = true;
        LOG.debug("Adding session 0x{}", Long.toHexString(id));
    }

    if (LOG.isTraceEnabled()) 
    {
        String actionStr = added ? "Adding" : "Existing";
        ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK, "SessionTrackerImpl --- " 
        	+ actionStr + " session 0x" + Long.toHexString(id) + " " + sessionTimeout);
    }

    updateSessionExpiry(session, sessionTimeout);
    return added;
}

public synchronized boolean commitSession(long id, int sessionTimeout) 
{
	// 会话容器2
    return sessionsWithTimeout.put(id, sessionTimeout) == null;
}

public boolean isTrackingSession(long sessionId) 
{
    return sessionsById.containsKey(sessionId);
}

// todo
public synchronized void checkSession(long sessionId, Object owner) throws KeeperException.SessionExpiredException, KeeperException.SessionMovedException, KeeperException.UnknownSessionException 
{
    LOG.debug("Checking session 0x{}", Long.toHexString(sessionId));
    SessionImpl session = sessionsById.get(sessionId);
    if (session == null) 
    {
        throw new KeeperException.UnknownSessionException();
    }

    if (session.isClosing()) 
    {
        throw new KeeperException.SessionExpiredException();
    }

    if (session.owner == null) 
    {
        session.owner = owner;
    } 
    else if (session.owner != owner) 
    {
        throw new KeeperException.SessionMovedException();
    }
}

public synchronized void setOwner(long id, Object owner) throws SessionExpiredException 
{
    SessionImpl session = sessionsById.get(id);
    if (session == null || session.isClosing())
    {
        throw new KeeperException.SessionExpiredException();
    }
    
    session.owner = owner;
}

public void checkGlobalSession(long sessionId, Object owner) throws KeeperException.SessionExpiredException, KeeperException.SessionMovedException 
{
    try 
    {
        checkSession(sessionId, owner);
    } 
    catch (KeeperException.UnknownSessionException e) 
    {
        throw new KeeperException.SessionExpiredException();
    }
}

public long getLocalSessionCount() 
{
    return 0;
}

@Override
public boolean isLocalSessionsEnabled() 
{
    return false;
}

public Set<Long> globalSessions() 
{
    return sessionsById.keySet();
}

public Set<Long> localSessions() 
{
    return Collections.emptySet();
}
举报

相关推荐

0 条评论