From be011a7c85d20e8bbc327f69dfdd9363296bba26 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Sun, 14 Jan 2024 00:01:34 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=AF=E6=8C=81=E6=8B=89=E6=B5=81=E4=BB=A3?= =?UTF-8?q?=E7=90=86=E6=A3=80=E7=B4=A2=EF=BC=8C=E6=92=AD=E6=94=BE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/service/IStreamProxyService.java | 8 +- .../service/impl/StreamProxyServiceImpl.java | 207 +++++++++++------- .../vmp/storager/dao/DeviceChannelMapper.java | 2 +- .../vmp/storager/dao/StreamProxyMapper.java | 13 +- .../streamProxy/StreamProxyController.java | 18 +- 5 files changed, 168 insertions(+), 80 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java index 4be38e9af..9665ebc15 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java @@ -26,7 +26,7 @@ public interface IStreamProxyService { * @param count * @return */ - PageInfo getAll(Integer page, Integer count); + PageInfo getAll(String query, Boolean online, String mediaServerId, Integer page, Integer count); /** @@ -109,4 +109,10 @@ public interface IStreamProxyService { * 编辑拉流代理 */ void edit(StreamProxy param, GeneralCallback callback); + + /** + * 获取播放地址 + */ + void getStreamProxyById(Integer id, GeneralCallback callback); + } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java index 090c84ff7..3c38b343a 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java @@ -34,9 +34,11 @@ import com.github.pagehelper.PageInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; import org.springframework.transaction.TransactionDefinition; import org.springframework.util.CollectionUtils; @@ -97,6 +99,10 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @Autowired TransactionDefinition transactionDefinition; + @Qualifier("taskExecutor") + @Autowired + private ThreadPoolTaskExecutor taskExecutor; + @Override @Transactional @@ -164,16 +170,22 @@ public class StreamProxyServiceImpl implements IStreamProxyService { hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> { StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( mediaInfo, param.getApp(), param.getStream(), null, null); - callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); + if (callback != null) { + callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); + } }); if (param.isEnable()) { startProxy(param, mediaInfo, (code, msg, data) -> { if (code != ErrorCode.SUCCESS.getCode()) { - callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), data); + if (callback != null) { + callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), data); + } param.setStatus(true); streamProxyMapper.update(param); }else { - callback.run(code, msg, null); + if (callback != null) { + callback.run(code, msg, null); + } param.setEnable(false); // 直接移除 if (param.isEnableRemoveNoneReader()) { @@ -186,7 +198,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } else{ StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( mediaInfo, param.getApp(), param.getStream(), null, null); - callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); + if (callback != null) { + callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); + } } } @@ -211,24 +225,24 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } if (!param.isEnable()) { param.setStatus(false); - addProxyToDb(param); - StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( - mediaInfo, param.getApp(), param.getStream(), null, null); - callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); - return; } - startProxy(param, mediaInfo, (code, msg, data) -> { - callback.run(code, msg, data); - if (code == ErrorCode.SUCCESS.getCode()) { - param.setStatus(true); - } else { - if (param.isEnableRemoveNoneReader()) { - return; + addProxyToDb(param); + if (callback != null) { + callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), null); + } + taskExecutor.execute(()->{ + startProxy(param, mediaInfo, (code, msg, data) -> { + if (code == ErrorCode.SUCCESS.getCode()) { + param.setStatus(true); + } else { + if (param.isEnableRemoveNoneReader()) { + return; + } + param.setProxyError(msg); + param.setStatus(false); } - param.setProxyError(msg); - param.setStatus(false); - } - addProxyToDb(param); + addProxyToDb(param); + }); }); } @@ -251,52 +265,54 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } proxyParamHandler(param); param.setMediaServerId(mediaInfo.getId()); - // 国标编号发生变化,修改通用通道国标变化,通用通道应发送删除再发送添加命令通知上级 - // 类型变化,启用->启用:需要重新拉起视频流, 启用->未启用: 停止旧的视频流, 未启用->启用:发起新的视频流 - // 流地址发生变化。停止旧的,拉起新的 - // ffmpeg类型下,目标流地址变化,停止旧的,拉起新的 - // 节点变化: 停止旧的,拉起新的 - // ffmpeg命令模板变化: 停止旧的,拉起新的 - boolean stopOldProxy = !streamProxyInDb.getType().equals(param.getType()) - || !streamProxyInDb.getUrl().equals(param.getUrl()) - || !streamProxyInDb.getMediaServerId().equals(param.getMediaServerId()) - || (streamProxyInDb.isEnable() && !param.isEnable()) - || (streamProxyInDb.getType().equals("ffmpeg") && ( - streamProxyInDb.getDstUrl().equals(param.getDstUrl()) - || streamProxyInDb.getFfmpegCmdKey().equals(param.getFfmpegCmdKey()) - )); + updateProxyToDb(param); + if (callback != null) { + callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), null); + } + taskExecutor.execute(()->{ + // 国标编号发生变化,修改通用通道国标变化,通用通道应发送删除再发送添加命令通知上级 + // 类型变化,启用->启用:需要重新拉起视频流, 启用->未启用: 停止旧的视频流, 未启用->启用:发起新的视频流 + // 流地址发生变化。停止旧的,拉起新的 + // ffmpeg类型下,目标流地址变化,停止旧的,拉起新的 + // 节点变化: 停止旧的,拉起新的 + // ffmpeg命令模板变化: 停止旧的,拉起新的 + boolean stopOldProxy = !streamProxyInDb.getType().equals(param.getType()) + || !streamProxyInDb.getUrl().equals(param.getUrl()) + || !streamProxyInDb.getMediaServerId().equals(param.getMediaServerId()) + || (streamProxyInDb.isEnable() && !param.isEnable()) + || (streamProxyInDb.getType().equals("ffmpeg") && ( + streamProxyInDb.getDstUrl().equals(param.getDstUrl()) + || streamProxyInDb.getFfmpegCmdKey().equals(param.getFfmpegCmdKey()) + )); - // 如果是开启代理这是开启代理结束后的回调 - final GeneralCallback startProxyCallback = (code, msg, data) -> { - if (code == ErrorCode.SUCCESS.getCode()) { - param.setStatus(true); - } else { - param.setStatus(false); - if (param.isEnableRemoveNoneReader()) { - return; + // 如果是开启代理这是开启代理结束后的回调 + final GeneralCallback startProxyCallback = (code, msg, data) -> { + if (code == ErrorCode.SUCCESS.getCode()) { + param.setStatus(true); + } else { + param.setStatus(false); + if (param.isEnableRemoveNoneReader()) { + return; + } + param.setProxyError(msg); } - param.setProxyError(msg); - } - updateProxyToDb(param); - callback.run(code, msg, null); - }; - if(stopOldProxy) { - stopProxy(param, mediaInfo, (code, msg, data) -> { + updateProxyToDb(param); + }; + if(stopOldProxy) { + stopProxy(param, mediaInfo, (code, msg, data) -> { + if (param.isEnable()) { + startProxy(param, mediaInfo, startProxyCallback); + } + }); + }else { if (param.isEnable()) { startProxy(param, mediaInfo, startProxyCallback); }else { - callback.run(code, msg, null); + param.setStatus(false); + updateProxyToDb(param); } - }); - }else { - if (param.isEnable()) { - startProxy(param, mediaInfo, startProxyCallback); - }else { - param.setStatus(false); - updateProxyToDb(param); - callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), null); } - } + }); } public void startProxy(StreamProxy streamProxy, MediaServerItem mediaInfo, GeneralCallback callback) { @@ -316,7 +332,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService { StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( mediaInfo, streamProxy.getApp(), streamProxy.getStream(), null, null); logger.info("[开始拉流代理] 已拉起,直接返回 {}/{}", streamProxy.getApp(), streamProxy.getStream()); - callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); + if (callback != null) { + callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); + } } return; } @@ -329,13 +347,17 @@ public class StreamProxyServiceImpl implements IStreamProxyService { StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( mediaInfo, streamProxy.getApp(), streamProxy.getStream(), null, null); logger.info("[开始拉流代理] 成功: {}/{}", streamProxy.getApp(), streamProxy.getStream()); - callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); + if (callback != null) { + callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); + } }); dynamicTask.startDelay(delayTalkKey, ()->{ hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); dynamicTask.stop(delayTalkKey); - callback.run(ErrorCode.ERROR100.getCode(), "启用超时,请检查源地址是否可用", null); + if (callback != null) { + callback.run(ErrorCode.ERROR100.getCode(), "启用超时,请检查源地址是否可用", null); + } streamProxy.setProxyError("启用超时"); }, 10000); JSONObject result; @@ -348,24 +370,32 @@ public class StreamProxyServiceImpl implements IStreamProxyService { streamProxy.isEnableAudio(), streamProxy.isEnableMp4(), streamProxy.getRtpType()); } if (result == null) { - callback.run(ErrorCode.ERROR100.getCode(), "接口调用失败", null); + if (callback != null) { + callback.run(ErrorCode.ERROR100.getCode(), "接口调用失败", null); + } return; } if (result.getInteger("code") != 0) { hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); dynamicTask.stop(delayTalkKey); - callback.run(result.getInteger("code"), result.getString("msg"), null); + if (callback != null) { + callback.run(result.getInteger("code"), result.getString("msg"), null); + } }else { JSONObject data = result.getJSONObject("data"); if (data == null) { logger.warn("[获取拉流代理的结果数据Data] 失败: {}", result ); - callback.run(result.getInteger("code"), result.getString("msg"), null); + if (callback != null) { + callback.run(result.getInteger("code"), result.getString("msg"), null); + } return; } String key = data.getString("key"); if (key == null) { logger.warn("[获取拉流代理的结果数据Data中的KEY] 失败: {}", result ); - callback.run(ErrorCode.ERROR100.getCode(), "获取代理流结果中的KEY失败", null); + if (callback != null) { + callback.run(ErrorCode.ERROR100.getCode(), "获取代理流结果中的KEY失败", null); + } return; } streamProxy.setStreamKey(key); @@ -392,7 +422,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService { redisTemplate.delete(key); } logger.info("[停止拉流代理] 成功 {}/{}", streamProxy.getApp(), streamProxy.getStream()); - callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), null); + if (callback != null) { + callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), null); + } } public void proxyParamHandler(StreamProxy param) { @@ -497,9 +529,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } @Override - public PageInfo getAll(Integer page, Integer count) { + public PageInfo getAll(String query, Boolean online, String mediaServerId, Integer page, Integer count) { PageHelper.startPage(page, count); - List all = streamProxyMapper.selectAll(); + List all = streamProxyMapper.selectAll(query, online, mediaServerId); return new PageInfo<>(all); } @@ -546,13 +578,17 @@ public class StreamProxyServiceImpl implements IStreamProxyService { public void start(String app, String stream, GeneralCallback callback) { StreamProxy streamProxy = streamProxyMapper.selectOne(app, stream); if (streamProxy == null || !streamProxy.isEnable()){ - callback.run(ErrorCode.ERROR100.getCode(), "代理不存在或未启用", null); + if (callback != null) { + callback.run(ErrorCode.ERROR100.getCode(), "代理不存在或未启用", null); + } return; } String mediaServerId = streamProxy.getMediaServerId(); MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); if (mediaServerItem == null) { - callback.run(ErrorCode.ERROR100.getCode(), "使用的媒体节点不存在", null); + if (callback != null) { + callback.run(ErrorCode.ERROR100.getCode(), "使用的媒体节点不存在", null); + } return; } startProxy(streamProxy, mediaServerItem, (code, msg, data) -> { @@ -563,7 +599,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } streamProxy.setUpdateTime(DateUtil.getNow()); updateProxyToDb(streamProxy); - callback.run(code, msg, data); + if (callback != null) { + callback.run(code, msg, data); + } }); } @@ -572,20 +610,26 @@ public class StreamProxyServiceImpl implements IStreamProxyService { public void stop(String app, String stream, GeneralCallback callback) { StreamProxy streamProxy = streamProxyMapper.selectOne(app, stream); if (streamProxy == null || !streamProxy.isEnable()){ - callback.run(ErrorCode.ERROR100.getCode(), "代理不存在或未启用", null); + if (callback != null) { + callback.run(ErrorCode.ERROR100.getCode(), "代理不存在或未启用", null); + } return; } String mediaServerId = streamProxy.getMediaServerId(); MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); if (mediaServerItem == null) { - callback.run(ErrorCode.ERROR100.getCode(), "使用的媒体节点不存在", null); + if (callback != null) { + callback.run(ErrorCode.ERROR100.getCode(), "使用的媒体节点不存在", null); + } return; } stopProxy(streamProxy, mediaServerItem, (code, msg, data) -> { streamProxy.setStatus(false); streamProxy.setUpdateTime(DateUtil.getNow()); updateProxyToDb(streamProxy); - callback.run(code, msg, data); + if (callback != null) { + callback.run(code, msg, data); + } }); } @@ -820,4 +864,17 @@ public class StreamProxyServiceImpl implements IStreamProxyService { return streamProxyMapper.selectForEnable(true); } + @Override + public void getStreamProxyById(Integer id, GeneralCallback callback) { + assert id != null; + StreamProxy streamProxy = streamProxyMapper.selectOneById(id); + assert streamProxy != null; + StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck( + streamProxy.getApp(), streamProxy.getStream(), streamProxy.getMediaServerId(), false); + if (streamInfo == null) { + callback.run(ErrorCode.ERROR100.getCode(), "地址获取失败", null); + }else { + callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java index 50de508c7..2fd34559f 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java @@ -107,7 +107,7 @@ public interface DeviceChannelMapper { "wvp_device_channel dc " + "WHERE " + "dc.device_id = #{deviceId} " + -" AND (dc.channel_id LIKE concat('%',#{query},'%') OR dc.name LIKE concat('%',#{query},'%') OR dc.name LIKE concat('%',#{query},'%')) " + + " AND (dc.channel_id LIKE concat('%',#{query},'%') OR dc.name LIKE concat('%',#{query},'%') OR dc.name LIKE concat('%',#{query},'%')) " + " AND (dc.parent_id=#{parentChannelId} OR dc.civil_code = #{parentChannelId}) " + " AND dc.status= true" + " AND dc.status= false" + diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java index 016652119..5128aaa25 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java @@ -47,8 +47,17 @@ public interface StreamProxyMapper { @Delete("DELETE FROM wvp_stream_proxy WHERE app=#{app} AND stream=#{stream}") int del(String app, String stream); - @Select("SELECT * FROM wvp_stream_proxy order by create_time desc") - List selectAll(); + @Select(" " ) + List selectAll(@Param("query") String query, + @Param("online") Boolean online, + @Param("mediaServerId") String mediaServerId); @Select("SELECT st.* FROM wvp_stream_proxy st WHERE st.enable=#{enable} order by st.create_time desc") List selectForEnable(boolean enable); diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java index b2ca1d3ee..abd8b5eed 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java @@ -56,14 +56,16 @@ public class StreamProxyController { @Parameter(name = "count", description = "每页查询数量") @Parameter(name = "query", description = "查询内容") @Parameter(name = "online", description = "是否在线") + @Parameter(name = "mediaServerId", description = "节点ID") @GetMapping(value = "/list") @ResponseBody public PageInfo list(@RequestParam(required = false)Integer page, @RequestParam(required = false)Integer count, @RequestParam(required = false)String query, + @RequestParam(required = false)String mediaServerId, @RequestParam(required = false)Boolean online ){ - return streamProxyService.getAll(page, count); + return streamProxyService.getAll(query, online, mediaServerId, page, count); } @Operation(summary = "查询流代理", security = @SecurityRequirement(name = JwtUtils.HEADER)) @@ -277,4 +279,18 @@ public class StreamProxyController { }); return result; } + + @GetMapping(value = "/stream") + @ResponseBody + @Operation(summary = "获取代理播放地址", security = @SecurityRequirement(name = JwtUtils.HEADER)) + @Parameter(name = "id", description = "ID", required = true) + public DeferredResult> getStream(Integer id){ + logger.info("获取代理播放地址: " + id ); + DeferredResult> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue()); + streamProxyService.getStreamProxyById(id, (code, msg, data) -> { + WVPResult wvpResult = new WVPResult<>(code, msg, new StreamContent(data)); + result.setResult(wvpResult); + }); + return result; + } }