diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java index 1ef0e219..86c9f27b 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java @@ -759,7 +759,7 @@ public class MediaServerServiceImpl implements IMediaServerService { streamMediaInfo.setBytesSpeed(mediaInfoJson.getInteger("bytesSpeed")); JSONArray tracks = mediaInfoJson.getJSONArray("tracks"); - if (!tracks.isEmpty()) { + if (tracks != null && !tracks.isEmpty()) { for (int i = 0; i < tracks.size(); i++) { JSONObject tracksJson = tracks.getJSONObject(i); StreamMediaTrack streamMediaTrack = new StreamMediaTrack(); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java index 4f042d16..46400215 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java @@ -113,7 +113,7 @@ public class MediaServiceImpl implements IMediaService { @Override public boolean isReady(MediaServerItem mediaInfo, String app, String stream) { JSONObject jsonObject = zlmresTfulUtils.getMediaInfo(mediaInfo, app, "rtsp", stream); - return jsonObject != null && jsonObject.getInteger("code") == 0 && jsonObject.getBoolean("online"); + return jsonObject != null && jsonObject.getInteger("code") == 0 && jsonObject.getInteger("aliveSecond") > 0; } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java index 4430429b..090c84ff 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java @@ -260,6 +260,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { boolean stopOldProxy = !streamProxyInDb.getType().equals(param.getType()) || !streamProxyInDb.getUrl().equals(param.getUrl()) || !streamProxyInDb.getMediaServerId().equals(param.getMediaServerId()) + || (streamProxyInDb.isEnable() && !param.isEnable()) || (streamProxyInDb.getType().equals("ffmpeg") && ( streamProxyInDb.getDstUrl().equals(param.getDstUrl()) || streamProxyInDb.getFfmpegCmdKey().equals(param.getFfmpegCmdKey()) @@ -281,14 +282,10 @@ public class StreamProxyServiceImpl implements IStreamProxyService { }; if(stopOldProxy) { stopProxy(param, mediaInfo, (code, msg, data) -> { - if (code == ErrorCode.SUCCESS.getCode()) { - if (param.isEnable()) { - startProxy(param, mediaInfo, startProxyCallback); - }else { - callback.run(code, msg, null); - } + if (param.isEnable()) { + startProxy(param, mediaInfo, startProxyCallback); }else { - callback.run(code, "停止旧的代理失败: " + msg, null); + callback.run(code, msg, null); } }); }else { @@ -303,6 +300,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } public void startProxy(StreamProxy streamProxy, MediaServerItem mediaInfo, GeneralCallback callback) { + logger.info("[开始拉流代理] {}/{}", streamProxy.getApp(), streamProxy.getStream()); // 检测是否在线 boolean ready = mediaService.isReady(mediaInfo, streamProxy.getApp(), streamProxy.getStream()); if (ready) { @@ -317,26 +315,26 @@ public class StreamProxyServiceImpl implements IStreamProxyService { }else { StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( mediaInfo, streamProxy.getApp(), streamProxy.getStream(), null, null); + logger.info("[开始拉流代理] 已拉起,直接返回 {}/{}", streamProxy.getApp(), streamProxy.getStream()); callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); } return; } - String talkKey = UUID.randomUUID().toString(); String delayTalkKey = UUID.randomUUID().toString(); HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(streamProxy.getApp(), streamProxy.getStream(), true, "rtsp", mediaInfo.getId()); hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> { - dynamicTask.stop(talkKey); + dynamicTask.stop(delayTalkKey); streamProxy.setStatus(true); StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( mediaInfo, streamProxy.getApp(), streamProxy.getStream(), null, null); - logger.info("[拉流代理] 启用成功: {}/{}", streamProxy.getApp(), streamProxy.getStream()); + logger.info("[开始拉流代理] 成功: {}/{}", streamProxy.getApp(), streamProxy.getStream()); callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); }); dynamicTask.startDelay(delayTalkKey, ()->{ hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); - dynamicTask.stop(talkKey); + dynamicTask.stop(delayTalkKey); callback.run(ErrorCode.ERROR100.getCode(), "启用超时,请检查源地址是否可用", null); streamProxy.setProxyError("启用超时"); }, 10000); @@ -355,7 +353,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } if (result.getInteger("code") != 0) { hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); - dynamicTask.stop(talkKey); + dynamicTask.stop(delayTalkKey); callback.run(result.getInteger("code"), result.getString("msg"), null); }else { JSONObject data = result.getJSONObject("data"); @@ -375,6 +373,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } public void stopProxy(StreamProxy streamProxy, MediaServerItem mediaInfo, GeneralCallback callback) { + logger.info("[停止拉流代理] {}/{}", streamProxy.getApp(), streamProxy.getStream()); boolean ready = mediaService.isReady(mediaInfo, streamProxy.getApp(), streamProxy.getStream()); if (ready) { if ("ffmpeg".equalsIgnoreCase(streamProxy.getType())){ @@ -392,6 +391,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { if (redisTemplate.opsForValue().get(key) == null) { redisTemplate.delete(key); } + logger.info("[停止拉流代理] 成功 {}/{}", streamProxy.getApp(), streamProxy.getStream()); callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), null); } @@ -418,7 +418,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } } }else { - logger.info("[拉流代理参数处理] 直接拉流,源地址: {}, app: {}, stream: {}", param.getUrl(), param.getApp(), param.getStream()); + logger.info("[拉流代理参数处理] 方式:直接拉流,源地址: {}, app: {}, stream: {}", param.getUrl(), param.getApp(), param.getStream()); } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/CommonChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/CommonChannelMapper.java index 4f3ae8af..da30e673 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/CommonChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/CommonChannelMapper.java @@ -296,14 +296,14 @@ public interface CommonChannelMapper { int deleteByDeviceIDs(List clearChannels); @Update("") void channelsOnlineFromList(List channelList); @Update("") void channelsOfflineFromList(List channelList); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java index f712e8ab..01665211 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java @@ -12,11 +12,11 @@ import java.util.List; @Repository public interface StreamProxyMapper { - @Insert("INSERT INTO wvp_stream_proxy (type, name, app, stream,media_server_id, url, src_url, dst_url, " + + @Insert("INSERT INTO wvp_stream_proxy (type, name, app, stream,media_server_id, url, dst_url, " + "timeout_ms, ffmpeg_cmd_key, rtp_type, enable_audio, enable_mp4, enable, status, stream_key, " + "enable_remove_none_reader, enable_disable_none_reader, create_time, longitude, latitude, " + "common_gb_channel_id, gb_id) VALUES " + - "(#{type}, #{name}, #{app}, #{stream}, #{mediaServerId}, #{url}, #{srcUrl}, #{dstUrl}, " + + "(#{type}, #{name}, #{app}, #{stream}, #{mediaServerId}, #{url}, #{dstUrl}, " + "#{timeoutMs}, #{ffmpegCmdKey}, #{rtpType}, #{enableAudio}, #{enableMp4}, #{enable}, #{status}, #{streamKey}, " + "#{enableRemoveNoneReader}, #{enableDisableNoneReader}, #{createTime} , #{longitude} , #{latitude}, " + "#{commonGbChannelId}, #{gbId})") @@ -30,7 +30,6 @@ public interface StreamProxyMapper { "stream=#{stream}," + "url=#{url}, " + "media_server_id=#{mediaServerId}, " + - "src_url=#{srcUrl}," + "dst_url=#{dstUrl}, " + "timeout_ms=#{timeoutMs}, " + "ffmpeg_cmd_key=#{ffmpegCmdKey}, " + diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java index d65ef195..b2ca1d3e 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java @@ -192,7 +192,7 @@ public class StreamProxyController { }); streamProxyService.edit(param, (code, msg, streamInfo) -> { - logger.info("[添加拉流代理] {}", code == ErrorCode.SUCCESS.getCode()? "成功":"失败: " + msg); + logger.info("[编辑拉流代理] {}", code == ErrorCode.SUCCESS.getCode()? "成功":"失败: " + msg); if (code == ErrorCode.SUCCESS.getCode()) { result.setResult(new StreamContent(streamInfo)); }else { diff --git a/数据库/结构优化/common.sql b/数据库/结构优化/common.sql index 42276527..52b8aa1d 100644 --- a/数据库/结构优化/common.sql +++ b/数据库/结构优化/common.sql @@ -152,6 +152,9 @@ alter table wvp_stream_push alter table wvp_stream_push drop column alive_second; +alter table wvp_stream_proxy + drop column src_url; +