diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaNodeServerService.java b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaNodeServerService.java index 2566e0bee..c07797d9a 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaNodeServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaNodeServerService.java @@ -5,6 +5,7 @@ import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.bean.MediaServer; +import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import java.util.List; @@ -62,4 +63,6 @@ public interface IMediaNodeServerService { void startSendRtpStream(MediaServer mediaServer, SendRtpItem sendRtpItem); Long updateDownloadProcess(MediaServer mediaServer, String app, String stream); + + StreamInfo startProxy(MediaServer mediaServer, StreamProxy streamProxy); } diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java index a8c53381e..8f5615bab 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java @@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.service.bean.MediaServerLoad; import com.genersoft.iot.vmp.service.bean.SSRCInfo; +import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import java.util.List; @@ -152,4 +153,6 @@ public interface IMediaServerService { MediaServer getMediaServerByAppAndStream(String app, String stream); Long updateDownloadProcess(MediaServer mediaServerItem, String app, String stream); + + StreamInfo startProxy(MediaServer mediaServer, StreamProxy streamProxy); } diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java index d8f5ba955..b551ab09b 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java @@ -25,6 +25,7 @@ import com.genersoft.iot.vmp.service.bean.MediaServerLoad; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.dao.MediaServerMapper; +import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.utils.JsonUtil; import com.genersoft.iot.vmp.utils.redis.RedisUtil; @@ -919,4 +920,14 @@ public class MediaServerServiceImpl implements IMediaServerService { } return mediaNodeServerService.updateDownloadProcess(mediaServer, app, stream); } + + @Override + public StreamInfo startProxy(MediaServer mediaServer, StreamProxy streamProxy) { + IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType()); + if (mediaNodeServerService == null) { + logger.info("[startProxy] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType()); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到mediaServer对应的实现类"); + } + return mediaNodeServerService.startProxy(mediaServer, streamProxy); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java index 43443433d..3953d9931 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java @@ -11,6 +11,7 @@ import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.service.IMediaNodeServerService; import com.genersoft.iot.vmp.media.zlm.dto.ZLMServerConfig; +import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import org.slf4j.Logger; @@ -390,4 +391,9 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService { } return mediaInfo.getDuration(); } + + @Override + public StreamInfo startProxy(MediaServer mediaServer, StreamProxy streamProxy) { + return null; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/streamProxy/bean/StreamProxy.java b/src/main/java/com/genersoft/iot/vmp/streamProxy/bean/StreamProxy.java index 13d521b44..adddde6c9 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamProxy/bean/StreamProxy.java +++ b/src/main/java/com/genersoft/iot/vmp/streamProxy/bean/StreamProxy.java @@ -34,7 +34,6 @@ public class StreamProxy extends CommonGBChannel { @Schema(description = "拉流地址") private String srcUrl; - @Schema(description = "超时时间") private int timeout; diff --git a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java index 02d8f0236..840f6515e 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java @@ -136,21 +136,22 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @Override - public void save(StreamProxy param, GeneralCallback callback) { + public void save(StreamProxy streamProxy, GeneralCallback callback) { MediaServer mediaServer; - if (ObjectUtils.isEmpty(param.getMediaServerId()) || "auto".equals(param.getMediaServerId())){ + if (ObjectUtils.isEmpty(streamProxy.getMediaServerId()) || "auto".equals(streamProxy.getMediaServerId())){ mediaServer = mediaServerService.getMediaServerForMinimumLoad(null); }else { - mediaServer = mediaServerService.getOne(param.getMediaServerId()); + mediaServer = mediaServerService.getOne(streamProxy.getMediaServerId()); } if (mediaServer == null) { log.warn("保存代理未找到在线的ZLM..."); throw new ControllerException(ErrorCode.ERROR100.getCode(), "保存代理未找到在线的ZLM"); } - String dstUrl; - if ("ffmpeg".equalsIgnoreCase(param.getType())) { - String ffmpegCmd = mediaServerService.getFfmpegCmd(mediaServer, param.getFfmpegCmdKey()); + String dstUrl; + if ("ffmpeg".equalsIgnoreCase(streamProxy.getType())) { + + String ffmpegCmd = mediaServerService.getFfmpegCmd(mediaServer, streamProxy.getFfmpegCmdKey()); if (ffmpegCmd == null) { throw new ControllerException(ErrorCode.ERROR100.getCode(), "ffmpeg拉流代理无法获取ffmpeg cmd"); @@ -172,37 +173,47 @@ public class StreamProxyServiceImpl implements IStreamProxyService { schemaForUri = schema; } - dstUrl = String.format("%s://%s:%s/%s/%s", schemaForUri, "127.0.0.1", port, param.getApp(), - param.getStream()); + dstUrl = String.format("%s://%s:%s/%s/%s", schemaForUri, "127.0.0.1", port, streamProxy.getApp(), + streamProxy.getStream()); }else { - dstUrl = String.format("rtsp://%s:%s/%s/%s", "127.0.0.1", mediaServer.getRtspPort(), param.getApp(), - param.getStream()); + dstUrl = String.format("rtsp://%s:%s/%s/%s", "127.0.0.1", mediaServer.getRtspPort(), streamProxy.getApp(), + streamProxy.getStream()); } - param.setDstUrl(dstUrl); + streamProxy.setDstUrl(dstUrl); log.info("[拉流代理] 输出地址为:{}", dstUrl); - param.setMediaServerId(mediaServer.getId()); + streamProxy.setMediaServerId(mediaServer.getId()); boolean saveResult; // 更新 - if (streamProxyMapper.selectOne(param.getApp(), param.getStream()) != null) { - saveResult = updateStreamProxy(param); + if (streamProxyMapper.selectOne(streamProxy.getApp(), streamProxy.getStream()) != null) { + saveResult = updateStreamProxy(streamProxy); }else { // 新增 - saveResult = addStreamProxy(param); + saveResult = addStreamProxy(streamProxy); } if (!saveResult) { callback.run(ErrorCode.ERROR100.getCode(), "保存失败", null); return; } - Hook hook = Hook.getInstance(HookType.on_media_arrival, param.getApp(), param.getStream(), mediaServer.getId()); - hookSubscribe.addSubscribe(hook, (hookData) -> { - StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream( - mediaServer, param.getApp(), param.getStream(), null, null); - callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); - }); - if (param.isEnable()) { + + if (streamProxy.isEnable()) { + StreamInfo streamInfo = mediaServerService.startProxy(streamProxy); + if (streamInfo != null) { + callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); + }else { + callback.run(ErrorCode.ERROR100.getCode(), "记录已保存,启用失败", null); + } + + + + Hook hook = Hook.getInstance(HookType.on_media_arrival, streamProxy.getApp(), streamProxy.getStream(), mediaServer.getId()); + hookSubscribe.addSubscribe(hook, (hookData) -> { + StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream( + mediaServer, streamProxy.getApp(), streamProxy.getStream(), null, null); + callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); + }); String talkKey = UUID.randomUUID().toString(); String delayTalkKey = UUID.randomUUID().toString(); dynamicTask.startDelay(delayTalkKey, ()->{ - StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStreamWithCheck(param.getApp(), param.getStream(), mediaServer.getId(), false); + StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStreamWithCheck(streamProxy.getApp(), streamProxy.getStream(), mediaServer.getId(), false); if (streamInfo != null) { callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); }else { @@ -210,20 +221,20 @@ public class StreamProxyServiceImpl implements IStreamProxyService { callback.run(ErrorCode.ERROR100.getCode(), "超时", null); } }, 7000); - WVPResult result = addStreamProxyToZlm(param); + WVPResult result = addStreamProxyToZlm(streamProxy); if (result != null && result.getCode() == 0) { hookSubscribe.removeSubscribe(hook); dynamicTask.stop(talkKey); StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream( - mediaServer, param.getApp(), param.getStream(), null, null); + mediaServer, streamProxy.getApp(), streamProxy.getStream(), null, null); callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); }else { - param.setEnable(false); + streamProxy.setEnable(false); // 直接移除 - if (param.isEnableRemoveNoneReader()) { - del(param.getApp(), param.getStream()); + if (streamProxy.isEnableRemoveNoneReader()) { + del(streamProxy.getApp(), streamProxy.getStream()); }else { - updateStreamProxy(param); + updateStreamProxy(streamProxy); } if (result == null){ callback.run(ErrorCode.ERROR100.getCode(), "记录已保存,启用失败", null); @@ -231,10 +242,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService { callback.run(ErrorCode.ERROR100.getCode(), result.getMsg(), null); } } - } - else{ + }else{ StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream( - mediaServer, param.getApp(), param.getStream(), null, null); + mediaServer, streamProxy.getApp(), streamProxy.getStream(), null, null); callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); } }