SessionTrackerImpl
SessionImpl
// 会话id,超时时间
SessionImpl(long sessionId, int timeout)
{
this.sessionId = sessionId;
this.timeout = timeout;
isClosing = false;
}
// 会话id
final long sessionId;
// 超时时间
final int timeout;
// 关闭标志
boolean isClosing;
// 拥有者
Object owner;
public long getSessionId()
{
return sessionId;
}
public int getTimeout()
{
return timeout;
}
public boolean isClosing()
{
return isClosing;
}
public String toString()
{
return "0x" + Long.toHexString(sessionId);
}
SessionTrackerImpl
// 基于当前时间&传入id构造新的会话id
public static long initializeNextSessionId(long id)
{
long nextSid;
nextSid = (Time.currentElapsedTime() << 24) >>> 8;
nextSid = nextSid | (id << 56);
if (nextSid == EphemeralType.CONTAINER_EPHEMERAL_OWNER)
{
++nextSid;
}
return nextSid;
}
public SessionTrackerImpl(
SessionExpirer expirer,
ConcurrentMap<Long, Integer> sessionsWithTimeout,
int tickTime,
long serverId,
ZooKeeperServerListener listener)
{
super("SessionTracker", listener);
this.expirer = expirer;
this.sessionExpiryQueue = new ExpiryQueue<SessionImpl>(tickTime);
this.sessionsWithTimeout = sessionsWithTimeout;
this.nextSessionId.set(initializeNextSessionId(serverId));
for (Entry<Long, Integer> e : sessionsWithTimeout.entrySet())
{
trackSession(e.getKey(), e.getValue());
}
EphemeralType.validateServerId(serverId);
}
volatile boolean running = true;
public void dumpSessions(PrintWriter pwriter)
{
pwriter.print("Session ");
sessionExpiryQueue.dump(pwriter);
}
// 时间点--在此时间点会超期的会话id集合
public synchronized Map<Long, Set<Long>> getSessionExpiryMap()
{
Map<Long, Set<SessionImpl>> expiryMap = sessionExpiryQueue.getExpiryMap();
Map<Long, Set<Long>> sessionExpiryMap = new TreeMap<Long, Set<Long>>();
for (Entry<Long, Set<SessionImpl>> e : expiryMap.entrySet())
{
Set<Long> ids = new HashSet<Long>();
sessionExpiryMap.put(e.getKey(), ids);
for (SessionImpl s : e.getValue())
{
ids.add(s.sessionId);
}
}
return sessionExpiryMap;
}
@Override
public String toString()
{
StringWriter sw = new StringWriter();
PrintWriter pwriter = new PrintWriter(sw);
dumpSessions(pwriter);
pwriter.flush();
pwriter.close();
return sw.toString();
}
@Override
public void run()
{
try
{
while (running)
{
long waitTime = sessionExpiryQueue.getWaitTime();
if (waitTime > 0)
{
Thread.sleep(waitTime);
continue;
}
for (SessionImpl s : sessionExpiryQueue.poll())
{
ServerMetrics.getMetrics().STALE_SESSIONS_EXPIRED.add(1);
setSessionClosing(s.sessionId);
expirer.expire(s);
}
}
}
catch (InterruptedException e)
{
handleException(this.getName(), e);
}
LOG.info("SessionTrackerImpl exited loop!");
}
public synchronized boolean touchSession(long sessionId, int timeout)
{
SessionImpl s = sessionsById.get(sessionId);
if (s == null)
{
logTraceTouchInvalidSession(sessionId, timeout);
return false;
}
if (s.isClosing())
{
logTraceTouchClosingSession(sessionId, timeout);
return false;
}
updateSessionExpiry(s, timeout);
return true;
}
private void updateSessionExpiry(SessionImpl s, int timeout)
{
logTraceTouchSession(s.sessionId, timeout, "");
sessionExpiryQueue.update(s, timeout);
}
private void logTraceTouchSession(long sessionId, int timeout, String sessionStatus)
{
if (LOG.isTraceEnabled())
{
String msg = MessageFormat.format(
"SessionTrackerImpl --- Touch {0}session: 0x{1} with timeout {2}", sessionStatus,
Long.toHexString(sessionId), Integer.toString(timeout));
ZooTrace.logTraceMessage(LOG, ZooTrace.CLIENT_PING_TRACE_MASK, msg);
}
}
private void logTraceTouchInvalidSession(long sessionId, int timeout)
{
logTraceTouchSession(sessionId, timeout, "invalid ");
}
private void logTraceTouchClosingSession(long sessionId, int timeout)
{
logTraceTouchSession(sessionId, timeout, "closing ");
}
public int getSessionTimeout(long sessionId)
{
return sessionsWithTimeout.get(sessionId);
}
public synchronized void setSessionClosing(long sessionId)
{
if (LOG.isTraceEnabled())
{
LOG.trace("Session closing: 0x{}", Long.toHexString(sessionId));
}
SessionImpl s = sessionsById.get(sessionId);
if (s == null)
{
return;
}
s.isClosing = true;
}
public synchronized void removeSession(long sessionId)
{
LOG.debug("Removing session 0x{}", Long.toHexString(sessionId));
SessionImpl s = sessionsById.remove(sessionId);
sessionsWithTimeout.remove(sessionId);
if (LOG.isTraceEnabled())
{
ZooTrace.logTraceMessage(LOG,
ZooTrace.SESSION_TRACE_MASK, "SessionTrackerImpl --- Removing session 0x" +
Long.toHexString(sessionId));
}
if (s != null)
{
sessionExpiryQueue.remove(s);
}
}
public void shutdown()
{
LOG.info("Shutting down");
running = false;
if (LOG.isTraceEnabled())
{
ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(), "Shutdown SessionTrackerImpl!");
}
}
public long createSession(int sessionTimeout)
{
long sessionId = nextSessionId.getAndIncrement();
trackSession(sessionId, sessionTimeout);
return sessionId;
}
@Override
public synchronized boolean trackSession(long id, int sessionTimeout)
{
boolean added = false;
SessionImpl session = sessionsById.get(id);
if (session == null)
{
session = new SessionImpl(id, sessionTimeout);
}
// 会话容器1
SessionImpl existedSession = sessionsById.putIfAbsent(id, session);
if (existedSession != null)
{
session = existedSession;
}
else
{
added = true;
LOG.debug("Adding session 0x{}", Long.toHexString(id));
}
if (LOG.isTraceEnabled())
{
String actionStr = added ? "Adding" : "Existing";
ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK, "SessionTrackerImpl --- "
+ actionStr + " session 0x" + Long.toHexString(id) + " " + sessionTimeout);
}
updateSessionExpiry(session, sessionTimeout);
return added;
}
public synchronized boolean commitSession(long id, int sessionTimeout)
{
// 会话容器2
return sessionsWithTimeout.put(id, sessionTimeout) == null;
}
public boolean isTrackingSession(long sessionId)
{
return sessionsById.containsKey(sessionId);
}
// todo
public synchronized void checkSession(long sessionId, Object owner) throws KeeperException.SessionExpiredException, KeeperException.SessionMovedException, KeeperException.UnknownSessionException
{
LOG.debug("Checking session 0x{}", Long.toHexString(sessionId));
SessionImpl session = sessionsById.get(sessionId);
if (session == null)
{
throw new KeeperException.UnknownSessionException();
}
if (session.isClosing())
{
throw new KeeperException.SessionExpiredException();
}
if (session.owner == null)
{
session.owner = owner;
}
else if (session.owner != owner)
{
throw new KeeperException.SessionMovedException();
}
}
public synchronized void setOwner(long id, Object owner) throws SessionExpiredException
{
SessionImpl session = sessionsById.get(id);
if (session == null || session.isClosing())
{
throw new KeeperException.SessionExpiredException();
}
session.owner = owner;
}
public void checkGlobalSession(long sessionId, Object owner) throws KeeperException.SessionExpiredException, KeeperException.SessionMovedException
{
try
{
checkSession(sessionId, owner);
}
catch (KeeperException.UnknownSessionException e)
{
throw new KeeperException.SessionExpiredException();
}
}
public long getLocalSessionCount()
{
return 0;
}
@Override
public boolean isLocalSessionsEnabled()
{
return false;
}
public Set<Long> globalSessions()
{
return sessionsById.keySet();
}
public Set<Long> localSessions()
{
return Collections.emptySet();
}