优化拉流代理启停

结构优化
648540858 2024-01-13 00:40:41 +08:00
parent 1d28869fd3
commit 8805966f89
7 changed files with 23 additions and 21 deletions

View File

@ -759,7 +759,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
streamMediaInfo.setBytesSpeed(mediaInfoJson.getInteger("bytesSpeed"));
JSONArray tracks = mediaInfoJson.getJSONArray("tracks");
if (!tracks.isEmpty()) {
if (tracks != null && !tracks.isEmpty()) {
for (int i = 0; i < tracks.size(); i++) {
JSONObject tracksJson = tracks.getJSONObject(i);
StreamMediaTrack streamMediaTrack = new StreamMediaTrack();

View File

@ -113,7 +113,7 @@ public class MediaServiceImpl implements IMediaService {
@Override
public boolean isReady(MediaServerItem mediaInfo, String app, String stream) {
JSONObject jsonObject = zlmresTfulUtils.getMediaInfo(mediaInfo, app, "rtsp", stream);
return jsonObject != null && jsonObject.getInteger("code") == 0 && jsonObject.getBoolean("online");
return jsonObject != null && jsonObject.getInteger("code") == 0 && jsonObject.getInteger("aliveSecond") > 0;
}
@Override

View File

@ -260,6 +260,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
boolean stopOldProxy = !streamProxyInDb.getType().equals(param.getType())
|| !streamProxyInDb.getUrl().equals(param.getUrl())
|| !streamProxyInDb.getMediaServerId().equals(param.getMediaServerId())
|| (streamProxyInDb.isEnable() && !param.isEnable())
|| (streamProxyInDb.getType().equals("ffmpeg") && (
streamProxyInDb.getDstUrl().equals(param.getDstUrl())
|| streamProxyInDb.getFfmpegCmdKey().equals(param.getFfmpegCmdKey())
@ -281,15 +282,11 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
};
if(stopOldProxy) {
stopProxy(param, mediaInfo, (code, msg, data) -> {
if (code == ErrorCode.SUCCESS.getCode()) {
if (param.isEnable()) {
startProxy(param, mediaInfo, startProxyCallback);
}else {
callback.run(code, msg, null);
}
}else {
callback.run(code, "停止旧的代理失败: " + msg, null);
}
});
}else {
if (param.isEnable()) {
@ -303,6 +300,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
}
public void startProxy(StreamProxy streamProxy, MediaServerItem mediaInfo, GeneralCallback<StreamInfo> callback) {
logger.info("[开始拉流代理] {}/{}", streamProxy.getApp(), streamProxy.getStream());
// 检测是否在线
boolean ready = mediaService.isReady(mediaInfo, streamProxy.getApp(), streamProxy.getStream());
if (ready) {
@ -317,26 +315,26 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
}else {
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
mediaInfo, streamProxy.getApp(), streamProxy.getStream(), null, null);
logger.info("[开始拉流代理] 已拉起,直接返回 {}/{}", streamProxy.getApp(), streamProxy.getStream());
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
}
return;
}
String talkKey = UUID.randomUUID().toString();
String delayTalkKey = UUID.randomUUID().toString();
HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(streamProxy.getApp(), streamProxy.getStream(), true, "rtsp", mediaInfo.getId());
hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> {
dynamicTask.stop(talkKey);
dynamicTask.stop(delayTalkKey);
streamProxy.setStatus(true);
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
mediaInfo, streamProxy.getApp(), streamProxy.getStream(), null, null);
logger.info("[拉流代理] 启用成功: {}/{}", streamProxy.getApp(), streamProxy.getStream());
logger.info("[开始拉流代理] 成功: {}/{}", streamProxy.getApp(), streamProxy.getStream());
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
});
dynamicTask.startDelay(delayTalkKey, ()->{
hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
dynamicTask.stop(talkKey);
dynamicTask.stop(delayTalkKey);
callback.run(ErrorCode.ERROR100.getCode(), "启用超时,请检查源地址是否可用", null);
streamProxy.setProxyError("启用超时");
}, 10000);
@ -355,7 +353,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
}
if (result.getInteger("code") != 0) {
hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
dynamicTask.stop(talkKey);
dynamicTask.stop(delayTalkKey);
callback.run(result.getInteger("code"), result.getString("msg"), null);
}else {
JSONObject data = result.getJSONObject("data");
@ -375,6 +373,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
}
public void stopProxy(StreamProxy streamProxy, MediaServerItem mediaInfo, GeneralCallback<StreamInfo> callback) {
logger.info("[停止拉流代理] {}/{}", streamProxy.getApp(), streamProxy.getStream());
boolean ready = mediaService.isReady(mediaInfo, streamProxy.getApp(), streamProxy.getStream());
if (ready) {
if ("ffmpeg".equalsIgnoreCase(streamProxy.getType())){
@ -392,6 +391,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
if (redisTemplate.opsForValue().get(key) == null) {
redisTemplate.delete(key);
}
logger.info("[停止拉流代理] 成功 {}/{}", streamProxy.getApp(), streamProxy.getStream());
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), null);
}
@ -418,7 +418,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
}
}
}else {
logger.info("[拉流代理参数处理] 直接拉流,源地址: {}, app: {}, stream: {}", param.getUrl(), param.getApp(), param.getStream());
logger.info("[拉流代理参数处理] 方式:直接拉流,源地址: {}, app: {}, stream: {}", param.getUrl(), param.getApp(), param.getStream());
}
}

