添加编辑拉流代理接口

结构优化
648540858 2024-01-09 17:43:16 +08:00
parent 7c7b35ca80
commit 48d4871ea4
3 changed files with 171 additions and 0 deletions

View File

@ -130,4 +130,9 @@ public interface IStreamProxyService {
void add(StreamProxy param, GeneralCallback<StreamInfo> callback); void add(StreamProxy param, GeneralCallback<StreamInfo> callback);
void delById(int gbId); void delById(int gbId);
/**
*
*/
void edit(StreamProxy param, GeneralCallback<StreamInfo> callback);
} }

View File

@ -320,6 +320,120 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
} }
} }
@Override
public void edit(StreamProxy param, GeneralCallback<StreamInfo> callback) {
MediaServerItem mediaInfo;
StreamProxy streamProxyInDb = streamProxyMapper.selectOneById(param.getId());
if (streamProxyInDb != null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "代理不存在");
}
if (ObjectUtils.isEmpty(param.getMediaServerId()) || "auto".equals(param.getMediaServerId())){
mediaInfo = mediaServerService.getMediaServerForMinimumLoad(null);
}else {
mediaInfo = mediaServerService.getOne(param.getMediaServerId());
}
if (mediaInfo == null) {
logger.warn("[编辑拉流代理] 未找到在线的ZLM...");
throw new ControllerException(ErrorCode.ERROR100.getCode(), "保存代理未找到可用的ZLM");
}
// 国标编号发生变化,修改通用通道国标变化,通用通道应发送删除再发送添加命令通知上级
// 类型变化,启用->启用:需要重新拉起视频流, 启用->未启用: 停止旧的视频流, 未启用->启用:发起新的视频流
// 流地址发生变化。停止旧的,拉起新的
// ffmpeg类型下目标流地址变化停止旧的拉起新的
// 节点变化: 停止旧的,拉起新的
// ffmpeg命令模板变化 停止旧的,拉起新的
if (ObjectUtils.isEmpty(streamProxyInDb.getGbId())) {
if (!ObjectUtils.isEmpty(param.getGbId())) {
// 之前是空的,现在添加了国标编号
}
}else {
if (ObjectUtils.isEmpty(param.getGbId())) {
// 移除了国标编号
}else {
if (!streamProxyInDb.getGbId().equals(param.getGbId())) {
// 修改了国标编号
}
}
}
if ("ffmpeg".equalsIgnoreCase(param.getType())) {
if (ObjectUtils.isEmpty(param.getDstUrl())) {
logger.warn("[添加拉流代理] 未设置目标URL地址DstUrl");
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未设置目标URL地址");
}
logger.info("[添加拉流代理] ffmpeg源地址: {}, 目标地址为:{}", param.getUrl(), param.getDstUrl());
if (ObjectUtils.isEmpty(param.getApp()) || ObjectUtils.isEmpty(param.getStream())) {
try {
URL url = new URL(param.getDstUrl());
String path = url.getPath();
if (path.indexOf("/", 1) < 0) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "解析DstUrl失败, 至少两层路径");
}
String app = path.substring(1, path.indexOf("/", 2));
String stream = path.substring(path.indexOf("/", 2) + 1);
param.setApp(app);
param.setStream(stream);
} catch (MalformedURLException e) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "解析DstUrl失败");
}
}
}else {
logger.info("[添加拉流代理] 直接拉流,源地址: {}, app: {}, stream: {}", param.getUrl(), param.getApp(), param.getStream());
}
param.setMediaServerId(mediaInfo.getId());
StreamProxy streamProxyInDb = streamProxyMapper.selectOne(param.getApp(), param.getStream());
if (streamProxyInDb != null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "app/stream已经存在");
}
if (!param.isEnable()) {
param.setStatus(false);
saveProxyToDb(param);
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
mediaInfo, param.getApp(), param.getStream(), null, null);
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
return;
}
String talkKey = UUID.randomUUID().toString();
String delayTalkKey = UUID.randomUUID().toString();
HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(param.getApp(), param.getStream(), true, "rtsp", mediaInfo.getId());
hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> {
dynamicTask.stop(talkKey);
param.setStatus(true);
saveProxyToDb(param);
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
mediaInfo, param.getApp(), param.getStream(), null, null);
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
});
dynamicTask.startDelay(delayTalkKey, ()->{
hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
dynamicTask.stop(talkKey);
callback.run(ErrorCode.ERROR100.getCode(), "启用超时,请检查源地址是否可用", null);
if (param.isEnableRemoveNoneReader()) {
return;
}
param.setProxyError("启用超时");
param.setStatus(false);
saveProxyToDb(param);
}, 10000);
JSONObject jsonObject = addStreamProxyToZlm(param);
if (jsonObject != null && jsonObject.getInteger("code") != 0) {
hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
dynamicTask.stop(talkKey);
callback.run(ErrorCode.ERROR100.getCode(), jsonObject.getString("msg"), null);
if (param.isEnableRemoveNoneReader()) {
return;
}
param.setProxyError("启用失败: " + jsonObject.getString("msg"));
param.setStatus(false);
saveProxyToDb(param);
}
}
private void saveProxyToDb(StreamProxy param) { private void saveProxyToDb(StreamProxy param) {
// 未启用的数据可以直接保存了 // 未启用的数据可以直接保存了
if (!ObjectUtils.isEmpty(param.getGbId())) { if (!ObjectUtils.isEmpty(param.getGbId())) {

View File

@ -142,6 +142,11 @@ public class StreamProxyController {
if (ObjectUtils.isEmpty(param.getGbId())) { if (ObjectUtils.isEmpty(param.getGbId())) {
param.setGbId(null); param.setGbId(null);
} }
if (ObjectUtils.isEmpty(param.getSrcUrl())) {
param.setSrcUrl(param.getUrl());
}else {
param.setUrl(param.getSrcUrl());
}
DeferredResult<Object> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue()); DeferredResult<Object> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
// 录像查询以channelId作为deviceId查询 // 录像查询以channelId作为deviceId查询
@ -162,6 +167,53 @@ public class StreamProxyController {
}); });
return result; return result;
} }
@Operation(summary = "编辑代理", security = @SecurityRequirement(name = JwtUtils.HEADER), parameters = {
@Parameter(name = "param", description = "代理参数", required = true),
})
@PostMapping(value = "/edit")
@ResponseBody
public DeferredResult<Object> edit(@RequestBody StreamProxy param){
logger.info("编辑代理: " + JSONObject.toJSONString(param));
if (param.getId() <= 0) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "缺少主键ID");
}
if (ObjectUtils.isEmpty(param.getMediaServerId())) {
param.setMediaServerId("auto");
}
if (ObjectUtils.isEmpty(param.getType())) {
param.setType("default");
}
if (ObjectUtils.isEmpty(param.getRtpType())) {
param.setRtpType("1");
}
if (ObjectUtils.isEmpty(param.getGbId())) {
param.setGbId(null);
}
if (ObjectUtils.isEmpty(param.getSrcUrl())) {
param.setSrcUrl(param.getUrl());
}else {
param.setUrl(param.getSrcUrl());
}
DeferredResult<Object> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
// 录像查询以channelId作为deviceId查询
result.onTimeout(()->{
WVPResult<StreamInfo> wvpResult = new WVPResult<>();
wvpResult.setCode(ErrorCode.ERROR100.getCode());
wvpResult.setMsg("超时");
result.setResult(wvpResult);
});
streamProxyService.edit(param, (code, msg, streamInfo) -> {
logger.info("[添加拉流代理] {}", code == ErrorCode.SUCCESS.getCode()? "成功":"失败: " + msg);
if (code == ErrorCode.SUCCESS.getCode()) {
result.setResult(new StreamContent(streamInfo));
}else {
result.setResult(WVPResult.fail(code, msg));
}
});
return result;
}
@GetMapping(value = "/ffmpeg_cmd/list") @GetMapping(value = "/ffmpeg_cmd/list")
@ResponseBody @ResponseBody