From 48d4871ea4dbfcfcec70e69b4a9746a23f8a150a Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Tue, 9 Jan 2024 17:43:16 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E7=BC=96=E8=BE=91=E6=8B=89?= =?UTF-8?q?=E6=B5=81=E4=BB=A3=E7=90=86=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/service/IStreamProxyService.java | 5 + .../service/impl/StreamProxyServiceImpl.java | 114 ++++++++++++++++++ .../streamProxy/StreamProxyController.java | 52 ++++++++ 3 files changed, 171 insertions(+) diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java index e9ecb3cc..d7a6aed1 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java @@ -130,4 +130,9 @@ public interface IStreamProxyService { void add(StreamProxy param, GeneralCallback callback); void delById(int gbId); + + /** + * 编辑拉流代理 + */ + void edit(StreamProxy param, GeneralCallback callback); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java index bf4549e5..a07165e7 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java @@ -320,6 +320,120 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } } + @Override + public void edit(StreamProxy param, GeneralCallback callback) { + MediaServerItem mediaInfo; + StreamProxy streamProxyInDb = streamProxyMapper.selectOneById(param.getId()); + if (streamProxyInDb != null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "代理不存在"); + } + if (ObjectUtils.isEmpty(param.getMediaServerId()) || "auto".equals(param.getMediaServerId())){ + mediaInfo = mediaServerService.getMediaServerForMinimumLoad(null); + }else { + mediaInfo = mediaServerService.getOne(param.getMediaServerId()); + } + if (mediaInfo == null) { + logger.warn("[编辑拉流代理] 未找到在线的ZLM..."); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "保存代理未找到可用的ZLM"); + } + // 国标编号发生变化,修改通用通道国标变化,通用通道应发送删除再发送添加命令通知上级 + // 类型变化,启用->启用:需要重新拉起视频流, 启用->未启用: 停止旧的视频流, 未启用->启用:发起新的视频流 + // 流地址发生变化。停止旧的,拉起新的 + // ffmpeg类型下,目标流地址变化,停止旧的,拉起新的 + // 节点变化: 停止旧的,拉起新的 + // ffmpeg命令模板变化: 停止旧的,拉起新的 + if (ObjectUtils.isEmpty(streamProxyInDb.getGbId())) { + if (!ObjectUtils.isEmpty(param.getGbId())) { + // 之前是空的,现在添加了国标编号 + + } + }else { + if (ObjectUtils.isEmpty(param.getGbId())) { + // 移除了国标编号 + }else { + if (!streamProxyInDb.getGbId().equals(param.getGbId())) { + // 修改了国标编号 + } + } + } + + + + if ("ffmpeg".equalsIgnoreCase(param.getType())) { + if (ObjectUtils.isEmpty(param.getDstUrl())) { + logger.warn("[添加拉流代理] 未设置目标URL地址(DstUrl)"); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "未设置目标URL地址"); + } + logger.info("[添加拉流代理] ffmpeg,源地址: {}, 目标地址为:{}", param.getUrl(), param.getDstUrl()); + if (ObjectUtils.isEmpty(param.getApp()) || ObjectUtils.isEmpty(param.getStream())) { + try { + URL url = new URL(param.getDstUrl()); + String path = url.getPath(); + if (path.indexOf("/", 1) < 0) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "解析DstUrl失败, 至少两层路径"); + } + String app = path.substring(1, path.indexOf("/", 2)); + String stream = path.substring(path.indexOf("/", 2) + 1); + param.setApp(app); + param.setStream(stream); + } catch (MalformedURLException e) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "解析DstUrl失败"); + } + } + }else { + logger.info("[添加拉流代理] 直接拉流,源地址: {}, app: {}, stream: {}", param.getUrl(), param.getApp(), param.getStream()); + } + param.setMediaServerId(mediaInfo.getId()); + StreamProxy streamProxyInDb = streamProxyMapper.selectOne(param.getApp(), param.getStream()); + if (streamProxyInDb != null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "app/stream已经存在"); + } + if (!param.isEnable()) { + param.setStatus(false); + saveProxyToDb(param); + StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( + mediaInfo, param.getApp(), param.getStream(), null, null); + callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); + return; + } + String talkKey = UUID.randomUUID().toString(); + String delayTalkKey = UUID.randomUUID().toString(); + + HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(param.getApp(), param.getStream(), true, "rtsp", mediaInfo.getId()); + hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> { + dynamicTask.stop(talkKey); + param.setStatus(true); + saveProxyToDb(param); + StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( + mediaInfo, param.getApp(), param.getStream(), null, null); + callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); + }); + + dynamicTask.startDelay(delayTalkKey, ()->{ + hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); + dynamicTask.stop(talkKey); + callback.run(ErrorCode.ERROR100.getCode(), "启用超时,请检查源地址是否可用", null); + if (param.isEnableRemoveNoneReader()) { + return; + } + param.setProxyError("启用超时"); + param.setStatus(false); + saveProxyToDb(param); + }, 10000); + JSONObject jsonObject = addStreamProxyToZlm(param); + if (jsonObject != null && jsonObject.getInteger("code") != 0) { + hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); + dynamicTask.stop(talkKey); + callback.run(ErrorCode.ERROR100.getCode(), jsonObject.getString("msg"), null); + if (param.isEnableRemoveNoneReader()) { + return; + } + param.setProxyError("启用失败: " + jsonObject.getString("msg")); + param.setStatus(false); + saveProxyToDb(param); + } + } + private void saveProxyToDb(StreamProxy param) { // 未启用的数据可以直接保存了 if (!ObjectUtils.isEmpty(param.getGbId())) { diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java index 4cef4a03..87f29d1e 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java @@ -142,6 +142,11 @@ public class StreamProxyController { if (ObjectUtils.isEmpty(param.getGbId())) { param.setGbId(null); } + if (ObjectUtils.isEmpty(param.getSrcUrl())) { + param.setSrcUrl(param.getUrl()); + }else { + param.setUrl(param.getSrcUrl()); + } DeferredResult result = new DeferredResult<>(userSetting.getPlayTimeout().longValue()); // 录像查询以channelId作为deviceId查询 @@ -162,6 +167,53 @@ public class StreamProxyController { }); return result; } + @Operation(summary = "编辑代理", security = @SecurityRequirement(name = JwtUtils.HEADER), parameters = { + @Parameter(name = "param", description = "代理参数", required = true), + }) + @PostMapping(value = "/edit") + @ResponseBody + public DeferredResult edit(@RequestBody StreamProxy param){ + logger.info("编辑代理: " + JSONObject.toJSONString(param)); + if (param.getId() <= 0) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "缺少主键ID"); + } + if (ObjectUtils.isEmpty(param.getMediaServerId())) { + param.setMediaServerId("auto"); + } + if (ObjectUtils.isEmpty(param.getType())) { + param.setType("default"); + } + if (ObjectUtils.isEmpty(param.getRtpType())) { + param.setRtpType("1"); + } + if (ObjectUtils.isEmpty(param.getGbId())) { + param.setGbId(null); + } + if (ObjectUtils.isEmpty(param.getSrcUrl())) { + param.setSrcUrl(param.getUrl()); + }else { + param.setUrl(param.getSrcUrl()); + } + + DeferredResult result = new DeferredResult<>(userSetting.getPlayTimeout().longValue()); + // 录像查询以channelId作为deviceId查询 + result.onTimeout(()->{ + WVPResult wvpResult = new WVPResult<>(); + wvpResult.setCode(ErrorCode.ERROR100.getCode()); + wvpResult.setMsg("超时"); + result.setResult(wvpResult); + }); + + streamProxyService.edit(param, (code, msg, streamInfo) -> { + logger.info("[添加拉流代理] {}", code == ErrorCode.SUCCESS.getCode()? "成功":"失败: " + msg); + if (code == ErrorCode.SUCCESS.getCode()) { + result.setResult(new StreamContent(streamInfo)); + }else { + result.setResult(WVPResult.fail(code, msg)); + } + }); + return result; + } @GetMapping(value = "/ffmpeg_cmd/list") @ResponseBody