diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxy.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxy.java index 8124aae8..64d3af60 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxy.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxy.java @@ -7,10 +7,8 @@ import io.swagger.v3.oas.annotations.media.Schema; */ @Schema(description = "拉流代理的信息") public class StreamProxy { - @Schema(description = "ID") private int id; - @Schema(description = "类型") private String type; @Schema(description = "应用名") @@ -31,11 +29,13 @@ public class StreamProxy { private String ffmpegCmdKey; @Schema(description = "rtsp拉流时,拉流方式,0:tcp,1:udp,2:组播") private String rtpType; + @Schema(description = "代理失败的原因") + private String proxyError; @Schema(description = "是否启用") private boolean enable; @Schema(description = "是否启用音频") private boolean enableAudio; - @Schema(description = "是否启用MP4") + @Schema(description = "是否录制") private boolean enableMp4; @Schema(description = "是否 无人观看时删除") private boolean enableRemoveNoneReader; @@ -267,4 +267,12 @@ public class StreamProxy { public void setCommonGbChannelId(int commonGbChannelId) { this.commonGbChannelId = commonGbChannelId; } + + public String getProxyError() { + return proxyError; + } + + public void setProxyError(String proxyError) { + this.proxyError = proxyError; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java index b656931f..717e021c 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java @@ -124,4 +124,8 @@ public interface IStreamProxyService { */ List getAllForEnable(); + /** + * 添加拉流代理 + */ + void add(StreamProxy param, GeneralCallback callback); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java index aa91a69b..dd3480a3 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java @@ -43,6 +43,8 @@ import org.springframework.util.CollectionUtils; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.ObjectUtils; +import java.net.MalformedURLException; +import java.net.URL; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -229,6 +231,113 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } } + @Override + @Transactional + public void add(StreamProxy param, GeneralCallback callback) { + MediaServerItem mediaInfo; + if (ObjectUtils.isEmpty(param.getMediaServerId()) || "auto".equals(param.getMediaServerId())){ + mediaInfo = mediaServerService.getMediaServerForMinimumLoad(null); + }else { + mediaInfo = mediaServerService.getOne(param.getMediaServerId()); + } + if (mediaInfo == null) { + logger.warn("[添加拉流代理] 未找到在线的ZLM..."); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "保存代理未找到可用的ZLM"); + } + if ("ffmpeg".equalsIgnoreCase(param.getType())) { + if (ObjectUtils.isEmpty(param.getDstUrl())) { + logger.warn("[添加拉流代理] 未设置目标URL地址(DstUrl)"); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "未设置目标URL地址"); + } + logger.info("[添加拉流代理] ffmpeg,源地址: {}, 目标地址为:{}", param.getUrl(), param.getDstUrl()); + if (ObjectUtils.isEmpty(param.getApp()) || ObjectUtils.isEmpty(param.getStream())) { + try { + URL url = new URL(param.getDstUrl()); + String path = url.getPath(); + if (path.indexOf("/", 1) < 0) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "解析DstUrl失败, 至少两层路径"); + } + int appIndex = path.indexOf("/", 1); + String app = path.substring(1, appIndex); + String stream = path.substring(path.indexOf(app)); + param.setApp(app); + param.setStream(stream); + } catch (MalformedURLException e) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "解析DstUrl失败"); + } + } + }else { + logger.info("[添加拉流代理] 直接拉流,源地址: {}, app: {}, stream: {}", param.getUrl(), param.getApp(), param.getStream()); + } + param.setMediaServerId(mediaInfo.getId()); + StreamProxy streamProxyInDb = streamProxyMapper.selectOne(param.getApp(), param.getStream()); + if (streamProxyInDb != null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "app/stream已经存在"); + } + if (!param.isEnable()) { + param.setStatus(false); + saveProxyToDb(param); + StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( + mediaInfo, param.getApp(), param.getStream(), null, null); + callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); + return; + } + String talkKey = UUID.randomUUID().toString(); + String delayTalkKey = UUID.randomUUID().toString(); + + HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(param.getApp(), param.getStream(), true, "rtsp", mediaInfo.getId()); + hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> { + dynamicTask.stop(talkKey); + param.setStatus(true); + saveProxyToDb(param); + StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( + mediaInfo, param.getApp(), param.getStream(), null, null); + callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); + }); + + dynamicTask.startDelay(delayTalkKey, ()->{ + hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); + dynamicTask.stop(talkKey); + callback.run(ErrorCode.ERROR100.getCode(), "启用超时,请检查源地址是否可用", null); + if (param.isEnableRemoveNoneReader()) { + return; + } + param.setProxyError("启用超时"); + param.setStatus(false); + saveProxyToDb(param); + }, 10000); + JSONObject jsonObject = addStreamProxyToZlm(param); + if (jsonObject != null && jsonObject.getInteger("code") != 0) { + hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); + dynamicTask.stop(talkKey); + callback.run(ErrorCode.ERROR100.getCode(), jsonObject.getString("msg"), null); + if (param.isEnableRemoveNoneReader()) { + return; + } + param.setProxyError("启用失败: " + jsonObject.getString("msg")); + param.setStatus(false); + saveProxyToDb(param); + } + } + + private void saveProxyToDb(StreamProxy param) { + // 未启用的数据可以直接保存了 + if (!ObjectUtils.isEmpty(param.getGbId())) { + CommonGbChannel commonGbChannel = CommonGbChannel.getInstance(param); + if (commonGbChannelService.add(commonGbChannel) > 0) { + param.setCommonGbChannelId(commonGbChannel.getCommonGbId()); + }else { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "添加通用通道失败,请检查是否国标编码重复"); + } + } + param.setUpdateTime(DateUtil.getNow()); + param.setCreateTime(DateUtil.getNow()); + int addStreamProxyResult = streamProxyMapper.add(param); + if (addStreamProxyResult <= 0) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "添加拉流代理失败"); + } + } + private String getSchemaFromFFmpegCmd(String ffmpegCmd) { ffmpegCmd = ffmpegCmd.replaceAll(" + ", " "); String[] paramArray = ffmpegCmd.split(" "); @@ -283,8 +392,6 @@ public class StreamProxyServiceImpl implements IStreamProxyService { result = zlmresTfulUtils.addStreamProxy(mediaServerItem, param.getApp(), param.getStream(), param.getUrl().trim(), param.isEnableAudio(), param.isEnableMp4(), param.getRtpType()); } - System.out.println("addStreamProxyToZlm===="); - System.out.println(result); if (result != null && result.getInteger("code") == 0) { JSONObject data = result.getJSONObject("data"); if (data == null) { diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java index 07275b92..ecb412b1 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java @@ -18,6 +18,7 @@ public interface StreamProxyMapper { "(#{type}, #{name}, #{app}, #{stream}, #{mediaServerId}, #{url}, #{srcUrl}, #{dstUrl}, " + "#{timeoutMs}, #{ffmpegCmdKey}, #{rtpType}, #{enableAudio}, #{enableMp4}, #{enable}, #{status}, #{streamKey}, " + "#{enableRemoveNoneReader}, #{enableDisableNoneReader}, #{createTime} , #{longitude} , #{latitude}, #{commonGbChannelId} )") + @Options(useGeneratedKeys=true, keyProperty="id", keyColumn="id") int add(StreamProxy streamProxy); @Update("UPDATE wvp_stream_proxy " + diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java index 7393a37f..32608fc4 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java @@ -123,6 +123,46 @@ public class StreamProxyController { return result; } + @Operation(summary = "添加代理", security = @SecurityRequirement(name = JwtUtils.HEADER), parameters = { + @Parameter(name = "param", description = "代理参数", required = true), + }) + @PostMapping(value = "/add") + @ResponseBody + public DeferredResult add(@RequestBody StreamProxy param){ + logger.info("添加代理: " + JSONObject.toJSONString(param)); + if (ObjectUtils.isEmpty(param.getMediaServerId())) { + param.setMediaServerId("auto"); + } + if (ObjectUtils.isEmpty(param.getType())) { + param.setType("default"); + } + if (ObjectUtils.isEmpty(param.getRtpType())) { + param.setRtpType("1"); + } + if (ObjectUtils.isEmpty(param.getGbId())) { + param.setGbId(null); + } + + DeferredResult result = new DeferredResult<>(userSetting.getPlayTimeout().longValue()); + // 录像查询以channelId作为deviceId查询 + result.onTimeout(()->{ + WVPResult wvpResult = new WVPResult<>(); + wvpResult.setCode(ErrorCode.ERROR100.getCode()); + wvpResult.setMsg("超时"); + result.setResult(wvpResult); + }); + + streamProxyService.add(param, (code, msg, streamInfo) -> { + logger.info("[添加拉流代理] {}", code == ErrorCode.SUCCESS.getCode()? "成功":"失败: " + msg); + if (code == ErrorCode.SUCCESS.getCode()) { + result.setResult(new StreamContent(streamInfo)); + }else { + result.setResult(WVPResult.fail(code, msg)); + } + }); + return result; + } + @GetMapping(value = "/ffmpeg_cmd/list") @ResponseBody @Operation(summary = "获取ffmpeg.cmd模板", security = @SecurityRequirement(name = JwtUtils.HEADER)) diff --git a/数据库/结构优化/common.sql b/数据库/结构优化/common.sql index 2ca1ee97..42276527 100644 --- a/数据库/结构优化/common.sql +++ b/数据库/结构优化/common.sql @@ -140,6 +140,9 @@ alter table wvp_stream_proxy alter table wvp_stream_proxy add status bool default false; +alter table wvp_stream_proxy + add proxy_error varchar(255) default NULL; + alter table wvp_device drop column auto_sync_channel; diff --git a/数据库/结构优化/初始化-mysql.sql b/数据库/结构优化/初始化-mysql.sql index d4374deb..32edfb85 100644 --- a/数据库/结构优化/初始化-mysql.sql +++ b/数据库/结构优化/初始化-mysql.sql @@ -245,6 +245,7 @@ create table wvp_stream_proxy ( enable_remove_none_reader bool default false, create_time character varying(50), name character varying(255), + proxy_error character varying(255) default null, update_time character varying(50), stream_key character varying(255), enable_disable_none_reader bool default false, diff --git a/数据库/结构优化/初始化-postgresql-kingbase.sql b/数据库/结构优化/初始化-postgresql-kingbase.sql index 2a8e884f..25563b1a 100644 --- a/数据库/结构优化/初始化-postgresql-kingbase.sql +++ b/数据库/结构优化/初始化-postgresql-kingbase.sql @@ -212,7 +212,8 @@ create table wvp_stream_proxy ( status boolean, enable_remove_none_reader bool default false, create_time character varying(50), - name character varying(255), + name character varying(255) default null, + proxy_error character varying(255) default null, update_time character varying(50), stream_key character varying(255), enable_disable_none_reader bool default false,