From 489fbe31a5974dd6e1dadafbf3042b8c9a3fad24 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Mon, 9 Sep 2024 00:07:00 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=B4=E6=97=B6=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/common/VideoManagerConstants.java | 5 +- .../iot/vmp/gb28181/bean/SendRtpInfo.java | 46 +++-- .../service/impl/DeviceServiceImpl.java | 10 +- .../service/impl/PlatformServiceImpl.java | 32 +-- .../iot/vmp/gb28181/task/SipRunner.java | 7 +- .../request/impl/AckRequestProcessor.java | 8 +- .../request/impl/ByeRequestProcessor.java | 14 +- .../request/impl/InviteRequestProcessor.java | 22 +-- .../impl/info/InfoRequestProcessor.java | 6 +- .../cmd/BroadcastNotifyMessageHandler.java | 6 +- .../cmd/MediaStatusNotifyMessageHandler.java | 6 +- .../media/service/IMediaServerService.java | 5 - .../service/impl/MediaServerServiceImpl.java | 48 ----- .../iot/vmp/media/zlm/SendRtpPortManager.java | 2 +- .../vmp/service/ISendRtpServerService.java | 30 +++ .../vmp/service/impl/MediaServiceImpl.java | 1 + .../impl/SendRtpServerServiceImpl.java | 132 ++++++++++++- .../service/redisMsg/IRedisRpcService.java | 8 +- .../redisMsg/control/RedisRpcController.java | 14 +- .../iot/vmp/storager/IRedisCatchStorage.java | 33 ---- .../storager/impl/RedisCatchStorageImpl.java | 182 +----------------- 21 files changed, 279 insertions(+), 338 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java index 82098bd6..ae27400f 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java +++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java @@ -39,7 +39,10 @@ public class VideoManagerConstants { public static final String PLATFORM_REGISTER_INFO_PREFIX = "VMP_PLATFORM_REGISTER_INFO_"; - public static final String SEND_RTP_INFO_PREFIX = "VMP_SEND_RTP_INFO"; + public static final String SEND_RTP_INFO = "VMP_SEND_RTP_INFO:"; + public static final String SEND_RTP_INFO_CALLID = "VMP_SEND_RTP_INFO:CALL_ID"; + public static final String SEND_RTP_INFO_STREAM = "VMP_SEND_RTP_INFO:STREAM"; + public static final String SEND_RTP_INFO_CHANNEL = "VMP_SEND_RTP_INFO:CHANNEL"; public static final String EVENT_ONLINE_REGISTER = "1"; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpInfo.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpInfo.java index 898d7722..29d40db7 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpInfo.java @@ -1,5 +1,6 @@ package com.genersoft.iot.vmp.gb28181.bean; +import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg; import com.genersoft.iot.vmp.common.VideoManagerConstants; @@ -24,19 +25,14 @@ public class SendRtpInfo { private String ssrc; /** - * 平台id + * 目标平台或设备的编号 */ - private String platformId; + private String targetId; /** - * 平台名称 + * 目标平台或设备的名称 */ - private String platformName; - - /** - * 对应设备id - */ - private String deviceId; + private String targetName; /** * 直播流的应用名 @@ -176,6 +172,28 @@ public class SendRtpInfo { return sendRtpItem; } + public static SendRtpInfo getInstance(Integer localPort, MediaServer mediaServer, String ip, int port, String ssrc, + String deviceId, String platformId, Integer channelId, boolean isTcp, boolean rtcp, + String serverId) { + if (localPort == 0) { + return null; + } + SendRtpInfo sendRtpItem = new SendRtpInfo(); + sendRtpItem.setIp(ip); + sendRtpItem.setPort(port); + sendRtpItem.setSsrc(ssrc); + sendRtpItem.setDeviceId(deviceId); + sendRtpItem.setPlatformId(platformId); + sendRtpItem.setChannelId(channelId); + sendRtpItem.setTcp(isTcp); + sendRtpItem.setRtcp(rtcp); + sendRtpItem.setApp("rtp"); + sendRtpItem.setLocalPort(localPort); + sendRtpItem.setServerId(serverId); + sendRtpItem.setMediaServerId(mediaServer.getId()); + return sendRtpItem; + } + @Override public String toString() { return "SendRtpItem{" + @@ -208,13 +226,5 @@ public class SendRtpInfo { '}'; } - public String getRedisKey() { - return VideoManagerConstants.SEND_RTP_INFO_PREFIX + - serverId + "_" - + mediaServerId + "_" - + platformId + "_" - + channelId + "_" - + stream + "_" - + callId; - } + } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java index 5a7ae2c8..46654cb3 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java @@ -23,6 +23,7 @@ import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService; import com.genersoft.iot.vmp.gb28181.service.IDeviceService; import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService; +import com.genersoft.iot.vmp.service.ISendRtpServerService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; @@ -83,10 +84,7 @@ public class DeviceServiceImpl implements IDeviceService { private DeviceChannelMapper deviceChannelMapper; @Autowired - DataSourceTransactionManager dataSourceTransactionManager; - - @Autowired - TransactionDefinition transactionDefinition; + private ISendRtpServerService sendRtpServerService; @Autowired private UserSetting userSetting; @@ -239,9 +237,9 @@ public class DeviceServiceImpl implements IDeviceService { if (audioBroadcastCatches.size() > 0) { for (AudioBroadcastCatch audioBroadcastCatch : audioBroadcastCatches) { - SendRtpInfo sendRtpItem = redisCatchStorage.querySendRTPServer(deviceId, audioBroadcastCatch.getChannelId(), null, null); + SendRtpInfo sendRtpItem = sendRtpServerService.queryByChannelId(audioBroadcastCatch.getChannelId()); if (sendRtpItem != null) { - redisCatchStorage.deleteSendRTPServer(deviceId, sendRtpItem.getChannelId(), null, null); + sendRtpServerService.delete(sendRtpItem); MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), null); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java index 8483a75a..e19b453f 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java @@ -25,6 +25,7 @@ import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent; import com.genersoft.iot.vmp.media.event.mediaServer.MediaSendRtpStoppedEvent; import com.genersoft.iot.vmp.media.service.IMediaServerService; +import com.genersoft.iot.vmp.service.ISendRtpServerService; import com.genersoft.iot.vmp.service.bean.*; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.dao.GbStreamMapper; @@ -101,28 +102,29 @@ public class PlatformServiceImpl implements IPlatformService { @Autowired private IGbChannelService channelService; + @Autowired + private ISendRtpServerService sendRtpServerService; + /** * 流离开的处理 */ @Async("taskExecutor") @EventListener public void onApplicationEvent(MediaDepartureEvent event) { - List sendRtpItems = redisCatchStorage.querySendRTPServerByStream(event.getStream()); - if (!sendRtpItems.isEmpty()) { - for (SendRtpInfo sendRtpItem : sendRtpItems) { - if (sendRtpItem != null && sendRtpItem.getApp().equals(event.getApp())) { - String platformId = sendRtpItem.getPlatformId(); - Platform platform = platformMapper.getParentPlatByServerGBId(platformId); - CommonGBChannel channel = channelService.getOne(sendRtpItem.getChannelId()); - try { - if (platform != null && channel != null) { - commanderForPlatform.streamByeCmd(platform, sendRtpItem, channel); - redisCatchStorage.deleteSendRTPServer(platformId, channel.getGbDeviceId(), - sendRtpItem.getCallId(), sendRtpItem.getStream()); - } - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] 发送BYE: {}", e.getMessage()); + SendRtpInfo sendRtpItems = sendRtpServerService.queryByStream(event.getStream()); + if (sendRtpItems != null) { + if (sendRtpItem != null && sendRtpItem.getApp().equals(event.getApp())) { + String platformId = sendRtpItem.getPlatformId(); + Platform platform = platformMapper.getParentPlatByServerGBId(platformId); + CommonGBChannel channel = channelService.getOne(sendRtpItem.getChannelId()); + try { + if (platform != null && channel != null) { + commanderForPlatform.streamByeCmd(platform, sendRtpItem, channel); + redisCatchStorage.deleteSendRTPServer(platformId, channel.getGbDeviceId(), + sendRtpItem.getCallId(), sendRtpItem.getStream()); } + } catch (SipException | InvalidArgumentException | ParseException e) { + log.error("[命令发送失败] 发送BYE: {}", e.getMessage()); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/SipRunner.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/SipRunner.java index b97569bf..27e5f560 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/SipRunner.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/SipRunner.java @@ -11,6 +11,8 @@ import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.gb28181.service.IDeviceService; import com.genersoft.iot.vmp.gb28181.service.IPlatformService; +import com.genersoft.iot.vmp.service.ISendRtpServerService; +import com.genersoft.iot.vmp.service.impl.SendRtpServerServiceImpl; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -56,6 +58,9 @@ public class SipRunner implements CommandLineRunner { @Autowired private ISIPCommanderForPlatform commanderForPlatform; + @Autowired + private ISendRtpServerService sendRtpServerService; + @Override public void run(String... args) throws Exception { List deviceList = deviceService.getAllOnlineDevice(); @@ -99,7 +104,7 @@ public class SipRunner implements CommandLineRunner { if (channel == null){ continue; } - redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(),channel.getGbDeviceId(), sendRtpItem.getCallId(),sendRtpItem.getStream()); + sendRtpServerService.delete(sendRtpItem); if (mediaServerItem != null) { ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc()); boolean stopResult = mediaServerService.initStopSendRtp(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index d91910b3..b0a12b24 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -16,6 +16,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.service.IMediaServerService; +import com.genersoft.iot.vmp.service.ISendRtpServerService; import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; @@ -77,6 +78,9 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In @Autowired private IPlayService playService; + @Autowired + private ISendRtpServerService sendRtpServerService; + /** * 处理 ACK请求 @@ -88,7 +92,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In String fromUserId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser(); String toUserId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); log.info("[收到ACK]: 来自->{}", fromUserId); - SendRtpInfo sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId()); + SendRtpInfo sendRtpItem = sendRtpServerService.queryByCallId(callIdHeader.getCallId()); if (sendRtpItem == null) { log.warn("[收到ACK]:未找到来自{},callId: {}", fromUserId, callIdHeader.getCallId()); return; @@ -112,7 +116,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In if (parentPlatform != null) { DeviceChannel deviceChannel = deviceChannelService.getOneById(sendRtpItem.getChannelId()); if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) { - WVPResult wvpResult = redisRpcService.startSendRtp(sendRtpItem.getRedisKey(), sendRtpItem); + WVPResult wvpResult = redisRpcService.startSendRtp(sendRtpItem.getChannelId(), sendRtpItem); if (wvpResult.getCode() == 0) { redisCatchStorage.sendPlatformStartPlayMsg(sendRtpItem, deviceChannel, parentPlatform); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index 32882564..ede7dbf4 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -16,6 +16,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorP import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.service.IMediaServerService; +import com.genersoft.iot.vmp.service.ISendRtpServerService; import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; @@ -46,7 +47,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In private ISIPCommander cmder; @Autowired - private ISIPCommanderForPlatform commanderForPlatform; + private ISendRtpServerService sendRtpServerService; @Autowired private IRedisCatchStorage redisCatchStorage; @@ -112,7 +113,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In log.error("[回复BYE信息失败],{}", e.getMessage()); } CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME); - SendRtpInfo sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId()); + SendRtpInfo sendRtpItem = sendRtpServerService.queryByCallId(callIdHeader.getCallId()); // 收流端发送的停止 if (sendRtpItem != null){ @@ -128,11 +129,11 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In if (platform != null) { redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, platform, channel); if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) { - redisRpcService.stopSendRtp(sendRtpItem.getRedisKey()); - redisCatchStorage.deleteSendRTPServer(null, null, sendRtpItem.getCallId(), null); + redisRpcService.stopSendRtp(sendRtpItem.getChannelId()); + sendRtpServerService.deleteByCallId(sendRtpItem.getCallId()); }else { MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - redisCatchStorage.deleteSendRTPServer(null, null, callIdHeader.getCallId(), null); + sendRtpServerService.deleteByCallId(callIdHeader.getCallId()); mediaServerService.stopSendRtp(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc()); if (userSetting.getUseCustomSsrcForParentInvite()) { mediaServerService.releaseSsrc(mediaServer.getId(), sendRtpItem.getSsrc()); @@ -143,8 +144,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In } }else { MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), - callIdHeader.getCallId(), null); + sendRtpServerService.delete(sendRtpItem); mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc()); if (userSetting.getUseCustomSsrcForParentInvite()) { mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index f713e1ef..d8973dc3 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -20,8 +20,8 @@ import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; +import com.genersoft.iot.vmp.service.ISendRtpServerService; import com.genersoft.iot.vmp.service.bean.InviteErrorCode; -import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; @@ -75,13 +75,13 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements private IGbChannelPlayService channelPlayService; @Autowired - private IStreamProxyService streamProxyService; + private ISendRtpServerService sendRtpServerService; @Autowired private IRedisCatchStorage redisCatchStorage; @Autowired - private IRedisRpcService redisRpcService; + private IMediaServerService mediaServerService; @Autowired private RedisTemplate redisTemplate; @@ -101,9 +101,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @Autowired private AudioBroadcastManager audioBroadcastManager; - @Autowired - private IMediaServerService mediaServerService; - @Autowired private HookSubscribe hookSubscribe; @@ -182,7 +179,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements }else { // 点播成功, TODO 可以在此处检测cancel命令是否存在,存在则不发送 // 构建sendRTP内容 - SendRtpInfo sendRtpItem = mediaServerService.createSendRtpItem(streamInfo.getMediaServer(), + SendRtpInfo sendRtpItem = sendRtpServerService.createSendRtpInfo(streamInfo.getMediaServer(), inviteInfo.getIp(), inviteInfo.getPort(), inviteInfo.getSsrc(), platform.getServerGBId(), streamInfo.getApp(), streamInfo.getStream(), channel.getGbId(), inviteInfo.isTcp(), platform.isRtcp()); @@ -193,7 +190,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements sendRtpItem.setCallId(inviteInfo.getCallId()); sendRtpItem.setPlayType("Play".equalsIgnoreCase(inviteInfo.getSessionName()) ? InviteStreamType.PLAY : InviteStreamType.PLAYBACK); - redisCatchStorage.updateSendRTPSever(sendRtpItem); + sendRtpServerService.update(sendRtpItem); String sdpIp = streamInfo.getMediaServer().getSdpIp(); if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) { sdpIp = platform.getSendStreamIp(); @@ -733,7 +730,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements private void sendBye(Platform platform, String callId) { try { - SendRtpInfo sendRtpItem = redisCatchStorage.querySendRTPServer(platform.getServerGBId(), null, null, callId); + SendRtpInfo sendRtpItem = sendRtpServerService.queryByCallId(callId); if (sendRtpItem == null) { return; } @@ -878,7 +875,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements mediaTransmissionTCP ? (tcpActive ? "TCP主动" : "TCP被动") : "UDP", sdp.getSessionName().getValue()); CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME); - SendRtpInfo sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, addressStr, port, gb28181Sdp.getSsrc(), requesterId, + SendRtpInfo sendRtpItem = sendRtpServerService.createSendRtpInfo(mediaServerItem, addressStr, port, gb28181Sdp.getSsrc(), requesterId, device.getDeviceId(), deviceChannel.getId(), mediaTransmissionTCP, false); @@ -894,7 +891,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements return; } - sendRtpItem.setPlayType(InviteStreamType.BROADCAST); sendRtpItem.setCallId(callIdHeader.getCallId()); sendRtpItem.setPlatformId(requesterId); @@ -910,7 +906,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements sendRtpItem.setTcpActive(tcpActive); } - redisCatchStorage.updateSendRTPSever(sendRtpItem); + sendRtpServerService.update(sendRtpItem); Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, broadcastCatch.getApp(), broadcastCatch.getStream()); if (streamReady) { @@ -944,7 +940,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements SIPResponse sipResponse = null; try { sendRtpItem.setStatus(2); - redisCatchStorage.updateSendRTPSever(sendRtpItem); + sendRtpServerService.update(sendRtpItem); StringBuffer content = new StringBuffer(200); content.append("v=0\r\n"); content.append("o=" + config.getId() + " " + sdp.getOrigin().getSessionId() + " " + sdp.getOrigin().getSessionVersion() + " IN IP4 " + mediaServerItem.getSdpIp() + "\r\n"); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/info/InfoRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/info/InfoRequestProcessor.java index 68482137..d56f59ed 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/info/InfoRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/info/InfoRequestProcessor.java @@ -13,6 +13,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; +import com.genersoft.iot.vmp.service.ISendRtpServerService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import gov.nist.javax.sip.message.SIPRequest; import lombok.extern.slf4j.Slf4j; @@ -64,6 +65,9 @@ public class InfoRequestProcessor extends SIPRequestProcessorParent implements I @Autowired private SipInviteSessionManager sessionManager; + @Autowired + private ISendRtpServerService sendRtpServerService; + @Override public void afterPropertiesSet() throws Exception { // 添加消息处理的订阅 @@ -108,7 +112,7 @@ public class InfoRequestProcessor extends SIPRequestProcessorParent implements I String contentType = header.getContentType(); String contentSubType = header.getContentSubType(); if ("Application".equalsIgnoreCase(contentType) && "MANSRTSP".equalsIgnoreCase(contentSubType)) { - SendRtpInfo sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId()); + SendRtpInfo sendRtpItem = sendRtpServerService.queryByCallId(callIdHeader.getCallId()); String streamId = sendRtpItem.getStream(); InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(InviteSessionType.PLAYBACK, streamId); if (null == inviteInfo) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java index 5a3f703d..f2276072 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java @@ -10,6 +10,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessag import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.NotifyMessageHandler; import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.service.IMediaServerService; +import com.genersoft.iot.vmp.service.ISendRtpServerService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import gov.nist.javax.sip.message.SIPRequest; @@ -67,6 +68,9 @@ public class BroadcastNotifyMessageHandler extends SIPRequestProcessorParent imp @Autowired private IRedisCatchStorage redisCatchStorage; + @Autowired + private ISendRtpServerService sendRtpServerService; + @Override public void afterPropertiesSet() throws Exception { notifyMessageHandler.addHandler(cmdType, this); @@ -147,7 +151,7 @@ public class BroadcastNotifyMessageHandler extends SIPRequestProcessorParent imp broadcastCatch.setMediaServerItem(hookData.getMediaServer()); audioBroadcastManager.update(broadcastCatch); // 推流到设备 - SendRtpInfo sendRtpItem = redisCatchStorage.querySendRTPServer(null, targetId, hookData.getStream(), null); + SendRtpInfo sendRtpItem = sendRtpServerService.queryByStream(null, targetId, hookData.getStream(), null); if (sendRtpItem == null) { log.warn("[国标级联] 语音喊话 异常,未找到发流信息, channelId: {}, stream: {}", targetId, hookData.getStream()); log.info("[国标级联] 语音喊话 重新开始,channelId: {}, stream: {}", targetId, hookData.getStream()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java index c9d835fc..5e06c65b 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java @@ -16,6 +16,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify. import com.genersoft.iot.vmp.media.event.hook.Hook; import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.event.hook.HookType; +import com.genersoft.iot.vmp.service.ISendRtpServerService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import gov.nist.javax.sip.message.SIPRequest; import lombok.extern.slf4j.Slf4j; @@ -69,6 +70,9 @@ public class MediaStatusNotifyMessageHandler extends SIPRequestProcessorParent i @Autowired private IDeviceChannelService deviceChannelService; + @Autowired + private ISendRtpServerService sendRtpServerService; + @Override public void afterPropertiesSet() throws Exception { notifyMessageHandler.addHandler(cmdType, this); @@ -110,7 +114,7 @@ public class MediaStatusNotifyMessageHandler extends SIPRequestProcessorParent i Hook hook = Hook.getInstance(HookType.on_media_arrival, "rtp", ssrcTransaction.getStream(), ssrcTransaction.getMediaServerId()); subscribe.removeSubscribe(hook); // 如果级联播放,需要给上级发送此通知 TODO 多个上级同时观看一个下级 可能存在停错的问题,需要将点播CallId进行上下级绑定 - SendRtpInfo sendRtpItem = redisCatchStorage.querySendRTPServer(null, ssrcTransaction.getChannelId(), null, null); + SendRtpInfo sendRtpItem = sendRtpServerService.queryByChannelId(ssrcTransaction.getChannelId()); if (sendRtpItem != null) { Platform parentPlatform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId()); if (parentPlatform == null) { diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java index 59fee102..3d1f7880 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java @@ -146,11 +146,6 @@ public interface IMediaServerService { void startSendRtp(MediaServer mediaServer, SendRtpInfo sendRtpItem); - SendRtpInfo createSendRtpItem(MediaServer mediaServerItem, String addressStr, int port, String ssrc, String requesterId, String deviceId, Integer channelId, boolean mediaTransmissionTCP, boolean rtcp); - - SendRtpInfo createSendRtpItem(MediaServer serverItem, String ip, int port, String ssrc, String platformId, - String app, String stream, Integer channelId, boolean tcp, boolean rtcp); - MediaServer getMediaServerByAppAndStream(String app, String stream); Long updateDownloadProcess(MediaServer mediaServerItem, String app, String stream); diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java index 6d26e0a6..e55758a2 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java @@ -82,11 +82,6 @@ public class MediaServerServiceImpl implements IMediaServerService { @Autowired private MediaConfig mediaConfig; - @Autowired - private SendRtpPortManager sendRtpPortManager; - - - /** * 流到来的处理 */ @@ -867,50 +862,7 @@ public class MediaServerServiceImpl implements IMediaServerService { mediaNodeServerService.startSendRtpStream(mediaServer, sendRtpItem); } - @Override - public SendRtpInfo createSendRtpItem(MediaServer mediaServer, String ip, int port, String ssrc, String requesterId, String deviceId, Integer channelId, boolean isTcp, boolean rtcp) { - int localPort = sendRtpPortManager.getNextPort(mediaServer); - if (localPort == 0) { - return null; - } - SendRtpInfo sendRtpItem = new SendRtpInfo(); - sendRtpItem.setIp(ip); - sendRtpItem.setPort(port); - sendRtpItem.setSsrc(ssrc); - sendRtpItem.setDeviceId(deviceId); - sendRtpItem.setChannelId(channelId); - sendRtpItem.setTcp(isTcp); - sendRtpItem.setRtcp(rtcp); - sendRtpItem.setApp("rtp"); - sendRtpItem.setLocalPort(localPort); - sendRtpItem.setServerId(userSetting.getServerId()); - sendRtpItem.setMediaServerId(mediaServer.getId()); - return sendRtpItem; - } - @Override - public SendRtpInfo createSendRtpItem(MediaServer serverItem, String ip, int port, String ssrc, String platformId, - String app, String stream, Integer channelId, boolean tcp, boolean rtcp){ - - int localPort = sendRtpPortManager.getNextPort(serverItem); - if (localPort <= 0) { - throw new PlayException(javax.sip.message.Response.SERVER_INTERNAL_ERROR, "server internal error"); - } - SendRtpInfo sendRtpItem = new SendRtpInfo(); - sendRtpItem.setIp(ip); - sendRtpItem.setPort(port); - sendRtpItem.setSsrc(ssrc); - sendRtpItem.setApp(app); - sendRtpItem.setStream(stream); - sendRtpItem.setPlatformId(platformId); - sendRtpItem.setChannelId(channelId); - sendRtpItem.setTcp(tcp); - sendRtpItem.setLocalPort(localPort); - sendRtpItem.setServerId(userSetting.getServerId()); - sendRtpItem.setMediaServerId(serverItem.getId()); - sendRtpItem.setRtcp(rtcp); - return sendRtpItem; - } @Override public MediaServer getMediaServerByAppAndStream(String app, String stream) { diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/SendRtpPortManager.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/SendRtpPortManager.java index 9db19e9d..b756b9ec 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/SendRtpPortManager.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/SendRtpPortManager.java @@ -34,7 +34,7 @@ public class SendRtpPortManager { return -1; } String sendIndexKey = KEY + userSetting.getServerId() + "_" + mediaServer.getId(); - String key = VideoManagerConstants.SEND_RTP_INFO_PREFIX + String key = VideoManagerConstants.SEND_RTP_INFO + userSetting.getServerId() + "_*"; List queryResult = RedisUtil.scan(redisTemplate, key); Map sendRtpItemMap = new HashMap<>(); diff --git a/src/main/java/com/genersoft/iot/vmp/service/ISendRtpServerService.java b/src/main/java/com/genersoft/iot/vmp/service/ISendRtpServerService.java index 1912c82d..7c3590c8 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/ISendRtpServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/ISendRtpServerService.java @@ -1,5 +1,35 @@ package com.genersoft.iot.vmp.service; +import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo; +import com.genersoft.iot.vmp.media.bean.MediaServer; + +import java.util.List; + public interface ISendRtpServerService { + SendRtpInfo createSendRtpInfo(MediaServer mediaServer, String ip, int port, String ssrc, String requesterId, + String deviceId, Integer channelId, boolean isTcp, boolean rtcp); + + SendRtpInfo createSendRtpInfo(MediaServer mediaServer, String ip, int port, String ssrc, String platformId, + String app, String stream, Integer channelId, boolean tcp, boolean rtcp); + + void update(SendRtpInfo sendRtpItem); + + SendRtpInfo queryByChannelId(Integer channelId); + + SendRtpInfo queryByCallId(String callId); + + SendRtpInfo queryByStream(String stream); + + void delete(SendRtpInfo sendRtpInfo); + + void deleteByCallId(String callId); + + void deleteByStream(String Stream); + + void deleteByChannel(Integer channelId); + + List queryAll(); + + boolean isChannelSendingRTP(Integer channelId); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java index ace08fe4..9df65333 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java @@ -252,6 +252,7 @@ public class MediaServiceImpl implements IMediaService { sendRtpItem.getCallId(), sendRtpItem.getStream()); if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) { redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, parentPlatform, channel); + redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, parentPlatform, channel); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/SendRtpServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/SendRtpServerServiceImpl.java index 506a5ad5..d2975dc9 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/SendRtpServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/SendRtpServerServiceImpl.java @@ -1,12 +1,142 @@ package com.genersoft.iot.vmp.service.impl; +import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.gb28181.bean.PlayException; +import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo; +import com.genersoft.iot.vmp.media.bean.MediaServer; +import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; import com.genersoft.iot.vmp.service.ISendRtpServerService; +import com.genersoft.iot.vmp.utils.JsonUtil; +import com.genersoft.iot.vmp.utils.redis.RedisUtil; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; +import java.util.ArrayList; +import java.util.List; + @Service @Slf4j public class SendRtpServerServiceImpl implements ISendRtpServerService { - + @Autowired + private UserSetting userSetting; + + @Autowired + private SendRtpPortManager sendRtpPortManager; + + @Autowired + private RedisTemplate redisTemplate; + + @Override + public SendRtpInfo createSendRtpInfo(MediaServer mediaServer, String ip, int port, String ssrc, String requesterId, + String deviceId, Integer channelId, boolean isTcp, boolean rtcp) { + int localPort = sendRtpPortManager.getNextPort(mediaServer); + if (localPort == 0) { + return null; + } + return SendRtpInfo.getInstance(localPort, mediaServer, ip, port, ssrc, deviceId, null, channelId, + isTcp, rtcp, userSetting.getServerId()); + } + + @Override + public SendRtpInfo createSendRtpInfo(MediaServer mediaServer, String ip, int port, String ssrc, String platformId, + String app, String stream, Integer channelId, boolean tcp, boolean rtcp){ + + int localPort = sendRtpPortManager.getNextPort(mediaServer); + if (localPort <= 0) { + throw new PlayException(javax.sip.message.Response.SERVER_INTERNAL_ERROR, "server internal error"); + } + return SendRtpInfo.getInstance(localPort, mediaServer, ip, port, ssrc, null, platformId, channelId, + tcp, rtcp, userSetting.getServerId()); + } + + @Override + public void update(SendRtpInfo sendRtpItem) { + redisTemplate.opsForValue().set(VideoManagerConstants.SEND_RTP_INFO_CALLID + sendRtpItem.getCallId(), sendRtpItem); + redisTemplate.opsForValue().set(VideoManagerConstants.SEND_RTP_INFO_STREAM + sendRtpItem.getStream(), sendRtpItem); + redisTemplate.opsForValue().set(VideoManagerConstants.SEND_RTP_INFO_CHANNEL + sendRtpItem.getChannelId(), sendRtpItem); + } + + @Override + public SendRtpInfo queryByChannelId(Integer channelId) { + String key = VideoManagerConstants.SEND_RTP_INFO_CHANNEL + channelId; + return JsonUtil.redisJsonToObject(redisTemplate, key, SendRtpInfo.class); + } + + @Override + public SendRtpInfo queryByCallId(String callId) { + String key = VideoManagerConstants.SEND_RTP_INFO_CALLID + callId; + return JsonUtil.redisJsonToObject(redisTemplate, key, SendRtpInfo.class); + } + + @Override + public SendRtpInfo queryByStream(String stream, String targetId) { + String key = VideoManagerConstants.SEND_RTP_INFO_STREAM + stream; + return JsonUtil.redisJsonToObject(redisTemplate, key, SendRtpInfo.class); + } + + /** + * 删除RTP推送信息缓存 + */ + @Override + public void delete(SendRtpInfo sendRtpInfo) { + if (sendRtpInfo == null) { + return; + } + redisTemplate.delete(VideoManagerConstants.SEND_RTP_INFO_CALLID + sendRtpInfo.getCallId()); + redisTemplate.delete(VideoManagerConstants.SEND_RTP_INFO_STREAM + sendRtpInfo.getStream()); + redisTemplate.delete(VideoManagerConstants.SEND_RTP_INFO_CHANNEL + sendRtpInfo.getChannelId()); + } + @Override + public void deleteByCallId(String callId) { + SendRtpInfo sendRtpInfo = queryByCallId(callId); + if (sendRtpInfo == null) { + return; + } + delete(sendRtpInfo); + } + @Override + public void deleteByStream(String Stream) { + SendRtpInfo sendRtpInfo = queryByStream(Stream); + if (sendRtpInfo == null) { + return; + } + delete(sendRtpInfo); + } + @Override + public void deleteByChannel(Integer channelId) { + SendRtpInfo sendRtpInfo = queryByChannelId(channelId); + if (sendRtpInfo == null) { + return; + } + delete(sendRtpInfo); + } + + @Override + public List queryAll() { + String key = VideoManagerConstants.SEND_RTP_INFO_CALLID + + userSetting.getServerId() + ":*"; + List queryResult = RedisUtil.scan(redisTemplate, key); + List result= new ArrayList<>(); + + for (Object o : queryResult) { + String keyItem = (String) o; + result.add((SendRtpInfo) redisTemplate.opsForValue().get(keyItem)); + } + + return result; + } + + /** + * 查询某个通道是否存在上级点播(RTP推送) + */ + @Override + public boolean isChannelSendingRTP(Integer channelId) { + SendRtpInfo sendRtpInfo = queryByChannelId(channelId); + return sendRtpInfo != null; + } + } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java index 98a3a99c..29bb5cbf 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java @@ -7,17 +7,17 @@ import com.genersoft.iot.vmp.vmanager.bean.WVPResult; public interface IRedisRpcService { - SendRtpInfo getSendRtpItem(String sendRtpItemKey); + SendRtpInfo getSendRtpItem(Integer sendRtpChannelId); - WVPResult startSendRtp(String sendRtpItemKey, SendRtpInfo sendRtpItem); + WVPResult startSendRtp(Integer sendRtpChannelId, SendRtpInfo sendRtpItem); - WVPResult stopSendRtp(String sendRtpItemKey); + WVPResult stopSendRtp(Integer sendRtpChannelId); long waitePushStreamOnline(SendRtpInfo sendRtpItem, CommonCallback callback); void stopWaitePushStreamOnline(SendRtpInfo sendRtpItem); - void rtpSendStopped(String sendRtpItemKey); + void rtpSendStopped(Integer sendRtpChannelId); void removeCallback(long key); diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java index 43580e07..e14c19e5 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java @@ -19,6 +19,8 @@ import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.event.hook.HookType; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; +import com.genersoft.iot.vmp.service.ISendRtpServerService; +import com.genersoft.iot.vmp.service.impl.SendRtpServerServiceImpl; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; @@ -62,6 +64,8 @@ public class RedisRpcController { @Autowired private IPlatformService platformService; + @Autowired + private ISendRtpServerService sendRtpServerService; /** @@ -99,7 +103,7 @@ public class RedisRpcController { String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); sendRtpItem.setSsrc(ssrc); } - redisCatchStorage.updateSendRTPSever(sendRtpItem); + sendRtpServerService.update(sendRtpItem); redisTemplate.opsForValue().set(sendRtpItemKey, sendRtpItem); RedisRpcResponse response = request.getResponse(); response.setStatusCode(200); @@ -127,9 +131,9 @@ public class RedisRpcController { sendRtpItem.setLocalIp(mediaServer.getSdpIp()); sendRtpItem.setServerId(userSetting.getServerId()); - redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem); + sendRtpServerService.update(sendRtpItem); RedisRpcResponse response = request.getResponse(); - response.setBody(sendRtpItem.getRedisKey()); + response.setBody(sendRtpItem.getChannelId()); response.setStatusCode(200); } // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者 @@ -146,9 +150,9 @@ public class RedisRpcController { sendRtpItem.setLocalIp(hookData.getMediaServer().getSdpIp()); sendRtpItem.setServerId(userSetting.getServerId()); - redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem); + redisTemplate.opsForValue().set(sendRtpItem.getChannelId(), sendRtpItem); RedisRpcResponse response = request.getResponse(); - response.setBody(sendRtpItem.getRedisKey()); + response.setBody(sendRtpItem.getChannelId()); response.setStatusCode(200); // 手动发送结果 sendResponse(response); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index 308dd95c..6d705101 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -38,33 +38,6 @@ public interface IRedisCatchStorage { void delPlatformRegisterInfo(String callId); - void updateSendRTPSever(SendRtpInfo sendRtpItem); - - List querySendRTPServer(String platformGbId, String channelId, String streamId); - - /** - * 查询RTP推送信息缓存 - * @param platformGbId - * @param channelId - * @return sendRtpItem - */ - SendRtpInfo querySendRTPServer(String platformGbId, String channelId, String streamId, String callId); - - List querySendRTPServer(String platformGbId); - - /** - * 删除RTP推送信息缓存 - * @param platformGbId - * @param channelId - */ - void deleteSendRTPServer(String platformGbId, String channelId, String callId, String streamId); - - /** - * 查询某个通道是否存在上级点播(RTP推送) - * @param channelId - */ - boolean isChannelSendingRTP(String channelId); - /** * 在redis添加wvp的信息 */ @@ -182,10 +155,6 @@ public interface IRedisCatchStorage { */ void sendStreamPushRequestedMsgForStatus(); - List querySendRTPServerByChannelId(String channelId); - - List querySendRTPServerByStream(String stream); - SystemAllInfo getSystemInfo(); int getPushStreamCount(String id); @@ -196,8 +165,6 @@ public interface IRedisCatchStorage { void addDiskInfo(List> diskInfo); - void deleteSendRTPServer(SendRtpInfo sendRtpItem); - List queryAllSendRTPServer(); List getAllDevices(); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index fb40b946..33002365 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -33,6 +33,7 @@ import java.util.*; @Component public class RedisCatchStorageImpl implements IRedisCatchStorage { + @Autowired private DeviceChannelMapper deviceChannelMapper; @@ -48,6 +49,11 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @Autowired private StringRedisTemplate stringRedisTemplate; + @Override + public List queryAllSendRTPServer() { + return Collections.emptyList(); + } + @Override public Long getCSEQ() { String key = VideoManagerConstants.SIP_CSEQ_PREFIX + userSetting.getServerId(); @@ -133,181 +139,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { redisTemplate.delete(VideoManagerConstants.PLATFORM_REGISTER_INFO_PREFIX + userSetting.getServerId() + "_" + callId); } - @Override - public void updateSendRTPSever(SendRtpInfo sendRtpItem) { - redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem); - } - @Override - public List querySendRTPServer(String platformGbId, String channelId, String streamId) { - String scanKey = VideoManagerConstants.SEND_RTP_INFO_PREFIX - + userSetting.getServerId() + "_*_" - + platformGbId + "_" - + channelId + "_" - + streamId + "_" - + "*"; - List result = new ArrayList<>(); - List scan = RedisUtil.scan(redisTemplate, scanKey); - if (!scan.isEmpty()) { - for (Object o : scan) { - String key = (String) o; - result.add(JsonUtil.redisJsonToObject(redisTemplate, key, SendRtpInfo.class)); - } - } - return result; - } - - @Override - public SendRtpInfo querySendRTPServer(String platformGbId, String channelId, String streamId, String callId) { - if (platformGbId == null) { - platformGbId = "*"; - } - if (channelId == null) { - channelId = "*"; - } - if (streamId == null) { - streamId = "*"; - } - if (callId == null) { - callId = "*"; - } - String key = VideoManagerConstants.SEND_RTP_INFO_PREFIX - + "*_*_" - + platformGbId + "_" - + channelId + "_" - + streamId + "_" - + callId; - List scan = RedisUtil.scan(redisTemplate, key); - if (scan.size() > 0) { - return (SendRtpInfo)redisTemplate.opsForValue().get(scan.get(0)); - }else { - return null; - } - } - - @Override - public List querySendRTPServerByChannelId(String channelId) { - if (channelId == null) { - return null; - } - String platformGbId = "*"; - String callId = "*"; - String streamId = "*"; - String key = VideoManagerConstants.SEND_RTP_INFO_PREFIX - + userSetting.getServerId() + "_*_" - + platformGbId + "_" - + channelId + "_" - + streamId + "_" - + callId; - List scan = RedisUtil.scan(redisTemplate, key); - List result = new ArrayList<>(); - for (Object o : scan) { - result.add((SendRtpInfo) redisTemplate.opsForValue().get(o)); - } - return result; - } - - @Override - public List querySendRTPServerByStream(String stream) { - if (stream == null) { - return null; - } - String platformGbId = "*"; - String callId = "*"; - String channelId = "*"; - String key = VideoManagerConstants.SEND_RTP_INFO_PREFIX - + userSetting.getServerId() + "_*_" - + platformGbId + "_" - + channelId + "_" - + stream + "_" - + callId; - List scan = RedisUtil.scan(redisTemplate, key); - List result = new ArrayList<>(); - for (Object o : scan) { - result.add((SendRtpInfo) redisTemplate.opsForValue().get(o)); - } - return result; - } - - @Override - public List querySendRTPServer(String platformGbId) { - if (platformGbId == null) { - platformGbId = "*"; - } - String key = VideoManagerConstants.SEND_RTP_INFO_PREFIX - + userSetting.getServerId() + "_*_" - + platformGbId + "_*" + "_*" + "_*"; - List queryResult = RedisUtil.scan(redisTemplate, key); - List result= new ArrayList<>(); - - for (Object o : queryResult) { - String keyItem = (String) o; - result.add((SendRtpInfo) redisTemplate.opsForValue().get(keyItem)); - } - - return result; - } - - /** - * 删除RTP推送信息缓存 - */ - @Override - public void deleteSendRTPServer(String platformGbId, String channelId, String callId, String streamId) { - if (streamId == null) { - streamId = "*"; - } - if (callId == null) { - callId = "*"; - } - String key = VideoManagerConstants.SEND_RTP_INFO_PREFIX - + userSetting.getServerId() + "_*_" - + platformGbId + "_" - + channelId + "_" - + streamId + "_" - + callId; - List scan = RedisUtil.scan(redisTemplate, key); - if (scan.size() > 0) { - for (Object keyStr : scan) { - log.info("[删除 redis的SendRTP]: {}", keyStr.toString()); - redisTemplate.delete(keyStr); - } - } - } - - /** - * 删除RTP推送信息缓存 - */ - @Override - public void deleteSendRTPServer(SendRtpInfo sendRtpItem) { - deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),sendRtpItem.getCallId(), sendRtpItem.getStream()); - } - - @Override - public List queryAllSendRTPServer() { - String key = VideoManagerConstants.SEND_RTP_INFO_PREFIX - + userSetting.getServerId() + "_*"; - List queryResult = RedisUtil.scan(redisTemplate, key); - List result= new ArrayList<>(); - - for (Object o : queryResult) { - String keyItem = (String) o; - result.add((SendRtpInfo) redisTemplate.opsForValue().get(keyItem)); - } - - return result; - } - - /** - * 查询某个通道是否存在上级点播(RTP推送) - */ - @Override - public boolean isChannelSendingRTP(String channelId) { - String key = VideoManagerConstants.SEND_RTP_INFO_PREFIX - + userSetting.getServerId() + "_*_*_" - + channelId + "*_" + "*_"; - List RtpStreams = RedisUtil.scan(redisTemplate, key); - return RtpStreams.size() > 0; - } @Override public void updateWVPInfo(JSONObject jsonObject, int time) { @@ -626,7 +458,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @Override public int getGbSendCount(String id) { - String key = VideoManagerConstants.SEND_RTP_INFO_PREFIX + String key = VideoManagerConstants.SEND_RTP_INFO + userSetting.getServerId() + "_*_" + id + "_*"; return RedisUtil.scan(redisTemplate, key).size(); }