From 06143eef4ab4e91080ad8f51c0aa4a3fdaf9f47e Mon Sep 17 00:00:00 2001 From: che_shuai Date: Wed, 26 Jul 2023 16:22:53 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D[BUG]=E5=9B=BD=E6=A0=87?= =?UTF-8?q?=E7=BA=A7=E8=81=94=E9=80=89=E8=AE=A2=E9=98=85=E4=B8=8B=E7=BA=A7?= =?UTF-8?q?=E5=B9=B3=E5=8F=B0=E5=90=8E=EF=BC=8C=E4=B8=8A=E7=BA=A7=E5=B9=B3?= =?UTF-8?q?=E5=8F=B0=E6=8E=A5=E6=94=B6=E5=88=B0=20=E8=AE=BE=E5=A4=87?= =?UTF-8?q?=E7=9B=AE=E5=BD=95=E9=80=9A=E7=9F=A5=E5=A4=84=E7=90=86=E8=A7=A6?= =?UTF-8?q?=E5=8F=91redis=20=E6=B6=88=E6=81=AF=E5=90=8E=E6=B2=A1=E6=9C=89?= =?UTF-8?q?=E5=90=8E=E7=BB=AD=E5=A4=84=E7=90=86=20=E9=97=AE=E9=A2=98.?= =?UTF-8?q?=E8=AF=A6=E8=A7=81=20https://github.com/648540858/wvp-GB28181-p?= =?UTF-8?q?ro/issues/967?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../vmp/conf/redis/RedisMsgListenConfig.java | 78 +++---- .../RedisDeviceStatusMsgListener.java | 192 ++++++++++++++++++ 2 files changed, 232 insertions(+), 38 deletions(-) create mode 100644 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisDeviceStatusMsgListener.java diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java index c14ebcdd0..cba73f39f 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java @@ -16,61 +16,63 @@ import org.springframework.data.redis.listener.RedisMessageListenerContainer; * @description:Redis中间件配置类,使用spring-data-redis集成,自动从application.yml中加载redis配置 * @author: swwheihei * @date: 2019年5月30日 上午10:58:25 - * */ @Configuration -@Order(value=1) +@Order(value = 1) public class RedisMsgListenConfig { - @Autowired - private RedisGpsMsgListener redisGPSMsgListener; + @Autowired + private RedisGpsMsgListener redisGPSMsgListener; - @Autowired - private RedisAlarmMsgListener redisAlarmMsgListener; + @Autowired + private RedisAlarmMsgListener redisAlarmMsgListener; - @Autowired - private RedisStreamMsgListener redisStreamMsgListener; + @Autowired + private RedisStreamMsgListener redisStreamMsgListener; - @Autowired - private RedisGbPlayMsgListener redisGbPlayMsgListener; + @Autowired + private RedisGbPlayMsgListener redisGbPlayMsgListener; - @Autowired - private RedisPushStreamStatusMsgListener redisPushStreamStatusMsgListener; + @Autowired + private RedisPushStreamStatusMsgListener redisPushStreamStatusMsgListener; - @Autowired - private RedisPushStreamStatusListMsgListener redisPushStreamListMsgListener; + @Autowired + private RedisPushStreamStatusListMsgListener redisPushStreamListMsgListener; - @Autowired - private RedisPushStreamResponseListener redisPushStreamResponseListener; + @Autowired + private RedisPushStreamResponseListener redisPushStreamResponseListener; - @Autowired - private RedisCloseStreamMsgListener redisCloseStreamMsgListener; + @Autowired + private RedisCloseStreamMsgListener redisCloseStreamMsgListener; - @Autowired - private RedisPushStreamCloseResponseListener redisPushStreamCloseResponseListener; + @Autowired + private RedisPushStreamCloseResponseListener redisPushStreamCloseResponseListener; + @Autowired + private RedisDeviceStatusMsgListener redisDeviceStatusMsgListener; - /** - * redis消息监听器容器 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器 - * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理 - * - * @param connectionFactory - * @return - */ - @Bean - RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) { + /** + * redis消息监听器容器 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器 + * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理 + * + * @param connectionFactory + * @return + */ + @Bean + RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); - container.addMessageListener(redisGPSMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_GPS)); - container.addMessageListener(redisAlarmMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM_RECEIVE)); - container.addMessageListener(redisStreamMsgListener, new PatternTopic(VideoManagerConstants.WVP_MSG_STREAM_CHANGE_PREFIX + "PUSH")); - container.addMessageListener(redisGbPlayMsgListener, new PatternTopic(RedisGbPlayMsgListener.WVP_PUSH_STREAM_KEY)); - container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE)); - container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE)); - container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE)); - container.addMessageListener(redisCloseStreamMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE)); - container.addMessageListener(redisPushStreamCloseResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED)); + container.addMessageListener(redisGPSMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_GPS)); + container.addMessageListener(redisAlarmMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM_RECEIVE)); + container.addMessageListener(redisStreamMsgListener, new PatternTopic(VideoManagerConstants.WVP_MSG_STREAM_CHANGE_PREFIX + "PUSH")); + container.addMessageListener(redisGbPlayMsgListener, new PatternTopic(RedisGbPlayMsgListener.WVP_PUSH_STREAM_KEY)); + container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE)); + container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE)); + container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE)); + container.addMessageListener(redisCloseStreamMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE)); + container.addMessageListener(redisPushStreamCloseResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED)); + container.addMessageListener(redisDeviceStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_SUBSCRIBE_DEVICE_STATUS)); return container; } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisDeviceStatusMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisDeviceStatusMsgListener.java new file mode 100644 index 000000000..9b91c1588 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisDeviceStatusMsgListener.java @@ -0,0 +1,192 @@ +package com.genersoft.iot.vmp.service.redisMsg; + + +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; +import com.genersoft.iot.vmp.service.IDeviceChannelService; +import com.genersoft.iot.vmp.service.IDeviceService; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.storager.IVideoManagerStorage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Component; + +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * 监听 redis 设备状态信息 + * @author davidche + */ +@Component +public class RedisDeviceStatusMsgListener implements MessageListener { + + + private final static Logger logger = LoggerFactory.getLogger(RedisGpsMsgListener.class); + + @Autowired + private IRedisCatchStorage redisCatchStorage; + + @Autowired + private IDeviceChannelService deviceChannelService; + + private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); + + @Qualifier("taskExecutor") + @Autowired + private ThreadPoolTaskExecutor taskExecutor; + + @Autowired + private IVideoManagerStorage storager; + + @Autowired + private IDeviceService deviceService; + + @Autowired + private UserSetting userSetting; + + /** + * 处理 接收的 redis 设备状态信息 + * + * @param message message must not be {@literal null}. + * @param pattern pattern matching the channel (if specified) - can be {@literal null}. + */ + @Override + public void onMessage(Message message, byte[] pattern) { + + boolean isEmpty = taskQueue.isEmpty(); + taskQueue.offer(message); + if (isEmpty) { + taskExecutor.execute(() -> { + while (!taskQueue.isEmpty()) { + Message msg = taskQueue.poll(); + try { + + DeviceStatusMsg deviceStatusMsg = praseMessage(new String(msg.getBody())); + +// Device device = redisCatchStorage.getDevice(deviceStatusMsg.getDeviceId()); + Device device = storager.queryVideoDevice(deviceStatusMsg.getDeviceId()); + DeviceChannel channel = new DeviceChannel(); + channel.setDeviceId(deviceStatusMsg.getDeviceId()); + channel.setChannelId(deviceStatusMsg.getChannelId()); + switch (deviceStatusMsg.getCmd()) { + case CatalogEvent.ON: + // 上线 + logger.info("[收到REDIS的DEVICE通道上线通知] 来自设备: {}, 通道 {}", deviceStatusMsg.getDeviceId(), deviceStatusMsg.getChannelId()); + storager.deviceChannelOnline(deviceStatusMsg.getDeviceId(), deviceStatusMsg.getChannelId()); + break; + + case CatalogEvent.OFF: + // 离线 + logger.info("[收到REDIS的DEVICE通道离线通知] 来自设备: {}, 通道 {}", deviceStatusMsg.getDeviceId(), deviceStatusMsg.getChannelId()); + if (userSetting.getRefuseChannelStatusChannelFormNotify()) { + storager.deviceChannelOffline(deviceStatusMsg.getDeviceId(), deviceStatusMsg.getChannelId()); + } else { + logger.info("[收到REDIS的DEVICE通道离线通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", deviceStatusMsg.getDeviceId(), deviceStatusMsg.getChannelId()); + } + break; + case CatalogEvent.ADD: + // 增加 + logger.info("[收到REDIS的DEVICE增加通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), deviceStatusMsg.getChannelId()); + deviceChannelService.updateChannel(deviceStatusMsg.getDeviceId(), channel); + deviceService.sync(device); + break; + case "DELETE": + + // 删除 + logger.info("[收到REDIS的DEVICE删除通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), deviceStatusMsg.getChannelId()); + storager.delChannel(deviceStatusMsg.getDeviceId(), deviceStatusMsg.getChannelId()); + break; + + case CatalogEvent.UPDATE: + // 更新 + logger.info("[收到REDIS的DEVICE更新通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), deviceStatusMsg.getChannelId()); + deviceChannelService.updateChannel(deviceStatusMsg.getDeviceId(), channel); + deviceService.sync(device); + break; case CatalogEvent.VLOST: + // 视频丢失 + logger.info("[收到REDIS的DEVICE通道视频丢失通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + if (userSetting.getRefuseChannelStatusChannelFormNotify()) { + storager.deviceChannelOffline(deviceStatusMsg.getDeviceId(), channel.getChannelId()); + }else { + logger.info("[收到REDIS的DEVICE通道视频丢失通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + } + break; + case CatalogEvent.DEFECT: + // 故障 + logger.info("[收到REDIS的DEVICE通道视频故障通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + break; + + default: + logger.warn("[ NotifyCatalog ] event not found : {}", msg.getBody()); + } + + } catch (Exception e) { + logger.warn("[REDIS的DEVICE通知] 发现未处理的异常, \r\n{}", msg.getBody()); + logger.error("[REDIS的DEVICE通知] 异常内容: ", e); + } + } + }); + } + } + + /** + * 解析redis 消息 + * + * @param message + * @return + */ + private DeviceStatusMsg praseMessage(String message) { + DeviceStatusMsg deviceStatusMsg = new DeviceStatusMsg(); + + if(message.split(" ")[0].split(":").length>1){ + deviceStatusMsg.setChannelId(message.split(" ")[0].split(":")[1]); + } + deviceStatusMsg.setDeviceId(message.split(" ")[0].split(":")[0]); + deviceStatusMsg.setCmd(message.split(" ")[1]); + + return deviceStatusMsg; + + } + + /** + * redis 消息对象 + */ + private class DeviceStatusMsg { + private String deviceId; + private String channelId; + private String cmd; + + public String getDeviceId() { + return deviceId; + } + + public void setDeviceId(String deviceId) { + this.deviceId = deviceId; + } + + public String getChannelId() { + return channelId; + } + + public void setChannelId(String channelId) { + this.channelId = channelId; + } + + public String getCmd() { + return cmd; + } + + public void setCmd(String cmd) { + this.cmd = cmd; + } + } + + +}