diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index 4ce30a2a..3dd8fcbe 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -116,14 +116,17 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements logger.info("接收到消息:" + cmd); } } catch (DocumentException e) { - throw new RuntimeException(e); + logger.error("处理NOTIFY消息时错误", e); + } finally { + taskQueueHandlerRun = false; } } - taskQueueHandlerRun = false; }); } } catch (SipException | InvalidArgumentException | ParseException e) { e.printStackTrace(); + } finally { + taskQueueHandlerRun = false; } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java index ff71a922..a278c082 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java @@ -35,6 +35,9 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; +/** + * 目录查询的回复 + */ @Component public class CatalogResponseMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler { @@ -85,82 +88,83 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp // 回复200 OK try { responseAck(evt, Response.OK); + if (!taskQueueHandlerRun) { + taskQueueHandlerRun = true; + taskExecutor.execute(()-> { + while (!taskQueue.isEmpty()) { + HandlerCatchData take = taskQueue.poll(); + try { + Element rootElement = getRootElement(take.getEvt(), take.getDevice().getCharset()); + Element deviceListElement = rootElement.element("DeviceList"); + Element sumNumElement = rootElement.element("SumNum"); + Element snElement = rootElement.element("SN"); + if (snElement == null || sumNumElement == null || deviceListElement == null) { + responseAck(take.getEvt(), Response.BAD_REQUEST, "xml error"); + return; + } + int sumNum = Integer.parseInt(sumNumElement.getText()); + + if (sumNum == 0) { + logger.info("[收到通道]设备:{}的: 0个", take.getDevice().getDeviceId()); + // 数据已经完整接收 + storager.cleanChannelsForDevice(take.getDevice().getDeviceId()); + catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), null); + }else { + Iterator deviceListIterator = deviceListElement.elementIterator(); + if (deviceListIterator != null) { + List channelList = new ArrayList<>(); + // 遍历DeviceList + while (deviceListIterator.hasNext()) { + Element itemDevice = deviceListIterator.next(); + Element channelDeviceElement = itemDevice.element("DeviceID"); + if (channelDeviceElement == null) { + continue; + } + DeviceChannel deviceChannel = XmlUtil.channelContentHander(itemDevice, device, null); + deviceChannel.setDeviceId(take.getDevice().getDeviceId()); + + channelList.add(deviceChannel); + } + int sn = Integer.parseInt(snElement.getText()); + catalogDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, take.getDevice(), channelList); + logger.info("[收到通道]设备: {} -> {}个,{}/{}", take.getDevice().getDeviceId(), channelList.size(), catalogDataCatch.get(take.getDevice().getDeviceId()) == null ? 0 :catalogDataCatch.get(take.getDevice().getDeviceId()).size(), sumNum); + if (catalogDataCatch.get(take.getDevice().getDeviceId()).size() == sumNum) { + // 数据已经完整接收 + boolean resetChannelsResult = storager.resetChannels(take.getDevice().getDeviceId(), catalogDataCatch.get(take.getDevice().getDeviceId())); + if (!resetChannelsResult) { + String errorMsg = "接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(take.getDevice().getDeviceId()).size() + "条"; + catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), errorMsg); + }else { + catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), null); + } + } + } + + } + } catch (DocumentException e) { + e.printStackTrace(); + } catch (InvalidArgumentException e) { + e.printStackTrace(); + } catch (ParseException e) { + e.printStackTrace(); + } catch (SipException e) { + e.printStackTrace(); + } finally { + taskQueueHandlerRun = false; + } + } + }); + } } catch (SipException e) { throw new RuntimeException(e); } catch (InvalidArgumentException e) { throw new RuntimeException(e); } catch (ParseException e) { throw new RuntimeException(e); + } finally { + taskQueueHandlerRun = false; } - if (!taskQueueHandlerRun) { - taskQueueHandlerRun = true; - taskExecutor.execute(()-> { - while (!taskQueue.isEmpty()) { - HandlerCatchData take = taskQueue.poll(); - String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + take.getDevice().getDeviceId(); - Element rootElement = null; - try { - rootElement = getRootElement(take.getEvt(), take.getDevice().getCharset()); - Element deviceListElement = rootElement.element("DeviceList"); - Element sumNumElement = rootElement.element("SumNum"); - Element snElement = rootElement.element("SN"); - if (snElement == null || sumNumElement == null || deviceListElement == null) { - responseAck(take.getEvt(), Response.BAD_REQUEST, "xml error"); - return; - } - int sumNum = Integer.parseInt(sumNumElement.getText()); - if (sumNum == 0) { - logger.info("[收到通道]设备:{}的: 0个", take.getDevice().getDeviceId()); - // 数据已经完整接收 - storager.cleanChannelsForDevice(take.getDevice().getDeviceId()); - catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), null); - }else { - Iterator deviceListIterator = deviceListElement.elementIterator(); - if (deviceListIterator != null) { - List channelList = new ArrayList<>(); - // 遍历DeviceList - while (deviceListIterator.hasNext()) { - Element itemDevice = deviceListIterator.next(); - Element channelDeviceElement = itemDevice.element("DeviceID"); - if (channelDeviceElement == null) { - continue; - } - DeviceChannel deviceChannel = XmlUtil.channelContentHander(itemDevice, device, null); - deviceChannel.setDeviceId(take.getDevice().getDeviceId()); - - channelList.add(deviceChannel); - } - int sn = Integer.parseInt(snElement.getText()); - catalogDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, take.getDevice(), channelList); - logger.info("[收到通道]设备: {} -> {}个,{}/{}", take.getDevice().getDeviceId(), channelList.size(), catalogDataCatch.get(take.getDevice().getDeviceId()) == null ? 0 :catalogDataCatch.get(take.getDevice().getDeviceId()).size(), sumNum); - if (catalogDataCatch.get(take.getDevice().getDeviceId()).size() == sumNum) { - // 数据已经完整接收 - boolean resetChannelsResult = storager.resetChannels(take.getDevice().getDeviceId(), catalogDataCatch.get(take.getDevice().getDeviceId())); - if (!resetChannelsResult) { - String errorMsg = "接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(take.getDevice().getDeviceId()).size() + "条"; - catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), errorMsg); - }else { - catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), null); - } - } - } - - } - } catch (DocumentException e) { - e.printStackTrace(); - } catch (InvalidArgumentException e) { - e.printStackTrace(); - } catch (ParseException e) { - e.printStackTrace(); - } catch (SipException e) { - e.printStackTrace(); - } - } - taskQueueHandlerRun = false; - }); - - } } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java index 57e80454..79153198 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java @@ -76,8 +76,8 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent if (!taskQueueHandlerRun) { taskQueueHandlerRun = true; taskExecutor.execute(()->{ - try { - while (!taskQueue.isEmpty()) { + while (!taskQueue.isEmpty()) { + try { HandlerCatchData take = taskQueue.poll(); Element rootElementForCharset = getRootElement(take.getEvt(), take.getDevice().getCharset()); String sn = getText(rootElementForCharset, "SN"); @@ -141,10 +141,11 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent releaseRequest(take.getDevice().getDeviceId(), sn); } } + } catch (DocumentException e) { + throw new RuntimeException(e); + } finally { + taskQueueHandlerRun = false; } - taskQueueHandlerRun = false; - }catch (DocumentException e) { - throw new RuntimeException(e); } }); } @@ -155,6 +156,8 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent e.printStackTrace(); } catch (ParseException e) { e.printStackTrace(); + }finally { + taskQueueHandlerRun = false; } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java index 66f678a2..1d54b5cf 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java @@ -105,6 +105,7 @@ public class DeviceServiceImpl implements IDeviceService { redisCatchStorage.updateDevice(device); commander.deviceInfoQuery(device); sync(device); + // TODO 如果设备下的通道级联到了其他平台,那么需要发送事件或者notify给上级平台 }else { deviceMapper.update(device); redisCatchStorage.updateDevice(device);