移除Session中的subject、snapshot业务属性、attributes支持EnumMap类型;

移除MessageManager,发送同步消息逻辑移至Session
master
剑器近 2021-06-11 11:46:41 +08:00
parent a15da0d583
commit 75fb5938a5
3 changed files with 118 additions and 183 deletions

View File

@ -1,126 +0,0 @@
package io.github.yezhihao.netmc.session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.github.yezhihao.netmc.core.model.Header;
import io.github.yezhihao.netmc.core.model.Message;
import io.github.yezhihao.netmc.core.model.Response;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
/**
* @author yezhihao
* home https://gitee.com/yezhihao/jt808-server
*/
public class MessageManager {
private static final Logger log = LoggerFactory.getLogger(MessageManager.class.getSimpleName());
private Map<String, SynchronousQueue> topicSubscribers = new ConcurrentHashMap<>();
private SessionManager sessionManager;
public MessageManager(SessionManager sessionManager) {
this.sessionManager = sessionManager;
}
/**
*
*/
public boolean notify(Message<? extends Header> message) {
Header header = message.getHeader();
Object clientId = header.getClientId();
Session session = sessionManager.get(clientId);
if (session == null) {
log.info("<<<<<<<<<<消息发送失败,未注册,{}", message);
return false;
}
header.setSerialNo(session.nextSerialNo());
session.writeObject(message);
return true;
}
/**
*
* 20
*/
public <T> T request(Message<? extends Header> request, Class<T> responseClass) {
return request(request, responseClass, 20000);
}
public <T> T request(Message<? extends Header> request, Class<T> responseClass, long timeout) {
Header header = request.getHeader();
Object clientId = header.getClientId();
Session session = sessionManager.get(clientId);
if (session == null) {
log.info("<<<<<<<<<<消息发送失败,未注册,{}", request);
return null;
}
header.setSerialNo(session.nextSerialNo());
String key = requestKey(header, responseClass);
SynchronousQueue syncQueue = this.subscribe(key);
if (syncQueue == null) {
log.info("<<<<<<<<<<请勿重复发送,{}", request);
}
try {
session.writeObject(request);
return (T) syncQueue.poll(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.warn("<<<<<<<<<<等待响应超时" + session, e);
} finally {
this.unsubscribe(key);
}
return null;
}
/**
*
*/
public boolean response(Message message) {
SynchronousQueue queue = topicSubscribers.get(responseKey(message));
if (queue != null)
return queue.offer(message);
return false;
}
private SynchronousQueue subscribe(String key) {
SynchronousQueue queue = null;
if (!topicSubscribers.containsKey(key))
topicSubscribers.put(key, queue = new SynchronousQueue());
return queue;
}
private void unsubscribe(String key) {
topicSubscribers.remove(key);
}
private static String requestKey(Header header, Class responseClass) {
StringBuilder key = new StringBuilder(13 + 1 + 27 + 1 + 5);
key.append(header.getClientId()).append('/').append(responseClass.getName());
if (Response.class.isAssignableFrom(responseClass))
key.append('/').append(header.getSerialNo());
return key.toString();
}
private static String responseKey(Message response) {
Class<? extends Message> responseClass = response.getClass();
Header header = response.getHeader();
StringBuilder key = new StringBuilder(13 + 1 + 27 + 1 + 5);
key.append(header.getClientId()).append('/').append(responseClass.getName());
if (response instanceof Response)
key.append('/').append(((Response) response).getSerialNo());
return key.toString();
}
}

View File

@ -1,14 +1,17 @@
package io.github.yezhihao.netmc.session;
import io.github.yezhihao.netmc.core.model.Header;
import io.github.yezhihao.netmc.core.model.Message;
import io.github.yezhihao.netmc.core.model.Response;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.util.AttributeKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Map;
import java.util.TreeMap;
import java.util.*;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
@ -29,24 +32,24 @@ public class Session {
private final long creationTime;
private volatile long lastAccessedTime;
private Map<String, Object> attributes;
private Object subject;
private Object snapshot;
private Map<Object, Object> attributes;
private Integer protocolVersion;
private SessionManager sessionManager;
protected Session(Channel channel, SessionManager sessionManager) {
this(null, channel, sessionManager);
}
protected Session(Class<? extends Enum> sessionKeyClass, Channel channel, SessionManager sessionManager) {
this.channel = channel;
this.sessionManager = sessionManager;
this.creationTime = System.currentTimeMillis();
this.lastAccessedTime = creationTime;
this.attributes = new TreeMap<>();
}
public void writeObject(Object message) {
log.info("<<<<<<<<<<消息下发{},{}", this, message);
channel.writeAndFlush(message);
if (sessionKeyClass != null)
this.attributes = new EnumMap(sessionKeyClass);
else
this.attributes = new TreeMap<>();
}
public int getId() {
@ -70,33 +73,16 @@ public class Session {
/**
* SessionManager
*/
public void register(Header header) {
this.register(header, null);
}
public void register(Header header, Object subject) {
this.clientId = header.getClientId();
this.registered = true;
this.subject = subject;
sessionManager.put(clientId, this);
}
public void register(Object clientId) {
this.register(clientId, null);
}
public void register(Object clientId, Object subject) {
this.clientId = clientId;
this.registered = true;
this.subject = subject;
sessionManager.put(clientId, this);
this.clientId = clientId;
this.sessionManager.put(this.clientId, this);
}
public Object getClientId() {
return clientId;
}
public long getCreationTime() {
return creationTime;
}
@ -110,38 +96,22 @@ public class Session {
return lastAccessedTime;
}
public Collection<String> getAttributeNames() {
public Collection<Object> getAttributeNames() {
return attributes.keySet();
}
public Object getAttribute(String name) {
public Object getAttribute(Object name) {
return attributes.get(name);
}
public void setAttribute(String name, Object value) {
public void setAttribute(Object name, Object value) {
attributes.put(name, value);
}
public Object removeAttribute(String name) {
public Object removeAttribute(Object name) {
return attributes.remove(name);
}
public Object getSubject() {
return subject;
}
public void setSubject(Object subject) {
this.subject = subject;
}
public Object getSnapshot() {
return snapshot;
}
public void setSnapshot(Object snapshot) {
this.snapshot = snapshot;
}
public Integer getProtocolVersion() {
return protocolVersion;
}
@ -188,4 +158,84 @@ public class Session {
sb.append(']');
return sb.toString();
}
private transient Map<String, SynchronousQueue> topicSubscribers = new HashMap<>();
private static final ChannelFutureListener ERROR_LOG_LISTENER = future -> {
Throwable t = future.cause();
if (t != null)
log.error("<<<<<<<<<<消息下发失败", t);
};
/**
*
*/
public void notify(Object message) {
log.info("<<<<<<<<<<消息通知{},{}", this, message);
channel.writeAndFlush(message).addListener(ERROR_LOG_LISTENER);
}
/**
*
* 20
*/
public <T> T request(Message request, Class<T> responseClass) {
return request(request, responseClass, 20000);
}
public <T> T request(Message request, Class<T> responseClass, long timeout) {
String key = requestKey(request, responseClass);
SynchronousQueue syncQueue = this.subscribe(key);
if (syncQueue == null) {
log.info("<<<<<<<<<<请勿重复发送,{}", request);
}
T result = null;
try {
log.info("<<<<<<<<<<消息请求{},{}", this, request);
ChannelFuture channelFuture = channel.writeAndFlush(request).addListener(ERROR_LOG_LISTENER);
if (channelFuture.awaitUninterruptibly().isSuccess())
result = (T) syncQueue.poll(timeout, TimeUnit.MILLISECONDS);
} catch (Throwable e) {
log.warn("<<<<<<<<<<等待响应超时" + this, e);
} finally {
this.unsubscribe(key);
}
return result;
}
/**
*
*/
public boolean response(Message message) {
SynchronousQueue queue = topicSubscribers.get(responseKey(message));
if (queue != null)
return queue.offer(message);
return false;
}
private SynchronousQueue subscribe(String key) {
SynchronousQueue queue = null;
synchronized (this) {
if (!topicSubscribers.containsKey(key))
topicSubscribers.put(key, queue = new SynchronousQueue());
}
return queue;
}
private void unsubscribe(String key) {
topicSubscribers.remove(key);
}
private static String requestKey(Message request, Class responseClass) {
if (Response.class.isAssignableFrom(responseClass))
return Integer.toString(request.getSerialNo());
return responseClass.getName();
}
private static String responseKey(Message response) {
if (response instanceof Response)
return Integer.toString(((Response) response).getResponseSerialNo());
return response.getClass().getName();
}
}

View File

@ -24,24 +24,35 @@ public class SessionManager {
private SessionListener sessionListener;
private Class<? extends Enum> sessionKeyClass;
public SessionManager() {
}
public SessionManager(Class<? extends Enum> sessionKeyClass) {
this(sessionKeyClass, null);
}
public SessionManager(SessionListener sessionListener) {
this(null, sessionListener);
}
public SessionManager(Class<? extends Enum> sessionKeyClass, SessionListener sessionListener) {
this.sessionMap = new ConcurrentHashMap<>();
this.versionCache = Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES).build();
this.sessionKeyClass = sessionKeyClass;
this.remover = future -> {
Session session = future.channel().attr(Session.KEY).get();
if (session != null) {
sessionMap.remove(session.getClientId(), session);
}
};
}
public SessionManager(SessionListener sessionListener) {
this();
this.sessionListener = sessionListener;
}
public Session newSession(Channel channel) {
Session session = new Session(channel, this);
Session session = new Session(sessionKeyClass, channel, this);
callSessionCreatedListener(session);
return session;
}