添加拉流代理接口-新增拉流代理

结构优化
648540858 2024-01-07 23:57:33 +08:00
parent c186ce94c1
commit 8db0192a27
8 changed files with 171 additions and 6 deletions

View File

@ -7,10 +7,8 @@ import io.swagger.v3.oas.annotations.media.Schema;
*/
@Schema(description = "拉流代理的信息")
public class StreamProxy {
@Schema(description = "ID")
private int id;
@Schema(description = "类型")
private String type;
@Schema(description = "应用名")
@ -31,11 +29,13 @@ public class StreamProxy {
private String ffmpegCmdKey;
@Schema(description = "rtsp拉流时拉流方式0tcp1udp2组播")
private String rtpType;
@Schema(description = "代理失败的原因")
private String proxyError;
@Schema(description = "是否启用")
private boolean enable;
@Schema(description = "是否启用音频")
private boolean enableAudio;
@Schema(description = "是否启用MP4")
@Schema(description = "是否录制")
private boolean enableMp4;
@Schema(description = "是否 无人观看时删除")
private boolean enableRemoveNoneReader;
@ -267,4 +267,12 @@ public class StreamProxy {
public void setCommonGbChannelId(int commonGbChannelId) {
this.commonGbChannelId = commonGbChannelId;
}
public String getProxyError() {
return proxyError;
}
public void setProxyError(String proxyError) {
this.proxyError = proxyError;
}
}

View File

@ -124,4 +124,8 @@ public interface IStreamProxyService {
*/
List<StreamProxy> getAllForEnable();
/**
*
*/
void add(StreamProxy param, GeneralCallback<StreamInfo> callback);
}

View File

@ -43,6 +43,8 @@ import org.springframework.util.CollectionUtils;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -229,6 +231,113 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
}
}
@Override
@Transactional
public void add(StreamProxy param, GeneralCallback<StreamInfo> callback) {
MediaServerItem mediaInfo;
if (ObjectUtils.isEmpty(param.getMediaServerId()) || "auto".equals(param.getMediaServerId())){
mediaInfo = mediaServerService.getMediaServerForMinimumLoad(null);
}else {
mediaInfo = mediaServerService.getOne(param.getMediaServerId());
}
if (mediaInfo == null) {
logger.warn("[添加拉流代理] 未找到在线的ZLM...");
throw new ControllerException(ErrorCode.ERROR100.getCode(), "保存代理未找到可用的ZLM");
}
if ("ffmpeg".equalsIgnoreCase(param.getType())) {
if (ObjectUtils.isEmpty(param.getDstUrl())) {
logger.warn("[添加拉流代理] 未设置目标URL地址DstUrl");
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未设置目标URL地址");
}
logger.info("[添加拉流代理] ffmpeg源地址: {}, 目标地址为:{}", param.getUrl(), param.getDstUrl());
if (ObjectUtils.isEmpty(param.getApp()) || ObjectUtils.isEmpty(param.getStream())) {
try {
URL url = new URL(param.getDstUrl());
String path = url.getPath();
if (path.indexOf("/", 1) < 0) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "解析DstUrl失败, 至少两层路径");
}
int appIndex = path.indexOf("/", 1);
String app = path.substring(1, appIndex);
String stream = path.substring(path.indexOf(app));
param.setApp(app);
param.setStream(stream);
} catch (MalformedURLException e) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "解析DstUrl失败");
}
}
}else {
logger.info("[添加拉流代理] 直接拉流,源地址: {}, app: {}, stream: {}", param.getUrl(), param.getApp(), param.getStream());
}
param.setMediaServerId(mediaInfo.getId());
StreamProxy streamProxyInDb = streamProxyMapper.selectOne(param.getApp(), param.getStream());
if (streamProxyInDb != null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "app/stream已经存在");
}
if (!param.isEnable()) {
param.setStatus(false);
saveProxyToDb(param);
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
mediaInfo, param.getApp(), param.getStream(), null, null);
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
return;
}
String talkKey = UUID.randomUUID().toString();
String delayTalkKey = UUID.randomUUID().toString();
HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(param.getApp(), param.getStream(), true, "rtsp", mediaInfo.getId());
hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> {
dynamicTask.stop(talkKey);
param.setStatus(true);
saveProxyToDb(param);
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
mediaInfo, param.getApp(), param.getStream(), null, null);
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
});
dynamicTask.startDelay(delayTalkKey, ()->{
hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
dynamicTask.stop(talkKey);
callback.run(ErrorCode.ERROR100.getCode(), "启用超时,请检查源地址是否可用", null);
if (param.isEnableRemoveNoneReader()) {
return;
}
param.setProxyError("启用超时");
param.setStatus(false);
saveProxyToDb(param);
}, 10000);
JSONObject jsonObject = addStreamProxyToZlm(param);
if (jsonObject != null && jsonObject.getInteger("code") != 0) {
hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
dynamicTask.stop(talkKey);
callback.run(ErrorCode.ERROR100.getCode(), jsonObject.getString("msg"), null);
if (param.isEnableRemoveNoneReader()) {
return;
}
param.setProxyError("启用失败: " + jsonObject.getString("msg"));
param.setStatus(false);
saveProxyToDb(param);
}
}
private void saveProxyToDb(StreamProxy param) {
// 未启用的数据可以直接保存了
if (!ObjectUtils.isEmpty(param.getGbId())) {
CommonGbChannel commonGbChannel = CommonGbChannel.getInstance(param);
if (commonGbChannelService.add(commonGbChannel) > 0) {
param.setCommonGbChannelId(commonGbChannel.getCommonGbId());
}else {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "添加通用通道失败,请检查是否国标编码重复");
}
}
param.setUpdateTime(DateUtil.getNow());
param.setCreateTime(DateUtil.getNow());
int addStreamProxyResult = streamProxyMapper.add(param);
if (addStreamProxyResult <= 0) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "添加拉流代理失败");
}
}
private String getSchemaFromFFmpegCmd(String ffmpegCmd) {
ffmpegCmd = ffmpegCmd.replaceAll(" + ", " ");
String[] paramArray = ffmpegCmd.split(" ");
@ -283,8 +392,6 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
result = zlmresTfulUtils.addStreamProxy(mediaServerItem, param.getApp(), param.getStream(), param.getUrl().trim(),
param.isEnableAudio(), param.isEnableMp4(), param.getRtpType());
}
System.out.println("addStreamProxyToZlm====");
System.out.println(result);
if (result != null && result.getInteger("code") == 0) {
JSONObject data = result.getJSONObject("data");
if (data == null) {

View File

@ -18,6 +18,7 @@ public interface StreamProxyMapper {
"(#{type}, #{name}, #{app}, #{stream}, #{mediaServerId}, #{url}, #{srcUrl}, #{dstUrl}, " +
"#{timeoutMs}, #{ffmpegCmdKey}, #{rtpType}, #{enableAudio}, #{enableMp4}, #{enable}, #{status}, #{streamKey}, " +
"#{enableRemoveNoneReader}, #{enableDisableNoneReader}, #{createTime} , #{longitude} , #{latitude}, #{commonGbChannelId} )")
@Options(useGeneratedKeys=true, keyProperty="id", keyColumn="id")
int add(StreamProxy streamProxy);
@Update("UPDATE wvp_stream_proxy " +

View File

@ -123,6 +123,46 @@ public class StreamProxyController {
return result;
}
@Operation(summary = "添加代理", security = @SecurityRequirement(name = JwtUtils.HEADER), parameters = {
@Parameter(name = "param", description = "代理参数", required = true),
})
@PostMapping(value = "/add")
@ResponseBody
public DeferredResult<Object> add(@RequestBody StreamProxy param){
logger.info("添加代理: " + JSONObject.toJSONString(param));
if (ObjectUtils.isEmpty(param.getMediaServerId())) {
param.setMediaServerId("auto");
}
if (ObjectUtils.isEmpty(param.getType())) {
param.setType("default");
}
if (ObjectUtils.isEmpty(param.getRtpType())) {
param.setRtpType("1");
}
if (ObjectUtils.isEmpty(param.getGbId())) {
param.setGbId(null);
}
DeferredResult<Object> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
// 录像查询以channelId作为deviceId查询
result.onTimeout(()->{
WVPResult<StreamInfo> wvpResult = new WVPResult<>();
wvpResult.setCode(ErrorCode.ERROR100.getCode());
wvpResult.setMsg("超时");
result.setResult(wvpResult);
});
streamProxyService.add(param, (code, msg, streamInfo) -> {
logger.info("[添加拉流代理] {}", code == ErrorCode.SUCCESS.getCode()? "成功":"失败: " + msg);
if (code == ErrorCode.SUCCESS.getCode()) {
result.setResult(new StreamContent(streamInfo));
}else {
result.setResult(WVPResult.fail(code, msg));
}
});
return result;
}
@GetMapping(value = "/ffmpeg_cmd/list")
@ResponseBody
@Operation(summary = "获取ffmpeg.cmd模板", security = @SecurityRequirement(name = JwtUtils.HEADER))

View File

@ -140,6 +140,9 @@ alter table wvp_stream_proxy
alter table wvp_stream_proxy
add status bool default false;
alter table wvp_stream_proxy
add proxy_error varchar(255) default NULL;
alter table wvp_device
drop column auto_sync_channel;

View File

@ -245,6 +245,7 @@ create table wvp_stream_proxy (
enable_remove_none_reader bool default false,
create_time character varying(50),
name character varying(255),
proxy_error character varying(255) default null,
update_time character varying(50),
stream_key character varying(255),
enable_disable_none_reader bool default false,

View File

@ -212,7 +212,8 @@ create table wvp_stream_proxy (
status boolean,
enable_remove_none_reader bool default false,
create_time character varying(50),
name character varying(255),
name character varying(255) default null,
proxy_error character varying(255) default null,
update_time character varying(50),
stream_key character varying(255),
enable_disable_none_reader bool default false,