临时提交

pull/1642/head
648540858 2024-09-04 23:06:27 +08:00
parent ad5b66eb71
commit 568abbcd9d
4 changed files with 110 additions and 12 deletions

View File

@ -160,4 +160,7 @@ public interface IMediaServerService {
void stopProxy(MediaServer mediaServer, String streamKey); void stopProxy(MediaServer mediaServer, String streamKey);
StreamInfo getMediaByAppAndStream(String app, String stream); StreamInfo getMediaByAppAndStream(String app, String stream);
int createRTPServer(MediaServer mediaServerItem, String streamId, long ssrc, Integer port, boolean onlyAuto, boolean disableAudio, boolean reUsePort, Integer tcpMode);
} }

View File

@ -186,6 +186,22 @@ public class MediaServerServiceImpl implements IMediaServerService {
return new SSRCInfo(rtpServerPort, ssrc, streamId); return new SSRCInfo(rtpServerPort, ssrc, streamId);
} }
@Override
public int createRTPServer(MediaServer mediaServer, String streamId, long ssrc, Integer port, boolean onlyAuto, boolean disableAudio, boolean reUsePort, Integer tcpMode) {
int rtpServerPort;
if (mediaServer.isRtpEnable()) {
IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
if (mediaNodeServerService == null) {
log.info("[openRTPServer] 失败, mediaServer的类型 {},未找到对应的实现类", mediaServer.getType());
return 0;
}
rtpServerPort = mediaNodeServerService.createRTPServer(mediaServer, streamId, ssrc, port, onlyAuto, disableAudio, reUsePort, tcpMode);
} else {
rtpServerPort = mediaServer.getRtpProxyPort();
}
return rtpServerPort;
}
@Override @Override
public void closeRTPServer(MediaServer mediaServer, String streamId) { public void closeRTPServer(MediaServer mediaServer, String streamId) {
if (mediaServer == null) { if (mediaServer == null) {

View File

@ -6,14 +6,18 @@ import lombok.Data;
@Data @Data
public class RTPServerParam { public class RTPServerParam {
MediaServer mediaServerItem; private MediaServer mediaServerItem;
String streamId; private String streamId;
String presetSsrc; private String presetSsrc;
boolean ssrcCheck; private boolean ssrcCheck;
boolean isPlayback; private boolean playback;
Integer port; private Integer port;
Boolean onlyAuto; private boolean onlyAuto;
Boolean disableAudio; private boolean disableAudio;
Boolean reUsePort; private boolean reUsePort;
Integer tcpMode;
/**
* tcp0tcp1tcp2tcp
*/
private Integer tcpMode;
} }

View File

@ -1,29 +1,42 @@
package com.genersoft.iot.vmp.service.impl; package com.genersoft.iot.vmp.service.impl;
import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatch; import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatch;
import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType; import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent; import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
import com.genersoft.iot.vmp.media.service.IMediaNodeServerService;
import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IReceiveRtpServerService; import com.genersoft.iot.vmp.service.IReceiveRtpServerService;
import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.service.bean.RTPServerParam; import com.genersoft.iot.vmp.service.bean.RTPServerParam;
import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener; import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Async;
import org.springframework.security.core.parameters.P;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
import javax.sip.InvalidArgumentException; import javax.sip.InvalidArgumentException;
import javax.sip.SipException; import javax.sip.SipException;
import java.text.ParseException; import java.text.ParseException;
import java.util.List; import java.util.List;
import java.util.UUID;
@Slf4j
@Service @Service
public class RtpServerServiceImpl implements IReceiveRtpServerService { public class RtpServerServiceImpl implements IReceiveRtpServerService {
@ -33,6 +46,12 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
@Autowired @Autowired
private DynamicTask dynamicTask; private DynamicTask dynamicTask;
@Autowired
private SSRCFactory ssrcFactory;
@Autowired
private UserSetting userSetting;
/** /**
* *
*/ */
@ -53,9 +72,65 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
@Override @Override
public SSRCInfo openRTPServer(RTPServerParam rtpServerParam, ErrorCallback<StreamInfo> callback) { public SSRCInfo openRTPServer(RTPServerParam rtpServerParam, ErrorCallback<StreamInfo> callback) {
// 开启流到来的监听 if (callback == null) {
log.warn("[开启RTP收流] 失败回调为NULL");
return null;
}
if (rtpServerParam.getMediaServerItem() == null) {
log.warn("[开启RTP收流] 失败媒体节点为NULL");
return null;
}
// 获取mediaServer可用的ssrc
final String ssrc;
if (rtpServerParam.getPresetSsrc() != null) {
ssrc = rtpServerParam.getPresetSsrc();
}else {
if (rtpServerParam.isPlayback()) {
ssrc = ssrcFactory.getPlayBackSsrc(rtpServerParam.getMediaServerItem().getId());
}else {
ssrc = ssrcFactory.getPlaySsrc(rtpServerParam.getMediaServerItem().getId());
}
}
final String streamId;
if (rtpServerParam.getStreamId() == null) {
streamId = String.format("%08x", Long.parseLong(ssrc)).toUpperCase();
}else {
streamId = rtpServerParam.getStreamId();
}
if (rtpServerParam.isSsrcCheck() && rtpServerParam.getTcpMode() > 0) {
// 目前zlm不支持 tcp模式更新ssrc暂时关闭ssrc校验
log.warn("[openRTPServer] 平台对接时下级可能自定义ssrc但是tcp模式zlm收流目前无法更新ssrc可能收流超时此时请使用udp收流或者关闭ssrc校验");
}
int rtpServerPort;
if (rtpServerParam.getMediaServerItem().isRtpEnable()) {
rtpServerPort = mediaServerService.createRTPServer(rtpServerParam.getMediaServerItem(), streamId,
rtpServerParam.isSsrcCheck() ? Long.parseLong(ssrc) : 0, rtpServerParam.getPort(), rtpServerParam.isOnlyAuto(),
rtpServerParam.isDisableAudio(), rtpServerParam.isReUsePort(), rtpServerParam.getTcpMode());
} else {
rtpServerPort = rtpServerParam.getMediaServerItem().getRtpProxyPort();
}
if (rtpServerPort == 0) {
callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getMsg(), null);
return null;
}
SSRCInfo ssrcInfo = new SSRCInfo(rtpServerPort, ssrc, streamId);
// 设置流超时的定时任务 // 设置流超时的定时任务
// 调用节点,开启端口监听 String timeOutTaskKey = UUID.randomUUID().toString();
dynamicTask.startDelay(timeOutTaskKey, () -> {
// 收流超时
// 释放ssrc
if (rtpServerParam.getPresetSsrc() == null) {
ssrcFactory.releaseSsrc(rtpServerParam.getMediaServerItem().getId(), ssrc);
}
// 关闭收流端口
mediaServerService.closeRTPServer(rtpServerParam.getMediaServerItem(), streamId);
}, userSetting.getPlayTimeout());
// 开启流到来的监听
} }