Merge branch 'wvp-28181-2.0' into 结构优化
# Conflicts: # src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java结构优化
commit
9b1c6c46a8
|
@ -14,6 +14,7 @@ import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
|
||||||
import com.genersoft.iot.vmp.service.ICommonGbChannelService;
|
import com.genersoft.iot.vmp.service.ICommonGbChannelService;
|
||||||
import com.genersoft.iot.vmp.service.IDeviceChannelService;
|
import com.genersoft.iot.vmp.service.IDeviceChannelService;
|
||||||
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
||||||
|
import com.genersoft.iot.vmp.utils.DateUtil;
|
||||||
import org.dom4j.DocumentException;
|
import org.dom4j.DocumentException;
|
||||||
import org.dom4j.Element;
|
import org.dom4j.Element;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -189,6 +190,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
|
||||||
// 判断此通道是否存在
|
// 判断此通道是否存在
|
||||||
DeviceChannel deviceChannel = deviceChannelService.getOne(deviceId, channel.getChannelId());
|
DeviceChannel deviceChannel = deviceChannelService.getOne(deviceId, channel.getChannelId());
|
||||||
if (deviceChannel != null) {
|
if (deviceChannel != null) {
|
||||||
|
logger.info("[增加通道] 已存在,不发送通知只更新,设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
|
||||||
channel.setId(deviceChannel.getId());
|
channel.setId(deviceChannel.getId());
|
||||||
updateChannelMap.put(channel.getChannelId(), channel);
|
updateChannelMap.put(channel.getChannelId(), channel);
|
||||||
if (updateChannelMap.keySet().size() > 300) {
|
if (updateChannelMap.keySet().size() > 300) {
|
||||||
|
@ -226,6 +228,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
|
||||||
DeviceChannel deviceChannelForUpdate = deviceChannelService.getOne(deviceId, channel.getChannelId());
|
DeviceChannel deviceChannelForUpdate = deviceChannelService.getOne(deviceId, channel.getChannelId());
|
||||||
if (deviceChannelForUpdate != null) {
|
if (deviceChannelForUpdate != null) {
|
||||||
channel.setId(deviceChannelForUpdate.getId());
|
channel.setId(deviceChannelForUpdate.getId());
|
||||||
|
channel.setUpdateTime(DateUtil.getNow());
|
||||||
updateChannelMap.put(channel.getChannelId(), channel);
|
updateChannelMap.put(channel.getChannelId(), channel);
|
||||||
if (updateChannelMap.keySet().size() > 300) {
|
if (updateChannelMap.keySet().size() > 300) {
|
||||||
executeSaveForUpdate(device);
|
executeSaveForUpdate(device);
|
||||||
|
@ -246,11 +249,11 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (updateChannelMap.keySet().size() > 0
|
if (!updateChannelMap.keySet().isEmpty()
|
||||||
|| addChannelMap.keySet().size() > 0
|
|| !addChannelMap.keySet().isEmpty()
|
||||||
|| updateChannelOnlineList.size() > 0
|
|| !updateChannelOnlineList.isEmpty()
|
||||||
|| updateChannelOfflineList.size() > 0
|
|| !updateChannelOfflineList.isEmpty()
|
||||||
|| deleteChannelList.size() > 0) {
|
|| !deleteChannelList.isEmpty()) {
|
||||||
|
|
||||||
if (!dynamicTask.contains(talkKey)) {
|
if (!dynamicTask.contains(talkKey)) {
|
||||||
dynamicTask.startDelay(talkKey, ()-> executeSave(device), 1000);
|
dynamicTask.startDelay(talkKey, ()-> executeSave(device), 1000);
|
||||||
|
@ -264,11 +267,31 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
|
||||||
}
|
}
|
||||||
|
|
||||||
private void executeSave(Device device){
|
private void executeSave(Device device){
|
||||||
|
try {
|
||||||
executeSaveForAdd(device);
|
executeSaveForAdd(device);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("[存储收到的增加通道] 异常: ", e );
|
||||||
|
}
|
||||||
|
try {
|
||||||
executeSaveForUpdate(device);
|
executeSaveForUpdate(device);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("[存储收到的更新通道] 异常: ", e );
|
||||||
|
}
|
||||||
|
try {
|
||||||
executeSaveForDelete(device);
|
executeSaveForDelete(device);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("[存储收到的删除通道] 异常: ", e );
|
||||||
|
}
|
||||||
|
try {
|
||||||
executeSaveForOnline(device);
|
executeSaveForOnline(device);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("[存储收到的通道上线] 异常: ", e );
|
||||||
|
}
|
||||||
|
try {
|
||||||
executeSaveForOffline(device);
|
executeSaveForOffline(device);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("[存储收到的通道离线] 异常: ", e );
|
||||||
|
}
|
||||||
dynamicTask.stop(talkKey);
|
dynamicTask.stop(talkKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -58,11 +58,12 @@ public class RedisGpsMsgListener implements MessageListener {
|
||||||
Message msg = taskQueue.poll();
|
Message msg = taskQueue.poll();
|
||||||
try {
|
try {
|
||||||
GPSMsgInfo gpsMsgInfo = JSON.parseObject(msg.getBody(), GPSMsgInfo.class);
|
GPSMsgInfo gpsMsgInfo = JSON.parseObject(msg.getBody(), GPSMsgInfo.class);
|
||||||
|
logger.info("[REDIS的位置变化通知], {}", JSON.toJSONString(gpsMsgInfo));
|
||||||
// 只是放入redis缓存起来
|
// 只是放入redis缓存起来
|
||||||
redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo);
|
redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo);
|
||||||
}catch (Exception e) {
|
}catch (Exception e) {
|
||||||
logger.warn("[REDIS的ALARM通知] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));
|
logger.warn("[REDIS的位置变化通知] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));
|
||||||
logger.error("[REDIS的ALARM通知] 异常内容: ", e);
|
logger.error("[REDIS的位置变化通知] 异常内容: ", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
Loading…
Reference in New Issue