diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java index 4d261897..a0ce17c0 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java @@ -1,5 +1,6 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; +import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.Device; @@ -22,6 +23,7 @@ import org.springframework.transaction.annotation.Transactional; import javax.sip.RequestEvent; import javax.sip.header.FromHeader; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -37,12 +39,13 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForCatalogProcessor.class); + private final List updateChannelOnlineList = new CopyOnWriteArrayList<>(); + private final List updateChannelOfflineList = new CopyOnWriteArrayList<>(); private final Map updateChannelMap = new ConcurrentHashMap<>(); private final Map addChannelMap = new ConcurrentHashMap<>(); private final List deleteChannelList = new CopyOnWriteArrayList<>(); - @Autowired private UserSetting userSetting; @@ -55,6 +58,9 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent @Autowired private IDeviceChannelService deviceChannelService; + @Autowired + private DynamicTask dynamicTask; + @Autowired private SipConfig sipConfig; @@ -71,7 +77,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent Device device = redisCatchStorage.getDevice(deviceId); if (device == null || !device.isOnLine()) { - logger.warn("[收到目录订阅]:{}, 但是设备已经离线", (device != null ? device.getDeviceId():"" )); + logger.warn("[收到目录订阅]:{}, 但是设备已经离线", (device != null ? device.getDeviceId() : "")); return; } Element rootElement = getRootElement(evt, device.getCharset()); @@ -92,9 +98,9 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent Element eventElement = itemDevice.element("Event"); String event; if (eventElement == null) { - logger.warn("[收到目录订阅]:{}, 但是Event为空, 设为默认值 ADD", (device != null ? device.getDeviceId():"" )); + logger.warn("[收到目录订阅]:{}, 但是Event为空, 设为默认值 ADD", (device != null ? device.getDeviceId() : "")); event = CatalogEvent.ADD; - }else { + } else { event = eventElement.getText().toUpperCase(); } DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event); @@ -106,61 +112,187 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent channel.setParentId(null); } channel.setDeviceId(device.getDeviceId()); - logger.info("[收到目录订阅]:{}, {}/{}",event, device.getDeviceId(), channel.getChannelId()); + logger.info("[收到目录订阅]:{}/{}", device.getDeviceId(), channel.getChannelId()); switch (event) { case CatalogEvent.ON: // 上线 - deviceChannelService.online(channel); + logger.info("[收到通道上线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + updateChannelOnlineList.add(channel); if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), true); } - break; - case CatalogEvent.OFF : - case CatalogEvent.VLOST: - case CatalogEvent.DEFECT: + case CatalogEvent.OFF: // 离线 + logger.info("[收到通道离线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); if (userSetting.getRefuseChannelStatusChannelFormNotify()) { - logger.info("[目录订阅] 离线 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); - }else { - deviceChannelService.offline(channel); + logger.info("[收到通道离线通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + } else { + updateChannelOfflineList.add(channel); if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); } } + break; + case CatalogEvent.VLOST: + // 视频丢失 + logger.info("[收到通道视频丢失通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + if (userSetting.getRefuseChannelStatusChannelFormNotify()) { + logger.info("[收到通道视频丢失通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + } else { + updateChannelOfflineList.add(channel); + if (userSetting.getDeviceStatusNotify()) { + // 发送redis消息 + redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); + } + } + break; + case CatalogEvent.DEFECT: + // 故障 + logger.info("[收到通道视频故障通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + if (userSetting.getRefuseChannelStatusChannelFormNotify()) { + logger.info("[收到通道视频故障通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + } else { + updateChannelOfflineList.add(channel); + if (userSetting.getDeviceStatusNotify()) { + // 发送redis消息 + redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); + } + } + break; + case CatalogEvent.ADD: + // 增加 + logger.info("[收到增加通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + // 判断此通道是否存在 + DeviceChannel deviceChannel = deviceChannelService.getOne(deviceId, channel.getChannelId()); + if (deviceChannel != null) { + logger.info("[增加通道] 已存在,不发送通知只更新,设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + channel.setId(deviceChannel.getId()); + channel.setHasAudio(null); + updateChannelMap.put(channel.getChannelId(), channel); + } else { + addChannelMap.put(channel.getChannelId(), channel); + if (userSetting.getDeviceStatusNotify()) { + // 发送redis消息 + redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true); + } + } + break; case CatalogEvent.DEL: // 删除 - deviceChannelService.delete(channel); + logger.info("[收到删除通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + deleteChannelList.add(channel); if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), false); } break; - case CatalogEvent.ADD: case CatalogEvent.UPDATE: // 更新 - channel.setUpdateTime(DateUtil.getNow()); - channel.setHasAudio(null); - deviceChannelService.updateChannel(deviceId,channel); - if (userSetting.getDeviceStatusNotify()) { - // 发送redis消息 - redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true); + logger.info("[收到更新通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + // 判断此通道是否存在 + DeviceChannel deviceChannelForUpdate = deviceChannelService.getOne(deviceId, channel.getChannelId()); + if (deviceChannelForUpdate != null) { + channel.setId(deviceChannelForUpdate.getId()); + channel.setUpdateTime(DateUtil.getNow()); + channel.setHasAudio(null); + updateChannelMap.put(channel.getChannelId(), channel); + } else { + addChannelMap.put(channel.getChannelId(), channel); + if (userSetting.getDeviceStatusNotify()) { + // 发送redis消息 + redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true); + } } break; default: - logger.warn("[ NotifyCatalog ] event not found : {}", event ); + logger.warn("[ NotifyCatalog ] event not found : {}", event); } // 转发变化信息 eventPublisher.catalogEventPublish(null, channel, event); } } + } catch (DocumentException e) { logger.error("未处理的异常 ", e); } } + if (!updateChannelMap.keySet().isEmpty() + || !addChannelMap.keySet().isEmpty() + || !updateChannelOnlineList.isEmpty() + || !updateChannelOfflineList.isEmpty() + || !deleteChannelList.isEmpty()) { + executeSave(); + } + } + + public void executeSave(){ + try { + executeSaveForAdd(); + } catch (Exception e) { + logger.error("[存储收到的增加通道] 异常: ", e ); + } + try { + executeSaveForOnline(); + } catch (Exception e) { + logger.error("[存储收到的通道上线] 异常: ", e ); + } + try { + executeSaveForOffline(); + } catch (Exception e) { + logger.error("[存储收到的通道离线] 异常: ", e ); + } + try { + executeSaveForUpdate(); + } catch (Exception e) { + logger.error("[存储收到的更新通道] 异常: ", e ); + } + try { + executeSaveForDelete(); + } catch (Exception e) { + logger.error("[存储收到的删除通道] 异常: ", e ); + } + } + + private void executeSaveForUpdate(){ + if (!updateChannelMap.values().isEmpty()) { + logger.info("[存储收到的更新通道], 数量: {}", updateChannelMap.size()); + ArrayList deviceChannels = new ArrayList<>(updateChannelMap.values()); + deviceChannelService.batchUpdateChannel(deviceChannels); + updateChannelMap.clear(); + } + } + + private void executeSaveForAdd(){ + if (!addChannelMap.values().isEmpty()) { + ArrayList deviceChannels = new ArrayList<>(addChannelMap.values()); + addChannelMap.clear(); + deviceChannelService.batchAddChannel(deviceChannels); + } + } + + private void executeSaveForDelete(){ + if (!deleteChannelList.isEmpty()) { + deviceChannelService.deleteChannels(deleteChannelList); + deleteChannelList.clear(); + } + } + + private void executeSaveForOnline(){ + if (!updateChannelOnlineList.isEmpty()) { + deviceChannelService.channelsOnline(updateChannelOnlineList); + updateChannelOnlineList.clear(); + } + } + + private void executeSaveForOffline(){ + if (!updateChannelOfflineList.isEmpty()) { + deviceChannelService.channelsOffline(updateChannelOfflineList); + updateChannelOfflineList.clear(); + } } }