diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java index 48b81bc8..248bc10c 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java @@ -160,4 +160,7 @@ public interface IMediaServerService { void stopProxy(MediaServer mediaServer, String streamKey); StreamInfo getMediaByAppAndStream(String app, String stream); + + int createRTPServer(MediaServer mediaServerItem, String streamId, long ssrc, Integer port, boolean onlyAuto, boolean disableAudio, boolean reUsePort, Integer tcpMode); + } diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java index 1efeca7a..59dc3d87 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java @@ -186,6 +186,22 @@ public class MediaServerServiceImpl implements IMediaServerService { return new SSRCInfo(rtpServerPort, ssrc, streamId); } + @Override + public int createRTPServer(MediaServer mediaServer, String streamId, long ssrc, Integer port, boolean onlyAuto, boolean disableAudio, boolean reUsePort, Integer tcpMode) { + int rtpServerPort; + if (mediaServer.isRtpEnable()) { + IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType()); + if (mediaNodeServerService == null) { + log.info("[openRTPServer] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType()); + return 0; + } + rtpServerPort = mediaNodeServerService.createRTPServer(mediaServer, streamId, ssrc, port, onlyAuto, disableAudio, reUsePort, tcpMode); + } else { + rtpServerPort = mediaServer.getRtpProxyPort(); + } + return rtpServerPort; + } + @Override public void closeRTPServer(MediaServer mediaServer, String streamId) { if (mediaServer == null) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/RTPServerParam.java b/src/main/java/com/genersoft/iot/vmp/service/bean/RTPServerParam.java index 54839e89..d5beacd6 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/bean/RTPServerParam.java +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/RTPServerParam.java @@ -6,14 +6,18 @@ import lombok.Data; @Data public class RTPServerParam { - MediaServer mediaServerItem; - String streamId; - String presetSsrc; - boolean ssrcCheck; - boolean isPlayback; - Integer port; - Boolean onlyAuto; - Boolean disableAudio; - Boolean reUsePort; - Integer tcpMode; + private MediaServer mediaServerItem; + private String streamId; + private String presetSsrc; + private boolean ssrcCheck; + private boolean playback; + private Integer port; + private boolean onlyAuto; + private boolean disableAudio; + private boolean reUsePort; + + /** + * tcp模式,0时为不启用tcp监听,1时为启用tcp监听,2时为tcp主动连接模式 + */ + private Integer tcpMode; } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java index 42d65f39..c02a1746 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java @@ -1,29 +1,42 @@ package com.genersoft.iot.vmp.service.impl; +import com.genersoft.iot.vmp.common.InviteInfo; +import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.DynamicTask; +import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatch; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; +import com.genersoft.iot.vmp.media.event.hook.Hook; +import com.genersoft.iot.vmp.media.event.hook.HookType; import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent; +import com.genersoft.iot.vmp.media.service.IMediaNodeServerService; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.service.IReceiveRtpServerService; import com.genersoft.iot.vmp.service.bean.ErrorCallback; +import com.genersoft.iot.vmp.service.bean.InviteErrorCode; import com.genersoft.iot.vmp.service.bean.RTPServerParam; import com.genersoft.iot.vmp.service.bean.SSRCInfo; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Async; +import org.springframework.security.core.parameters.P; import org.springframework.stereotype.Service; +import org.springframework.util.Assert; import javax.sip.InvalidArgumentException; import javax.sip.SipException; import java.text.ParseException; import java.util.List; +import java.util.UUID; +@Slf4j @Service public class RtpServerServiceImpl implements IReceiveRtpServerService { @@ -33,6 +46,12 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService { @Autowired private DynamicTask dynamicTask; + @Autowired + private SSRCFactory ssrcFactory; + + @Autowired + private UserSetting userSetting; + /** * 流到来的处理 */ @@ -53,9 +72,65 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService { @Override public SSRCInfo openRTPServer(RTPServerParam rtpServerParam, ErrorCallback callback) { - // 开启流到来的监听 + if (callback == null) { + log.warn("[开启RTP收流] 失败,回调为NULL"); + return null; + } + if (rtpServerParam.getMediaServerItem() == null) { + log.warn("[开启RTP收流] 失败,媒体节点为NULL"); + return null; + } + + // 获取mediaServer可用的ssrc + final String ssrc; + if (rtpServerParam.getPresetSsrc() != null) { + ssrc = rtpServerParam.getPresetSsrc(); + }else { + if (rtpServerParam.isPlayback()) { + ssrc = ssrcFactory.getPlayBackSsrc(rtpServerParam.getMediaServerItem().getId()); + }else { + ssrc = ssrcFactory.getPlaySsrc(rtpServerParam.getMediaServerItem().getId()); + } + } + final String streamId; + if (rtpServerParam.getStreamId() == null) { + streamId = String.format("%08x", Long.parseLong(ssrc)).toUpperCase(); + }else { + streamId = rtpServerParam.getStreamId(); + } + if (rtpServerParam.isSsrcCheck() && rtpServerParam.getTcpMode() > 0) { + // 目前zlm不支持 tcp模式更新ssrc,暂时关闭ssrc校验 + log.warn("[openRTPServer] 平台对接时下级可能自定义ssrc,但是tcp模式zlm收流目前无法更新ssrc,可能收流超时,此时请使用udp收流或者关闭ssrc校验"); + } + int rtpServerPort; + if (rtpServerParam.getMediaServerItem().isRtpEnable()) { + rtpServerPort = mediaServerService.createRTPServer(rtpServerParam.getMediaServerItem(), streamId, + rtpServerParam.isSsrcCheck() ? Long.parseLong(ssrc) : 0, rtpServerParam.getPort(), rtpServerParam.isOnlyAuto(), + rtpServerParam.isDisableAudio(), rtpServerParam.isReUsePort(), rtpServerParam.getTcpMode()); + } else { + rtpServerPort = rtpServerParam.getMediaServerItem().getRtpProxyPort(); + } + if (rtpServerPort == 0) { + callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getMsg(), null); + return null; + } + SSRCInfo ssrcInfo = new SSRCInfo(rtpServerPort, ssrc, streamId); // 设置流超时的定时任务 - // 调用节点,开启端口监听 + String timeOutTaskKey = UUID.randomUUID().toString(); + dynamicTask.startDelay(timeOutTaskKey, () -> { + // 收流超时 + // 释放ssrc + if (rtpServerParam.getPresetSsrc() == null) { + ssrcFactory.releaseSsrc(rtpServerParam.getMediaServerItem().getId(), ssrc); + } + // 关闭收流端口 + mediaServerService.closeRTPServer(rtpServerParam.getMediaServerItem(), streamId); + }, userSetting.getPlayTimeout()); + // 开启流到来的监听 + + + + }