From 7cc4c9d14ab13db6317bab8c94790f7d548a085f Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Mon, 9 Sep 2024 17:56:46 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=B4=E6=97=B6=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/common/VideoManagerConstants.java | 2 +- .../iot/vmp/gb28181/service/IPlayService.java | 2 - .../service/impl/InviteStreamServiceImpl.java | 3 + .../gb28181/service/impl/PlayServiceImpl.java | 463 ++++++++---------- .../gb28181/transmit/cmd/ISIPCommander.java | 8 +- .../transmit/cmd/impl/SIPCommander.java | 24 +- .../cmd/impl/SIPCommanderFroPlatform.java | 1 - .../service/impl/MediaServerServiceImpl.java | 6 +- .../vmp/service/IReceiveRtpServerService.java | 2 +- .../iot/vmp/service/bean/RTPServerParam.java | 2 + .../iot/vmp/service/bean/SSRCInfo.java | 33 +- .../service/impl/RtpServerServiceImpl.java | 27 +- 12 files changed, 251 insertions(+), 322 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java index ae27400f..8b5028cc 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java +++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java @@ -26,7 +26,7 @@ public class VideoManagerConstants { public static final String KEEPLIVEKEY_PREFIX = "VMP_KEEPALIVE_"; // TODO 此处多了一个_,暂不修改 - public static final String INVITE_PREFIX = "VMP_INVITE_INFO_"; + public static final String INVITE_PREFIX = "VMP_INVITE_INFO"; public static final String PLAYER_PREFIX = "VMP_INVITE_PLAY_"; public static final String PLAY_BLACK_PREFIX = "VMP_INVITE_PLAYBACK_"; public static final String DOWNLOAD_PREFIX = "VMP_INVITE_DOWNLOAD_"; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlayService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlayService.java index c9921065..e4c5befe 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlayService.java @@ -21,8 +21,6 @@ import java.text.ParseException; */ public interface IPlayService { - void play(MediaServer mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channel, - ErrorCallback callback); SSRCInfo play(MediaServer mediaServerItem, String deviceId, String channelId, String ssrc, ErrorCallback callback); StreamInfo onPublishHandlerForPlay(MediaServer mediaServerItem, MediaInfo mediaInfo, Device device, DeviceChannel channel); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/InviteStreamServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/InviteStreamServiceImpl.java index 9b983336..116414bd 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/InviteStreamServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/InviteStreamServiceImpl.java @@ -160,6 +160,7 @@ public class InviteStreamServiceImpl implements IInviteStreamService { public InviteInfo getInviteInfo(InviteSessionType type, Integer channelId, String stream) { String key = VideoManagerConstants.INVITE_PREFIX + ":" + (type != null ? type : "*") + + ":*" + ":" + (channelId != null ? channelId : "*") + ":" + (stream != null ? stream : "*") + ":*"; @@ -178,6 +179,7 @@ public class InviteStreamServiceImpl implements IInviteStreamService { public List getAllInviteInfo(InviteSessionType type, Integer channelId, String stream) { String key = VideoManagerConstants.INVITE_PREFIX + ":" + (type != null ? type : "*") + + ":*" + ":" + (channelId != null ? channelId : "*") + ":" + (stream != null ? stream : "*") + ":*"; @@ -206,6 +208,7 @@ public class InviteStreamServiceImpl implements IInviteStreamService { public void removeInviteInfo(InviteSessionType type, Integer channelId, String stream) { String scanKey = VideoManagerConstants.INVITE_PREFIX + ":" + (type != null ? type : "*") + + ":*" + ":" + (channelId != null ? channelId : "*") + ":" + (stream != null ? stream : "*") + ":*"; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java index 6d36b8ab..5d550b78 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java @@ -28,14 +28,10 @@ import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent; import com.genersoft.iot.vmp.media.event.media.MediaNotFoundEvent; import com.genersoft.iot.vmp.media.service.IMediaServerService; -import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; +import com.genersoft.iot.vmp.service.IReceiveRtpServerService; import com.genersoft.iot.vmp.service.ISendRtpServerService; -import com.genersoft.iot.vmp.service.bean.DownloadFileInfo; -import com.genersoft.iot.vmp.service.bean.ErrorCallback; -import com.genersoft.iot.vmp.service.bean.InviteErrorCode; -import com.genersoft.iot.vmp.service.bean.SSRCInfo; +import com.genersoft.iot.vmp.service.bean.*; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.CloudRecordUtils; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult; @@ -71,11 +67,6 @@ import java.util.Vector; @DS("master") public class PlayServiceImpl implements IPlayService { - - - @Autowired - private IVideoManagerStorage storager; - @Autowired private ISIPCommander cmder; @@ -97,9 +88,6 @@ public class PlayServiceImpl implements IPlayService { @Autowired private HookSubscribe subscribe; - @Autowired - private SendRtpPortManager sendRtpPortManager; - @Autowired private IMediaServerService mediaServerService; @@ -130,6 +118,9 @@ public class PlayServiceImpl implements IPlayService { @Autowired private ISendRtpServerService sendRtpServerService; + @Autowired + private IReceiveRtpServerService receiveRtpServerService; + /** * 流到来的处理 */ @@ -279,10 +270,7 @@ public class PlayServiceImpl implements IPlayService { startTime, endTime ); - int tcpMode = device.getStreamMode().equals("TCP-ACTIVE")? 2: (device.getStreamMode().equals("TCP-PASSIVE")? 1:0); - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(event.getMediaServer(), event.getStream(), null, - device.isSsrcCheck(), true, 0, false, !deviceChannel.isHasAudio(), false, tcpMode); - playBack(event.getMediaServer(), ssrcInfo, device, deviceChannel, startTime, endTime, null); + playBack(event.getMediaServer(), device, deviceChannel, startTime, endTime, null); } } @@ -341,21 +329,130 @@ public class PlayServiceImpl implements IPlayService { } } } - String streamId = String.format("%s_%s", device.getDeviceId(), channelId); + + return play(mediaServerItem, device, channel, ssrc, callback); + } + + private SSRCInfo play(MediaServer mediaServerItem, Device device, DeviceChannel channel, String ssrc, + ErrorCallback callback) { + if (mediaServerItem == null ) { + if (callback != null) { + callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(), + InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(), + null); + } + return null; + } + String streamId = String.format("%s_%s", device.getDeviceId(), channel.getDeviceId()); int tcpMode = device.getStreamMode().equals("TCP-ACTIVE")? 2: (device.getStreamMode().equals("TCP-PASSIVE")? 1:0); - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, ssrc, device.isSsrcCheck(), false, 0, false, !channel.isHasAudio(), false, tcpMode); - if (ssrcInfo == null) { - callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getMsg(), null); + RTPServerParam rtpServerParam = new RTPServerParam(); + rtpServerParam.setMediaServerItem(mediaServerItem); + rtpServerParam.setStreamId(streamId); + rtpServerParam.setPresetSsrc(ssrc); + rtpServerParam.setSsrcCheck(device.isSsrcCheck()); + rtpServerParam.setPlayback(false); + rtpServerParam.setPort(0); + rtpServerParam.setTcpMode(tcpMode); + rtpServerParam.setOnlyAuto(false); + rtpServerParam.setDisableAudio(!channel.isHasAudio()); + SSRCInfo ssrcInfo = receiveRtpServerService.openRTPServer(rtpServerParam, (code, msg, hookData) -> { + if (code == InviteErrorCode.SUCCESS.getCode()) { + // hook响应 + StreamInfo streamInfo = onPublishHandlerForPlay(hookData.getMediaServer(), hookData.getMediaInfo(), device, channel); + if (streamInfo == null){ + if (callback != null) { + callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(), + InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null); + } + inviteStreamService.call(InviteSessionType.PLAY, channel.getId(), null, + InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(), + InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null); + return; + } + if (callback != null) { + callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); + } + inviteStreamService.call(InviteSessionType.PLAY, channel.getId(), null, + InviteErrorCode.SUCCESS.getCode(), + InviteErrorCode.SUCCESS.getMsg(), + streamInfo); + + log.info("[点播成功] deviceId: {}, channelId:{}, 码流类型:{}", device.getDeviceId(), channel.getDeviceId(), + channel.getStreamIdentification()); + snapOnPlay(hookData.getMediaServer(), device.getDeviceId(), channel.getDeviceId(), streamId); + }else { + if (callback != null) { + callback.run(code, msg, null); + } + inviteStreamService.call(InviteSessionType.PLAY, channel.getId(), null, code, msg, null); + inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, channel.getId()); + SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(streamId); + if (ssrcTransaction != null) { + try { + cmder.streamByeCmd(device, channel.getDeviceId(), streamId, null); + } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { + log.error("[点播超时], 发送BYE失败 {}", e.getMessage()); + } finally { + sessionManager.removeByStream(streamId); + } + } + } + }); + if (ssrcInfo == null || ssrcInfo.getPort() <= 0) { + log.info("[点播端口/SSRC]获取失败,deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channel.getDeviceId(), ssrcInfo); + callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "获取端口或者ssrc失败", null); + sessionManager.removeByStream(streamId); inviteStreamService.call(InviteSessionType.PLAY, channel.getId(), null, InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getMsg(), null); return null; } - play(mediaServerItem, ssrcInfo, device, channel, callback); + log.info("[点播开始] deviceId: {}, channelId: {},码流类型:{}, 收流端口: {}, 码流:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}", + device.getDeviceId(), channel.getDeviceId(), channel.getStreamIdentification(), ssrcInfo.getPort(), ssrcInfo.getStream(), + device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck()); + + // 初始化redis中的invite消息状态 + InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channel.getId(), ssrcInfo.getStream(), ssrcInfo, + mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAY, + InviteSessionStatus.ready); + inviteStreamService.updateInviteInfo(inviteInfo); + + try { + cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channel, (eventResult) -> { + // 处理收到200ok后的TCP主动连接以及SSRC不一致的问题 + InviteOKHandler(eventResult, ssrcInfo, mediaServerItem, device, channel, callback, inviteInfo, InviteSessionType.PLAY); + }, (event) -> { + log.info("[点播失败] deviceId: {}, channelId:{}, {}: {}", device.getDeviceId(), channel.getDeviceId(), event.statusCode, event.msg); + receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo); + + sessionManager.removeByStream(ssrcInfo.getStream()); + if (callback != null) { + callback.run(event.statusCode, event.msg, null); + } + inviteStreamService.call(InviteSessionType.PLAY, channel.getId(), null, + event.statusCode, event.msg, null); + + inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, channel.getId()); + }); + } catch (InvalidArgumentException | SipException | ParseException e) { + log.error("[命令发送失败] 点播消息: {}", e.getMessage()); + receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo); + sessionManager.removeByStream(ssrcInfo.getStream()); + if (callback != null) { + callback.run(InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(), + InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getMsg(), null); + } + inviteStreamService.call(InviteSessionType.PLAY, channel.getId(), null, + InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(), + InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getMsg(), null); + + inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, channel.getId()); + } return ssrcInfo; } + private void talk(MediaServer mediaServerItem, Device device, DeviceChannel channel, String stream, HookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, Runnable timeoutCallback, AudioBroadcastEvent audioEvent) { @@ -475,154 +572,8 @@ public class PlayServiceImpl implements IPlayService { } - - - @Override - public void play(MediaServer mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channel, - ErrorCallback callback) { - - if (mediaServerItem == null || ssrcInfo == null) { - if (callback != null) { - callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(), - InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(), - null); - } - return; - } - log.info("[点播开始] deviceId: {}, channelId: {},码流类型:{}, 收流端口: {}, 码流:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}", - device.getDeviceId(), channel.getDeviceId(), channel.getStreamIdentification(), ssrcInfo.getPort(), ssrcInfo.getStream(), - device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck()); - //端口获取失败的ssrcInfo 没有必要发送点播指令 - if (ssrcInfo.getPort() <= 0) { - log.info("[点播端口分配异常],deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channel.getDeviceId(), ssrcInfo); - // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - sessionManager.removeByStream(ssrcInfo.getStream()); - if (callback != null) { - callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "点播端口分配异常", null); - } - inviteStreamService.call(InviteSessionType.PLAY, channel.getId(), null, - InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "点播端口分配异常", null); - return; - } - - // 初始化redis中的invite消息状态 - InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channel.getId(), ssrcInfo.getStream(), ssrcInfo, - mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAY, - InviteSessionStatus.ready); - inviteStreamService.updateInviteInfo(inviteInfo); - // 超时处理 - String timeOutTaskKey = UUID.randomUUID().toString(); - dynamicTask.startDelay(timeOutTaskKey, () -> { - // 执行超时任务时查询是否已经成功,成功了则不执行超时任务,防止超时任务取消失败的情况 - InviteInfo inviteInfoForTimeOut = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, channel.getId()); - if (inviteInfoForTimeOut == null || inviteInfoForTimeOut.getStreamInfo() == null) { - log.info("[点播超时] 收流超时 deviceId: {}, channelId: {},码流:{},端口:{}, SSRC: {}", - device.getDeviceId(), channel.getDeviceId(), channel.getStreamIdentification(), - ssrcInfo.getPort(), ssrcInfo.getSsrc()); - if (callback != null) { - callback.run(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null); - } - inviteStreamService.call(InviteSessionType.PLAY, channel.getId(), null, - InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null); - inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, channel.getId()); - - try { - cmder.streamByeCmd(device, channel.getDeviceId(), ssrcInfo.getStream(), null); - } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { - log.error("[点播超时], 发送BYE失败 {}", e.getMessage()); - } finally { - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); - sessionManager.removeByStream(ssrcInfo.getStream()); - mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); - // 取消订阅消息监听 - subscribe.removeSubscribe(Hook.getInstance(HookType.on_media_arrival, "rtp", ssrcInfo.getStream())); - } - }else { - log.info("[点播超时] 收流超时 deviceId: {}, channelId: {},码流:{},端口:{}, SSRC: {}", - device.getDeviceId(), channel.getDeviceId(), channel.getStreamIdentification(), - ssrcInfo.getPort(), ssrcInfo.getSsrc()); - - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - - mediaServerService.closeRTPServer(mediaServerItem.getId(), ssrcInfo.getStream()); - sessionManager.removeByStream(ssrcInfo.getStream()); - } - }, userSetting.getPlayTimeout()); - - try { - cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channel, (hookData ) -> { - log.info("收到订阅消息: " + hookData); - dynamicTask.stop(timeOutTaskKey); - // hook响应 - StreamInfo streamInfo = onPublishHandlerForPlay(hookData.getMediaServer(), hookData.getMediaInfo(), device, channel); - if (streamInfo == null){ - if (callback != null) { - callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(), - InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null); - } - - inviteStreamService.call(InviteSessionType.PLAY, channel.getId(), null, - InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(), - InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null); - return; - } - if (callback != null) { - callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); - } - inviteStreamService.call(InviteSessionType.PLAY, channel.getId(), null, - InviteErrorCode.SUCCESS.getCode(), - InviteErrorCode.SUCCESS.getMsg(), - streamInfo); - log.info("[点播成功] deviceId: {}, channelId:{}, 码流类型:{}", device.getDeviceId(), channel.getDeviceId(), - channel.getStreamIdentification()); - snapOnPlay(hookData.getMediaServer(), device.getDeviceId(), channel.getDeviceId(), ssrcInfo.getStream()); - }, (eventResult) -> { - // 处理收到200ok后的TCP主动连接以及SSRC不一致的问题 - InviteOKHandler(eventResult, ssrcInfo, mediaServerItem, device, channel, - timeOutTaskKey, callback, inviteInfo, InviteSessionType.PLAY); - }, (event) -> { - log.info("[点播失败] deviceId: {}, channelId:{}, {}: {}", device.getDeviceId(), channel.getDeviceId(), event.statusCode, event.msg); - dynamicTask.stop(timeOutTaskKey); - mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); - // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - - sessionManager.removeByStream(ssrcInfo.getStream()); - if (callback != null) { - callback.run(event.statusCode, event.msg, null); - } - inviteStreamService.call(InviteSessionType.PLAY, channel.getId(), null, - InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(), - String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null); - - inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, channel.getId()); - }); - } catch (InvalidArgumentException | SipException | ParseException e) { - - log.error("[命令发送失败] 点播消息: {}", e.getMessage()); - dynamicTask.stop(timeOutTaskKey); - mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); - // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - - sessionManager.removeByStream(ssrcInfo.getStream()); - if (callback != null) { - callback.run(InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(), - InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getMsg(), null); - } - inviteStreamService.call(InviteSessionType.PLAY, channel.getId(), null, - InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(), - InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getMsg(), null); - - inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, channel.getId()); - } - } - private void tcpActiveHandler(Device device, DeviceChannel channel, String contentString, - MediaServer mediaServerItem, String timeOutTaskKey, - SSRCInfo ssrcInfo, ErrorCallback callback){ + MediaServer mediaServerItem, SSRCInfo ssrcInfo, ErrorCallback callback){ if (!device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) { return; } @@ -652,13 +603,8 @@ public class PlayServiceImpl implements IPlayService { log.info("[TCP主动连接对方] 结果: {}" , result); if (!result) { // 主动连接失败,结束流程, 清理数据 - dynamicTask.stop(timeOutTaskKey); - mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); - // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - + receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo); sessionManager.removeByStream(ssrcInfo.getStream()); - callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(), InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null); inviteStreamService.call(InviteSessionType.BROADCAST, channel.getId(), null, @@ -667,10 +613,7 @@ public class PlayServiceImpl implements IPlayService { } } catch (SdpException e) { log.error("[TCP主动连接对方] deviceId: {}, channelId: {}, 解析200OK的SDP信息失败", device.getDeviceId(), channel.getDeviceId(), e); - dynamicTask.stop(timeOutTaskKey); - mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); - // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); + receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo); sessionManager.removeByStream(ssrcInfo.getStream()); @@ -759,34 +702,86 @@ public class PlayServiceImpl implements IPlayService { @Override public void playBack(Device device, DeviceChannel channel, String startTime, String endTime, ErrorCallback callback) { - + if (device == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备不存在"); + } + if (channel == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "通道不存在"); + } MediaServer newMediaServerItem = getNewMediaServerItem(device); + if (newMediaServerItem == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的节点"); + } if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE") && ! newMediaServerItem.isRtpEnable()) { log.warn("[录像回放] 单端口收流时不支持TCP主动方式收流 deviceId: {},channelId:{}", device.getDeviceId(), channel.getDeviceId()); throw new ControllerException(ErrorCode.ERROR100.getCode(), "单端口收流时不支持TCP主动方式收流"); } + + playBack(newMediaServerItem, device, channel, startTime, endTime, callback); + } + + private void playBack(MediaServer mediaServerItem, + Device device, DeviceChannel channel, String startTime, + String endTime, ErrorCallback callback) { + String startTimeStr = startTime.replace("-", "") .replace(":", "") .replace(" ", ""); String endTimeTimeStr = endTime.replace("-", "") .replace(":", "") .replace(" ", ""); + String stream = device.getDeviceId() + "_" + channel.getDeviceId() + "_" + startTimeStr + "_" + endTimeTimeStr; int tcpMode = device.getStreamMode().equals("TCP-ACTIVE")? 2: (device.getStreamMode().equals("TCP-PASSIVE")? 1:0); - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, stream, null, device.isSsrcCheck(), - true, 0, false, !channel.isHasAudio(), false, tcpMode); - playBack(newMediaServerItem, ssrcInfo, device, channel, startTime, endTime, callback); - } - public void playBack(MediaServer mediaServerItem, SSRCInfo ssrcInfo, - Device device, DeviceChannel channel, String startTime, - String endTime, ErrorCallback callback) { - if (device == null) { - throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备不存在"); - } - if (mediaServerItem == null || ssrcInfo == null) { - callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(), - InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(), + RTPServerParam rtpServerParam = new RTPServerParam(); + rtpServerParam.setMediaServerItem(mediaServerItem); + rtpServerParam.setStreamId(stream); + rtpServerParam.setSsrcCheck(device.isSsrcCheck()); + rtpServerParam.setPlayback(true); + rtpServerParam.setPort(0); + rtpServerParam.setTcpMode(tcpMode); + rtpServerParam.setOnlyAuto(false); + rtpServerParam.setDisableAudio(!channel.isHasAudio()); + SSRCInfo ssrcInfo = receiveRtpServerService.openRTPServer(rtpServerParam, (code, msg, hookData) -> { + if (code == InviteErrorCode.SUCCESS.getCode()) { + // hook响应 + StreamInfo streamInfo = onPublishHandlerForPlayback(hookData.getMediaServer(), hookData.getMediaInfo(), device, channel, startTime, endTime); + if (streamInfo == null) { + log.warn("设备回放API调用失败!"); + callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(), + InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null); + return; + } + callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); + log.info("[录像回放] 成功 deviceId: {}, channelId: {}, 开始时间: {}, 结束时间: {}", device.getDeviceId(), channel.getGbDeviceId(), startTime, endTime); + }else { + if (callback != null) { + callback.run(code, msg, null); + } + inviteStreamService.call(InviteSessionType.PLAYBACK, channel.getId(), null, code, msg, null); + inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAYBACK, channel.getId()); + SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(stream); + if (ssrcTransaction != null) { + try { + cmder.streamByeCmd(device, channel.getDeviceId(), stream, null); + } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { + log.error("[录像回放] 发送BYE失败 {}", e.getMessage()); + } finally { + sessionManager.removeByStream(stream); + } + } + } + }); + if (ssrcInfo == null || ssrcInfo.getPort() <= 0) { + log.info("[回放端口/SSRC]获取失败,deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channel.getDeviceId(), ssrcInfo); + if (callback != null) { + callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "获取端口或者ssrc失败", null); + } + sessionManager.removeByStream(stream); + inviteStreamService.call(InviteSessionType.PLAY, channel.getId(), null, + InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), + InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getMsg(), null); return; } @@ -799,70 +794,37 @@ public class PlayServiceImpl implements IPlayService { mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAYBACK, InviteSessionStatus.ready); inviteStreamService.updateInviteInfo(inviteInfo); - String playBackTimeOutTaskKey = UUID.randomUUID().toString(); - dynamicTask.startDelay(playBackTimeOutTaskKey, () -> { - log.warn("[录像回放] 超时,deviceId:{} ,channelId:{}", device.getDeviceId(), channel.getGbDeviceId()); - inviteStreamService.removeInviteInfo(inviteInfo); - callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null); - - try { - cmder.streamByeCmd(device, channel.getGbDeviceId(), ssrcInfo.getStream(), null); - } catch (InvalidArgumentException | ParseException | SipException e) { - log.error("[录像回放] 超时 发送BYE失败 {}", e.getMessage()); - } catch (SsrcTransactionNotFoundException e) { - // 点播超时回复BYE 同时释放ssrc以及此次点播的资源 - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); - sessionManager.removeByStream(ssrcInfo.getStream()); - } - }, userSetting.getPlayTimeout()); - - SipSubscribe.Event errorEvent = event -> { - log.info("[录像回放] 失败,{} {}", event.statusCode, event.msg); - dynamicTask.stop(playBackTimeOutTaskKey); - callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_ERROR.getCode(), - String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg), null); - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); - sessionManager.removeByStream(ssrcInfo.getStream()); - inviteStreamService.removeInviteInfo(inviteInfo); - }; - - HookSubscribe.Event hookEvent = (hookData) -> { - log.info("收到回放订阅消息: " + hookData); - dynamicTask.stop(playBackTimeOutTaskKey); - StreamInfo streamInfo = onPublishHandlerForPlayback(hookData.getMediaServer(), hookData.getMediaInfo(), device, channel, startTime, endTime); - if (streamInfo == null) { - log.warn("设备回放API调用失败!"); - callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(), - InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null); - return; - } - callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); - log.info("[录像回放] 成功 deviceId: {}, channelId: {}, 开始时间: {}, 结束时间: {}", device.getDeviceId(), channel.getGbDeviceId(), startTime, endTime); - }; try { cmder.playbackStreamCmd(mediaServerItem, ssrcInfo, device, channel, startTime, endTime, - hookEvent, eventResult -> { + eventResult -> { // 处理收到200ok后的TCP主动连接以及SSRC不一致的问题 InviteOKHandler(eventResult, ssrcInfo, mediaServerItem, device, channel, - playBackTimeOutTaskKey, callback, inviteInfo, InviteSessionType.PLAYBACK); - }, errorEvent); + callback, inviteInfo, InviteSessionType.PLAYBACK); + }, eventResult -> { + log.info("[录像回放] 失败,{} {}", eventResult.statusCode, eventResult.msg); + if (callback != null) { + callback.run(eventResult.statusCode, eventResult.msg, null); + } + + receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo); + sessionManager.removeByStream(ssrcInfo.getStream()); + inviteStreamService.removeInviteInfo(inviteInfo); + }); } catch (InvalidArgumentException | SipException | ParseException e) { log.error("[命令发送失败] 录像回放: {}", e.getMessage()); - - SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(); - eventResult.type = SipSubscribe.EventResultType.cmdSendFailEvent; - eventResult.statusCode = -1; - eventResult.msg = "命令发送失败"; - errorEvent.response(eventResult); + if (callback != null) { + callback.run(InviteErrorCode.FAIL.getCode(), e.getMessage(), null); + } + receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo); + sessionManager.removeByStream(ssrcInfo.getStream()); + inviteStreamService.removeInviteInfo(inviteInfo); } } private void InviteOKHandler(SipSubscribe.EventResult eventResult, SSRCInfo ssrcInfo, MediaServer mediaServerItem, - Device device, DeviceChannel channel, String timeOutTaskKey, ErrorCallback callback, + Device device, DeviceChannel channel, ErrorCallback callback, InviteInfo inviteInfo, InviteSessionType inviteSessionType){ inviteInfo.setStatus(InviteSessionStatus.ok); ResponseEvent responseEvent = (ResponseEvent) eventResult.event; @@ -877,7 +839,7 @@ public class PlayServiceImpl implements IPlayService { if (mediaServerItem.isRtpEnable()) { // 多端口 if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) { - tcpActiveHandler(device, channel, contentString, mediaServerItem, timeOutTaskKey, ssrcInfo, callback); + tcpActiveHandler(device, channel, contentString, mediaServerItem, ssrcInfo, callback); } }else { // 单端口 @@ -906,7 +868,6 @@ public class PlayServiceImpl implements IPlayService { log.error("[命令发送失败] 停止播放, 发送BYE: {}", e.getMessage()); } - dynamicTask.stop(timeOutTaskKey); // 释放ssrc mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); @@ -924,7 +885,7 @@ public class PlayServiceImpl implements IPlayService { inviteInfo.setStream(ssrcInfo.getStream()); if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) { if (mediaServerItem.isRtpEnable()) { - tcpActiveHandler(device, channel, contentString, mediaServerItem, timeOutTaskKey, ssrcInfo, callback); + tcpActiveHandler(device, channel, contentString, mediaServerItem, ssrcInfo, callback); }else { log.warn("[Invite 200OK] 单端口收流模式不支持tcp主动模式收流"); } @@ -1026,10 +987,10 @@ public class PlayServiceImpl implements IPlayService { }; try { cmder.downloadStreamCmd(mediaServerItem, ssrcInfo, device, channel, startTime, endTime, downloadSpeed, - hookEvent, errorEvent, eventResult ->{ + errorEvent, eventResult ->{ // 处理收到200ok后的TCP主动连接以及SSRC不一致的问题 InviteOKHandler(eventResult, ssrcInfo, mediaServerItem, device, channel, - downLoadTimeOutTaskKey, callback, inviteInfo, InviteSessionType.DOWNLOAD); + callback, inviteInfo, InviteSessionType.DOWNLOAD); // 注册录像回调事件,录像下载结束后写入下载地址 HookSubscribe.Event hookEventForRecord = (hookData) -> { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java index d4f462ac..f8225edb 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java @@ -100,7 +100,7 @@ public interface ISIPCommander { * @param device 视频设备 * @param channel 预览通道 */ - void playStreamCmd(MediaServer mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channel, HookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException; + void playStreamCmd(MediaServer mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channel, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException; /** * 请求回放视频流 @@ -110,19 +110,19 @@ public interface ISIPCommander { * @param startTime 开始时间,格式要求:yyyy-MM-dd HH:mm:ss * @param endTime 结束时间,格式要求:yyyy-MM-dd HH:mm:ss */ - void playbackStreamCmd(MediaServer mediaServerItem, SSRCInfo ssrcInf, Device device, DeviceChannel channel, String startTime, String endTime, HookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException; + void playbackStreamCmd(MediaServer mediaServerItem, SSRCInfo ssrcInf, Device device, DeviceChannel channel, String startTime, String endTime, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException; /** * 请求历史媒体下载 * * @param device 视频设备 - * @param channelId 预览通道 + * @param channel 预览通道 * @param startTime 开始时间,格式要求:yyyy-MM-dd HH:mm:ss * @param endTime 结束时间,格式要求:yyyy-MM-dd HH:mm:ss * @param downloadSpeed 下载倍速参数 */ void downloadStreamCmd(MediaServer mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channel, - String startTime, String endTime, int downloadSpeed, HookSubscribe.Event hookEvent, + String startTime, String endTime, int downloadSpeed, SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index 2ce27501..1b0c0ab6 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -258,26 +258,16 @@ public class SIPCommander implements ISIPCommander { * * @param device 视频设备 * @param channel 预览通道 - * @param event hook订阅 * @param errorEvent sip错误订阅 */ @Override public void playStreamCmd(MediaServer mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channel, - HookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException { + SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException { String stream = ssrcInfo.getStream(); if (device == null) { return; } - - log.info("{} 分配的ZLM为: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getSdpIp(), ssrcInfo.getPort()); - Hook rtpHook = Hook.getInstance(HookType.on_media_arrival, "rtp", stream, mediaServerItem.getId()); - subscribe.addSubscribe(rtpHook, (hookData) -> { - if (event != null) { - event.response(hookData); - subscribe.removeSubscribe(rtpHook); - } - }); String sdpIp; if (!ObjectUtils.isEmpty(device.getSdpIp())) { sdpIp = device.getSdpIp(); @@ -373,7 +363,7 @@ public class SIPCommander implements ISIPCommander { */ @Override public void playbackStreamCmd(MediaServer mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channel, - String startTime, String endTime, HookSubscribe.Event hookEvent, + String startTime, String endTime, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException { @@ -446,14 +436,6 @@ public class SIPCommander implements ISIPCommander { //ssrc content.append("y=" + ssrcInfo.getSsrc() + "\r\n"); - Hook rtpHook = Hook.getInstance(HookType.on_media_arrival, "rtp", ssrcInfo.getStream(), mediaServerItem.getId()); - // 添加订阅 - subscribe.addSubscribe(rtpHook, (hookData) -> { - if (hookEvent != null) { - hookEvent.response(hookData); - } - subscribe.removeSubscribe(rtpHook); - }); Request request = headerProvider.createPlaybackInviteRequest(device, channel.getDeviceId(), content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()), ssrcInfo.getSsrc()); sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent, event -> { @@ -472,7 +454,6 @@ public class SIPCommander implements ISIPCommander { @Override public void downloadStreamCmd(MediaServer mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channel, String startTime, String endTime, int downloadSpeed, - HookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException { log.info("{} 分配的ZLM为: {} [{}:{}]", ssrcInfo.getStream(), mediaServerItem.getId(), mediaServerItem.getSdpIp(), ssrcInfo.getPort()); @@ -549,7 +530,6 @@ public class SIPCommander implements ISIPCommander { String callId= newCallIdHeader.getCallId(); subscribe.addSubscribe(rtpHook, (hookData) -> { log.debug("sipc 添加订阅===callId {}",callId); - hookEvent.response(hookData); subscribe.removeSubscribe(rtpHook); // 添加流注销的订阅,注销了后向设备发送bye Hook departureHook = Hook.getInstance(HookType.on_media_departure, "rtp", ssrcInfo.getStream(), mediaServerItem.getId()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index 5c36f206..950f8ff3 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -139,7 +139,6 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { errorEvent.response(event); } }); - }else { request = headerProviderPlatformProvider.createRegisterRequest(parentPlatform, fromTag, toTag, www, callIdHeader, isRegister? parentPlatform.getExpires() : 0); } diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java index e55758a2..97ec3b59 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java @@ -7,8 +7,8 @@ import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.MediaConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; -import com.genersoft.iot.vmp.gb28181.bean.PlayException; import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo; +import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.bean.MediaServer; @@ -18,10 +18,8 @@ import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerChangeEvent; import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerDeleteEvent; import com.genersoft.iot.vmp.media.service.IMediaNodeServerService; import com.genersoft.iot.vmp.media.service.IMediaServerService; -import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType; -import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService; import com.genersoft.iot.vmp.service.bean.MediaServerLoad; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; @@ -178,7 +176,7 @@ public class MediaServerServiceImpl implements IMediaServerService { } else { rtpServerPort = mediaServer.getRtpProxyPort(); } - return new SSRCInfo(rtpServerPort, ssrc, streamId); + return new SSRCInfo(rtpServerPort, ssrc, streamId, null); } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/service/IReceiveRtpServerService.java b/src/main/java/com/genersoft/iot/vmp/service/IReceiveRtpServerService.java index 1f18dcc0..06620367 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IReceiveRtpServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IReceiveRtpServerService.java @@ -9,5 +9,5 @@ import com.genersoft.iot.vmp.service.bean.SSRCInfo; public interface IReceiveRtpServerService { SSRCInfo openRTPServer(RTPServerParam rtpServerParam, ErrorCallback callback); - void closeRTPServer(MediaServer mediaServer, String stream); + void closeRTPServer(MediaServer mediaServer, SSRCInfo ssrcInfo); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/RTPServerParam.java b/src/main/java/com/genersoft/iot/vmp/service/bean/RTPServerParam.java index d5beacd6..2baf335a 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/bean/RTPServerParam.java +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/RTPServerParam.java @@ -20,4 +20,6 @@ public class RTPServerParam { * tcp模式,0时为不启用tcp监听,1时为启用tcp监听,2时为tcp主动连接模式 */ private Integer tcpMode; + + } diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/SSRCInfo.java b/src/main/java/com/genersoft/iot/vmp/service/bean/SSRCInfo.java index 1723bc59..ac0e2b3b 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/bean/SSRCInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/SSRCInfo.java @@ -1,38 +1,19 @@ package com.genersoft.iot.vmp.service.bean; +import lombok.Data; + +@Data public class SSRCInfo { private int port; private String ssrc; private String Stream; + private String timeOutTaskKey; - public SSRCInfo(int port, String ssrc, String stream) { + public SSRCInfo(int port, String ssrc, String stream, String timeOutTaskKey) { this.port = port; this.ssrc = ssrc; - Stream = stream; - } - - public int getPort() { - return port; - } - - public void setPort(int port) { - this.port = port; - } - - public String getSsrc() { - return ssrc; - } - - public void setSsrc(String ssrc) { - this.ssrc = ssrc; - } - - public String getStream() { - return Stream; - } - - public void setStream(String stream) { - Stream = stream; + this.Stream = stream; + this.timeOutTaskKey = timeOutTaskKey; } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java index 91f55b84..351ffb1e 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java @@ -2,7 +2,6 @@ package com.genersoft.iot.vmp.service.impl; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager; import com.genersoft.iot.vmp.media.bean.MediaServer; @@ -107,17 +106,21 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService { rtpServerPort = rtpServerParam.getMediaServerItem().getRtpProxyPort(); } if (rtpServerPort == 0) { - callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getMsg(), null); + callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "开启RTPServer失败", null); // 释放ssrc if (rtpServerParam.getPresetSsrc() == null) { ssrcFactory.releaseSsrc(rtpServerParam.getMediaServerItem().getId(), ssrc); } return null; } - SSRCInfo ssrcInfo = new SSRCInfo(rtpServerPort, ssrc, streamId); // 设置流超时的定时任务 String timeOutTaskKey = UUID.randomUUID().toString(); + + SSRCInfo ssrcInfo = new SSRCInfo(rtpServerPort, ssrc, streamId, timeOutTaskKey); + + + Hook rtpHook = Hook.getInstance(HookType.on_media_arrival, "rtp", streamId, rtpServerParam.getMediaServerItem().getId()); dynamicTask.startDelay(timeOutTaskKey, () -> { // 收流超时 // 释放ssrc @@ -126,28 +129,32 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService { } // 关闭收流端口 mediaServerService.closeRTPServer(rtpServerParam.getMediaServerItem(), streamId); + subscribe.removeSubscribe(rtpHook); + callback.run(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null); }, userSetting.getPlayTimeout()); // 开启流到来的监听 - Hook rtpHook = Hook.getInstance(HookType.on_media_arrival, "rtp", streamId, rtpServerParam.getMediaServerItem().getId()); subscribe.addSubscribe(rtpHook, (hookData) -> { dynamicTask.stop(timeOutTaskKey); // hook响应 callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), hookData); + subscribe.removeSubscribe(rtpHook); }); return ssrcInfo; } @Override - public void closeRTPServer(MediaServer mediaServer, String stream) { + public void closeRTPServer(MediaServer mediaServer, SSRCInfo ssrcInfo) { if (mediaServer == null) { return; } - SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(stream); - if (ssrcTransaction != null) { - // 释放ssrc - ssrcFactory.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc()); + if (ssrcInfo.getTimeOutTaskKey() != null) { + dynamicTask.stop(ssrcInfo.getTimeOutTaskKey()); } - mediaServerService.closeRTPServer(mediaServer, stream); + if (ssrcInfo.getSsrc() != null) { + // 释放ssrc + ssrcFactory.releaseSsrc(mediaServer.getId(), ssrcInfo.getSsrc()); + } + mediaServerService.closeRTPServer(mediaServer, ssrcInfo.getStream()); } }