View File

@ -296,14 +296,14 @@ public interface CommonChannelMapper {
int deleteByDeviceIDs(List<String> clearChannels);
@Update("<script> "+
"UPDATE wvp_common_channel SET commonGbStatus = true WHERE common_gb_id in" +
"UPDATE wvp_common_channel SET common_gb_status = true WHERE common_gb_id in" +
"<foreach collection='channelList' item='item' open='(' separator=',' close=')' > #{item.commonGbId}</foreach>" +
"</script>")
void channelsOnlineFromList(List<CommonGbChannel> channelList);
@Update("<script> "+
"UPDATE wvp_common_channel SET commonGbStatus = false WHERE common_gb_id in" +
"UPDATE wvp_common_channel SET common_gb_status = false WHERE common_gb_id in" +
"<foreach collection='channelList' item='item' open='(' separator=',' close=')' > #{item.commonGbId}</foreach>" +
"</script>")
void channelsOfflineFromList(List<CommonGbChannel> channelList);

View File

@ -12,11 +12,11 @@ import java.util.List;
@Repository
public interface StreamProxyMapper {
@Insert("INSERT INTO wvp_stream_proxy (type, name, app, stream,media_server_id, url, src_url, dst_url, " +
@Insert("INSERT INTO wvp_stream_proxy (type, name, app, stream,media_server_id, url, dst_url, " +
"timeout_ms, ffmpeg_cmd_key, rtp_type, enable_audio, enable_mp4, enable, status, stream_key, " +
"enable_remove_none_reader, enable_disable_none_reader, create_time, longitude, latitude, " +
"common_gb_channel_id, gb_id) VALUES " +
"(#{type}, #{name}, #{app}, #{stream}, #{mediaServerId}, #{url}, #{srcUrl}, #{dstUrl}, " +
"(#{type}, #{name}, #{app}, #{stream}, #{mediaServerId}, #{url}, #{dstUrl}, " +
"#{timeoutMs}, #{ffmpegCmdKey}, #{rtpType}, #{enableAudio}, #{enableMp4}, #{enable}, #{status}, #{streamKey}, " +
"#{enableRemoveNoneReader}, #{enableDisableNoneReader}, #{createTime} , #{longitude} , #{latitude}, " +
"#{commonGbChannelId}, #{gbId})")
@ -30,7 +30,6 @@ public interface StreamProxyMapper {
"stream=#{stream}," +
"url=#{url}, " +
"media_server_id=#{mediaServerId}, " +
"src_url=#{srcUrl}," +
"dst_url=#{dstUrl}, " +
"timeout_ms=#{timeoutMs}, " +
"ffmpeg_cmd_key=#{ffmpegCmdKey}, " +

View File

@ -192,7 +192,7 @@ public class StreamProxyController {
});
streamProxyService.edit(param, (code, msg, streamInfo) -> {
logger.info("[添加拉流代理] {}", code == ErrorCode.SUCCESS.getCode()? "成功":"失败: " + msg);
logger.info("[编辑拉流代理] {}", code == ErrorCode.SUCCESS.getCode()? "成功":"失败: " + msg);
if (code == ErrorCode.SUCCESS.getCode()) {
result.setResult(new StreamContent(streamInfo));
}else {

View File

@ -152,6 +152,9 @@ alter table wvp_stream_push
alter table wvp_stream_push
drop column alive_second;
alter table wvp_stream_proxy
drop column src_url;