From d2fc2df77b5634baa4cd7ce055d82c48079c9c50 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Tue, 22 Oct 2024 17:41:42 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E9=80=9A=E9=81=93=E5=88=B7?= =?UTF-8?q?=E6=96=B0=E7=9A=84=E6=97=B6=E6=8E=A5=E6=94=B6=E4=B8=8B=E7=BA=A7?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E7=9A=84=E6=95=88=E7=8E=87?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/conf/MediaStatusTimerTask.java | 2 +- .../genersoft/iot/vmp/conf/WVPTimerTask.java | 2 +- .../vmp/gb28181/session/CatalogDataCatch.java | 2 +- .../NotifyRequestForCatalogProcessor.java | 2 +- ...tifyRequestForMobilePositionProcessor.java | 2 +- .../cmd/CatalogResponseMessageHandler.java | 193 ++++++++++-------- .../impl/MobilePositionServiceImpl.java | 2 +- .../service/redisMsg/RedisGpsMsgListener.java | 2 +- 8 files changed, 111 insertions(+), 96 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/conf/MediaStatusTimerTask.java b/src/main/java/com/genersoft/iot/vmp/conf/MediaStatusTimerTask.java index 56573fe5..761250e0 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/MediaStatusTimerTask.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/MediaStatusTimerTask.java @@ -8,7 +8,7 @@ import org.springframework.scheduling.annotation.Scheduled; public class MediaStatusTimerTask { - @Scheduled(fixedRate = 2 * 1000) //每3秒执行一次 +// @Scheduled(fixedRate = 2 * 1000) //每3秒执行一次 public void execute(){ } diff --git a/src/main/java/com/genersoft/iot/vmp/conf/WVPTimerTask.java b/src/main/java/com/genersoft/iot/vmp/conf/WVPTimerTask.java index 229aeabb..6da0caf3 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/WVPTimerTask.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/WVPTimerTask.java @@ -19,7 +19,7 @@ public class WVPTimerTask { @Autowired private SipConfig sipConfig; - @Scheduled(fixedRate = 2 * 1000) //每3秒执行一次 + @Scheduled(fixedDelay = 2 * 1000) //每3秒执行一次 public void execute(){ JSONObject jsonObject = new JSONObject(); jsonObject.put("ip", sipConfig.getShowIp()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java index baa9e2f2..34a8eceb 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java @@ -135,7 +135,7 @@ public class CatalogDataCatch { return !catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end); } - @Scheduled(fixedRate = 5 * 1000) //每5秒执行一次, 发现数据5秒未更新则移除数据并认为数据接收超时 + @Scheduled(fixedDelay = 5 * 1000) //每5秒执行一次, 发现数据5秒未更新则移除数据并认为数据接收超时 private void timerTask(){ Set keys = data.keySet(); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java index 00f7fd25..968980a9 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java @@ -62,7 +62,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent taskQueue.offer(new HandlerCatchData(evt, null, null)); } - @Scheduled(fixedRate = 400) //每400毫秒执行一次 + @Scheduled(fixedDelay = 400) //每400毫秒执行一次 public void executeTaskQueue(){ if (taskQueue.isEmpty()) { return; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java index c4124cad..ae589d3c 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java @@ -61,7 +61,7 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor taskQueue.offer(new HandlerCatchData(evt, null, null)); } - @Scheduled(fixedRate = 200) //每200毫秒执行一次 + @Scheduled(fixedDelay = 200) //每200毫秒执行一次 public void executeTaskQueue() { if (taskQueue.isEmpty()) { return; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java index b0fe7144..38774fb6 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java @@ -2,13 +2,13 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.respon import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService; import com.genersoft.iot.vmp.gb28181.service.IGroupService; import com.genersoft.iot.vmp.gb28181.service.IRegionService; import com.genersoft.iot.vmp.gb28181.session.CatalogDataCatch; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler; -import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService; import gov.nist.javax.sip.message.SIPRequest; import lombok.extern.slf4j.Slf4j; import org.dom4j.DocumentException; @@ -16,6 +16,7 @@ import org.dom4j.Element; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; @@ -80,100 +81,114 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp } catch (SipException | InvalidArgumentException | ParseException e) { log.error("[命令发送失败] 目录查询回复: {}", e.getMessage()); } - // 已经开启消息处理则跳过 - if (processing.compareAndSet(false, true)) { - taskExecutor.execute(() -> { - while (!taskQueue.isEmpty()) { - // 全局异常捕获,保证下一条可以得到处理 - try { - HandlerCatchData take = taskQueue.poll(); - Element rootElement = null; - try { - rootElement = getRootElement(take.getEvt(), take.getDevice().getCharset()); - } catch (DocumentException e) { - log.error("[xml解析] 失败: ", e); - continue; - } - if (rootElement == null) { - log.warn("[ 收到通道 ] content cannot be null, {}", evt.getRequest()); - continue; - } - Element deviceListElement = rootElement.element("DeviceList"); - Element sumNumElement = rootElement.element("SumNum"); - Element snElement = rootElement.element("SN"); - int sumNum = Integer.parseInt(sumNumElement.getText()); + } - if (sumNum == 0) { - log.info("[收到通道]设备:{}的: 0个", take.getDevice().getDeviceId()); - // 数据已经完整接收 - deviceChannelService.cleanChannelsForDevice(take.getDevice().getId()); - catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), null); - } else { - Iterator deviceListIterator = deviceListElement.elementIterator(); - if (deviceListIterator != null) { - List channelList = new ArrayList<>(); - List regionList = new ArrayList<>(); - List groupList = new ArrayList<>(); - // 遍历DeviceList - while (deviceListIterator.hasNext()) { - Element itemDevice = deviceListIterator.next(); - Element channelDeviceElement = itemDevice.element("DeviceID"); - if (channelDeviceElement == null) { - continue; - } - DeviceChannel channel = DeviceChannel.decode(itemDevice); - if (channel.getDeviceId() == null) { - log.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent())); - continue; - } - channel.setDeviceDbId(device.getId()); - if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) { - channel.setParentId(null); - } - // 解析通道类型 - if (channel.getDeviceId().length() <= 8) { - // 行政区划 - Region region = Region.getInstance(channel); - regionList.add(region); - channel.setChannelType(1); - }else if (channel.getDeviceId().length() == 20){ - // 业务分组/虚拟组织 - Group group = Group.getInstance(channel); - if (group != null) { - channel.setParental(1); - channel.setChannelType(2); - groupList.add(group); - } - } - channelList.add(channel); - } - int sn = Integer.parseInt(snElement.getText()); - catalogDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, take.getDevice(), - channelList, regionList, groupList); - log.info("[收到通道]设备: {} -> {}个,{}/{}", take.getDevice().getDeviceId(), channelList.size(), catalogDataCatch.getDeviceChannelList(take.getDevice().getDeviceId()) == null ? 0 : catalogDataCatch.getDeviceChannelList(take.getDevice().getDeviceId()).size(), sumNum); - if (catalogDataCatch.getDeviceChannelList(take.getDevice().getDeviceId()).size() == sumNum) { - // 数据已经完整接收, 此时可能存在某个设备离线变上线的情况,但是考虑到性能,此处不做处理, - // 目前支持设备通道上线通知时和设备上线时向上级通知 - boolean resetChannelsResult = saveData(device); - if (!resetChannelsResult) { - String errorMsg = "接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.getDeviceChannelList(take.getDevice().getDeviceId()).size() + "条"; - catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), errorMsg); - } else { - catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), null); - } + @Scheduled(fixedDelay = 200) + public void executeTaskQueue(){ + if (taskQueue.isEmpty()) { + return; + } + List handlerCatchDataList = new ArrayList<>(); + int size = taskQueue.size(); + for (int i = 0; i < size; i++) { + HandlerCatchData poll = taskQueue.poll(); + if (poll != null) { + handlerCatchDataList.add(poll); + } + } + if (handlerCatchDataList.isEmpty()) { + return; + } + for (HandlerCatchData take : handlerCatchDataList) { + if (take == null) { + continue; + } + RequestEvent evt = take.getEvt(); + // 全局异常捕获,保证下一条可以得到处理 + try { + Element rootElement = null; + try { + rootElement = getRootElement(take.getEvt(), take.getDevice().getCharset()); + } catch (DocumentException e) { + log.error("[xml解析] 失败: ", e); + continue; + } + if (rootElement == null) { + log.warn("[ 收到通道 ] content cannot be null, {}", evt.getRequest()); + continue; + } + Element deviceListElement = rootElement.element("DeviceList"); + Element sumNumElement = rootElement.element("SumNum"); + Element snElement = rootElement.element("SN"); + int sumNum = Integer.parseInt(sumNumElement.getText()); + + if (sumNum == 0) { + log.info("[收到通道]设备:{}的: 0个", take.getDevice().getDeviceId()); + // 数据已经完整接收 + deviceChannelService.cleanChannelsForDevice(take.getDevice().getId()); + catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), null); + } else { + Iterator deviceListIterator = deviceListElement.elementIterator(); + if (deviceListIterator != null) { + List channelList = new ArrayList<>(); + List regionList = new ArrayList<>(); + List groupList = new ArrayList<>(); + // 遍历DeviceList + while (deviceListIterator.hasNext()) { + Element itemDevice = deviceListIterator.next(); + Element channelDeviceElement = itemDevice.element("DeviceID"); + if (channelDeviceElement == null) { + continue; + } + DeviceChannel channel = DeviceChannel.decode(itemDevice); + if (channel.getDeviceId() == null) { + log.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent())); + continue; + } + channel.setDeviceDbId(take.getDevice().getId()); + if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) { + channel.setParentId(null); + } + // 解析通道类型 + if (channel.getDeviceId().length() <= 8) { + // 行政区划 + Region region = Region.getInstance(channel); + regionList.add(region); + channel.setChannelType(1); + }else if (channel.getDeviceId().length() == 20){ + // 业务分组/虚拟组织 + Group group = Group.getInstance(channel); + if (group != null) { + channel.setParental(1); + channel.setChannelType(2); + groupList.add(group); } } - + channelList.add(channel); + } + int sn = Integer.parseInt(snElement.getText()); + catalogDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, take.getDevice(), + channelList, regionList, groupList); + log.info("[收到通道]设备: {} -> {}个,{}/{}", take.getDevice().getDeviceId(), channelList.size(), catalogDataCatch.getDeviceChannelList(take.getDevice().getDeviceId()) == null ? 0 : catalogDataCatch.getDeviceChannelList(take.getDevice().getDeviceId()).size(), sumNum); + if (catalogDataCatch.getDeviceChannelList(take.getDevice().getDeviceId()).size() == sumNum) { + // 数据已经完整接收, 此时可能存在某个设备离线变上线的情况,但是考虑到性能,此处不做处理, + // 目前支持设备通道上线通知时和设备上线时向上级通知 + boolean resetChannelsResult = saveData(take.getDevice()); + if (!resetChannelsResult) { + String errorMsg = "接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.getDeviceChannelList(take.getDevice().getDeviceId()).size() + "条"; + catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), errorMsg); + } else { + catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), null); + } } - } catch (Exception e) { - log.warn("[收到通道] 发现未处理的异常, \r\n{}", evt.getRequest()); - log.error("[收到通道] 异常内容: ", e); } - } - processing.set(false); - }); - } + } + } catch (Exception e) { + log.warn("[收到通道] 发现未处理的异常, \r\n{}", evt.getRequest()); + log.error("[收到通道] 异常内容: ", e); + } + } } @Transactional diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MobilePositionServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MobilePositionServiceImpl.java index 97f26a10..0241f69a 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MobilePositionServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MobilePositionServiceImpl.java @@ -99,7 +99,7 @@ public class MobilePositionServiceImpl implements IMobilePositionService { channelMapper.updateStreamGPS(gpsMsgInfoList); } - @Scheduled(fixedRate = 1000) + @Scheduled(fixedDelay = 1000) @Transactional public void executeTaskQueue() { int countLimit = 3000; diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java index 32dddddf..8fe40485 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java @@ -65,7 +65,7 @@ public class RedisGpsMsgListener implements MessageListener { /** * 定时将经纬度更新到数据库 */ - @Scheduled(fixedRate = 2 * 1000) //每2秒执行一次 + @Scheduled(fixedDelay = 2 * 1000) //每2秒执行一次 public void execute(){ List gpsMsgInfoList = redisCatchStorage.getAllGpsMsgInfo(); if (!gpsMsgInfoList.isEmpty()) {