diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java index b551ab09..fe505b19 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java @@ -821,13 +821,13 @@ public class MediaServerServiceImpl implements IMediaServerService { } @Override - public Boolean isStreamReady(MediaServer mediaServer, String rtp, String streamId) { + public Boolean isStreamReady(MediaServer mediaServer, String app, String streamId) { IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType()); if (mediaNodeServerService == null) { logger.info("[isStreamReady] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType()); return false; } - MediaInfo mediaInfo = mediaNodeServerService.getMediaInfo(mediaServer, rtp, streamId); + MediaInfo mediaInfo = mediaNodeServerService.getMediaInfo(mediaServer, app, streamId); return mediaInfo != null; } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java index 3953d993..a88f64f7 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java @@ -394,6 +394,63 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService { @Override public StreamInfo startProxy(MediaServer mediaServer, StreamProxy streamProxy) { + String dstUrl; + if ("ffmpeg".equalsIgnoreCase(streamProxy.getType())) { + + String ffmpegCmd = getFfmpegCmd(mediaServer, streamProxy.getFfmpegCmdKey()); + + if (ffmpegCmd == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "ffmpeg拉流代理无法获取ffmpeg cmd"); + } + String schema = getSchemaFromFFmpegCmd(ffmpegCmd); + if (schema == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "ffmpeg拉流代理无法从ffmpeg cmd中获取到输出格式"); + } + int port; + String schemaForUri; + if (schema.equalsIgnoreCase("rtsp")) { + port = mediaServer.getRtspPort(); + schemaForUri = schema; + }else if (schema.equalsIgnoreCase("flv")) { + port = mediaServer.getRtmpPort(); + schemaForUri = schema; + }else { + port = mediaServer.getRtmpPort(); + schemaForUri = schema; + } + + dstUrl = String.format("%s://%s:%s/%s/%s", schemaForUri, "127.0.0.1", port, streamProxy.getApp(), + streamProxy.getStream()); + }else { + dstUrl = String.format("rtsp://%s:%s/%s/%s", "127.0.0.1", mediaServer.getRtspPort(), streamProxy.getApp(), + streamProxy.getStream()); + } + MediaInfo mediaInfo = getMediaInfo(mediaServer, streamProxy.getApp(), streamProxy.getStream()); + if (mediaInfo != null) { + mediaServerService.closeStreams(mediaServer, param.getApp(), param.getStream()); + } + if (isStreamReady(mediaServer, param.getApp(), param.getStream())) { + mediaServerService.closeStreams(mediaServer, param.getApp(), param.getStream()); + } + return null; + } + + private String getSchemaFromFFmpegCmd(String ffmpegCmd) { + ffmpegCmd = ffmpegCmd.replaceAll(" + ", " "); + String[] paramArray = ffmpegCmd.split(" "); + if (paramArray.length == 0) { + return null; + } + for (int i = 0; i < paramArray.length; i++) { + if (paramArray[i].equalsIgnoreCase("-f")) { + if (i + 1 < paramArray.length - 1) { + return paramArray[i+1]; + }else { + return null; + } + + } + } return null; } } diff --git a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java index 840f6515..347d547f 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java @@ -148,39 +148,6 @@ public class StreamProxyServiceImpl implements IStreamProxyService { throw new ControllerException(ErrorCode.ERROR100.getCode(), "保存代理未找到在线的ZLM"); } - String dstUrl; - if ("ffmpeg".equalsIgnoreCase(streamProxy.getType())) { - - String ffmpegCmd = mediaServerService.getFfmpegCmd(mediaServer, streamProxy.getFfmpegCmdKey()); - - if (ffmpegCmd == null) { - throw new ControllerException(ErrorCode.ERROR100.getCode(), "ffmpeg拉流代理无法获取ffmpeg cmd"); - } - String schema = getSchemaFromFFmpegCmd(ffmpegCmd); - if (schema == null) { - throw new ControllerException(ErrorCode.ERROR100.getCode(), "ffmpeg拉流代理无法从ffmpeg cmd中获取到输出格式"); - } - int port; - String schemaForUri; - if (schema.equalsIgnoreCase("rtsp")) { - port = mediaServer.getRtspPort(); - schemaForUri = schema; - }else if (schema.equalsIgnoreCase("flv")) { - port = mediaServer.getRtmpPort(); - schemaForUri = schema; - }else { - port = mediaServer.getRtmpPort(); - schemaForUri = schema; - } - - dstUrl = String.format("%s://%s:%s/%s/%s", schemaForUri, "127.0.0.1", port, streamProxy.getApp(), - streamProxy.getStream()); - }else { - dstUrl = String.format("rtsp://%s:%s/%s/%s", "127.0.0.1", mediaServer.getRtspPort(), streamProxy.getApp(), - streamProxy.getStream()); - } - streamProxy.setDstUrl(dstUrl); - log.info("[拉流代理] 输出地址为:{}", dstUrl); streamProxy.setMediaServerId(mediaServer.getId()); boolean saveResult; // 更新 @@ -195,53 +162,53 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } if (streamProxy.isEnable()) { - StreamInfo streamInfo = mediaServerService.startProxy(streamProxy); + StreamInfo streamInfo = mediaServerService.startProxy(mediaServer, streamProxy); if (streamInfo != null) { callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); }else { callback.run(ErrorCode.ERROR100.getCode(), "记录已保存,启用失败", null); } - - - Hook hook = Hook.getInstance(HookType.on_media_arrival, streamProxy.getApp(), streamProxy.getStream(), mediaServer.getId()); - hookSubscribe.addSubscribe(hook, (hookData) -> { - StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream( - mediaServer, streamProxy.getApp(), streamProxy.getStream(), null, null); - callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); - }); - String talkKey = UUID.randomUUID().toString(); - String delayTalkKey = UUID.randomUUID().toString(); - dynamicTask.startDelay(delayTalkKey, ()->{ - StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStreamWithCheck(streamProxy.getApp(), streamProxy.getStream(), mediaServer.getId(), false); - if (streamInfo != null) { - callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); - }else { - dynamicTask.stop(talkKey); - callback.run(ErrorCode.ERROR100.getCode(), "超时", null); - } - }, 7000); - WVPResult result = addStreamProxyToZlm(streamProxy); - if (result != null && result.getCode() == 0) { - hookSubscribe.removeSubscribe(hook); - dynamicTask.stop(talkKey); - StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream( - mediaServer, streamProxy.getApp(), streamProxy.getStream(), null, null); - callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); - }else { - streamProxy.setEnable(false); - // 直接移除 - if (streamProxy.isEnableRemoveNoneReader()) { - del(streamProxy.getApp(), streamProxy.getStream()); - }else { - updateStreamProxy(streamProxy); - } - if (result == null){ - callback.run(ErrorCode.ERROR100.getCode(), "记录已保存,启用失败", null); - }else { - callback.run(ErrorCode.ERROR100.getCode(), result.getMsg(), null); - } - } +// +// +// Hook hook = Hook.getInstance(HookType.on_media_arrival, streamProxy.getApp(), streamProxy.getStream(), mediaServer.getId()); +// hookSubscribe.addSubscribe(hook, (hookData) -> { +// StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream( +// mediaServer, streamProxy.getApp(), streamProxy.getStream(), null, null); +// callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); +// }); +// String talkKey = UUID.randomUUID().toString(); +// String delayTalkKey = UUID.randomUUID().toString(); +// dynamicTask.startDelay(delayTalkKey, ()->{ +// StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStreamWithCheck(streamProxy.getApp(), streamProxy.getStream(), mediaServer.getId(), false); +// if (streamInfo != null) { +// callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); +// }else { +// dynamicTask.stop(talkKey); +// callback.run(ErrorCode.ERROR100.getCode(), "超时", null); +// } +// }, 7000); +// WVPResult result = addStreamProxyToZlm(streamProxy); +// if (result != null && result.getCode() == 0) { +// hookSubscribe.removeSubscribe(hook); +// dynamicTask.stop(talkKey); +// StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream( +// mediaServer, streamProxy.getApp(), streamProxy.getStream(), null, null); +// callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); +// }else { +// streamProxy.setEnable(false); +// // 直接移除 +// if (streamProxy.isEnableRemoveNoneReader()) { +// del(streamProxy.getApp(), streamProxy.getStream()); +// }else { +// updateStreamProxy(streamProxy); +// } +// if (result == null){ +// callback.run(ErrorCode.ERROR100.getCode(), "记录已保存,启用失败", null); +// }else { +// callback.run(ErrorCode.ERROR100.getCode(), result.getMsg(), null); +// } +// } }else{ StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream( mediaServer, streamProxy.getApp(), streamProxy.getStream(), null, null); @@ -249,24 +216,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } } - private String getSchemaFromFFmpegCmd(String ffmpegCmd) { - ffmpegCmd = ffmpegCmd.replaceAll(" + ", " "); - String[] paramArray = ffmpegCmd.split(" "); - if (paramArray.length == 0) { - return null; - } - for (int i = 0; i < paramArray.length; i++) { - if (paramArray[i].equalsIgnoreCase("-f")) { - if (i + 1 < paramArray.length - 1) { - return paramArray[i+1]; - }else { - return null; - } - } - } - return null; - } /** * 新增代理流