优化目录订阅处理,保证存储异常时不影响后续操作

pull/1242/head
648540858 2024-01-08 15:03:07 +08:00
parent f3acf33fbf
commit a74fb6ecd6
1 changed files with 38 additions and 15 deletions

View File

@ -13,6 +13,7 @@ import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
import com.genersoft.iot.vmp.service.IDeviceChannelService; import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
import org.dom4j.DocumentException; import org.dom4j.DocumentException;
import org.dom4j.Element; import org.dom4j.Element;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -185,6 +186,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
// 判断此通道是否存在 // 判断此通道是否存在
DeviceChannel deviceChannel = deviceChannelService.getOne(deviceId, channel.getChannelId()); DeviceChannel deviceChannel = deviceChannelService.getOne(deviceId, channel.getChannelId());
if (deviceChannel != null) { if (deviceChannel != null) {
logger.info("[增加通道] 已存在,不发送通知只更新,设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
channel.setId(deviceChannel.getId()); channel.setId(deviceChannel.getId());
updateChannelMap.put(channel.getChannelId(), channel); updateChannelMap.put(channel.getChannelId(), channel);
if (updateChannelMap.keySet().size() > 300) { if (updateChannelMap.keySet().size() > 300) {
@ -222,6 +224,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
DeviceChannel deviceChannelForUpdate = deviceChannelService.getOne(deviceId, channel.getChannelId()); DeviceChannel deviceChannelForUpdate = deviceChannelService.getOne(deviceId, channel.getChannelId());
if (deviceChannelForUpdate != null) { if (deviceChannelForUpdate != null) {
channel.setId(deviceChannelForUpdate.getId()); channel.setId(deviceChannelForUpdate.getId());
channel.setUpdateTime(DateUtil.getNow());
updateChannelMap.put(channel.getChannelId(), channel); updateChannelMap.put(channel.getChannelId(), channel);
if (updateChannelMap.keySet().size() > 300) { if (updateChannelMap.keySet().size() > 300) {
executeSaveForUpdate(); executeSaveForUpdate();
@ -244,11 +247,11 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
// 转发变化信息 // 转发变化信息
eventPublisher.catalogEventPublish(null, channel, event); eventPublisher.catalogEventPublish(null, channel, event);
if (updateChannelMap.keySet().size() > 0 if (!updateChannelMap.keySet().isEmpty()
|| addChannelMap.keySet().size() > 0 || !addChannelMap.keySet().isEmpty()
|| updateChannelOnlineList.size() > 0 || !updateChannelOnlineList.isEmpty()
|| updateChannelOfflineList.size() > 0 || !updateChannelOfflineList.isEmpty()
|| deleteChannelList.size() > 0) { || !deleteChannelList.isEmpty()) {
if (!dynamicTask.contains(talkKey)) { if (!dynamicTask.contains(talkKey)) {
dynamicTask.startDelay(talkKey, this::executeSave, 1000); dynamicTask.startDelay(talkKey, this::executeSave, 1000);
@ -262,16 +265,36 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
} }
private void executeSave(){ private void executeSave(){
executeSaveForAdd(); try {
executeSaveForUpdate(); executeSaveForAdd();
executeSaveForDelete(); } catch (Exception e) {
executeSaveForOnline(); logger.error("[存储收到的增加通道] 异常: ", e );
executeSaveForOffline(); }
try {
executeSaveForUpdate();
} catch (Exception e) {
logger.error("[存储收到的更新通道] 异常: ", e );
}
try {
executeSaveForDelete();
} catch (Exception e) {
logger.error("[存储收到的删除通道] 异常: ", e );
}
try {
executeSaveForOnline();
} catch (Exception e) {
logger.error("[存储收到的通道上线] 异常: ", e );
}
try {
executeSaveForOffline();
} catch (Exception e) {
logger.error("[存储收到的通道离线] 异常: ", e );
}
dynamicTask.stop(talkKey); dynamicTask.stop(talkKey);
} }
private void executeSaveForUpdate(){ private void executeSaveForUpdate(){
if (updateChannelMap.values().size() > 0) { if (!updateChannelMap.values().isEmpty()) {
ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values()); ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values());
updateChannelMap.clear(); updateChannelMap.clear();
deviceChannelService.batchUpdateChannel(deviceChannels); deviceChannelService.batchUpdateChannel(deviceChannels);
@ -280,7 +303,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
} }
private void executeSaveForAdd(){ private void executeSaveForAdd(){
if (addChannelMap.values().size() > 0) { if (!addChannelMap.values().isEmpty()) {
ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(addChannelMap.values()); ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(addChannelMap.values());
addChannelMap.clear(); addChannelMap.clear();
deviceChannelService.batchAddChannel(deviceChannels); deviceChannelService.batchAddChannel(deviceChannels);
@ -288,21 +311,21 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
} }
private void executeSaveForDelete(){ private void executeSaveForDelete(){
if (deleteChannelList.size() > 0) { if (!deleteChannelList.isEmpty()) {
deviceChannelService.deleteChannels(deleteChannelList); deviceChannelService.deleteChannels(deleteChannelList);
deleteChannelList.clear(); deleteChannelList.clear();
} }
} }
private void executeSaveForOnline(){ private void executeSaveForOnline(){
if (updateChannelOnlineList.size() > 0) { if (!updateChannelOnlineList.isEmpty()) {
deviceChannelService.channelsOnline(updateChannelOnlineList); deviceChannelService.channelsOnline(updateChannelOnlineList);
updateChannelOnlineList.clear(); updateChannelOnlineList.clear();
} }
} }
private void executeSaveForOffline(){ private void executeSaveForOffline(){
if (updateChannelOfflineList.size() > 0) { if (!updateChannelOfflineList.isEmpty()) {
deviceChannelService.channelsOffline(updateChannelOfflineList); deviceChannelService.channelsOffline(updateChannelOfflineList);
updateChannelOfflineList.clear(); updateChannelOfflineList.clear();
} }