修复[BUG]国标级联选订阅下级平台后,上级平台接收到 设备目录通知处理触发redis 消息后没有后续处理 问题.详见 https://github.com/648540858/wvp-GB28181-pro/issues/967

pull/968/head
che_shuai 2023-07-26 16:22:53 +08:00
parent 59d8f2f915
commit 06143eef4a
2 changed files with 232 additions and 38 deletions

View File

@ -16,61 +16,63 @@ import org.springframework.data.redis.listener.RedisMessageListenerContainer;
* @description:Redis使spring-data-redisapplication.ymlredis * @description:Redis使spring-data-redisapplication.ymlredis
* @author: swwheihei * @author: swwheihei
* @date: 2019530 10:58:25 * @date: 2019530 10:58:25
*
*/ */
@Configuration @Configuration
@Order(value=1) @Order(value = 1)
public class RedisMsgListenConfig { public class RedisMsgListenConfig {
@Autowired @Autowired
private RedisGpsMsgListener redisGPSMsgListener; private RedisGpsMsgListener redisGPSMsgListener;
@Autowired @Autowired
private RedisAlarmMsgListener redisAlarmMsgListener; private RedisAlarmMsgListener redisAlarmMsgListener;
@Autowired @Autowired
private RedisStreamMsgListener redisStreamMsgListener; private RedisStreamMsgListener redisStreamMsgListener;
@Autowired @Autowired
private RedisGbPlayMsgListener redisGbPlayMsgListener; private RedisGbPlayMsgListener redisGbPlayMsgListener;
@Autowired @Autowired
private RedisPushStreamStatusMsgListener redisPushStreamStatusMsgListener; private RedisPushStreamStatusMsgListener redisPushStreamStatusMsgListener;
@Autowired @Autowired
private RedisPushStreamStatusListMsgListener redisPushStreamListMsgListener; private RedisPushStreamStatusListMsgListener redisPushStreamListMsgListener;
@Autowired @Autowired
private RedisPushStreamResponseListener redisPushStreamResponseListener; private RedisPushStreamResponseListener redisPushStreamResponseListener;
@Autowired @Autowired
private RedisCloseStreamMsgListener redisCloseStreamMsgListener; private RedisCloseStreamMsgListener redisCloseStreamMsgListener;
@Autowired @Autowired
private RedisPushStreamCloseResponseListener redisPushStreamCloseResponseListener; private RedisPushStreamCloseResponseListener redisPushStreamCloseResponseListener;
@Autowired
private RedisDeviceStatusMsgListener redisDeviceStatusMsgListener;
/** /**
* redis redis * redis redis
* *
* *
* @param connectionFactory * @param connectionFactory
* @return * @return
*/ */
@Bean @Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) { RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer(); RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory); container.setConnectionFactory(connectionFactory);
container.addMessageListener(redisGPSMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_GPS)); container.addMessageListener(redisGPSMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_GPS));
container.addMessageListener(redisAlarmMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM_RECEIVE)); container.addMessageListener(redisAlarmMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM_RECEIVE));
container.addMessageListener(redisStreamMsgListener, new PatternTopic(VideoManagerConstants.WVP_MSG_STREAM_CHANGE_PREFIX + "PUSH")); container.addMessageListener(redisStreamMsgListener, new PatternTopic(VideoManagerConstants.WVP_MSG_STREAM_CHANGE_PREFIX + "PUSH"));
container.addMessageListener(redisGbPlayMsgListener, new PatternTopic(RedisGbPlayMsgListener.WVP_PUSH_STREAM_KEY)); container.addMessageListener(redisGbPlayMsgListener, new PatternTopic(RedisGbPlayMsgListener.WVP_PUSH_STREAM_KEY));
container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE)); container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE));
container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE)); container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE));
container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE)); container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE));
container.addMessageListener(redisCloseStreamMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE)); container.addMessageListener(redisCloseStreamMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE));
container.addMessageListener(redisPushStreamCloseResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED)); container.addMessageListener(redisPushStreamCloseResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED));
container.addMessageListener(redisDeviceStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_SUBSCRIBE_DEVICE_STATUS));
return container; return container;
} }
} }

View File

