0
点赞
收藏
分享

微信扫一扫

zookeeper源码解析--会话管理--UpgradeableSessionTracker

一只1994 2022-01-27 阅读 78
// 日志对象
private static final Logger LOG = LoggerFactory.getLogger(UpgradeableSessionTracker.class);
// 容器1--记录本地会话
private ConcurrentMap<Long, Integer> localSessionsWithTimeouts;
// 容器2--记录升级中会话
private ConcurrentMap<Long, Integer> upgradingSessions;
// 本地会话追踪对象
protected LocalSessionTracker localSessionTracker;
// 本地会话支持
protected boolean localSessionsEnabled;
public void start() 
{
}

public void createLocalSessionTracker(
	SessionExpirer expirer, int tickTime, long id, ZooKeeperServerListener listener) 
{
    this.localSessionsWithTimeouts = new ConcurrentHashMap<Long, Integer>();
    this.localSessionTracker 
    	= new LocalSessionTracker(expirer, this.localSessionsWithTimeouts, tickTime, id, listener);
    this.upgradingSessions = new ConcurrentHashMap<Long, Integer>();
}

// 本地会话或全局会话
public boolean isTrackingSession(long sessionId) 
{
    return isLocalSession(sessionId) || isGlobalSession(sessionId);
}

// 本地会话追踪对象存在&存在于追踪对象中
public boolean isLocalSession(long sessionId) 
{
    return localSessionTracker != null && localSessionTracker.isTrackingSession(sessionId);
}

@Override
public boolean isLocalSessionsEnabled() 
{
    return localSessionsEnabled;
}

// 升级容器存在&存在与该容器
public boolean isUpgradingSession(long sessionId) 
{
    return upgradingSessions != null && upgradingSessions.containsKey(sessionId);
}

// 完成升级,从升级中容器移除
public void finishedUpgrading(long sessionId) 
{
    if (upgradingSessions != null) 
    {
        upgradingSessions.remove(sessionId);
    }
}

// 全局会话检查
public abstract boolean isGlobalSession(long sessionId);
// 升级
public int upgradeSession(long sessionId) 
{
    if (localSessionsWithTimeouts == null) 
    {
        return -1;
    }
    
    // 本地会话容器移除
    Integer timeout = localSessionsWithTimeouts.remove(sessionId);
    if (timeout != null) 
    {
        LOG.info("Upgrading session 0x{}", Long.toHexString(sessionId));
        // 未实现
        trackSession(sessionId, timeout);
        // 升级中容器放入
        upgradingSessions.put(sessionId, timeout);
        // 内部3个容器都会移除
        localSessionTracker.removeSession(sessionId);
        return timeout;
    }
    
    return -1;
}

// 本地会话追踪中移除
protected void removeLocalSession(long sessionId) 
{
    if (localSessionTracker == null) 
    {
        return;
    }
    
    localSessionTracker.removeSession(sessionId);
}

// 未支持操作
public void checkGlobalSession(
	long sessionId, Object owner) 
	throws KeeperException.SessionExpiredException, KeeperException.SessionMovedException 
{
    throw new UnsupportedOperationException();
}

// 容器1中元素个数
public long getLocalSessionCount() 
{
    if (localSessionsWithTimeouts == null) 
    {
        return 0;
    }
    
    return localSessionsWithTimeouts.size();
}

// 本地会话最终对象中提交处理的会话集合
public Set<Long> localSessions() 
{
    return (localSessionTracker == null) ? 
    	Collections.<Long>emptySet() : localSessionTracker.localSessions();
}
举报

相关推荐

0 条评论