From 3291c4b2e67d510186ca5fbfac8ec5af1a9d4f16 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Thu, 21 Mar 2024 16:54:44 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=A4=9A=E5=B9=B3=E5=8F=B0?= =?UTF-8?q?=E6=8E=A8=E6=B5=81=E6=97=A0=E4=BA=BA=E8=A7=82=E7=9C=8Bredis?= =?UTF-8?q?=E9=80=9A=E7=9F=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../request/impl/ByeRequestProcessor.java | 116 +++++++++++------- .../vmp/media/zlm/ZLMHttpHookListener.java | 2 +- .../bean/RequestStopPushStreamMsg.java | 49 ++++++++ .../iot/vmp/service/bean/WvpRedisMsgCmd.java | 10 ++ .../redisMsg/RedisGbPlayMsgListener.java | 50 +++++++- .../RedisPushStreamCloseResponseListener.java | 2 +- .../iot/vmp/storager/IRedisCatchStorage.java | 2 +- .../storager/impl/RedisCatchStorageImpl.java | 2 +- 8 files changed, 186 insertions(+), 47 deletions(-) create mode 100755 src/main/java/com/genersoft/iot/vmp/service/bean/RequestStopPushStreamMsg.java diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index 5de1f193..ff7427bc 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -15,8 +15,11 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; +import com.genersoft.iot.vmp.service.bean.RequestStopPushStreamMsg; +import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import gov.nist.javax.sip.message.SIPRequest; @@ -92,6 +95,12 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In @Autowired private UserSetting userSetting; + @Autowired + private IStreamPushService pushService; + + @Autowired + private RedisGbPlayMsgListener redisGbPlayMsgListener; + @Override public void afterPropertiesSet() throws Exception { // 添加消息处理的订阅 @@ -124,58 +133,81 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In param.put("stream",streamId); param.put("ssrc",sendRtpItem.getSsrc()); logger.info("[收到bye] 停止推流:{}, 媒体节点: {}", streamId, sendRtpItem.getMediaServerId()); - MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), - callIdHeader.getCallId(), null); - zlmServerFactory.stopSendRtpStream(mediaInfo, param); - if (userSetting.getUseCustomSsrcForParentInvite()) { - mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc()); - } + if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) { - ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId()); - if (platform != null) { - MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, - sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), - sendRtpItem.getPlatformId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); - messageForPushChannel.setPlatFormIndex(platform.getId()); - redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel); + // 查询这路流是否是本平台的 + StreamPushItem push = pushService.getPush(sendRtpItem.getApp(), sendRtpItem.getStream()); + if (push!= null && !push.isSelf()) { + // 不是本平台的就发送redis消息让其他wvp停止发流 + ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId()); + if (platform != null) { + RequestStopPushStreamMsg streamMsg = RequestStopPushStreamMsg.getInstance(sendRtpItem, platform.getName(), platform.getId()); + redisGbPlayMsgListener.sendMsgForStopSendRtpStream(sendRtpItem.getServerId(), streamMsg); + } }else { - logger.info("[上级平台停止观看] 未找到平台{}的信息,发送redis消息失败", sendRtpItem.getPlatformId()); + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), + callIdHeader.getCallId(), null); + zlmServerFactory.stopSendRtpStream(mediaInfo, param); + if (userSetting.getUseCustomSsrcForParentInvite()) { + mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc()); + } + + ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId()); + if (platform != null) { + MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, + sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), + sendRtpItem.getPlatformId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); + messageForPushChannel.setPlatFormIndex(platform.getId()); + redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel); + }else { + logger.info("[上级平台停止观看] 未找到平台{}的信息,发送redis消息失败", sendRtpItem.getPlatformId()); + } + } + }else { + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), + callIdHeader.getCallId(), null); + zlmServerFactory.stopSendRtpStream(mediaInfo, param); + if (userSetting.getUseCustomSsrcForParentInvite()) { + mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc()); } } + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + if (mediaInfo != null) { + AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + if (audioBroadcastCatch != null && audioBroadcastCatch.getSipTransactionInfo().getCallId().equals(callIdHeader.getCallId())) { + // 来自上级平台的停止对讲 + logger.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + } - AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); - if (audioBroadcastCatch != null && audioBroadcastCatch.getSipTransactionInfo().getCallId().equals(callIdHeader.getCallId())) { - // 来自上级平台的停止对讲 - logger.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); - audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); - } - - int totalReaderCount = zlmServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId); - if (totalReaderCount <= 0) { - logger.info("[收到bye] {} 无其它观看者,通知设备停止推流", streamId); - if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) { - Device device = deviceService.getDevice(sendRtpItem.getDeviceId()); - if (device == null) { - logger.info("[收到bye] {} 通知设备停止推流时未找到设备信息", streamId); - } - try { - logger.info("[停止点播] {}/{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); - cmder.streamByeCmd(device, sendRtpItem.getChannelId(), streamId, null); - } catch (InvalidArgumentException | ParseException | SipException | - SsrcTransactionNotFoundException e) { - logger.error("[收到bye] {} 无其它观看者,通知设备停止推流, 发送BYE失败 {}",streamId, e.getMessage()); + int totalReaderCount = zlmServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId); + if (totalReaderCount <= 0) { + logger.info("[收到bye] {} 无其它观看者,通知设备停止推流", streamId); + if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) { + Device device = deviceService.getDevice(sendRtpItem.getDeviceId()); + if (device == null) { + logger.info("[收到bye] {} 通知设备停止推流时未找到设备信息", streamId); + } + try { + logger.info("[停止点播] {}/{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + cmder.streamByeCmd(device, sendRtpItem.getChannelId(), streamId, null); + } catch (InvalidArgumentException | ParseException | SipException | + SsrcTransactionNotFoundException e) { + logger.error("[收到bye] {} 无其它观看者,通知设备停止推流, 发送BYE失败 {}",streamId, e.getMessage()); + } } } } } - // 可能是设备发送的停止 - SsrcTransaction ssrcTransaction = streamSession.getSsrcTransactionByCallId(callIdHeader.getCallId()); - if (ssrcTransaction == null) { - return; - } - logger.info("[收到bye] 来自设备:{}, 通道已停止推流: {}", ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId()); + // 可能是设备发送的停止 + SsrcTransaction ssrcTransaction = streamSession.getSsrcTransactionByCallId(callIdHeader.getCallId()); + if (ssrcTransaction == null) { + return; + } + logger.info("[收到bye] 来自设备:{}, 通道已停止推流: {}", ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId()); ParentPlatform platform = platformService.queryPlatformByServerGBId(ssrcTransaction.getDeviceId()); if (platform != null ) { diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 7aed5752..9346eb8a 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -579,7 +579,7 @@ public class ZLMHttpHookListener { } // 收到无人观看说明流也没有在往上级推送 if (redisCatchStorage.isChannelSendingRTP(inviteInfo.getChannelId())) { - List sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId( + List sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId( inviteInfo.getChannelId()); if (!sendRtpItems.isEmpty()) { for (SendRtpItem sendRtpItem : sendRtpItems) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/RequestStopPushStreamMsg.java b/src/main/java/com/genersoft/iot/vmp/service/bean/RequestStopPushStreamMsg.java new file mode 100755 index 00000000..fcba5111 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/RequestStopPushStreamMsg.java @@ -0,0 +1,49 @@ +package com.genersoft.iot.vmp.service.bean; + +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; + + +public class RequestStopPushStreamMsg { + + + private SendRtpItem sendRtpItem; + + + private String platformName; + + + private int platFormIndex; + + public SendRtpItem getSendRtpItem() { + return sendRtpItem; + } + + public void setSendRtpItem(SendRtpItem sendRtpItem) { + this.sendRtpItem = sendRtpItem; + } + + public String getPlatformName() { + return platformName; + } + + public void setPlatformName(String platformName) { + this.platformName = platformName; + } + + + public int getPlatFormIndex() { + return platFormIndex; + } + + public void setPlatFormIndex(int platFormIndex) { + this.platFormIndex = platFormIndex; + } + + public static RequestStopPushStreamMsg getInstance(SendRtpItem sendRtpItem, String platformName, int platFormIndex) { + RequestStopPushStreamMsg streamMsg = new RequestStopPushStreamMsg(); + streamMsg.setSendRtpItem(sendRtpItem); + streamMsg.setPlatformName(platformName); + streamMsg.setPlatFormIndex(platFormIndex); + return streamMsg; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsgCmd.java b/src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsgCmd.java index cb118865..e9ee4cba 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsgCmd.java +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsgCmd.java @@ -6,7 +6,17 @@ package com.genersoft.iot.vmp.service.bean; public class WvpRedisMsgCmd { + /** + * 请求获取推流信息 + */ public static final String GET_SEND_ITEM = "GetSendItem"; + /** + * 请求推流的请求 + */ public static final String REQUEST_PUSH_STREAM = "RequestPushStream"; + /** + * 停止推流的请求 + */ + public static final String REQUEST_STOP_PUSH_STREAM = "RequestStopPushStream"; } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java index eb261e34..3b990f00 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java @@ -133,7 +133,10 @@ public class RedisGbPlayMsgListener implements MessageListener { case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: RequestPushStreamMsg param = JSON.to(RequestPushStreamMsg.class, wvpRedisMsg.getContent()); requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); - + break; + case WvpRedisMsgCmd.REQUEST_STOP_PUSH_STREAM: + RequestStopPushStreamMsg streamMsg = JSON.to(RequestStopPushStreamMsg.class, wvpRedisMsg.getContent()); + requestStopPushStreamMsgHand(streamMsg, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); break; default: break; @@ -397,6 +400,19 @@ public class RedisGbPlayMsgListener implements MessageListener { redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); } + /** + * 发送请求推流的消息 + */ + public void sendMsgForStopSendRtpStream(String serverId, RequestStopPushStreamMsg streamMsg) { + String key = UUID.randomUUID().toString(); + WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, + WvpRedisMsgCmd.REQUEST_STOP_PUSH_STREAM, key, JSON.toJSONString(streamMsg)); + + JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg); + logger.info("[REDIS 请求其他平台停止推流] {}: {}", serverId, jsonObject); + redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); + } + private SendRtpItem querySendRTPServer(String platformGbId, String channelId, String streamId, String callId) { if (platformGbId == null) { platformGbId = "*"; @@ -423,4 +439,36 @@ public class RedisGbPlayMsgListener implements MessageListener { return null; } } + + /** + * 处理收到的请求推流的请求 + */ + private void requestStopPushStreamMsgHand(RequestStopPushStreamMsg streamMsg, String fromId, String serial) { + SendRtpItem sendRtpItem = streamMsg.getSendRtpItem(); + if (sendRtpItem == null) { + logger.info("[REDIS 执行其他平台的请求停止推流] 失败: sendRtpItem为NULL"); + return; + } + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + if (mediaInfo == null) { + // TODO 回复错误 + return; + } + Map param = new HashMap<>(); + param.put("vhost","__defaultVhost__"); + param.put("app",sendRtpItem.getApp()); + param.put("stream",sendRtpItem.getStream()); + param.put("ssrc", sendRtpItem.getSsrc()); + + if (zlmServerFactory.stopSendRtpStream(mediaInfo, param)) { + logger.info("[REDIS 执行其他平台的请求停止推流] 成功: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); + // 发送redis消息 + MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, + sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), + sendRtpItem.getPlatformId(), streamMsg.getPlatformName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); + messageForPushChannel.setPlatFormIndex(streamMsg.getPlatFormIndex()); + redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel); + } + + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java index 1d7c2fd5..fe0ccd2a 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java @@ -73,7 +73,7 @@ public class RedisPushStreamCloseResponseListener implements MessageListener { MessageForPushChannel pushChannel = JSON.parseObject(message.getBody(), MessageForPushChannel.class); StreamPushItem push = streamPushService.getPush(pushChannel.getApp(), pushChannel.getStream()); if (push != null) { - List sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId( + List sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId( push.getGbId()); if (!sendRtpItems.isEmpty()) { for (SendRtpItem sendRtpItem : sendRtpItems) { diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index b663c5c6..78fd2801 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -181,7 +181,7 @@ public interface IRedisCatchStorage { */ void sendStreamPushRequestedMsgForStatus(); - List querySendRTPServerByChnnelId(String channelId); + List querySendRTPServerByChannelId(String channelId); List querySendRTPServerByStream(String stream); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 27bbdba6..18a037dd 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -184,7 +184,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { } @Override - public List querySendRTPServerByChnnelId(String channelId) { + public List querySendRTPServerByChannelId(String channelId) { if (channelId == null) { return null; }