@ -0,0 +1,192 @@
package com.genersoft.iot.vmp.service.redisMsg;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* redis
* @author davidche
*/
@Component
public class RedisDeviceStatusMsgListener implements MessageListener {
private final static Logger logger = LoggerFactory.getLogger(RedisGpsMsgListener.class);
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private IDeviceChannelService deviceChannelService;
private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
@Qualifier("taskExecutor")
@Autowired
private ThreadPoolTaskExecutor taskExecutor;
@Autowired
private IVideoManagerStorage storager;
@Autowired
private IDeviceService deviceService;
@Autowired
private UserSetting userSetting;
/**
* redis
*
* @param message message must not be {@literal null}.
* @param pattern pattern matching the channel (if specified) - can be {@literal null}.
*/
@Override
public void onMessage(Message message, byte[] pattern) {
boolean isEmpty = taskQueue.isEmpty();
taskQueue.offer(message);
if (isEmpty) {
taskExecutor.execute(() -> {
while (!taskQueue.isEmpty()) {
Message msg = taskQueue.poll();
try {
DeviceStatusMsg deviceStatusMsg = praseMessage(new String(msg.getBody()));
// Device device = redisCatchStorage.getDevice(deviceStatusMsg.getDeviceId());
Device device = storager.queryVideoDevice(deviceStatusMsg.getDeviceId());
DeviceChannel channel = new DeviceChannel();
channel.setDeviceId(deviceStatusMsg.getDeviceId());
channel.setChannelId(deviceStatusMsg.getChannelId());
switch (deviceStatusMsg.getCmd()) {
case CatalogEvent.ON:
// 上线
logger.info("[收到REDIS的DEVICE通道上线通知] 来自设备: {}, 通道 {}", deviceStatusMsg.getDeviceId(), deviceStatusMsg.getChannelId());
storager.deviceChannelOnline(deviceStatusMsg.getDeviceId(), deviceStatusMsg.getChannelId());
break;
case CatalogEvent.OFF:
// 离线
logger.info("[收到REDIS的DEVICE通道离线通知] 来自设备: {}, 通道 {}", deviceStatusMsg.getDeviceId(), deviceStatusMsg.getChannelId());
if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
storager.deviceChannelOffline(deviceStatusMsg.getDeviceId(), deviceStatusMsg.getChannelId());
} else {
logger.info("[收到REDIS的DEVICE通道离线通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", deviceStatusMsg.getDeviceId(), deviceStatusMsg.getChannelId());
}
break;
case CatalogEvent.ADD:
// 增加
logger.info("[收到REDIS的DEVICE增加通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), deviceStatusMsg.getChannelId());
deviceChannelService.updateChannel(deviceStatusMsg.getDeviceId(), channel);
deviceService.sync(device);
break;
case "DELETE":
// 删除
logger.info("[收到REDIS的DEVICE删除通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), deviceStatusMsg.getChannelId());
storager.delChannel(deviceStatusMsg.getDeviceId(), deviceStatusMsg.getChannelId());
break;
case CatalogEvent.UPDATE:
// 更新
logger.info("[收到REDIS的DEVICE更新通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), deviceStatusMsg.getChannelId());
deviceChannelService.updateChannel(deviceStatusMsg.getDeviceId(), channel);
deviceService.sync(device);
break; case CatalogEvent.VLOST:
// 视频丢失
logger.info("[收到REDIS的DEVICE通道视频丢失通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
if (userSetting.getRefuseChannelStatusChannelFormNotify()) {
storager.deviceChannelOffline(deviceStatusMsg.getDeviceId(), channel.getChannelId());
}else {
logger.info("[收到REDIS的DEVICE通道视频丢失通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
}
break;
case CatalogEvent.DEFECT:
// 故障
logger.info("[收到REDIS的DEVICE通道视频故障通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
break;
default:
logger.warn("[ NotifyCatalog ] event not found {}", msg.getBody());
}
} catch (Exception e) {
logger.warn("[REDIS的DEVICE通知] 发现未处理的异常, \r\n{}", msg.getBody());
logger.error("[REDIS的DEVICE通知] 异常内容: ", e);
}
}
});
}
}
/**
* redis
*
* @param message
* @return
*/
private DeviceStatusMsg praseMessage(String message) {
DeviceStatusMsg deviceStatusMsg = new DeviceStatusMsg();
if(message.split(" ")[0].split(":").length>1){
deviceStatusMsg.setChannelId(message.split(" ")[0].split(":")[1]);
}
deviceStatusMsg.setDeviceId(message.split(" ")[0].split(":")[0]);
deviceStatusMsg.setCmd(message.split(" ")[1]);
return deviceStatusMsg;
}
/**
* redis
*/
private class DeviceStatusMsg {
private String deviceId;
private String channelId;
private String cmd;
public String getDeviceId() {
return deviceId;
}
public void setDeviceId(String deviceId) {
this.deviceId = deviceId;
}
public String getChannelId() {
return channelId;
}
public void setChannelId(String channelId) {
this.channelId = channelId;
}
public String getCmd() {
return cmd;
}
public void setCmd(String cmd) {
this.cmd = cmd;
}
}
}