优化notify消息中目录的处理

2.7.0
648540858 2024-04-24 16:12:42 +08:00
parent c21d973977
commit 44aa37ad6e
1 changed files with 154 additions and 22 deletions

View File

@ -1,5 +1,6 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.Device;
@ -22,6 +23,7 @@ import org.springframework.transaction.annotation.Transactional;
import javax.sip.RequestEvent; import javax.sip.RequestEvent;
import javax.sip.header.FromHeader; import javax.sip.header.FromHeader;
import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -37,12 +39,13 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForCatalogProcessor.class); private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForCatalogProcessor.class);
private final List<DeviceChannel> updateChannelOnlineList = new CopyOnWriteArrayList<>();
private final List<DeviceChannel> updateChannelOfflineList = new CopyOnWriteArrayList<>();
private final Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>(); private final Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>();
private final Map<String, DeviceChannel> addChannelMap = new ConcurrentHashMap<>(); private final Map<String, DeviceChannel> addChannelMap = new ConcurrentHashMap<>();
private final List<DeviceChannel> deleteChannelList = new CopyOnWriteArrayList<>(); private final List<DeviceChannel> deleteChannelList = new CopyOnWriteArrayList<>();
@Autowired @Autowired
private UserSetting userSetting; private UserSetting userSetting;
@ -55,6 +58,9 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
@Autowired @Autowired
private IDeviceChannelService deviceChannelService; private IDeviceChannelService deviceChannelService;
@Autowired
private DynamicTask dynamicTask;
@Autowired @Autowired
private SipConfig sipConfig; private SipConfig sipConfig;
@ -71,7 +77,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
Device device = redisCatchStorage.getDevice(deviceId); Device device = redisCatchStorage.getDevice(deviceId);
if (device == null || !device.isOnLine()) { if (device == null || !device.isOnLine()) {
logger.warn("[收到目录订阅]{}, 但是设备已经离线", (device != null ? device.getDeviceId():"" )); logger.warn("[收到目录订阅]{}, 但是设备已经离线", (device != null ? device.getDeviceId() : ""));
return; return;
} }
Element rootElement = getRootElement(evt, device.getCharset()); Element rootElement = getRootElement(evt, device.getCharset());
@ -92,9 +98,9 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
Element eventElement = itemDevice.element("Event"); Element eventElement = itemDevice.element("Event");
String event; String event;
if (eventElement == null) { if (eventElement == null) {
logger.warn("[收到目录订阅]{}, 但是Event为空, 设为默认值 ADD", (device != null ? device.getDeviceId():"" )); logger.warn("[收到目录订阅]{}, 但是Event为空, 设为默认值 ADD", (device != null ? device.getDeviceId() : ""));
event = CatalogEvent.ADD; event = CatalogEvent.ADD;
}else { } else {
event = eventElement.getText().toUpperCase(); event = eventElement.getText().toUpperCase();
} }
DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event); DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event);
@ -106,61 +112,187 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
channel.setParentId(null); channel.setParentId(null);
} }
channel.setDeviceId(device.getDeviceId()); channel.setDeviceId(device.getDeviceId());
logger.info("[收到目录订阅]{}, {}/{}",event, device.getDeviceId(), channel.getChannelId()); logger.info("[收到目录订阅]{}/{}", device.getDeviceId(), channel.getChannelId());
switch (event) { switch (event) {
case CatalogEvent.ON: case CatalogEvent.ON:
// 上线 // 上线
deviceChannelService.online(channel); logger.info("[收到通道上线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
updateChannelOnlineList.add(channel);
if (userSetting.getDeviceStatusNotify()) { if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息 // 发送redis消息
redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), true); redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), true);
} }
break; break;
case CatalogEvent.OFF : case CatalogEvent.OFF:
case CatalogEvent.VLOST:
case CatalogEvent.DEFECT:
// 离线 // 离线
logger.info("[收到通道离线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
if (userSetting.getRefuseChannelStatusChannelFormNotify()) { if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
logger.info("[目录订阅] 离线 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); logger.info("[收到通道离线通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
}else { } else {
deviceChannelService.offline(channel); updateChannelOfflineList.add(channel);
if (userSetting.getDeviceStatusNotify()) { if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息 // 发送redis消息
redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
} }
} }
break;
case CatalogEvent.VLOST:
// 视频丢失
logger.info("[收到通道视频丢失通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
logger.info("[收到通道视频丢失通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
} else {
updateChannelOfflineList.add(channel);
if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息
redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
}
}
break;
case CatalogEvent.DEFECT:
// 故障
logger.info("[收到通道视频故障通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
logger.info("[收到通道视频故障通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
} else {
updateChannelOfflineList.add(channel);
if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息
redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false);
}
}
break;
case CatalogEvent.ADD:
// 增加
logger.info("[收到增加通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
// 判断此通道是否存在
DeviceChannel deviceChannel = deviceChannelService.getOne(deviceId, channel.getChannelId());
if (deviceChannel != null) {
logger.info("[增加通道] 已存在,不发送通知只更新,设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
channel.setId(deviceChannel.getId());
channel.setHasAudio(null);
updateChannelMap.put(channel.getChannelId(), channel);
} else {
addChannelMap.put(channel.getChannelId(), channel);
if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息
redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true);
}
}
break; break;
case CatalogEvent.DEL: case CatalogEvent.DEL:
// 删除 // 删除
deviceChannelService.delete(channel); logger.info("[收到删除通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
deleteChannelList.add(channel);
if (userSetting.getDeviceStatusNotify()) { if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息 // 发送redis消息
redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), false); redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), false);
} }
break; break;
case CatalogEvent.ADD:
case CatalogEvent.UPDATE: case CatalogEvent.UPDATE:
// 更新 // 更新
logger.info("[收到更新通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
// 判断此通道是否存在
DeviceChannel deviceChannelForUpdate = deviceChannelService.getOne(deviceId, channel.getChannelId());
if (deviceChannelForUpdate != null) {
channel.setId(deviceChannelForUpdate.getId());
channel.setUpdateTime(DateUtil.getNow()); channel.setUpdateTime(DateUtil.getNow());
channel.setHasAudio(null); channel.setHasAudio(null);
deviceChannelService.updateChannel(deviceId,channel); updateChannelMap.put(channel.getChannelId(), channel);
} else {
addChannelMap.put(channel.getChannelId(), channel);
if (userSetting.getDeviceStatusNotify()) { if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息 // 发送redis消息
redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true); redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true);
} }
}
break; break;
default: default:
logger.warn("[ NotifyCatalog ] event not found {}", event ); logger.warn("[ NotifyCatalog ] event not found {}", event);
} }
// 转发变化信息 // 转发变化信息
eventPublisher.catalogEventPublish(null, channel, event); eventPublisher.catalogEventPublish(null, channel, event);
} }
} }
} catch (DocumentException e) { } catch (DocumentException e) {
logger.error("未处理的异常 ", e); logger.error("未处理的异常 ", e);
} }
} }
if (!updateChannelMap.keySet().isEmpty()
|| !addChannelMap.keySet().isEmpty()
|| !updateChannelOnlineList.isEmpty()
|| !updateChannelOfflineList.isEmpty()
|| !deleteChannelList.isEmpty()) {
executeSave();
}
}
public void executeSave(){
try {
executeSaveForAdd();
} catch (Exception e) {
logger.error("[存储收到的增加通道] 异常: ", e );
}
try {
executeSaveForOnline();
} catch (Exception e) {
logger.error("[存储收到的通道上线] 异常: ", e );
}
try {
executeSaveForOffline();
} catch (Exception e) {
logger.error("[存储收到的通道离线] 异常: ", e );
}
try {
executeSaveForUpdate();
} catch (Exception e) {
logger.error("[存储收到的更新通道] 异常: ", e );
}
try {
executeSaveForDelete();
} catch (Exception e) {
logger.error("[存储收到的删除通道] 异常: ", e );
}
}
private void executeSaveForUpdate(){
if (!updateChannelMap.values().isEmpty()) {
logger.info("[存储收到的更新通道], 数量: {}", updateChannelMap.size());
ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values());
deviceChannelService.batchUpdateChannel(deviceChannels);
updateChannelMap.clear();
}
}
private void executeSaveForAdd(){
if (!addChannelMap.values().isEmpty()) {
ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(addChannelMap.values());
addChannelMap.clear();
deviceChannelService.batchAddChannel(deviceChannels);
}
}
private void executeSaveForDelete(){
if (!deleteChannelList.isEmpty()) {
deviceChannelService.deleteChannels(deleteChannelList);
deleteChannelList.clear();
}
}
private void executeSaveForOnline(){
if (!updateChannelOnlineList.isEmpty()) {
deviceChannelService.channelsOnline(updateChannelOnlineList);
updateChannelOnlineList.clear();
}
}
private void executeSaveForOffline(){
if (!updateChannelOfflineList.isEmpty()) {
deviceChannelService.channelsOffline(updateChannelOfflineList);
updateChannelOfflineList.clear();
}
} }
} }