From 26919170f40ba088a2f1c7b750a82df3cb7cfc60 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Thu, 5 Sep 2024 17:59:17 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=B4=E6=97=B6=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/media/event/hook/HookData.java | 58 +------------------ .../vmp/service/IReceiveRtpServerService.java | 6 ++ .../service/impl/RtpServerServiceImpl.java | 45 ++++++++------ 3 files changed, 35 insertions(+), 74 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookData.java b/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookData.java index d4a82a7e..6aa3a675 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookData.java +++ b/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookData.java @@ -8,10 +8,12 @@ import com.genersoft.iot.vmp.media.event.media.MediaPublishEvent; import com.genersoft.iot.vmp.media.event.media.MediaRecordMp4Event; import com.genersoft.iot.vmp.media.bean.MediaServer; import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; /** * Hook返回的内容 */ +@Data public class HookData { /** * 应用名 @@ -73,60 +75,4 @@ public class HookData { } return hookData; } - - public String getApp() { - return app; - } - - public void setApp(String app) { - this.app = app; - } - - public String getStream() { - return stream; - } - - public void setStream(String stream) { - this.stream = stream; - } - - public MediaServer getMediaServer() { - return mediaServer; - } - - public void setMediaServer(MediaServer mediaServer) { - this.mediaServer = mediaServer; - } - - public String getSchema() { - return schema; - } - - public void setSchema(String schema) { - this.schema = schema; - } - - public MediaInfo getMediaInfo() { - return mediaInfo; - } - - public void setMediaInfo(MediaInfo mediaInfo) { - this.mediaInfo = mediaInfo; - } - - public String getParams() { - return params; - } - - public void setParams(String params) { - this.params = params; - } - - public RecordInfo getRecordInfo() { - return recordInfo; - } - - public void setRecordInfo(RecordInfo recordInfo) { - this.recordInfo = recordInfo; - } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IReceiveRtpServerService.java b/src/main/java/com/genersoft/iot/vmp/service/IReceiveRtpServerService.java index ad7cbbef..2f0e61b0 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IReceiveRtpServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IReceiveRtpServerService.java @@ -1,4 +1,10 @@ package com.genersoft.iot.vmp.service; +import com.genersoft.iot.vmp.media.event.hook.HookData; +import com.genersoft.iot.vmp.service.bean.ErrorCallback; +import com.genersoft.iot.vmp.service.bean.RTPServerParam; +import com.genersoft.iot.vmp.service.bean.SSRCInfo; + public interface IReceiveRtpServerService { + SSRCInfo openRTPServer(RTPServerParam rtpServerParam, ErrorCallback callback); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java index c02a1746..1a2c0e6c 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java @@ -1,17 +1,12 @@ package com.genersoft.iot.vmp.service.impl; -import com.genersoft.iot.vmp.common.InviteInfo; -import com.genersoft.iot.vmp.common.InviteSessionType; -import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; -import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatch; -import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType; -import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; +import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.event.hook.Hook; +import com.genersoft.iot.vmp.media.event.hook.HookData; +import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.event.hook.HookType; import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent; @@ -26,14 +21,8 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Async; -import org.springframework.security.core.parameters.P; import org.springframework.stereotype.Service; -import org.springframework.util.Assert; -import javax.sip.InvalidArgumentException; -import javax.sip.SipException; -import java.text.ParseException; -import java.util.List; import java.util.UUID; @Slf4j @@ -52,6 +41,9 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService { @Autowired private UserSetting userSetting; + @Autowired + private HookSubscribe subscribe; + /** * 流到来的处理 */ @@ -71,7 +63,7 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService { } @Override - public SSRCInfo openRTPServer(RTPServerParam rtpServerParam, ErrorCallback callback) { + public SSRCInfo openRTPServer(RTPServerParam rtpServerParam, ErrorCallback callback) { if (callback == null) { log.warn("[开启RTP收流] 失败,回调为NULL"); return null; @@ -112,9 +104,14 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService { } if (rtpServerPort == 0) { callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getMsg(), null); + // 释放ssrc + if (rtpServerParam.getPresetSsrc() == null) { + ssrcFactory.releaseSsrc(rtpServerParam.getMediaServerItem().getId(), ssrc); + } return null; } SSRCInfo ssrcInfo = new SSRCInfo(rtpServerPort, ssrc, streamId); + // 设置流超时的定时任务 String timeOutTaskKey = UUID.randomUUID().toString(); dynamicTask.startDelay(timeOutTaskKey, () -> { @@ -126,11 +123,23 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService { // 关闭收流端口 mediaServerService.closeRTPServer(rtpServerParam.getMediaServerItem(), streamId); }, userSetting.getPlayTimeout()); + // 开启流到来的监听 + Hook rtpHook = Hook.getInstance(HookType.on_media_arrival, "rtp", streamId, rtpServerParam.getMediaServerItem().getId()); + subscribe.addSubscribe(rtpHook, (hookData) -> { + dynamicTask.stop(timeOutTaskKey); + // hook响应 + callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), hookData); + }); + return ssrcInfo; + } - - - + @Override + public void closeRTPServer(MediaServer mediaServer, String streamId) { + if (mediaServer == null) { + return; + } + // 释放ssrc }