diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/SipRunner.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/SipRunner.java index 9d5843ba..c5403e92 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/SipRunner.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/SipRunner.java @@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; +import com.genersoft.iot.vmp.media.zlm.IStreamSendManager; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IDeviceService; @@ -39,7 +40,7 @@ import java.util.Map; public class SipRunner implements CommandLineRunner { @Autowired - private IVideoManagerStorage storager; + private IStreamSendManager streamSendManager; @Autowired private IRedisCatchStorage redisCatchStorage; @@ -102,11 +103,11 @@ public class SipRunner implements CommandLineRunner { // 查找国标推流 - List sendRtpItems = redisCatchStorage.queryAllSendRTPServer(); + List sendRtpItems = streamSendManager.getAll(); if (sendRtpItems.size() > 0) { for (SendRtpItem sendRtpItem : sendRtpItems) { MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - redisCatchStorage.deleteSendRTPServer(sendRtpItem.getDestId(),sendRtpItem.getChannelId(), sendRtpItem.getCallId(),sendRtpItem.getStreamId()); + streamSendManager.remove(sendRtpItem); if (mediaServerItem != null) { ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc()); Map param = new HashMap<>(); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index a6b8dbbc..7bb5a1af 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -10,6 +10,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.SIPSender; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderPlarformProvider; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; +import com.genersoft.iot.vmp.media.zlm.IStreamSendManager; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; @@ -60,7 +61,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { private ZLMServerFactory zlmServerFactory; @Autowired - private SipLayer sipLayer; + private IStreamSendManager streamSendManager; @Autowired private SIPSender sipSender; @@ -816,7 +817,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { if (platform == null) { return; } - SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platform.getServerGBId(), null, null, callId); + SendRtpItem sendRtpItem = streamSendManager.getByCallId(callId); if (sendRtpItem != null) { streamByeCmd(platform, sendRtpItem); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index d189048b..6cbb2cb2 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -12,6 +12,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; +import com.genersoft.iot.vmp.media.zlm.IStreamSendManager; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; @@ -70,7 +71,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In private ZLMServerFactory zlmServerFactory; @Autowired - private ZlmHttpHookSubscribe hookSubscribe; + private IStreamSendManager streamSendManager; @Autowired private IMediaServerService mediaServerService; @@ -106,7 +107,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In // 取消设置的超时任务 dynamicTask.stop(callIdHeader.getCallId()); String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); - SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId, null, callIdHeader.getCallId()); + SendRtpItem sendRtpItem = streamSendManager.getByCallId(callIdHeader.getCallId()); if (sendRtpItem == null) { logger.warn("[收到ACK]:未找到通道({})的推流信息", channelId); return; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index 15489114..1edf1776 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -1,5 +1,6 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; +import com.genersoft.iot.vmp.common.CommonGbChannel; import com.genersoft.iot.vmp.common.InviteInfo; import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.conf.UserSetting; @@ -10,6 +11,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; +import com.genersoft.iot.vmp.media.zlm.IStreamSendManager; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.*; @@ -44,6 +46,12 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In @Autowired private ISIPCommander cmder; + @Autowired + private IStreamSendManager streamSendManager; + + @Autowired + private ICommonGbChannelService commonGbChannelService; + @Autowired private IRedisCatchStorage redisCatchStorage; @@ -77,6 +85,9 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In @Autowired private UserSetting userSetting; + @Autowired + private Map resourceServiceMap; + @Override public void afterPropertiesSet() throws Exception { // 添加消息处理的订阅 @@ -97,10 +108,9 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In } CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME); - SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId()); - + SendRtpItem sendRtpItem = streamSendManager.getByCallId(callIdHeader.getCallId()); if (sendRtpItem != null){ - logger.info("[收到bye] 来自平台{}, 停止通道:{}", sendRtpItem.getDestId(), sendRtpItem.getChannelId()); + logger.info("[收到bye] 来自{}, 停止通道:{}", sendRtpItem.getDestId(), sendRtpItem.getChannelId()); String streamId = sendRtpItem.getStreamId(); Map param = new HashMap<>(); param.put("vhost","__defaultVhost__"); @@ -109,9 +119,13 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In param.put("ssrc",sendRtpItem.getSsrc()); logger.info("[收到bye] 停止向上级推流:{}", streamId); MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - redisCatchStorage.deleteSendRTPServer(sendRtpItem.getDestId(), sendRtpItem.getChannelId(), - callIdHeader.getCallId(), null); zlmServerFactory.stopSendRtpStream(mediaInfo, param); + streamSendManager.remove(sendRtpItem); + CommonGbChannel channel = commonGbChannelService.getChannel(sendRtpItem.getChannelId()); + IResourceService resourceService = resourceServiceMap.get(channel.getType()); + if (resourceService != null) { + resourceService.stopPlay(channel, null); + } if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) { ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getDestId()); if (platform != null) { @@ -124,25 +138,6 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In logger.info("[上级平台停止观看] 未找到平台{}的信息,发送redis消息失败", sendRtpItem.getDestId()); } } - - int totalReaderCount = zlmServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId); - if (totalReaderCount <= 0) { - logger.info("[收到bye] {} 无其它观看者,通知设备停止推流", streamId); - if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) { - - Device device = deviceService.getDevice(sendRtpItem.getDeviceId()); - if (device == null) { - logger.info("[收到bye] {} 通知设备停止推流时未找到设备信息", streamId); - } - try { - logger.info("[停止点播] {}", sendRtpItem.getChannelId()); - cmder.streamByeCmd(device, sendRtpItem.getChannelId(), streamId, null); - } catch (InvalidArgumentException | ParseException | SipException | - SsrcTransactionNotFoundException e) { - logger.error("[收到bye] {} 无其它观看者,通知设备停止推流, 发送BYE失败 {}",streamId, e.getMessage()); - } - } - } }else { // 可能是设备发送的停止 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 375daf30..12a4793b 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -14,11 +14,13 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; +import com.genersoft.iot.vmp.media.zlm.IStreamSendManager; import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.*; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; +import com.genersoft.iot.vmp.media.zlm.service.ISendRtpService; import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.service.bean.InviteErrorCode; @@ -81,6 +83,8 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @Autowired private IRedisCatchStorage redisCatchStorage; + private IStreamSendManager streamSendManager; + @Autowired private SSRCFactory ssrcFactory; @@ -265,7 +269,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements sendRtpItem.setPlayType(InviteStreamType.DOWNLOAD); } sendRtpItem.setStatus(1); - redisCatchStorage.updateSendRTPSever(sendRtpItem); + streamSendManager.update(sendRtpItem); StringBuffer content = new StringBuffer(200); content.append("v=0\r\n"); @@ -347,191 +351,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements resourceService.startDownload(channel, gb28181Sdp.getStartTime(), gb28181Sdp.getStopTime(), gb28181Sdp.getDownloadSpeed(), callback); } - - - - -// -// Device device = null; -// // 通过 channel 和 gbStream 是否为null 值判断来源是直播流合适国标 -// if (channel != null) { -// -// ErrorCallback hookEvent = (code, msg, data) -> { -// StreamInfo streamInfo = (StreamInfo)data; -// MediaServerItem mediaServerItemInUSe = mediaServerService.getOne(streamInfo.getMediaServerId()); -// logger.info("[上级Invite]下级已经开始推流。 回复200OK(SDP), {}/{}", streamInfo.getApp(), streamInfo.getStream()); -// // * 0 等待设备推流上来 -// // * 1 下级已经推流,等待上级平台回复ack -// // * 2 推流中 -// sendRtpItem.setStatus(1); -// redisCatchStorage.updateSendRTPSever(sendRtpItem); -// -// -// -// -// try { -// // 超时未收到Ack应该回复bye,当前等待时间为10秒 -// dynamicTask.startDelay(callIdHeader.getCallId(), () -> { -// logger.info("Ack 等待超时"); -// mediaServerService.releaseSsrc(mediaServerItemInUSe.getId(), sendRtpItem.getSsrc()); -// // 回复bye -// try { -// cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId()); -// } catch (SipException | InvalidArgumentException | ParseException e) { -// logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); -// } -// }, 60 * 1000); -// responseSdpAck(request, content.toString(), platform); -// // tcp主动模式,回复sdp后开启监听 -// if (sendRtpItem.isTcpActive()) { -// MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); -// Map param = new HashMap<>(12); -// param.put("vhost","__defaultVhost__"); -// param.put("app",sendRtpItem.getApp()); -// param.put("stream",sendRtpItem.getStreamId()); -// param.put("ssrc", sendRtpItem.getSsrc()); -// if (!sendRtpItem.isTcpActive()) { -// param.put("dst_url",sendRtpItem.getIp()); -// param.put("dst_port", sendRtpItem.getPort()); -// } -// String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; -// param.put("is_udp", is_Udp); -// param.put("src_port", localPort); -// param.put("pt", sendRtpItem.getPt()); -// param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); -// param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); -// if (!sendRtpItem.isTcp()) { -// // 开启rtcp保活 -// param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0"); -// } -// JSONObject startSendRtpStreamResult = zlmServerFactory.startSendRtpStreamForPassive(mediaInfo, param); -// if (startSendRtpStreamResult != null) { -// startSendRtpStreamHand(evt, sendRtpItem, null, startSendRtpStreamResult, param, callIdHeader); -// } -// } -// } catch (SipException | InvalidArgumentException | ParseException e) { -// logger.error("[命令发送失败] 国标级联 回复SdpAck", e); -// } -// }; -// ErrorCallback errorEvent = ((statusCode, msg, data) -> { -// // 未知错误。直接转发设备点播的错误 -// try { -// if (statusCode > 0) { -// Response response = getMessageFactory().createResponse(statusCode, evt.getRequest()); -// sipSender.transmitRequest(request.getLocalAddress().getHostAddress(), response); -// } -// } catch (ParseException | SipException e) { -// logger.error("未处理的异常 ", e); -// } -// }); -// sendRtpItem.setApp("rtp"); -// if ("Playback".equalsIgnoreCase(sessionName)) { -// sendRtpItem.setPlayType(InviteStreamType.PLAYBACK); -// String startTimeStr = DateUtil.urlFormatter.format(start); -// String endTimeStr = DateUtil.urlFormatter.format(end); -// String stream = device.getDeviceId() + "_" + channelId + "_" + startTimeStr + "_" + endTimeStr; -// SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, stream, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam()); -// sendRtpItem.setStreamId(ssrcInfo.getStream()); -// // 写入redis, 超时时回复 -// redisCatchStorage.updateSendRTPSever(sendRtpItem); -// playService.playBack(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start), -// DateUtil.formatter.format(end), -// (code, msg, data) -> { -// if (code == InviteErrorCode.SUCCESS.getCode()){ -// hookEvent.run(code, msg, data); -// }else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()){ -// logger.info("[录像回放]超时, 用户:{}, 通道:{}", username, channelId); -// redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); -// errorEvent.run(code, msg, data); -// }else { -// errorEvent.run(code, msg, data); -// } -// }); -// } else if ("Download".equalsIgnoreCase(sessionName)) { -// // 获取指定的下载速度 -// Vector sdpMediaDescriptions = sdp.getMediaDescriptions(true); -// MediaDescription mediaDescription = null; -// String downloadSpeed = "1"; -// if (sdpMediaDescriptions.size() > 0) { -// mediaDescription = (MediaDescription) sdpMediaDescriptions.get(0); -// } -// if (mediaDescription != null) { -// downloadSpeed = mediaDescription.getAttribute("downloadspeed"); -// } -// -// sendRtpItem.setPlayType(InviteStreamType.DOWNLOAD); -// SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam()); -// sendRtpItem.setStreamId(ssrcInfo.getStream()); -// // 写入redis, 超时时回复 -// redisCatchStorage.updateSendRTPSever(sendRtpItem); -// playService.download(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start), -// DateUtil.formatter.format(end), Integer.parseInt(downloadSpeed), -// (code, msg, data) -> { -// if (code == InviteErrorCode.SUCCESS.getCode()) { -// hookEvent.run(code, msg, data); -// } else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()) { -// logger.info("[录像下载]超时, 用户:{}, 通道:{}", username, channelId); -// redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); -// errorEvent.run(code, msg, data); -// } else { -// errorEvent.run(code, msg, data); -// } -// }); -// } else { -// -// SSRCInfo ssrcInfo = playService.play(mediaServerItem, device.getDeviceId(), channelId, ssrc, ((code, msg, data) -> { -// if (code == InviteErrorCode.SUCCESS.getCode()) { -// hookEvent.run(code, msg, data); -// } else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()) { -// logger.info("[上级点播]超时, 用户:{}, 通道:{}", username, channelId); -// redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); -// errorEvent.run(code, msg, data); -// } else { -// errorEvent.run(code, msg, data); -// } -// })); -// sendRtpItem.setPlayType(InviteStreamType.PLAY); -// String streamId = String.format("%s_%s", device.getDeviceId(), channelId); -// sendRtpItem.setStreamId(streamId); -// sendRtpItem.setSsrc(ssrcInfo.getSsrc()); -// redisCatchStorage.updateSendRTPSever(sendRtpItem); -// -// } -// } else if (gbStream != null) { -// -// String ssrc; -// if (userSetting.getUseCustomSsrcForParentInvite() || gb28181Sdp.getSsrc() == null) { -// // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 -// ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); -// }else { -// ssrc = gb28181Sdp.getSsrc(); -// } -// -// if("push".equals(gbStream.getStreamType())) { -// if (streamPushItem != null && streamPushItem.isPushIng()) { -// // 推流状态 -// pushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, -// mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); -// } else { -// // 未推流 拉起 -// notifyStreamOnline(evt, request,gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, -// mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); -// } -// }else if ("proxy".equals(gbStream.getStreamType())){ -// if (null != proxyByAppAndStream) { -// if(proxyByAppAndStream.isStatus()){ -// pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive, -// mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); -// }else{ -// //开启代理拉流 -// notifyStreamOnline(evt, request,gbStream, null, platform, callIdHeader, mediaServerItem, port, tcpActive, -// mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); -// } -// } -// -// -// } -// } } } catch (SdpParseException e) { logger.error("sdp解析错误", e); @@ -587,7 +406,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements if (response != null) { sendRtpItem.setToTag(response.getToTag()); } - redisCatchStorage.updateSendRTPSever(sendRtpItem); + streamSendManager.update(sendRtpItem); } } private void pushStream(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPush streamPushItem, ParentPlatform platform, @@ -625,7 +444,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements sendRtpItem.setToTag(response.getToTag()); } - redisCatchStorage.updateSendRTPSever(sendRtpItem); + streamSendManager.update(sendRtpItem); } else { // 不在线 拉起 @@ -740,7 +559,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements if (response != null) { sendRtpItem.setToTag(response.getToTag()); } - redisCatchStorage.updateSendRTPSever(sendRtpItem); + streamSendManager.update(sendRtpItem); } else { // 其他平台内容 otherWvpPushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, @@ -803,7 +622,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements if (response != null) { sendRtpItem.setToTag(response.getToTag()); } - redisCatchStorage.updateSendRTPSever(sendRtpItem); + streamSendManager.update(sendRtpItem); }, (wvpResult) -> { // 错误 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/info/InfoRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/info/InfoRequestProcessor.java index c794fdb2..8a394fca 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/info/InfoRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/info/InfoRequestProcessor.java @@ -10,6 +10,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; +import com.genersoft.iot.vmp.media.zlm.IStreamSendManager; import com.genersoft.iot.vmp.service.IInviteStreamService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; @@ -47,6 +48,9 @@ public class InfoRequestProcessor extends SIPRequestProcessorParent implements I @Autowired private IRedisCatchStorage redisCatchStorage; + @Autowired + private IStreamSendManager streamSendManager; + @Autowired private IInviteStreamService inviteStreamService; @@ -108,7 +112,8 @@ public class InfoRequestProcessor extends SIPRequestProcessorParent implements I String contentType = header.getContentType(); String contentSubType = header.getContentSubType(); if ("Application".equalsIgnoreCase(contentType) && "MANSRTSP".equalsIgnoreCase(contentSubType)) { - SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId()); + + SendRtpItem sendRtpItem = streamSendManager.getByCallId(callIdHeader.getCallId()); String streamId = sendRtpItem.getStreamId(); InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(InviteSessionType.PLAYBACK, streamId); if (null == inviteInfo) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java index 30d4370d..65e8758f 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java @@ -13,6 +13,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.NotifyMessageHandler; +import com.genersoft.iot.vmp.media.zlm.IStreamSendManager; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; @@ -33,6 +34,7 @@ import javax.sip.SipException; import javax.sip.header.CallIdHeader; import javax.sip.message.Response; import java.text.ParseException; +import java.util.List; import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText; @@ -57,6 +59,9 @@ public class MediaStatusNotifyMessageHandler extends SIPRequestProcessorParent i @Autowired private IRedisCatchStorage redisCatchStorage; + @Autowired + private IStreamSendManager streamSendManager; + @Autowired private IVideoManagerStorage storage; @@ -110,7 +115,7 @@ public class MediaStatusNotifyMessageHandler extends SIPRequestProcessorParent i subscribe.removeSubscribe(hookSubscribe); // 如果级联播放,需要给上级发送此通知 TODO 多个上级同时观看一个下级 可能存在停错的问题,需要将点播CallId进行上下级绑定 - SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, ssrcTransaction.getChannelId(), null, null); + SendRtpItem sendRtpItem = streamSendManager.getByCallId(ssrcTransaction.getCallId()); if (sendRtpItem != null) { ParentPlatform parentPlatform = storage.queryParentPlatByServerGBId(sendRtpItem.getDestId()); if (parentPlatform == null) { diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/IStreamSendManager.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/IStreamSendManager.java index 72add757..275ca7c8 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/IStreamSendManager.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/IStreamSendManager.java @@ -7,10 +7,10 @@ import java.util.List; public interface IStreamSendManager { - void add(SendRtpItem sendRtpItem); - void update(SendRtpItem sendRtpItem); + List getAll(); + SendRtpItem getByCallId(String callId); List getByAppAndStream(String app, String stream); @@ -27,5 +27,8 @@ public interface IStreamSendManager { void removeByCallID(String id); + void remove(SendRtpItem sendRtpItem); + void remove(List sendRtpItemList); + } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 807569b9..547f3727 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -73,6 +73,9 @@ public class ZLMHttpHookListener { @Autowired private IRedisCatchStorage redisCatchStorage; + @Autowired + private IStreamSendManager streamSendManager; + @Autowired private IInviteStreamService inviteStreamService; @@ -441,8 +444,8 @@ public class ZLMHttpHookListener { } } if (!param.isRegist()) { - List sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream()); - if (sendRtpItems.size() > 0) { + List sendRtpItems = streamSendManager.getByAppAndStream(param.getApp(), param.getStream()); + if (!sendRtpItems.isEmpty()) { for (SendRtpItem sendRtpItem : sendRtpItems) { if (sendRtpItem != null && sendRtpItem.getApp().equals(param.getApp())) { String platformId = sendRtpItem.getDestId(); @@ -452,8 +455,7 @@ public class ZLMHttpHookListener { try { if (platform != null) { commanderFroPlatform.streamByeCmd(platform, sendRtpItem); - redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), - sendRtpItem.getCallId(), sendRtpItem.getStreamId()); + streamSendManager.remove(sendRtpItem); } else { cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId()); } @@ -495,26 +497,22 @@ public class ZLMHttpHookListener { return ret; } // 收到无人观看说明流也没有在往上级推送 - if (redisCatchStorage.isChannelSendingRTP(inviteInfo.getChannelId())) { - List sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId( - inviteInfo.getChannelId()); - if (sendRtpItems.size() > 0) { - for (SendRtpItem sendRtpItem : sendRtpItems) { - ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getDestId()); - try { - commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId()); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); - } - redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(), - sendRtpItem.getCallId(), sendRtpItem.getStreamId()); - if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) { - MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, - sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(), - sendRtpItem.getDestId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); - messageForPushChannel.setPlatFormIndex(parentPlatform.getId()); - redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel); - } + List sendRtpItems = streamSendManager.getByByChanelId(inviteInfo.getChannelId()); + if (!sendRtpItems.isEmpty()) { + for (SendRtpItem sendRtpItem : sendRtpItems) { + ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getDestId()); + try { + commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId()); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); + } + streamSendManager.remove(sendRtpItem); + if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) { + MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, + sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(), + sendRtpItem.getDestId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); + messageForPushChannel.setPlatFormIndex(parentPlatform.getId()); + redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel); } } } @@ -726,8 +724,8 @@ public class ZLMHttpHookListener { return HookResult.SUCCESS(); } taskExecutor.execute(() -> { - List sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream()); - if (sendRtpItems.size() > 0) { + List sendRtpItems = streamSendManager.getByAppAndStream(param.getApp(), param.getStream()); + if (!sendRtpItems.isEmpty()) { for (SendRtpItem sendRtpItem : sendRtpItems) { ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getDestId()); ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc()); @@ -736,8 +734,7 @@ public class ZLMHttpHookListener { } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); } - redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(), - sendRtpItem.getCallId(), sendRtpItem.getStreamId()); + streamSendManager.remove(sendRtpItem); } } }); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/impl/StreamSendManagerImpl.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/impl/StreamSendManagerImpl.java index a9cdbe92..a90de2e9 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/impl/StreamSendManagerImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/impl/StreamSendManagerImpl.java @@ -1,12 +1,15 @@ package com.genersoft.iot.vmp.media.zlm.impl; +import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.media.zlm.IStreamSendManager; +import com.genersoft.iot.vmp.utils.redis.RedisUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; +import org.springframework.util.ObjectUtils; import java.util.ArrayList; import java.util.List; @@ -19,19 +22,12 @@ import java.util.UUID; @Component public class StreamSendManagerImpl implements IStreamSendManager { - private Logger logger = LoggerFactory.getLogger("StreamSendManagerImpl"); - private final static String datePrefix = "VMP_SEND_STREAM:DATA:"; private final static String queryPrefix = "VMP_SEND_STREAM:QUERY:"; @Autowired private RedisTemplate redisTemplate; - @Override - public void add(SendRtpItem sendRtpItem) { - - } - @Override public void update(SendRtpItem sendRtpItem) { if (sendRtpItem.getId() != null) { @@ -84,6 +80,20 @@ public class StreamSendManagerImpl implements IStreamSendManager { return queryPrefix + "CHANNEL:" + channelId; } + @Override + public List getAll() { + String key = datePrefix + "_*_"; + List scan = RedisUtil.scan(redisTemplate, key); + List result = new ArrayList<>(); + if (!scan.isEmpty()) { + for (Object keyStr : scan) { + SendRtpItem sendRtpItem = (SendRtpItem)redisTemplate.opsForValue().get(keyStr); + result.add(sendRtpItem); + } + } + return result; + } + @Override public SendRtpItem getByCallId(String callId) { String dateId = (String) redisTemplate.opsForValue().get(getCallIdKey(callId)); @@ -135,6 +145,29 @@ public class StreamSendManagerImpl implements IStreamSendManager { return getSendRtpItems(dateIds); } + @Override + public void remove(SendRtpItem sendRtpItem) { + redisTemplate.delete(datePrefix); + if (sendRtpItem.getCallId() != null) { + redisTemplate.delete(getCallIdKey(sendRtpItem.getCallId())); + } + if (sendRtpItem.getApp() != null && sendRtpItem.getStreamId() != null) { + redisTemplate.opsForSet().remove(getAppAndStreamKey(sendRtpItem.getApp(), sendRtpItem.getStreamId())); + } + if (sendRtpItem.getMediaServerId() != null) { + redisTemplate.opsForSet().remove(getMediaServerIdKey(sendRtpItem.getMediaServerId())); + } + if (sendRtpItem.getDestId() != null) { + redisTemplate.opsForSet().remove(getDestIdKey(sendRtpItem.getDestId())); + } + if (sendRtpItem.getSourceId() != null) { + redisTemplate.opsForSet().remove(getSourceIdKey(sendRtpItem.getSourceId())); + } + if (sendRtpItem.getChannelId() != null) { + redisTemplate.opsForSet().remove(getChannelIdKey(sendRtpItem.getChannelId())); + } + } + @Override public void remove(String id) { if (id == null) { @@ -144,11 +177,28 @@ public class StreamSendManagerImpl implements IStreamSendManager { if (sendRtpItem == null) { return; } - + remove(sendRtpItem); } @Override public void remove(List sendRtpItemList) { + if (sendRtpItemList == null || sendRtpItemList.isEmpty()) { + return; + } + for (SendRtpItem sendRtpItem : sendRtpItemList) { + remove(sendRtpItem); + } + } + @Override + public void removeByCallID(String callId) { + if (ObjectUtils.isEmpty(callId)) { + return; + } + SendRtpItem sendRtpItem = getByCallId(callId); + if (sendRtpItem == null) { + return; + } + remove(sendRtpItem); } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java index c32775d9..d50c1308 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java @@ -11,6 +11,7 @@ import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; +import com.genersoft.iot.vmp.media.zlm.IStreamSendManager; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; @@ -63,6 +64,9 @@ public class PlatformServiceImpl implements IPlatformService { @Autowired private IRedisCatchStorage redisCatchStorage; + @Autowired + private IStreamSendManager streamSendManager; + @Autowired private SSRCFactory ssrcFactory; @@ -360,11 +364,12 @@ public class PlatformServiceImpl implements IPlatformService { } private void stopAllPush(String platformId) { - List sendRtpItems = redisCatchStorage.querySendRTPServer(platformId); - if (sendRtpItems != null && sendRtpItems.size() > 0) { + List sendRtpItems = streamSendManager.getByDestId(platformId); + + if (sendRtpItems != null && !sendRtpItems.isEmpty()) { for (SendRtpItem sendRtpItem : sendRtpItems) { ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc()); - redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), null, null); + streamSendManager.remove(sendRtpItem); MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); Map param = new HashMap<>(3); param.put("vhost", "__defaultVhost__"); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 51085f00..435a1a9a 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -18,6 +18,7 @@ import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; +import com.genersoft.iot.vmp.media.zlm.IStreamSendManager; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; @@ -72,6 +73,9 @@ public class PlayServiceImpl implements IPlayService { @Autowired private IRedisCatchStorage redisCatchStorage; + @Autowired + private IStreamSendManager streamSendManager; + @Autowired private IInviteStreamService inviteStreamService; @@ -882,8 +886,8 @@ public class PlayServiceImpl implements IPlayService { @Override public void zlmServerOffline(String mediaServerId) { // 处理正在向上推流的上级平台 - List sendRtpItems = redisCatchStorage.querySendRTPServer(null); - if (sendRtpItems.size() > 0) { + List sendRtpItems = streamSendManager.getByMediaServerId(mediaServerId); + if (!sendRtpItems.isEmpty()) { for (SendRtpItem sendRtpItem : sendRtpItems) { if (sendRtpItem.getMediaServerId().equals(mediaServerId)) { ParentPlatform platform = storager.queryParentPlatByServerGBId(sendRtpItem.getDestId()); diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java index eb261e34..c81b932d 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java @@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.media.zlm.IStreamSendManager; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; @@ -79,7 +80,7 @@ public class RedisGbPlayMsgListener implements MessageListener { private IMediaServerService mediaServerService; @Autowired - private IRedisCatchStorage redisCatchStorage; + private IStreamSendManager streamSendManager; @Autowired @@ -326,7 +327,7 @@ public class RedisGbPlayMsgListener implements MessageListener { responseSendItemMsg.setSendRtpItem(sendRtpItem); responseSendItemMsg.setMediaServerItem(mediaServerItem); result.setData(responseSendItemMsg); - redisCatchStorage.updateSendRTPSever(sendRtpItem); + streamSendManager.update(sendRtpItem); WvpRedisMsg response = WvpRedisMsg.getResponseInstance( userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, JSON.toJSONString(result) diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java index 90a98cfe..f25f200c 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java @@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; +import com.genersoft.iot.vmp.media.zlm.IStreamSendManager; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamPush; @@ -45,6 +46,9 @@ public class RedisPushStreamCloseResponseListener implements MessageListener { @Autowired private IRedisCatchStorage redisCatchStorage; + @Autowired + private IStreamSendManager streamSendManager; + @Autowired private IVideoManagerStorage storager; @@ -73,13 +77,12 @@ public class RedisPushStreamCloseResponseListener implements MessageListener { MessageForPushChannel pushChannel = JSON.parseObject(message.getBody(), MessageForPushChannel.class); StreamPush push = streamPushService.getPush(pushChannel.getApp(), pushChannel.getStream()); if (push != null) { - List sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId( - push.getGbId()); + List sendRtpItems = streamSendManager.getByAppAndStream(pushChannel.getApp(), pushChannel.getStream()); if (!sendRtpItems.isEmpty()) { for (SendRtpItem sendRtpItem : sendRtpItems) { ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getDestId()); if (parentPlatform != null) { - redisCatchStorage.deleteSendRTPServer(sendRtpItem.getDestId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStreamId()); + streamSendManager.remove(sendRtpItem); try { commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem); } catch (SipException | InvalidArgumentException | ParseException e) { @@ -96,7 +99,7 @@ public class RedisPushStreamCloseResponseListener implements MessageListener { param.put("ssrc",sendRtpItem.getSsrc()); logger.info("[REDIS消息-推流结束] 停止向上级推流:{}", streamId); MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - redisCatchStorage.deleteSendRTPServer(sendRtpItem.getDestId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStreamId()); + streamSendManager.remove(sendRtpItem); zlmServerFactory.stopSendRtpStream(mediaInfo, param); if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) { MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index 9b0f93b2..716cf4f4 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -41,31 +41,6 @@ public interface IRedisCatchStorage { void delPlatformRegisterInfo(String callId); - void updateSendRTPSever(SendRtpItem sendRtpItem); - - /** - * 查询RTP推送信息缓存 - * @param platformGbId - * @param channelId - * @return sendRtpItem - */ - SendRtpItem querySendRTPServer(String platformGbId, String channelId, String streamId, String callId); - - List querySendRTPServer(String platformGbId); - - /** - * 删除RTP推送信息缓存 - * @param platformGbId - * @param channelId - */ - void deleteSendRTPServer(String platformGbId, String channelId, String callId, String streamId); - - /** - * 查询某个通道是否存在上级点播(RTP推送) - * @param channelId - */ - boolean isChannelSendingRTP(String channelId); - /** * 在redis添加wvp的信息 */ @@ -183,10 +158,6 @@ public interface IRedisCatchStorage { */ void sendStreamPushRequestedMsgForStatus(); - List querySendRTPServerByChnnelId(String channelId); - - List querySendRTPServerByStream(String stream); - SystemAllInfo getSystemInfo(); int getPushStreamCount(String id); @@ -197,8 +168,6 @@ public interface IRedisCatchStorage { void addDiskInfo(List> diskInfo); - List queryAllSendRTPServer(); - List getAllDevices(); void removeAllDevice(); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 6c1ff13c..c6d10b95 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -142,150 +142,6 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { redisTemplate.delete(VideoManagerConstants.PLATFORM_REGISTER_INFO_PREFIX + userSetting.getServerId() + "_" + callId); } - @Override - public void updateSendRTPSever(SendRtpItem sendRtpItem) { - - String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + - userSetting.getServerId() + "_" - + sendRtpItem.getMediaServerId() + "_" - + sendRtpItem.getDestId() + "_" - + sendRtpItem.getChannelId() + "_" - + sendRtpItem.getStreamId() + "_" - + sendRtpItem.getCallId(); - redisTemplate.opsForValue().set(key, sendRtpItem); - } - - @Override - public SendRtpItem querySendRTPServer(String platformGbId, String channelId, String streamId, String callId) { - if (platformGbId == null) { - platformGbId = "*"; - } - if (channelId == null) { - channelId = "*"; - } - if (streamId == null) { - streamId = "*"; - } - if (callId == null) { - callId = "*"; - } - String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX - + userSetting.getServerId() + "_*_" - + platformGbId + "_" - + channelId + "_" - + streamId + "_" - + callId; - List scan = RedisUtil.scan(redisTemplate, key); - if (scan.size() > 0) { - return (SendRtpItem)redisTemplate.opsForValue().get(scan.get(0)); - }else { - return null; - } - } - - @Override - public List querySendRTPServerByChnnelId(String channelId) { - if (channelId == null) { - return null; - } - String platformGbId = "*"; - String callId = "*"; - String streamId = "*"; - String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX - + userSetting.getServerId() + "_*_" - + platformGbId + "_" - + channelId + "_" - + streamId + "_" - + callId; - List scan = RedisUtil.scan(redisTemplate, key); - List result = new ArrayList<>(); - for (Object o : scan) { - result.add((SendRtpItem) redisTemplate.opsForValue().get(o)); - } - return result; - } - - @Override - public List querySendRTPServerByStream(String stream) { - if (stream == null) { - return null; - } - String platformGbId = "*"; - String callId = "*"; - String channelId = "*"; - String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX - + userSetting.getServerId() + "_*_" - + platformGbId + "_" - + channelId + "_" - + stream + "_" - + callId; - List scan = RedisUtil.scan(redisTemplate, key); - List result = new ArrayList<>(); - for (Object o : scan) { - result.add((SendRtpItem) redisTemplate.opsForValue().get(o)); - } - return result; - } - - @Override - public List querySendRTPServer(String platformGbId) { - if (platformGbId == null) { - platformGbId = "*"; - } - String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX - + userSetting.getServerId() + "_*_" - + platformGbId + "_*" + "_*" + "_*"; - List queryResult = RedisUtil.scan(redisTemplate, key); - List result= new ArrayList<>(); - - for (Object o : queryResult) { - String keyItem = (String) o; - result.add((SendRtpItem) redisTemplate.opsForValue().get(keyItem)); - } - - return result; - } - - /** - * 删除RTP推送信息缓存 - */ - @Override - public void deleteSendRTPServer(String platformGbId, String channelId, String callId, String streamId) { - if (streamId == null) { - streamId = "*"; - } - if (callId == null) { - callId = "*"; - } - String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX - + userSetting.getServerId() + "_*_" - + platformGbId + "_" - + channelId + "_" - + streamId + "_" - + callId; - List scan = RedisUtil.scan(redisTemplate, key); - if (scan.size() > 0) { - for (Object keyStr : scan) { - redisTemplate.delete(keyStr); - } - } - } - - @Override - public List queryAllSendRTPServer() { - String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX - + userSetting.getServerId() + "_*"; - List queryResult = RedisUtil.scan(redisTemplate, key); - List result= new ArrayList<>(); - - for (Object o : queryResult) { - String keyItem = (String) o; - result.add((SendRtpItem) redisTemplate.opsForValue().get(keyItem)); - } - - return result; - } - /** * 查询某个通道是否存在上级点播(RTP推送) */