diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/CommonSessionManager.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/CommonSessionManager.java new file mode 100755 index 00000000..2d8c7e17 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/CommonSessionManager.java @@ -0,0 +1,86 @@ +package com.genersoft.iot.vmp.gb28181.session; + +import com.genersoft.iot.vmp.common.CommonCallback; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.util.Calendar; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * 通用回调管理 + */ +@Component +public class CommonSessionManager { + + public static Map callbackMap = new ConcurrentHashMap<>(); + + /** + * 存储回调相关的信息 + */ + class CommonSession{ + public String session; + public long createTime; + public int timeout; + + public CommonCallback callback; + public CommonCallback timeoutCallback; + } + + /** + * 添加回调 + * @param sessionId 唯一标识 + * @param callback 回调 + * @param timeout 超时时间, 单位分钟 + */ + public void add(String sessionId, CommonCallback callback, CommonCallback timeoutCallback, + Integer timeout) { + CommonSession commonSession = new CommonSession(); + commonSession.session = sessionId; + commonSession.callback = callback; + commonSession.createTime = System.currentTimeMillis(); + if (timeoutCallback != null) { + commonSession.timeoutCallback = timeoutCallback; + } + if (timeout != null) { + commonSession.timeout = timeout; + } + callbackMap.put(sessionId, commonSession); + } + + public void add(String sessionId, CommonCallback callback) { + add(sessionId, callback, null, 1); + } + + public CommonCallback get(String sessionId, boolean destroy) { + CommonSession commonSession = callbackMap.get(sessionId); + if (destroy) { + callbackMap.remove(sessionId); + } + return commonSession.callback; + } + + public CommonCallback get(String sessionId) { + return get(sessionId, false); + } + + public void delete(String sessionID) { + callbackMap.remove(sessionID); + } + + @Scheduled(fixedRate= 60) //每分钟执行一次 + public void execute(){ + Calendar cal = Calendar.getInstance(); + cal.add(Calendar.MINUTE, -1); + for (String session : callbackMap.keySet()) { + if (callbackMap.get(session).createTime < cal.getTimeInMillis()) { + // 超时 + if (callbackMap.get(session).timeoutCallback != null) { + callbackMap.get(session).timeoutCallback.run("timeout"); + } + callbackMap.remove(session); + } + } + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index d35c6a63..435f35f4 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -6,7 +6,6 @@ import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; -import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; @@ -38,7 +37,6 @@ import javax.sip.SipException; import javax.sip.header.FromHeader; import javax.sip.message.Response; import java.text.ParseException; -import java.util.Iterator; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; @@ -222,7 +220,6 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements mobilePosition.getLongitude(), mobilePosition.getLatitude()); mobilePosition.setReportSource("Mobile Position"); - // 更新device channel 的经纬度 DeviceChannel deviceChannel = new DeviceChannel(); deviceChannel.setDeviceId(device.getDeviceId()); @@ -242,6 +239,8 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements } storager.updateChannelPosition(deviceChannel); + // 向关联了该通道并且开启移动位置订阅的上级平台发送移动位置订阅消息 + // 发送redis消息。 通知位置信息的变化 JSONObject jsonObject = new JSONObject();