From 35fd5eee63c368e72b7dfa84f5bb9ab3daed14df Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Thu, 21 Dec 2023 13:16:03 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E9=A2=84=E7=BD=AE=E4=BD=8D?= =?UTF-8?q?=E6=9F=A5=E8=AF=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/gb28181/bean/PresetData.java | 11 +- .../iot/vmp/gb28181/bean/PresetItem.java | 2 + .../vmp/gb28181/session/PresetDataCatch.java | 33 ++++- .../PresetQueryResponseMessageHandler.java | 139 ++++++++++-------- .../vmanager/gb28181/ptz/PtzController.java | 7 +- .../vmp/web/gb28181/ApiDeviceController.java | 4 +- 6 files changed, 121 insertions(+), 75 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/PresetData.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/PresetData.java index 2b96b1f7..3a22576b 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/PresetData.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/PresetData.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.bean; import java.time.Instant; import java.util.List; +import java.util.Map; /** * 预置位数据 @@ -13,7 +14,7 @@ public class PresetData { */ private int sn; private int total; - private List presetItemList; + private Map presetItems; private Instant lastTime; private String errorMsg; @@ -36,12 +37,12 @@ public class PresetData { this.total = total; } - public List getPresetItemList() { - return presetItemList; + public Map getPresetItems() { + return presetItems; } - public void setPresetItemList(List presetItemList) { - this.presetItemList = presetItemList; + public void setPresetItems(Map presetItems) { + this.presetItems = presetItems; } public Instant getLastTime() { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/PresetItem.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/PresetItem.java index 10218f76..60acf668 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/PresetItem.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/PresetItem.java @@ -20,4 +20,6 @@ public class PresetItem { public void setPresetName(String presetName) { this.presetName = presetName; } + + } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/PresetDataCatch.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/PresetDataCatch.java index ab096856..fe8839df 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/PresetDataCatch.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/PresetDataCatch.java @@ -3,7 +3,10 @@ package com.genersoft.iot.vmp.gb28181.session; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.service.IDeviceChannelService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @@ -19,6 +22,8 @@ import java.util.concurrent.TimeUnit; @Component public class PresetDataCatch { + private final Logger logger = LoggerFactory.getLogger(PresetDataCatch.class); + public static Map data = new ConcurrentHashMap<>(); @@ -30,7 +35,7 @@ public class PresetDataCatch { PresetData presetData = data.get(sn); if (presetData == null || presetData.getStatus().equals(DataStatus.end)) { presetData = new PresetData(); - presetData.setPresetItemList(Collections.synchronizedList(new ArrayList<>())); + presetData.setPresetItems(new ConcurrentHashMap<>()); presetData.setSn(sn); presetData.setStatus(DataStatus.ready); presetData.setLastTime(Instant.now()); @@ -44,7 +49,7 @@ public class PresetDataCatch { presetData = new PresetData(); presetData.setSn(sn); presetData.setTotal(total); - presetData.setPresetItemList(presetItemList); + presetData.setPresetItems(new ConcurrentHashMap<>()); presetData.setStatus(DataStatus.runIng); presetData.setLastTime(Instant.now()); data.put(sn, presetData); @@ -55,9 +60,22 @@ public class PresetDataCatch { } presetData.setTotal(total); presetData.setStatus(DataStatus.runIng); - presetData.getPresetItemList().addAll(presetItemList); presetData.setLastTime(Instant.now()); } + if (!presetItemList.isEmpty()) { + for (PresetItem presetItem : presetItemList) { + presetData.getPresetItems().put(presetItem.getPresetID(), presetItem); + } + } +// presetData.getPresetItems().sort((a, b) ->{ +// if (a.getPresetID() > b.getPresetID()) { +// return 1; +// }else if (a.getPresetID() < b.getPresetID()) { +// return -1; +// }else { +// return 0; +// } +// }); } public List get(int sn) { @@ -65,7 +83,7 @@ public class PresetDataCatch { if (presetData == null) { return null; } - return presetData.getPresetItemList(); + return new ArrayList<>(presetData.getPresetItems().values()); } public int getTotal(int sn) { @@ -82,7 +100,7 @@ public class PresetDataCatch { return null; } SyncStatus syncStatus = new SyncStatus(); - syncStatus.setCurrent(presetData.getPresetItemList().size()); + syncStatus.setCurrent(presetData.getPresetItems().size()); syncStatus.setTotal(presetData.getTotal()); syncStatus.setErrorMsg(presetData.getErrorMsg()); if (presetData.getStatus().equals(DataStatus.end)) { @@ -112,14 +130,15 @@ public class PresetDataCatch { PresetData presetData = data.get(sn); String key = DeferredResultHolder.CALLBACK_CMD_PRESETQUERY + sn; if ( presetData.getLastTime().isBefore(instantBefore5S)) { + logger.info("[预置位接收等待超时] 直接返回已经收到的数据, {}/{}", presetData.getPresetItems().size(), presetData.getTotal()); // 超过五秒收不到消息任务超时, 只更新这一部分数据, 收到数据与声明的总数一致,则重置通道数据,数据不全则只对收到的数据做更新操作 if (presetData.getStatus().equals(DataStatus.runIng)) { RequestMessage requestMessage = new RequestMessage(); requestMessage.setKey(key); - requestMessage.setData(presetData.getPresetItemList()); + requestMessage.setData(presetData.getPresetItems().values()); deferredResultHolder.invokeAllResult(requestMessage); - String errorMsg = "更新成功,共" + presetData.getTotal() + "条,已更新" + presetData.getPresetItemList().size() + "条"; + String errorMsg = "更新成功,共" + presetData.getTotal() + "条,已更新" + presetData.getPresetItems().size() + "条"; presetData.setErrorMsg(errorMsg); }else if (presetData.getStatus().equals(DataStatus.ready)) { String errorMsg = "同步失败,等待回复超时"; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/PresetQueryResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/PresetQueryResponseMessageHandler.java index 0917f26f..6d0430d2 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/PresetQueryResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/PresetQueryResponseMessageHandler.java @@ -16,6 +16,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import javax.sip.InvalidArgumentException; @@ -26,6 +28,8 @@ import java.text.ParseException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText; @@ -47,6 +51,12 @@ public class PresetQueryResponseMessageHandler extends SIPRequestProcessorParent @Autowired private PresetDataCatch presetDataCatch; + @Qualifier("taskExecutor") + @Autowired + private ThreadPoolTaskExecutor taskExecutor; + + private final ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); + private AtomicBoolean processing = new AtomicBoolean(false); @Override @@ -56,73 +66,80 @@ public class PresetQueryResponseMessageHandler extends SIPRequestProcessorParent @Override public void handForDevice(RequestEvent evt, Device device, Element element) { - SIPRequest request = (SIPRequest) evt.getRequest(); - String deviceId = SipUtils.getUserIdFromFromHeader(request); + taskQueue.offer(new HandlerCatchData(evt, device, element)); try { - Element rootElement = getRootElement(evt, device.getCharset()); + responseAck(request, Response.OK); + } catch (InvalidArgumentException | ParseException | SipException e) { + logger.error("[命令发送失败] 设备预置位查询应答处理: {}", e.getMessage()); + } + if (processing.compareAndSet(false, true)) { + taskExecutor.execute(() -> { + while (!taskQueue.isEmpty()) { + HandlerCatchData take = taskQueue.poll(); + Element rootElement = null; + try { + rootElement = getRootElement(take.getEvt(), take.getDevice().getCharset()); + } catch (DocumentException e) { + logger.error("[设备预置位查询回复] xml解析 失败: ", e); + continue; + } + if (rootElement == null) { + logger.warn("[ 设备预置位查询回复 ] content cannot be null, {}", evt.getRequest()); + continue; + } + Element presetListNumElement = rootElement.element("PresetList"); + if (presetListNumElement == null) { + logger.warn("[ 设备预置位查询回复 ] PresetList cannot be null, {}", evt.getRequest()); + return; + } + String snStr = getText(rootElement, "SN"); - if (rootElement == null) { - logger.warn("[ 设备预置位查询应答 ] content cannot be null, {}", evt.getRequest()); - try { - responseAck(request, Response.BAD_REQUEST); - } catch (InvalidArgumentException | ParseException | SipException e) { - logger.error("[命令发送失败] 设备预置位查询应答处理: {}", e.getMessage()); - } - return; - } - Element presetListNumElement = rootElement.element("PresetList"); - String snStr = getText(rootElement, "SN"); - //该字段可能为通道或则设备的id - String channelId = getText(rootElement, "DeviceID"); - if (channelId == null) { - - } - String key = DeferredResultHolder.CALLBACK_CMD_PRESETQUERY + deviceId + channelId; - if (snStr == null || presetListNumElement == null) { - try { - responseAck(request, Response.BAD_REQUEST, "xml error"); - } catch (InvalidArgumentException | ParseException | SipException e) { - logger.error("[命令发送失败] 设备预置位查询应答处理: {}", e.getMessage()); - } - return; - } - int sumNum = Integer.parseInt(presetListNumElement.attributeValue("Num")); - int sn = Integer.parseInt(snStr); - List presetItems = new ArrayList<>(); - if (sumNum == 0) { - presetDataCatch.setChannelSyncEnd(sn, null ); - }else { - for (Iterator presetIterator = presetListNumElement.elementIterator(); presetIterator.hasNext(); ) { - Element itemListElement = presetIterator.next(); - PresetItem presetItem = new PresetItem(); - for (Iterator itemListIterator = itemListElement.elementIterator(); itemListIterator.hasNext(); ) { - // 遍历item - Element itemOne = itemListIterator.next(); - String name = itemOne.getName(); - String textTrim = itemOne.getTextTrim(); - if ("PresetID".equalsIgnoreCase(name)) { - presetItem.setPresetID(Integer.parseInt(textTrim)); - } else { - presetItem.setPresetName(textTrim); + if (snStr == null ) { + logger.warn("[ 设备预置位查询回复 ] sn cannot be null, {}", evt.getRequest()); + return; + } + String key = DeferredResultHolder.CALLBACK_CMD_PRESETQUERY + snStr; + String totalStr = getText(rootElement, "SumNum"); + int sn = Integer.parseInt(snStr); + int sumNum = Integer.parseInt(totalStr); + List presetItems = new ArrayList<>(); + if (sumNum == 0) { + presetDataCatch.setChannelSyncEnd(sn, null ); + }else { + int i = 0,j = 0; + for (Iterator presetIterator = presetListNumElement.elementIterator(); presetIterator.hasNext(); ) { + Element itemListElement = presetIterator.next(); + PresetItem presetItem = new PresetItem(); + i++; + for (Iterator itemListIterator = itemListElement.elementIterator(); itemListIterator.hasNext(); ) { + // 遍历item + Element itemOne = itemListIterator.next(); + String name = itemOne.getName(); + String textTrim = itemOne.getTextTrim(); + if ("PresetID".equalsIgnoreCase(name)) { + presetItem.setPresetID(Integer.parseInt(textTrim)); + } else { + presetItem.setPresetName(textTrim); + } + presetItems.add(presetItem); + j++; + } + } + presetDataCatch.put(sn, sumNum, presetItems); + if (presetDataCatch.get(sn).size() == sumNum) { + logger.warn("[ 设备预置位查询成功 ] 共{}条", presetDataCatch.get(sn).size()); + RequestMessage requestMessage = new RequestMessage(); + requestMessage.setKey(key); + requestMessage.setId(sn + ""); + requestMessage.setData(presetDataCatch.get(sn)); + deferredResultHolder.invokeResult(requestMessage); + presetDataCatch.setChannelSyncEnd(sn, null); } } } - presetDataCatch.put(sn, sumNum, presetItems); - if (sumNum == presetDataCatch.get(sn).size()) { - RequestMessage requestMessage = new RequestMessage(); - requestMessage.setKey(key); - requestMessage.setData(presetDataCatch.get(sn)); - deferredResultHolder.invokeAllResult(requestMessage); - } - } - try { - responseAck(request, Response.OK); - } catch (InvalidArgumentException | ParseException | SipException e) { - logger.error("[命令发送失败] 设备预置位查询应答处理: {}", e.getMessage()); - } - } catch (DocumentException e) { - logger.error("[解析xml]失败: ", e); + processing.set(false); + }); } } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/ptz/PtzController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/ptz/PtzController.java index 07906edc..4717146d 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/ptz/PtzController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/ptz/PtzController.java @@ -4,6 +4,7 @@ package com.genersoft.iot.vmp.vmanager.gb28181.ptz; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.PresetItem; +import com.genersoft.iot.vmp.gb28181.session.PresetDataCatch; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; @@ -44,6 +45,9 @@ public class PtzController { @Autowired private DeferredResultHolder resultHolder; + @Autowired + private PresetDataCatch presetDataCatch; + /*** * 云台控制 * @param deviceId 设备id @@ -153,7 +157,7 @@ public class PtzController { int sn = SipUtils.getNewSn(); String msgId = sn + ""; String key = DeferredResultHolder.CALLBACK_CMD_PRESETQUERY + sn; - DeferredResult> result = new DeferredResult<> (3 * 1000L); + DeferredResult> result = new DeferredResult<> (30 * 1000L); result.onTimeout(()->{ logger.warn(String.format("获取设备预置位超时")); // 释放rtpserver @@ -169,6 +173,7 @@ public class PtzController { } resultHolder.put(key, msgId, result); try { + presetDataCatch.addReady(sn); cmder.presetQuery(device, channelId, sn, event -> { RequestMessage msg = new RequestMessage(); msg.setId(msgId); diff --git a/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiDeviceController.java b/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiDeviceController.java index 31f81f45..495b0c49 100644 --- a/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiDeviceController.java +++ b/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiDeviceController.java @@ -5,6 +5,7 @@ import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.PresetQuerySipReq; +import com.genersoft.iot.vmp.gb28181.session.PresetDataCatch; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; @@ -46,7 +47,7 @@ public class ApiDeviceController { @Autowired private SIPCommander cmder; @Autowired - private IDeviceService deviceService; + private PresetDataCatch presetDataCatch; @Autowired private DeferredResultHolder resultHolder; @@ -233,6 +234,7 @@ public class ApiDeviceController { resultHolder.put(key, msgId, deferredResultEx); try { + presetDataCatch.addReady(sn); cmder.presetQuery(device, code, sn, event -> { RequestMessage msg = new RequestMessage(); msg.setId(msgId);