优化Session、SessionManager
parent
8e570571ad
commit
5734eccd88
|
@ -6,6 +6,7 @@ import io.github.yezhihao.netmc.codec.MessageDecoder;
|
|||
import io.github.yezhihao.netmc.codec.MessageEncoder;
|
||||
import io.github.yezhihao.netmc.core.HandlerInterceptor;
|
||||
import io.github.yezhihao.netmc.core.HandlerMapping;
|
||||
import io.github.yezhihao.netmc.session.SessionListener;
|
||||
import io.github.yezhihao.netmc.session.SessionManager;
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||
|
||||
|
@ -25,6 +26,7 @@ public class NettyConfig {
|
|||
protected final HandlerMapping handlerMapping;
|
||||
protected final HandlerInterceptor handlerInterceptor;
|
||||
protected final SessionManager sessionManager;
|
||||
protected final SessionListener sessionListener;
|
||||
|
||||
private NettyConfig(int port,
|
||||
int maxFrameLength,
|
||||
|
@ -34,7 +36,8 @@ public class NettyConfig {
|
|||
MessageEncoder encoder,
|
||||
HandlerMapping handlerMapping,
|
||||
HandlerInterceptor handlerInterceptor,
|
||||
SessionManager sessionManager
|
||||
SessionManager sessionManager,
|
||||
SessionListener sessionListener
|
||||
) {
|
||||
this.port = port;
|
||||
this.maxFrameLength = maxFrameLength;
|
||||
|
@ -45,7 +48,8 @@ public class NettyConfig {
|
|||
this.handlerMapping = handlerMapping;
|
||||
this.handlerInterceptor = handlerInterceptor;
|
||||
this.sessionManager = sessionManager;
|
||||
this.adapter = new TCPServerHandler(this.handlerMapping, this.handlerInterceptor, this.sessionManager);
|
||||
this.sessionListener = sessionListener;
|
||||
this.adapter = new TCPServerHandler(this.handlerMapping, this.handlerInterceptor, this.sessionManager, this.sessionListener);
|
||||
}
|
||||
|
||||
public static NettyConfig.Builder custom() {
|
||||
|
@ -63,6 +67,7 @@ public class NettyConfig {
|
|||
private HandlerMapping handlerMapping;
|
||||
private HandlerInterceptor handlerInterceptor;
|
||||
private SessionManager sessionManager;
|
||||
private SessionListener sessionListener;
|
||||
|
||||
public Builder() {
|
||||
}
|
||||
|
@ -121,6 +126,11 @@ public class NettyConfig {
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder setSessionListener(SessionListener sessionListener) {
|
||||
this.sessionListener = sessionListener;
|
||||
return this;
|
||||
}
|
||||
|
||||
public NettyConfig build() {
|
||||
return new NettyConfig(
|
||||
this.port,
|
||||
|
@ -131,7 +141,8 @@ public class NettyConfig {
|
|||
this.encoder,
|
||||
this.handlerMapping,
|
||||
this.handlerInterceptor,
|
||||
this.sessionManager
|
||||
this.sessionManager,
|
||||
this.sessionListener
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ import io.github.yezhihao.netmc.core.HandlerMapping;
|
|||
import io.github.yezhihao.netmc.core.handler.Handler;
|
||||
import io.github.yezhihao.netmc.core.model.Message;
|
||||
import io.github.yezhihao.netmc.session.Session;
|
||||
import io.github.yezhihao.netmc.session.SessionListener;
|
||||
import io.github.yezhihao.netmc.session.SessionManager;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelHandler;
|
||||
|
@ -30,10 +31,16 @@ public class TCPServerHandler extends ChannelInboundHandlerAdapter {
|
|||
|
||||
private SessionManager sessionManager;
|
||||
|
||||
public TCPServerHandler(HandlerMapping handlerMapping, HandlerInterceptor interceptor, SessionManager sessionManager) {
|
||||
private SessionListener sessionListener;
|
||||
|
||||
public TCPServerHandler(HandlerMapping handlerMapping,
|
||||
HandlerInterceptor interceptor,
|
||||
SessionManager sessionManager,
|
||||
SessionListener sessionListener) {
|
||||
this.handlerMapping = handlerMapping;
|
||||
this.interceptor = interceptor;
|
||||
this.sessionManager = sessionManager;
|
||||
this.sessionListener = sessionListener;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -75,7 +82,7 @@ public class TCPServerHandler extends ChannelInboundHandlerAdapter {
|
|||
@Override
|
||||
public void channelActive(ChannelHandlerContext ctx) {
|
||||
Channel channel = ctx.channel();
|
||||
Session session = sessionManager.newSession(channel);
|
||||
Session session = Session.newInstance(channel, sessionManager, sessionListener);
|
||||
channel.attr(Session.KEY).set(session);
|
||||
log.info("<<<<<终端连接{}", session);
|
||||
}
|
||||
|
|
|
@ -56,7 +56,7 @@ public abstract class AbstractHandlerMapping implements HandlerMapping {
|
|||
/**
|
||||
* 根据消息类型获取Handler
|
||||
*/
|
||||
public Handler getHandler(Object messageType) {
|
||||
return handlerMap.get(messageType);
|
||||
public Handler getHandler(int messageId) {
|
||||
return handlerMap.get(messageId);
|
||||
}
|
||||
}
|
|
@ -9,6 +9,6 @@ import io.github.yezhihao.netmc.core.handler.Handler;
|
|||
*/
|
||||
public interface HandlerMapping {
|
||||
|
||||
Handler getHandler(Object messageType);
|
||||
Handler getHandler(int messageId);
|
||||
|
||||
}
|
|
@ -10,10 +10,10 @@ import java.io.Serializable;
|
|||
public interface Message extends Serializable {
|
||||
|
||||
/** 客户端唯一标识 */
|
||||
Serializable getClientId();
|
||||
String getClientId();
|
||||
|
||||
/** 消息类型 */
|
||||
Serializable getMessageId();
|
||||
int getMessageId();
|
||||
|
||||
/** 消息类型(日志输出) */
|
||||
String getMessageName();
|
||||
|
|
|
@ -25,35 +25,37 @@ public class Session {
|
|||
public static final AttributeKey<Session> KEY = AttributeKey.newInstance(Session.class.getName());
|
||||
|
||||
protected final Channel channel;
|
||||
|
||||
private AtomicInteger serialNo = new AtomicInteger(0);
|
||||
private boolean registered = false;
|
||||
private Object clientId;
|
||||
private final SessionManager sessionManager;
|
||||
private final SessionListener sessionListener;
|
||||
|
||||
private final long creationTime;
|
||||
private volatile long lastAccessedTime;
|
||||
private Map<Object, Object> attributes;
|
||||
private Integer protocolVersion;
|
||||
private final Map<Object, Object> attributes;
|
||||
|
||||
private SessionManager sessionManager;
|
||||
private String sessionId;
|
||||
private String clientId;
|
||||
private final AtomicInteger serialNo = new AtomicInteger(0);
|
||||
|
||||
protected Session(Channel channel, SessionManager sessionManager) {
|
||||
this(null, channel, sessionManager);
|
||||
}
|
||||
|
||||
protected Session(Class<? extends Enum> sessionKeyClass, Channel channel, SessionManager sessionManager) {
|
||||
private Session(Channel channel, SessionManager sessionManager, SessionListener sessionListener) {
|
||||
this.channel = channel;
|
||||
this.sessionManager = sessionManager;
|
||||
this.creationTime = System.currentTimeMillis();
|
||||
this.lastAccessedTime = creationTime;
|
||||
if (sessionKeyClass != null)
|
||||
this.attributes = new EnumMap(sessionKeyClass);
|
||||
this.sessionManager = sessionManager;
|
||||
this.sessionListener = sessionListener;
|
||||
|
||||
if (sessionManager != null && sessionManager.getSessionKeyClass() != null)
|
||||
this.attributes = new EnumMap(sessionManager.getSessionKeyClass());
|
||||
else
|
||||
this.attributes = new TreeMap<>();
|
||||
}
|
||||
|
||||
public int getId() {
|
||||
return channel.id().hashCode();
|
||||
public static Session newInstance(Channel channel,
|
||||
SessionManager sessionManager,
|
||||
SessionListener sessionListener) {
|
||||
Session session = new Session(channel, sessionManager, sessionListener);
|
||||
session.callSessionCreatedListener();
|
||||
return session;
|
||||
}
|
||||
|
||||
public int nextSerialNo() {
|
||||
|
@ -67,19 +69,30 @@ public class Session {
|
|||
}
|
||||
|
||||
public boolean isRegistered() {
|
||||
return registered;
|
||||
return sessionId != null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 注册到SessionManager
|
||||
*/
|
||||
public void register(Object clientId) {
|
||||
this.registered = true;
|
||||
this.clientId = clientId;
|
||||
this.sessionManager.put(this.clientId, this);
|
||||
public void register(Message message) {
|
||||
register(message.getClientId(), message);
|
||||
}
|
||||
|
||||
public Object getClientId() {
|
||||
public void register(String sessionId, Message message) {
|
||||
if (sessionId == null)
|
||||
throw new NullPointerException("sessionId not null");
|
||||
this.sessionId = sessionId;
|
||||
this.clientId = message.getClientId();
|
||||
if (sessionManager != null)
|
||||
sessionManager.add(this);
|
||||
}
|
||||
|
||||
public String getId() {
|
||||
return sessionId;
|
||||
}
|
||||
|
||||
public String getClientId() {
|
||||
return clientId;
|
||||
}
|
||||
|
||||
|
@ -116,49 +129,38 @@ public class Session {
|
|||
return attributes.remove(name);
|
||||
}
|
||||
|
||||
public Integer getProtocolVersion() {
|
||||
return protocolVersion;
|
||||
public Object getOfflineCache(String clientId) {
|
||||
if (sessionManager != null)
|
||||
return sessionManager.getOfflineCache(clientId);
|
||||
return null;
|
||||
}
|
||||
|
||||
public void setProtocolVersion(int protocolVersion) {
|
||||
this.protocolVersion = protocolVersion;
|
||||
public void setOfflineCache(String clientId, Object value) {
|
||||
if (sessionManager != null)
|
||||
sessionManager.setOfflineCache(clientId, value);
|
||||
}
|
||||
|
||||
public Integer cachedProtocolVersion(Object clientId) {
|
||||
return this.sessionManager.getVersion(clientId);
|
||||
private void callSessionDestroyedListener() {
|
||||
if (sessionListener != null)
|
||||
sessionListener.sessionDestroyed(this);
|
||||
}
|
||||
|
||||
public void recordProtocolVersion(Object clientId, int protocolVersion) {
|
||||
this.protocolVersion = protocolVersion;
|
||||
this.sessionManager.putVersion(clientId, protocolVersion);
|
||||
private void callSessionCreatedListener() {
|
||||
if (sessionListener != null)
|
||||
sessionListener.sessionCreated(this);
|
||||
}
|
||||
|
||||
public void invalidate() {
|
||||
channel.close();
|
||||
sessionManager.callSessionDestroyedListener(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o)
|
||||
return true;
|
||||
if (o == null || getClass() != o.getClass())
|
||||
return false;
|
||||
Session that = (Session) o;
|
||||
return this.getId() == that.getId();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return getId();
|
||||
callSessionDestroyedListener();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
final StringBuilder sb = new StringBuilder(66);
|
||||
sb.append("[ip=").append(channel.remoteAddress());
|
||||
sb.append(", sid=").append(sessionId);
|
||||
sb.append(", cid=").append(clientId);
|
||||
sb.append(", reg=").append(registered);
|
||||
sb.append(']');
|
||||
return sb.toString();
|
||||
}
|
||||
|
@ -232,14 +234,20 @@ public class Session {
|
|||
}
|
||||
|
||||
private static String requestKey(Message request, Class responseClass) {
|
||||
if (Response.class.isAssignableFrom(responseClass))
|
||||
return Integer.toString(request.getSerialNo());
|
||||
return responseClass.getName();
|
||||
String className = responseClass.getName();
|
||||
if (Response.class.isAssignableFrom(responseClass)) {
|
||||
int serialNo = request.getSerialNo();
|
||||
return new StringBuilder(34).append(className).append('.').append(serialNo).toString();
|
||||
}
|
||||
return className;
|
||||
}
|
||||
|
||||
private static String responseKey(Message response) {
|
||||
if (response instanceof Response)
|
||||
return Integer.toString(((Response) response).getResponseSerialNo());
|
||||
return response.getClass().getName();
|
||||
String className = response.getClass().getName();
|
||||
if (response instanceof Response) {
|
||||
int serialNo = ((Response) response).getResponseSerialNo();
|
||||
return new StringBuilder(34).append(className).append('.').append(serialNo).toString();
|
||||
}
|
||||
return className;
|
||||
}
|
||||
}
|
|
@ -2,7 +2,6 @@ package io.github.yezhihao.netmc.session;
|
|||
|
||||
import com.github.benmanes.caffeine.cache.Cache;
|
||||
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
|
||||
import java.util.Collection;
|
||||
|
@ -16,77 +15,54 @@ import java.util.concurrent.TimeUnit;
|
|||
*/
|
||||
public class SessionManager {
|
||||
|
||||
private Map<Object, Session> sessionMap;
|
||||
private Map<String, Session> sessionMap;
|
||||
|
||||
private Cache<Object, Integer> versionCache;
|
||||
private Cache<String, Object> offlineCache;
|
||||
|
||||
private ChannelFutureListener remover;
|
||||
|
||||
private SessionListener sessionListener;
|
||||
|
||||
private Class<? extends Enum> sessionKeyClass;
|
||||
|
||||
public SessionManager() {
|
||||
|
||||
this(null);
|
||||
}
|
||||
|
||||
public SessionManager(Class<? extends Enum> sessionKeyClass) {
|
||||
this(sessionKeyClass, null);
|
||||
}
|
||||
|
||||
public SessionManager(SessionListener sessionListener) {
|
||||
this(null, sessionListener);
|
||||
}
|
||||
|
||||
public SessionManager(Class<? extends Enum> sessionKeyClass, SessionListener sessionListener) {
|
||||
this.sessionMap = new ConcurrentHashMap<>();
|
||||
this.versionCache = Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES).build();
|
||||
this.offlineCache = Caffeine.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).build();
|
||||
this.sessionKeyClass = sessionKeyClass;
|
||||
this.remover = future -> {
|
||||
Session session = future.channel().attr(Session.KEY).get();
|
||||
if (session != null) {
|
||||
sessionMap.remove(session.getClientId(), session);
|
||||
sessionMap.remove(session.getId(), session);
|
||||
}
|
||||
};
|
||||
this.sessionListener = sessionListener;
|
||||
}
|
||||
|
||||
public Session newSession(Channel channel) {
|
||||
Session session = new Session(sessionKeyClass, channel, this);
|
||||
callSessionCreatedListener(session);
|
||||
return session;
|
||||
}
|
||||
|
||||
protected void callSessionDestroyedListener(Session session) {
|
||||
if (sessionListener != null)
|
||||
sessionListener.sessionDestroyed(session);
|
||||
}
|
||||
|
||||
protected void callSessionCreatedListener(Session session) {
|
||||
if (sessionListener != null)
|
||||
sessionListener.sessionCreated(session);
|
||||
}
|
||||
|
||||
public Session get(Object clientId) {
|
||||
return sessionMap.get(clientId);
|
||||
public Session get(String sessionId) {
|
||||
return sessionMap.get(sessionId);
|
||||
}
|
||||
|
||||
public Collection<Session> all() {
|
||||
return sessionMap.values();
|
||||
}
|
||||
|
||||
protected void put(Object clientId, Session newSession) {
|
||||
Session oldSession = sessionMap.put(clientId, newSession);
|
||||
protected void add(Session newSession) {
|
||||
Session oldSession = sessionMap.put(newSession.getId(), newSession);
|
||||
if (!newSession.equals(oldSession)) {
|
||||
newSession.channel.closeFuture().addListener(remover);
|
||||
}
|
||||
}
|
||||
|
||||
public void putVersion(Object clientId, int version) {
|
||||
versionCache.put(clientId, version);
|
||||
public void setOfflineCache(String clientId, Object value) {
|
||||
offlineCache.put(clientId, value);
|
||||
}
|
||||
|
||||
public Integer getVersion(Object clientId) {
|
||||
return versionCache.getIfPresent(clientId);
|
||||
public Object getOfflineCache(String clientId) {
|
||||
return offlineCache.getIfPresent(clientId);
|
||||
}
|
||||
|
||||
public Class<? extends Enum> getSessionKeyClass() {
|
||||
return sessionKeyClass;
|
||||
}
|
||||
}
|
|
@ -26,11 +26,11 @@ public class MyHeader {
|
|||
this.clientId = clientId;
|
||||
}
|
||||
|
||||
public Integer getType() {
|
||||
public int getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
public void setType(Integer type) {
|
||||
public void setType(int type) {
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
|
|
|
@ -41,7 +41,7 @@ public class MyMessage implements Message {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Integer getMessageId() {
|
||||
public int getMessageId() {
|
||||
return header.getType();
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue