临时提交

pull/1642/head
panlinlin 2024-06-28 07:36:28 +08:00
parent 9c0e69c6c5
commit 019af90f6e
3 changed files with 100 additions and 93 deletions

View File

@ -821,13 +821,13 @@ public class MediaServerServiceImpl implements IMediaServerService {
} }
@Override @Override
public Boolean isStreamReady(MediaServer mediaServer, String rtp, String streamId) { public Boolean isStreamReady(MediaServer mediaServer, String app, String streamId) {
IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType()); IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
if (mediaNodeServerService == null) { if (mediaNodeServerService == null) {
logger.info("[isStreamReady] 失败, mediaServer的类型 {},未找到对应的实现类", mediaServer.getType()); logger.info("[isStreamReady] 失败, mediaServer的类型 {},未找到对应的实现类", mediaServer.getType());
return false; return false;
} }
MediaInfo mediaInfo = mediaNodeServerService.getMediaInfo(mediaServer, rtp, streamId); MediaInfo mediaInfo = mediaNodeServerService.getMediaInfo(mediaServer, app, streamId);
return mediaInfo != null; return mediaInfo != null;
} }

View File

@ -394,6 +394,63 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService {
@Override @Override
public StreamInfo startProxy(MediaServer mediaServer, StreamProxy streamProxy) { public StreamInfo startProxy(MediaServer mediaServer, StreamProxy streamProxy) {
String dstUrl;
if ("ffmpeg".equalsIgnoreCase(streamProxy.getType())) {
String ffmpegCmd = getFfmpegCmd(mediaServer, streamProxy.getFfmpegCmdKey());
if (ffmpegCmd == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "ffmpeg拉流代理无法获取ffmpeg cmd");
}
String schema = getSchemaFromFFmpegCmd(ffmpegCmd);
if (schema == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "ffmpeg拉流代理无法从ffmpeg cmd中获取到输出格式");
}
int port;
String schemaForUri;
if (schema.equalsIgnoreCase("rtsp")) {
port = mediaServer.getRtspPort();
schemaForUri = schema;
}else if (schema.equalsIgnoreCase("flv")) {
port = mediaServer.getRtmpPort();
schemaForUri = schema;
}else {
port = mediaServer.getRtmpPort();
schemaForUri = schema;
}
dstUrl = String.format("%s://%s:%s/%s/%s", schemaForUri, "127.0.0.1", port, streamProxy.getApp(),
streamProxy.getStream());
}else {
dstUrl = String.format("rtsp://%s:%s/%s/%s", "127.0.0.1", mediaServer.getRtspPort(), streamProxy.getApp(),
streamProxy.getStream());
}
MediaInfo mediaInfo = getMediaInfo(mediaServer, streamProxy.getApp(), streamProxy.getStream());
if (mediaInfo != null) {
mediaServerService.closeStreams(mediaServer, param.getApp(), param.getStream());
}
if (isStreamReady(mediaServer, param.getApp(), param.getStream())) {
mediaServerService.closeStreams(mediaServer, param.getApp(), param.getStream());
}
return null;
}
private String getSchemaFromFFmpegCmd(String ffmpegCmd) {
ffmpegCmd = ffmpegCmd.replaceAll(" + ", " ");
String[] paramArray = ffmpegCmd.split(" ");
if (paramArray.length == 0) {
return null;
}
for (int i = 0; i < paramArray.length; i++) {
if (paramArray[i].equalsIgnoreCase("-f")) {
if (i + 1 < paramArray.length - 1) {
return paramArray[i+1];
}else {
return null;
}
}
}
return null; return null;
} }
} }

View File

@ -148,39 +148,6 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "保存代理未找到在线的ZLM"); throw new ControllerException(ErrorCode.ERROR100.getCode(), "保存代理未找到在线的ZLM");
} }
String dstUrl;
if ("ffmpeg".equalsIgnoreCase(streamProxy.getType())) {
String ffmpegCmd = mediaServerService.getFfmpegCmd(mediaServer, streamProxy.getFfmpegCmdKey());
if (ffmpegCmd == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "ffmpeg拉流代理无法获取ffmpeg cmd");
}
String schema = getSchemaFromFFmpegCmd(ffmpegCmd);
if (schema == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "ffmpeg拉流代理无法从ffmpeg cmd中获取到输出格式");
}
int port;
String schemaForUri;
if (schema.equalsIgnoreCase("rtsp")) {
port = mediaServer.getRtspPort();
schemaForUri = schema;
}else if (schema.equalsIgnoreCase("flv")) {
port = mediaServer.getRtmpPort();
schemaForUri = schema;
}else {
port = mediaServer.getRtmpPort();
schemaForUri = schema;
}
dstUrl = String.format("%s://%s:%s/%s/%s", schemaForUri, "127.0.0.1", port, streamProxy.getApp(),
streamProxy.getStream());
}else {
dstUrl = String.format("rtsp://%s:%s/%s/%s", "127.0.0.1", mediaServer.getRtspPort(), streamProxy.getApp(),
streamProxy.getStream());
}
streamProxy.setDstUrl(dstUrl);
log.info("[拉流代理] 输出地址为:{}", dstUrl);
streamProxy.setMediaServerId(mediaServer.getId()); streamProxy.setMediaServerId(mediaServer.getId());
boolean saveResult; boolean saveResult;
// 更新 // 更新
@ -195,53 +162,53 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
} }
if (streamProxy.isEnable()) { if (streamProxy.isEnable()) {
StreamInfo streamInfo = mediaServerService.startProxy(streamProxy); StreamInfo streamInfo = mediaServerService.startProxy(mediaServer, streamProxy);
if (streamInfo != null) { if (streamInfo != null) {
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
}else { }else {
callback.run(ErrorCode.ERROR100.getCode(), "记录已保存,启用失败", null); callback.run(ErrorCode.ERROR100.getCode(), "记录已保存,启用失败", null);
} }
//
//
Hook hook = Hook.getInstance(HookType.on_media_arrival, streamProxy.getApp(), streamProxy.getStream(), mediaServer.getId()); // Hook hook = Hook.getInstance(HookType.on_media_arrival, streamProxy.getApp(), streamProxy.getStream(), mediaServer.getId());
hookSubscribe.addSubscribe(hook, (hookData) -> { // hookSubscribe.addSubscribe(hook, (hookData) -> {
StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream( // StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream(
mediaServer, streamProxy.getApp(), streamProxy.getStream(), null, null); // mediaServer, streamProxy.getApp(), streamProxy.getStream(), null, null);
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); // callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
}); // });
String talkKey = UUID.randomUUID().toString(); // String talkKey = UUID.randomUUID().toString();
String delayTalkKey = UUID.randomUUID().toString(); // String delayTalkKey = UUID.randomUUID().toString();
dynamicTask.startDelay(delayTalkKey, ()->{ // dynamicTask.startDelay(delayTalkKey, ()->{
StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStreamWithCheck(streamProxy.getApp(), streamProxy.getStream(), mediaServer.getId(), false); // StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStreamWithCheck(streamProxy.getApp(), streamProxy.getStream(), mediaServer.getId(), false);
if (streamInfo != null) { // if (streamInfo != null) {
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); // callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
}else { // }else {
dynamicTask.stop(talkKey); // dynamicTask.stop(talkKey);
callback.run(ErrorCode.ERROR100.getCode(), "超时", null); // callback.run(ErrorCode.ERROR100.getCode(), "超时", null);
} // }
}, 7000); // }, 7000);
WVPResult<String> result = addStreamProxyToZlm(streamProxy); // WVPResult<String> result = addStreamProxyToZlm(streamProxy);
if (result != null && result.getCode() == 0) { // if (result != null && result.getCode() == 0) {
hookSubscribe.removeSubscribe(hook); // hookSubscribe.removeSubscribe(hook);
dynamicTask.stop(talkKey); // dynamicTask.stop(talkKey);
StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream( // StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream(
mediaServer, streamProxy.getApp(), streamProxy.getStream(), null, null); // mediaServer, streamProxy.getApp(), streamProxy.getStream(), null, null);
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); // callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
}else { // }else {
streamProxy.setEnable(false); // streamProxy.setEnable(false);
// 直接移除 // // 直接移除
if (streamProxy.isEnableRemoveNoneReader()) { // if (streamProxy.isEnableRemoveNoneReader()) {
del(streamProxy.getApp(), streamProxy.getStream()); // del(streamProxy.getApp(), streamProxy.getStream());
}else { // }else {
updateStreamProxy(streamProxy); // updateStreamProxy(streamProxy);
} // }
if (result == null){ // if (result == null){
callback.run(ErrorCode.ERROR100.getCode(), "记录已保存,启用失败", null); // callback.run(ErrorCode.ERROR100.getCode(), "记录已保存,启用失败", null);
}else { // }else {
callback.run(ErrorCode.ERROR100.getCode(), result.getMsg(), null); // callback.run(ErrorCode.ERROR100.getCode(), result.getMsg(), null);
} // }
} // }
}else{ }else{
StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream( StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream(
mediaServer, streamProxy.getApp(), streamProxy.getStream(), null, null); mediaServer, streamProxy.getApp(), streamProxy.getStream(), null, null);
@ -249,24 +216,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
} }
} }
private String getSchemaFromFFmpegCmd(String ffmpegCmd) {
ffmpegCmd = ffmpegCmd.replaceAll(" + ", " ");
String[] paramArray = ffmpegCmd.split(" ");
if (paramArray.length == 0) {
return null;
}
for (int i = 0; i < paramArray.length; i++) {
if (paramArray[i].equalsIgnoreCase("-f")) {
if (i + 1 < paramArray.length - 1) {
return paramArray[i+1];
}else {
return null;
}
}
}
return null;
}
/** /**
* *