From e6ee7fe7477b485676ce506b4e971c9a50dfa588 Mon Sep 17 00:00:00 2001 From: 648540858 <456panlinlin> Date: Thu, 14 Apr 2022 16:52:48 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E7=BA=A7=E8=81=94=E7=A7=BB?= =?UTF-8?q?=E5=8A=A8=E4=BD=8D=E7=BD=AE=E8=AE=A2=E9=98=85=E4=BD=8D=E7=BD=AE?= =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../genersoft/iot/vmp/conf/DynamicTask.java | 55 ++++++++++++++++--- .../iot/vmp/gb28181/bean/SubscribeHolder.java | 51 ++++++++++++++++- .../iot/vmp/gb28181/bean/SubscribeInfo.java | 24 ++++++++ .../SubscribeListenerForPlatform.java | 50 ----------------- .../subscribe/catalog/CatalogEventLister.java | 2 - .../task/impl/CatalogSubscribeTask.java | 1 - .../MobilePositionSubscribeHandlerTask.java | 30 +++++----- .../impl/MobilePositionSubscribeTask.java | 26 +++++++-- .../cmd/impl/SIPCommanderFroPlatform.java | 6 +- .../impl/SubscribeRequestProcessor.java | 32 +++++------ .../cmd/CatalogResponseMessageHandler.java | 3 - .../vmp/service/impl/DeviceServiceImpl.java | 26 ++++----- .../vmp/vmanager/server/ServerController.java | 42 +++++++++++++- 13 files changed, 227 insertions(+), 121 deletions(-) delete mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java diff --git a/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java b/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java index b8423829..bd10317c 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java @@ -1,6 +1,9 @@ package com.genersoft.iot.vmp.conf; import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; +import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.CatalogResponseMessageHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @@ -18,6 +21,8 @@ import java.util.concurrent.ScheduledFuture; @Component public class DynamicTask { + private Logger logger = LoggerFactory.getLogger(DynamicTask.class); + @Autowired private ThreadPoolTaskScheduler threadPoolTaskScheduler; @@ -26,7 +31,12 @@ public class DynamicTask { @Bean public ThreadPoolTaskScheduler threadPoolTaskScheduler() { - return new ThreadPoolTaskScheduler(); + ThreadPoolTaskScheduler schedulerPool = new ThreadPoolTaskScheduler(); + schedulerPool.setPoolSize(300); + schedulerPool.setWaitForTasksToCompleteOnShutdown(true); + schedulerPool.setAwaitTerminationSeconds(10); + return schedulerPool; + } /** @@ -37,11 +47,24 @@ public class DynamicTask { * @return */ public void startCron(String key, Runnable task, int cycleForCatalog) { - stop(key); + ScheduledFuture future = futureMap.get(key); + if (future != null) { + if (future.isCancelled()) { + logger.info("任务【{}】已存在但是关闭状态!!!", key); + } else { + logger.info("任务【{}】已存在且已启动!!!", key); + return; + } + } // scheduleWithFixedDelay 必须等待上一个任务结束才开始计时period, cycleForCatalog表示执行的间隔 - ScheduledFuture future = threadPoolTaskScheduler.scheduleAtFixedRate(task, cycleForCatalog * 1000L); - futureMap.put(key, future); - runnableMap.put(key, task); + future = threadPoolTaskScheduler.scheduleAtFixedRate(task, cycleForCatalog * 1000L); + if (future != null){ + futureMap.put(key, future); + runnableMap.put(key, task); + logger.info("任务【{}】启动成功!!!", key); + }else { + logger.info("任务【{}】启动失败!!!", key); + } } /** @@ -53,13 +76,31 @@ public class DynamicTask { */ public void startDelay(String key, Runnable task, int delay) { stop(key); + System.out.println("定时任务开始了"); Date starTime = new Date(System.currentTimeMillis() + delay); + + ScheduledFuture future = futureMap.get(key); + if (future != null) { + if (future.isCancelled()) { + logger.info("任务【{}】已存在但是关闭状态!!!", key); + } else { + logger.info("任务【{}】已存在且已启动!!!", key); + return; + } + } // scheduleWithFixedDelay 必须等待上一个任务结束才开始计时period, cycleForCatalog表示执行的间隔 - ScheduledFuture future = threadPoolTaskScheduler.schedule(task, starTime); - futureMap.put(key, future); + future = threadPoolTaskScheduler.schedule(task, starTime); + if (future != null){ + futureMap.put(key, future); + runnableMap.put(key, task); + logger.info("任务【{}】启动成功!!!", key); + }else { + logger.info("任务【{}】启动失败!!!", key); + } } public void stop(String key) { + System.out.println("定时任务结束了"); if (futureMap.get(key) != null && !futureMap.get(key).isCancelled()) { futureMap.get(key).cancel(true); Runnable runnable = runnableMap.get(key); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java index 62a45d5a..981fe1ec 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java @@ -1,5 +1,12 @@ package com.genersoft.iot.vmp.gb28181.bean; +import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.genersoft.iot.vmp.conf.DynamicTask; +import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeHandlerTask; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.storager.IVideoManagerStorage; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.ArrayList; @@ -9,12 +16,32 @@ import java.util.concurrent.ConcurrentHashMap; @Component public class SubscribeHolder { + @Autowired + private DynamicTask dynamicTask; + + @Autowired + private IRedisCatchStorage redisCatchStorage; + + @Autowired + private ISIPCommanderForPlatform sipCommanderForPlatform; + + @Autowired + private IVideoManagerStorage storager; + + private final String taskOverduePrefix = "subscribe_overdue_"; + private static ConcurrentHashMap catalogMap = new ConcurrentHashMap<>(); private static ConcurrentHashMap mobilePositionMap = new ConcurrentHashMap<>(); public void putCatalogSubscribe(String platformId, SubscribeInfo subscribeInfo) { catalogMap.put(platformId, subscribeInfo); + // 添加订阅到期 + String taskOverdueKey = taskOverduePrefix + "catalog_" + platformId; + dynamicTask.stop(taskOverdueKey); + // 添加任务处理订阅过期 + dynamicTask.startDelay(taskOverdueKey, () -> removeCatalogSubscribe(subscribeInfo.getId()), + subscribeInfo.getExpires() * 1000); } public SubscribeInfo getCatalogSubscribe(String platformId) { @@ -23,10 +50,24 @@ public class SubscribeHolder { public void removeCatalogSubscribe(String platformId) { catalogMap.remove(platformId); + String taskOverdueKey = taskOverduePrefix + "catalog_" + platformId; + // 添加任务处理订阅过期 + dynamicTask.stop(taskOverdueKey); } public void putMobilePositionSubscribe(String platformId, SubscribeInfo subscribeInfo) { mobilePositionMap.put(platformId, subscribeInfo); + String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + "MobilePosition_" + platformId; + // 添加任务处理GPS定时推送 + dynamicTask.startCron(key, new MobilePositionSubscribeHandlerTask(redisCatchStorage, sipCommanderForPlatform, storager, platformId, subscribeInfo.getSn(), key, this), subscribeInfo.getGpsInterval()); + String taskOverdueKey = taskOverduePrefix + "MobilePosition_" + platformId; + dynamicTask.stop(taskOverdueKey); + // 添加任务处理订阅过期 + dynamicTask.startDelay(taskOverdueKey, () -> { + System.out.println("订阅过期"); + removeMobilePositionSubscribe(subscribeInfo.getId()); + }, + subscribeInfo.getExpires() * 1000); } public SubscribeInfo getMobilePositionSubscribe(String platformId) { @@ -35,6 +76,12 @@ public class SubscribeHolder { public void removeMobilePositionSubscribe(String platformId) { mobilePositionMap.remove(platformId); + String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + "MobilePosition_" + platformId; + // 结束任务处理GPS定时推送 + dynamicTask.stop(key); + String taskOverdueKey = taskOverduePrefix + "MobilePosition_" + platformId; + // 添加任务处理订阅过期 + dynamicTask.stop(taskOverdueKey); } public List getAllCatalogSubscribePlatform() { @@ -48,7 +95,7 @@ public class SubscribeHolder { } public void removeAllSubscribe(String platformId) { - mobilePositionMap.remove(platformId); - catalogMap.remove(platformId); + removeMobilePositionSubscribe(platformId); + removeCatalogSubscribe(platformId); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java index 434a639a..feb6a724 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java @@ -33,6 +33,14 @@ public class SubscribeInfo { private ServerTransaction transaction; private Dialog dialog; + /** + * 以下为可选字段 + * @return + */ + private String sn; + private int gpsInterval; + + public String getId() { return id; } @@ -88,4 +96,20 @@ public class SubscribeInfo { public void setDialog(Dialog dialog) { this.dialog = dialog; } + + public String getSn() { + return sn; + } + + public void setSn(String sn) { + this.sn = sn; + } + + public int getGpsInterval() { + return gpsInterval; + } + + public void setGpsInterval(int gpsInterval) { + this.gpsInterval = gpsInterval; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java deleted file mode 100644 index 898e51d6..00000000 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java +++ /dev/null @@ -1,50 +0,0 @@ -package com.genersoft.iot.vmp.gb28181.event.subscribe; - -import com.genersoft.iot.vmp.common.VideoManagerConstants; -import com.genersoft.iot.vmp.conf.DynamicTask; -import com.genersoft.iot.vmp.conf.RedisKeyExpirationEventMessageListener; -import com.genersoft.iot.vmp.conf.UserSetting; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.connection.Message; -import org.springframework.data.redis.listener.RedisMessageListenerContainer; -import org.springframework.stereotype.Component; - -/** - * 平台订阅到期事件 - */ -@Component -public class SubscribeListenerForPlatform extends RedisKeyExpirationEventMessageListener { - - private Logger logger = LoggerFactory.getLogger(SubscribeListenerForPlatform.class); - - @Autowired - private UserSetting userSetting; - - @Autowired - private DynamicTask dynamicTask; - - public SubscribeListenerForPlatform(RedisMessageListenerContainer listenerContainer, UserSetting userSetting) { - super(listenerContainer, userSetting); - } - - - /** - * 监听失效的key - * @param message - * @param pattern - */ - @Override - public void onMessage(Message message, byte[] pattern) { - // 获取失效的key - String expiredKey = message.toString(); - logger.debug(expiredKey); - // 订阅到期 - String PLATFORM_KEEPLIVEKEY_PREFIX = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() + "_"; - if (expiredKey.startsWith(PLATFORM_KEEPLIVEKEY_PREFIX)) { - // 取消定时任务 - dynamicTask.stop(expiredKey); - } - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java index f9546f0c..f9ef10cd 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java @@ -61,8 +61,6 @@ public class CatalogEventLister implements ApplicationListener { if (event.getPlatformId() != null) { parentPlatform = storager.queryParentPlatByServerGBId(event.getPlatformId()); if (parentPlatform != null && !parentPlatform.isStatus())return; - String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() + "_Catalog_" + event.getPlatformId(); -// subscribe = redisCatchStorage.getSubscribe(key); subscribe = subscribeHolder.getCatalogSubscribe(event.getPlatformId()); if (subscribe == null) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java index 433eb3b1..bee5fba6 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java @@ -46,7 +46,6 @@ public class CatalogSubscribeTask implements ISubscribeTask { }); } - @Async @Override public void stop() { /** diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java index 569a9b7e..f20baf90 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java @@ -24,52 +24,50 @@ public class MobilePositionSubscribeHandlerTask implements ISubscribeTask { private IVideoManagerStorage storager; private ISIPCommanderForPlatform sipCommanderForPlatform; private SubscribeHolder subscribeHolder; - private String platformId; + private ParentPlatform platform; private String sn; private String key; public MobilePositionSubscribeHandlerTask(IRedisCatchStorage redisCatchStorage, ISIPCommanderForPlatform sipCommanderForPlatform, IVideoManagerStorage storager, String platformId, String sn, String key, SubscribeHolder subscribeInfo) { + System.out.println("MobilePositionSubscribeHandlerTask 初始化"); this.redisCatchStorage = redisCatchStorage; this.storager = storager; - this.platformId = platformId; + this.platform = storager.queryParentPlatByServerGBId(platformId); this.sn = sn; this.key = key; this.sipCommanderForPlatform = sipCommanderForPlatform; this.subscribeHolder = subscribeInfo; } - @Async @Override public void run() { logger.info("执行MobilePositionSubscribeHandlerTask"); - SubscribeInfo subscribe = subscribeHolder.getMobilePositionSubscribe(platformId); + if (platform == null) return; + SubscribeInfo subscribe = subscribeHolder.getMobilePositionSubscribe(platform.getServerGBId()); if (subscribe != null) { - ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId); - if (parentPlatform == null ) { - logger.info("发送订阅时未找到平台信息:{}", platformId); - return; - } - if (!parentPlatform.isStatus()) { - logger.info("发送订阅时发现平台已经离线:{}", platformId); - return; - } + +// if (!parentPlatform.isStatus()) { +// logger.info("发送订阅时发现平台已经离线:{}", platformId); +// return; +// } // TODO 暂时只处理视频流的回复,后续增加对国标设备的支持 - List gbStreams = storager.queryGbStreamListInPlatform(platformId); + List gbStreams = storager.queryGbStreamListInPlatform(platform.getServerGBId()); if (gbStreams.size() == 0) { - logger.info("发送订阅时发现平台已经没有关联的直播流:{}", platformId); + logger.info("发送订阅时发现平台已经没有关联的直播流:{}", platform.getServerGBId()); return; } for (GbStream gbStream : gbStreams) { String gbId = gbStream.getGbId(); GPSMsgInfo gpsMsgInfo = redisCatchStorage.getGpsMsgInfo(gbId); if (gpsMsgInfo != null) { // 无最新位置不发送 + logger.info("无最新位置不发送"); // 经纬度都为0不发送 if (gpsMsgInfo.getLng() == 0 && gpsMsgInfo.getLat() == 0) { continue; } // 发送GPS消息 - sipCommanderForPlatform.sendNotifyMobilePosition(parentPlatform, gpsMsgInfo, subscribe); + sipCommanderForPlatform.sendNotifyMobilePosition(platform, gpsMsgInfo, subscribe); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java index 7203ee2d..884f0401 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java @@ -11,6 +11,8 @@ import org.springframework.scheduling.annotation.Async; import javax.sip.Dialog; import javax.sip.DialogState; import javax.sip.ResponseEvent; +import java.util.Timer; +import java.util.TimerTask; /** * 移动位置订阅的定时更新 @@ -21,18 +23,23 @@ public class MobilePositionSubscribeTask implements ISubscribeTask { private ISIPCommander sipCommander; private Dialog dialog; + private Timer timer ; + public MobilePositionSubscribeTask(Device device, ISIPCommander sipCommander) { this.device = device; this.sipCommander = sipCommander; } - @Async @Override public void run() { + if (timer != null ) { + timer.cancel(); + timer = null; + } sipCommander.mobilePositionSubscribe(device, dialog, eventResult -> { - if (eventResult.dialog != null || eventResult.dialog.getState().equals(DialogState.CONFIRMED)) { - dialog = eventResult.dialog; - } +// if (eventResult.dialog != null || eventResult.dialog.getState().equals(DialogState.CONFIRMED)) { +// dialog = eventResult.dialog; +// } ResponseEvent event = (ResponseEvent) eventResult.event; if (event.getResponse().getRawContent() != null) { // 成功 @@ -45,6 +52,13 @@ public class MobilePositionSubscribeTask implements ISubscribeTask { dialog = null; // 失败 logger.warn("[移动位置订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg); + timer = new Timer(); + timer.schedule(new TimerTask() { + @Override + public void run() { + MobilePositionSubscribeTask.this.run(); + } + }, 2000); }); } @@ -58,6 +72,10 @@ public class MobilePositionSubscribeTask implements ISubscribeTask { * COMPLETED-> Completed Dialog状态-已完成 * TERMINATED-> Terminated Dialog状态-终止 */ + if (timer != null ) { + timer.cancel(); + timer = null; + } if (dialog != null && dialog.getState().equals(DialogState.CONFIRMED)) { logger.info("取消移动订阅时dialog状态为{}", dialog.getState()); device.setSubscribeCycleForMobilePosition(0); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index 7768ed42..c5cdae00 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -385,7 +385,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { if (parentPlatform == null) { return false; } - + logger.info("[发送 移动位置订阅] {}/{}->{},{}", parentPlatform.getServerGBId(), gpsMsgInfo.getId(), gpsMsgInfo.getLng(), gpsMsgInfo.getLat()); try { String characterSet = parentPlatform.getCharacterSet(); StringBuffer deviceStatusXml = new StringBuffer(600); @@ -405,7 +405,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() : udpSipProvider.getNewCallId(); callIdHeader.setCallId(subscribeInfo.getCallId()); - logger.info("[发送 移动位置订阅] {}/{}->{},{}", parentPlatform.getServerGBId(), gpsMsgInfo.getId(), gpsMsgInfo.getLng(), gpsMsgInfo.getLat()); + sendNotify(parentPlatform, deviceStatusXml.toString(), subscribeInfo, eventResult -> { logger.error("发送NOTIFY通知消息失败。错误:{} {}", eventResult.statusCode, eventResult.msg); }, null); @@ -459,7 +459,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { // 设置编码, 防止中文乱码 messageFactory.setDefaultContentEncodingCharset(characterSet); Dialog dialog = subscribeInfo.getDialog(); - if (dialog == null) return; + if (dialog == null || !dialog.getState().equals(DialogState.CONFIRMED)) return; SIPRequest notifyRequest = (SIPRequest)dialog.createRequest(Request.NOTIFY); ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml"); notifyRequest.setContent(catalogXmlContent, contentTypeHeader); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java index 979849ed..da1088a2 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java @@ -149,7 +149,6 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme subscribeInfo.setDialog(dialog); } String sn = XmlUtil.getText(rootElement, "SN"); - String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() + "_MobilePosition_" + platformId; logger.info("[回复 移动位置订阅]: {}", platformId); StringBuilder resultXml = new StringBuilder(200); resultXml.append("\r\n") @@ -161,23 +160,25 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme .append("\r\n"); if (subscribeInfo.getExpires() > 0) { - - if (subscribeHolder.getMobilePositionSubscribe(platformId) == null ) { - String interval = XmlUtil.getText(rootElement, "Interval"); // GPS上报时间间隔 - subscribeHolder.putMobilePositionSubscribe(platformId, subscribeInfo); - dynamicTask.startCron(key, new MobilePositionSubscribeHandlerTask(redisCatchStorage, sipCommanderForPlatform, storager, platformId, sn, key, subscribeHolder), Integer.parseInt(interval)); + String interval = XmlUtil.getText(rootElement, "Interval"); // GPS上报时间间隔 + if (interval == null) { + subscribeInfo.setGpsInterval(5); }else { - if (subscribeHolder.getMobilePositionSubscribe(platformId).getDialog() != null - && subscribeHolder.getMobilePositionSubscribe(platformId).getDialog().getState() != null - && !subscribeHolder.getMobilePositionSubscribe(platformId).getDialog().getState().equals(DialogState.CONFIRMED)) { - dynamicTask.stop(key); - String interval = XmlUtil.getText(rootElement, "Interval"); // GPS上报时间间隔 - subscribeHolder.putMobilePositionSubscribe(platformId, subscribeInfo); - dynamicTask.startCron(key, new MobilePositionSubscribeHandlerTask(redisCatchStorage, sipCommanderForPlatform, storager, platformId, sn, key, subscribeHolder), Integer.parseInt(interval)); - } + subscribeInfo.setGpsInterval(Integer.parseInt(interval)); } + + subscribeInfo.setSn(sn); + subscribeHolder.putMobilePositionSubscribe(platformId, subscribeInfo); +// if (subscribeHolder.getMobilePositionSubscribe(platformId) == null ) { +// subscribeHolder.putMobilePositionSubscribe(platformId, subscribeInfo); +// }else { +// if (subscribeHolder.getMobilePositionSubscribe(platformId).getDialog() != null +// && subscribeHolder.getMobilePositionSubscribe(platformId).getDialog().getState() != null +// && !subscribeHolder.getMobilePositionSubscribe(platformId).getDialog().getState().equals(DialogState.CONFIRMED)) { +// subscribeHolder.putMobilePositionSubscribe(platformId, subscribeInfo); +// } +// } }else if (subscribeInfo.getExpires() == 0) { - dynamicTask.stop(key); subscribeHolder.removeMobilePositionSubscribe(platformId); } @@ -211,7 +212,6 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme subscribeInfo.setDialog(dialog); } String sn = XmlUtil.getText(rootElement, "SN"); - String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() + "_Catalog_" + platformId; logger.info("[回复 目录订阅]: {}/{}", platformId, deviceID); StringBuilder resultXml = new StringBuilder(200); resultXml.append("\r\n") diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java index 1bafb59d..dbc25fc6 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java @@ -145,9 +145,6 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp } // 回复200 OK responseAck(evt, Response.OK); - if (offLineDetector.isOnline(device.getDeviceId())) { - publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_MESSAGE); - } } } catch (DocumentException e) { e.printStackTrace(); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java index 8cd2c77b..d3432865 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java @@ -14,6 +14,8 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import javax.sip.DialogState; + /** * 设备业务(目录订阅) */ @@ -39,19 +41,17 @@ public class DeviceServiceImpl implements IDeviceService { if (device == null || device.getSubscribeCycleForCatalog() < 0) { return false; } - if (dynamicTask.contains(device.getDeviceId() + "catalog")) { - // 存在则停止现有的,开启新的 - dynamicTask.stop(device.getDeviceId() + "catalog"); + CatalogSubscribeTask task = (CatalogSubscribeTask)dynamicTask.get(device.getDeviceId() + "catalog"); + if (task != null && task.getDialogState() != null && task.getDialogState().equals(DialogState.CONFIRMED)) { // 已存在不需要再次添加 + return true; } logger.info("[添加目录订阅] 设备{}", device.getDeviceId()); // 添加目录订阅 CatalogSubscribeTask catalogSubscribeTask = new CatalogSubscribeTask(device, sipCommander); - catalogSubscribeTask.run(); // 提前开始刷新订阅 - int subscribeCycleForCatalog = device.getSubscribeCycleForCatalog(); + int subscribeCycleForCatalog = Math.max(device.getSubscribeCycleForCatalog(),30); // 设置最小值为30 - subscribeCycleForCatalog = Math.max(subscribeCycleForCatalog, 30); - dynamicTask.startCron(device.getDeviceId() + "catalog", catalogSubscribeTask, subscribeCycleForCatalog); + dynamicTask.startCron(device.getDeviceId() + "catalog", catalogSubscribeTask, subscribeCycleForCatalog -1); return true; } @@ -70,18 +70,16 @@ public class DeviceServiceImpl implements IDeviceService { if (device == null || device.getSubscribeCycleForMobilePosition() < 0) { return false; } - if (dynamicTask.contains(device.getDeviceId() + "mobile_position")) { - // 存在则停止现有的,开启新的 - dynamicTask.stop(device.getDeviceId() + "mobile_position"); - } logger.info("[添加移动位置订阅] 设备{}", device.getDeviceId()); + MobilePositionSubscribeTask task = (MobilePositionSubscribeTask)dynamicTask.get(device.getDeviceId() + "mobile_position"); + if (task != null && task.getDialogState() != null && task.getDialogState().equals(DialogState.CONFIRMED)) { // 已存在不需要再次添加 + return true; + } // 添加目录订阅 MobilePositionSubscribeTask mobilePositionSubscribeTask = new MobilePositionSubscribeTask(device, sipCommander); - mobilePositionSubscribeTask.run(); // 提前开始刷新订阅 - int subscribeCycleForCatalog = device.getSubscribeCycleForCatalog(); // 设置最小值为30 - subscribeCycleForCatalog = Math.max(subscribeCycleForCatalog, 30); + int subscribeCycleForCatalog = Math.max(device.getSubscribeCycleForMobilePosition(),30); dynamicTask.startCron(device.getDeviceId() + "mobile_position" , mobilePositionSubscribeTask, subscribeCycleForCatalog -1 ); return true; } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java index a0e7a737..faed2c82 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java @@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.VManageBootstrap; import com.genersoft.iot.vmp.common.VersionPo; +import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.VersionInfo; @@ -27,6 +28,7 @@ import javax.sip.ObjectInUseException; import javax.sip.SipProvider; import java.util.Iterator; import java.util.List; +import java.util.Set; @SuppressWarnings("rawtypes") @Api(tags = "服务控制") @@ -42,13 +44,16 @@ public class ServerController { private IMediaServerService mediaServerService; @Autowired - VersionInfo versionInfo; + private VersionInfo versionInfo; @Autowired - SipConfig sipConfig; + private SipConfig sipConfig; @Autowired - UserSetting userSetting; + private UserSetting userSetting; + + @Autowired + private DynamicTask dynamicTask; @Value("${server.port}") private int serverPort; @@ -248,4 +253,35 @@ public class ServerController { result.setData(jsonObject); return result; } + +// @ApiOperation("当前进行中的动态任务") +// @GetMapping(value = "/dynamicTask") +// @ResponseBody +// public WVPResult getDynamicTask(){ +// WVPResult result = new WVPResult<>(); +// result.setCode(0); +// result.setMsg("success"); +// +// JSONObject jsonObject = new JSONObject(); +// +// Set allKeys = dynamicTask.getAllKeys(); +// jsonObject.put("server.port", serverPort); +// if (StringUtils.isEmpty(type)) { +// jsonObject.put("sip", JSON.toJSON(sipConfig)); +// jsonObject.put("base", JSON.toJSON(userSetting)); +// }else { +// switch (type){ +// case "sip": +// jsonObject.put("sip", sipConfig); +// break; +// case "base": +// jsonObject.put("base", userSetting); +// break; +// default: +// break; +// } +// } +// result.setData(jsonObject); +// return result; +// } }