From a574ff094428decbdc35332d184cd0d210716a44 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Thu, 22 Sep 2022 16:56:20 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E4=BD=BF=E7=94=A8=E9=98=9F?= =?UTF-8?q?=E5=88=97=E5=AF=BC=E8=87=B4=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../request/impl/NotifyRequestProcessor.java | 2 +- .../notify/cmd/AlarmNotifyMessageHandler.java | 13 ++++---- .../MobilePositionNotifyMessageHandler.java | 4 +-- .../iot/vmp/service/impl/PlayServiceImpl.java | 33 ++++++++++--------- .../redisMsg/RedisAlarmMsgListener.java | 8 +++-- .../redisMsg/RedisGbPlayMsgListener.java | 4 +-- .../service/redisMsg/RedisGpsMsgListener.java | 2 +- .../RedisPushStreamResponseListener.java | 4 +-- .../RedisPushStreamStatusListMsgListener.java | 2 +- .../RedisPushStreamStatusMsgListener.java | 4 +-- .../redisMsg/RedisStreamMsgListener.java | 6 ++-- 11 files changed, 44 insertions(+), 38 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index 206a1599..84bf0b8c 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -79,7 +79,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements private boolean taskQueueHandlerRun = false; - private final ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); + private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @Autowired diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java index da2cb9c3..ec750152 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java @@ -72,7 +72,7 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme private boolean taskQueueHandlerRun = false; - private final ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); + private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @Autowired @@ -92,13 +92,14 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme if (!taskQueueHandlerRun) { taskQueueHandlerRun = true; taskExecutor.execute(() -> { + logger.info("[处理报警通知]待处理数量:{}", taskQueue.size() ); while (!taskQueue.isEmpty()) { SipMsgInfo sipMsgInfo = taskQueue.poll(); // 回复200 OK try { responseAck(getServerTransaction(sipMsgInfo.getEvt()), Response.OK); } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[收到报警通知], 回复200OK失败", e); + logger.error("[处理报警通知], 回复200OK失败", e); } Element deviceIdElement = sipMsgInfo.getRootElement().element("DeviceID"); @@ -112,7 +113,7 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme deviceAlarm.setAlarmMethod(getText(sipMsgInfo.getRootElement(), "AlarmMethod")); String alarmTime = XmlUtil.getText(sipMsgInfo.getRootElement(), "AlarmTime"); if (alarmTime == null) { - return; + continue; } deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime)); String alarmDescription = getText(sipMsgInfo.getRootElement(), "AlarmDescription"); @@ -182,7 +183,7 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme deviceAlarm.setAlarmType(getText(sipMsgInfo.getRootElement().element("Info"), "AlarmType")); } } - + logger.info("[收到报警通知]内容:{}", JSONObject.toJSON(deviceAlarm)); if ("7".equals(deviceAlarm.getAlarmMethod()) ) { // 发送给平台的报警信息。 发送redis通知 AlarmChannelMessage alarmChannelMessage = new AlarmChannelMessage(); @@ -190,7 +191,7 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme alarmChannelMessage.setAlarmDescription(deviceAlarm.getAlarmDescription()); alarmChannelMessage.setGbId(channelId); redisCatchStorage.sendAlarmMsg(alarmChannelMessage); - return; + continue; } logger.debug("存储报警信息、报警分类"); @@ -198,7 +199,7 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme if (sipConfig.isAlarm()) { deviceAlarmService.add(deviceAlarm); } - logger.info("[收到报警通知]内容:{}", JSONObject.toJSON(deviceAlarm)); + if (redisCatchStorage.deviceIsOnline(sipMsgInfo.getDevice().getDeviceId())) { publisher.deviceAlarmEventPublish(deviceAlarm); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java index 67d26d77..652cd830 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java @@ -58,7 +58,7 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen private boolean taskQueueHandlerRun = false; - private final ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); + private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @Autowired @@ -83,7 +83,7 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen if (rootElementAfterCharset == null) { logger.warn("[ 移动设备位置数据通知 ] content cannot be null, {}", sipMsgInfo.getEvt().getRequest()); responseAck(getServerTransaction(sipMsgInfo.getEvt()), Response.BAD_REQUEST); - return; + continue; } MobilePosition mobilePosition = new MobilePosition(); mobilePosition.setCreateTime(DateUtil.getNow()); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 6662738c..cdbbf332 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -131,6 +131,24 @@ public class PlayServiceImpl implements IPlayService { StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); playResult.setDevice(device); + result.onCompletion(()->{ + // 点播结束时调用截图接口 + taskExecutor.execute(()->{ + // TODO 应该在上流时调用更好,结束也可能是错误结束 + String path = "snap"; + String fileName = deviceId + "_" + channelId + ".jpg"; + WVPResult wvpResult = (WVPResult)result.getResult(); + if (Objects.requireNonNull(wvpResult).getCode() == 0) { + StreamInfo streamInfoForSuccess = (StreamInfo)wvpResult.getData(); + MediaServerItem mediaInfo = mediaServerService.getOne(streamInfoForSuccess.getMediaServerId()); + String streamUrl = streamInfoForSuccess.getFmp4(); + // 请求截图 + logger.info("[请求截图]: " + fileName); + zlmresTfulUtils.getSnap(mediaInfo, streamUrl, 15, 1, path, fileName); + } + }); + }); + if (streamInfo != null) { String streamId = streamInfo.getStream(); if (streamId == null) { @@ -192,21 +210,6 @@ public class PlayServiceImpl implements IPlayService { SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false); logger.info(JSONObject.toJSONString(ssrcInfo)); play(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInUse, response)->{ - // 点播结束时调用截图接口 - taskExecutor.execute(()->{ - // TODO 应该在上流时调用更好,结束也可能是错误结束 - String path = "snap"; - String fileName = deviceId + "_" + channelId + ".jpg"; - WVPResult wvpResult = (WVPResult)result.getResult(); - if (Objects.requireNonNull(wvpResult).getCode() == 0) { - StreamInfo streamInfoForSuccess = (StreamInfo)wvpResult.getData(); - MediaServerItem mediaInfo = mediaServerService.getOne(streamInfoForSuccess.getMediaServerId()); - String streamUrl = streamInfoForSuccess.getFmp4(); - // 请求截图 - logger.info("[请求截图]: " + fileName); - zlmresTfulUtils.getSnap(mediaInfo, streamUrl, 15, 1, path, fileName); - } - }); if (hookEvent != null) { hookEvent.response(mediaServerItem, response); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java index 84f5ef71..8d1b066e 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java @@ -16,6 +16,7 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; +import javax.validation.constraints.NotNull; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; @@ -36,19 +37,20 @@ public class RedisAlarmMsgListener implements MessageListener { private boolean taskQueueHandlerRun = false; - private final ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); + private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @Autowired private ThreadPoolTaskExecutor taskExecutor; @Override - public void onMessage(Message message, byte[] bytes) { + public void onMessage(@NotNull Message message, byte[] bytes) { logger.info("收到来自REDIS的ALARM通知: {}", new String(message.getBody())); taskQueue.offer(message); if (!taskQueueHandlerRun) { taskQueueHandlerRun = true; + logger.info("[线程池信息]活动线程数:{}, 最大线程数: {}", taskExecutor.getActiveCount(), taskExecutor.getMaxPoolSize()); taskExecutor.execute(() -> { while (!taskQueue.isEmpty()) { Message msg = taskQueue.poll(); @@ -56,7 +58,7 @@ public class RedisAlarmMsgListener implements MessageListener { AlarmChannelMessage alarmChannelMessage = JSON.parseObject(msg.getBody(), AlarmChannelMessage.class); if (alarmChannelMessage == null) { logger.warn("[REDIS的ALARM通知]消息解析失败"); - return; + continue; } String gbId = alarmChannelMessage.getGbId(); diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java index e6ba9b6b..2d4a82fb 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java @@ -90,7 +90,7 @@ public class RedisGbPlayMsgListener implements MessageListener { private boolean taskQueueHandlerRun = false; - private final ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); + private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @Autowired @@ -121,7 +121,7 @@ public class RedisGbPlayMsgListener implements MessageListener { JSONObject msgJSON = JSON.parseObject(msg.getBody(), JSONObject.class); WvpRedisMsg wvpRedisMsg = JSON.toJavaObject(msgJSON, WvpRedisMsg.class); if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) { - return; + continue; } if (WvpRedisMsg.isRequest(wvpRedisMsg)) { logger.info("[收到REDIS通知] 请求: {}", new String(msg.getBody())); diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java index bb2f4ad5..b5d02a5c 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java @@ -35,7 +35,7 @@ public class RedisGpsMsgListener implements MessageListener { @Autowired private IVideoManagerStorage storager; - private final ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); + private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @Autowired diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java index 13e68740..4fff32d9 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java @@ -28,7 +28,7 @@ public class RedisPushStreamResponseListener implements MessageListener { private boolean taskQueueHandlerRun = false; - private final ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); + private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @Autowired @@ -53,7 +53,7 @@ public class RedisPushStreamResponseListener implements MessageListener { MessageForPushChannelResponse response = JSON.parseObject(new String(msg.getBody()), MessageForPushChannelResponse.class); if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){ logger.info("[REDIS消息-请求推流结果]:参数不全"); - return; + continue; } // 查看正在等待的invite消息 if (responseEvents.get(response.getApp() + response.getStream()) != null) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java index 39258364..b69a587a 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java @@ -40,7 +40,7 @@ public class RedisPushStreamStatusListMsgListener implements MessageListener { private boolean taskQueueHandlerRun = false; - private final ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); + private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @Autowired diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java index 4b1c2d50..2faf3b8a 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java @@ -42,7 +42,7 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic - private final ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); + private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @Autowired @@ -61,7 +61,7 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic PushStreamStatusChangeFromRedisDto statusChangeFromPushStream = JSON.parseObject(msg.getBody(), PushStreamStatusChangeFromRedisDto.class); if (statusChangeFromPushStream == null) { logger.warn("[REDIS消息]推流设备状态变化消息解析失败"); - return; + continue; } // 取消定时任务 dynamicTask.stop(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED); diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java index 1897b6fc..415787e7 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java @@ -35,7 +35,7 @@ public class RedisStreamMsgListener implements MessageListener { private boolean taskQueueHandlerRun = false; - private final ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); + private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @Autowired @@ -53,13 +53,13 @@ public class RedisStreamMsgListener implements MessageListener { JSONObject steamMsgJson = JSON.parseObject(msg.getBody(), JSONObject.class); if (steamMsgJson == null) { logger.warn("[收到redis 流变化]消息解析失败"); - return; + continue; } String serverId = steamMsgJson.getString("serverId"); if (userSetting.getServerId().equals(serverId)) { // 自己发送的消息忽略即可 - return; + continue; } logger.info("[收到redis 流变化]: {}", new String(message.getBody())); String app = steamMsgJson.getString("app");