diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java index 717e021c..e9ecb3cc 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java @@ -128,4 +128,6 @@ public interface IStreamProxyService { * 添加拉流代理 */ void add(StreamProxy param, GeneralCallback callback); + + void delById(int gbId); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java index dd3480a3..9a4e0b2d 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java @@ -30,6 +30,7 @@ import com.genersoft.iot.vmp.storager.dao.StreamProxyMapper; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo; +import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -159,7 +160,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { logger.info("[拉流代理] 输出地址为:{}", dstUrl); param.setMediaServerId(mediaInfo.getId()); // 更新 - StreamProxy streamProxyInDb = videoManagerStorager.queryStreamProxy(param.getApp(), param.getStream()); + StreamProxy streamProxyInDb = streamProxyMapper.selectOne(param.getApp(), param.getStream()); if (streamProxyInDb != null) { if (streamProxyInDb.getCommonGbChannelId() == 0 && !ObjectUtils.isEmpty(param.getGbId()) ) { // 新增通用通道 @@ -426,31 +427,55 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @Override public PageInfo getAll(Integer page, Integer count) { - return videoManagerStorager.queryStreamProxyList(page, count); + PageHelper.startPage(page, count); + List all = streamProxyMapper.selectAll(); + return new PageInfo<>(all); } @Override public void del(String app, String stream) { - StreamProxy streamProxyItem = videoManagerStorager.queryStreamProxy(app, stream); - if (streamProxyItem != null) { - if (streamProxyItem.getCommonGbChannelId() > 0) { - commonGbChannelService.deleteById(streamProxyItem.getCommonGbChannelId()); - } - videoManagerStorager.deleteStreamProxy(app, stream); - redisCatchStorage.removeStream(streamProxyItem.getMediaServerId(), "PULL", app, stream); - JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyItem); - if (jsonObject != null && jsonObject.getInteger("code") == 0) { - logger.info("[移除代理]: 代理: {}/{}, 从zlm移除成功", app, stream); - }else { - logger.info("[移除代理]: 代理: {}/{}, 从zlm移除失败", app, stream); - } + StreamProxy streamProxyItem = streamProxyMapper.selectOne(app, stream); + if (streamProxyItem == null) { + return; + } + if (streamProxyItem.getCommonGbChannelId() > 0) { + commonGbChannelService.deleteById(streamProxyItem.getCommonGbChannelId()); + } + streamProxyMapper.delById(streamProxyItem.getId()); + redisCatchStorage.removeStream(streamProxyItem.getMediaServerId(), "PULL", app, stream); + + JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyItem); + if (jsonObject != null && jsonObject.getInteger("code") == 0) { + logger.info("[移除代理]: 代理: {}/{}, 从zlm移除成功", app, stream); + }else { + logger.info("[移除代理]: 代理: {}/{}, 从zlm移除失败", app, stream); + } + } + + @Override + public void delById(int id) { + StreamProxy streamProxyItem = streamProxyMapper.selectOneById(id); + if (streamProxyItem == null) { + return; + } + if (streamProxyItem.getCommonGbChannelId() > 0) { + commonGbChannelService.deleteById(streamProxyItem.getCommonGbChannelId()); + } + streamProxyMapper.delById(id); + redisCatchStorage.removeStream(streamProxyItem.getMediaServerId(), "PULL", streamProxyItem.getApp(), streamProxyItem.getStream()); + + JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyItem); + if (jsonObject != null && jsonObject.getInteger("code") == 0) { + logger.info("[移除代理]: 代理: {}/{}, 从zlm移除成功", streamProxyItem.getApp(), streamProxyItem.getStream()); + }else { + logger.info("[移除代理]: 代理: {}/{}, 从zlm移除失败", streamProxyItem.getApp(), streamProxyItem.getStream()); } } @Override public boolean start(String app, String stream) { boolean result = false; - StreamProxy streamProxy = videoManagerStorager.queryStreamProxy(app, stream); + StreamProxy streamProxy = streamProxyMapper.selectOne(app, stream); if (streamProxy != null && !streamProxy.isEnable() ) { JSONObject jsonObject = addStreamProxyToZlm(streamProxy); if (jsonObject == null) { @@ -473,7 +498,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @Override public boolean stop(String app, String stream) { boolean result = false; - StreamProxy streamProxyDto = videoManagerStorager.queryStreamProxy(app, stream); + StreamProxy streamProxyDto = streamProxyMapper.selectOne(app, stream); if (streamProxyDto != null && streamProxyDto.isEnable()) { JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyDto); if (jsonObject != null && jsonObject.getInteger("code") == 0) { @@ -504,7 +529,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @Override public StreamProxy getStreamProxyByAppAndStream(String app, String streamId) { - return videoManagerStorager.getStreamProxyByAppAndStream(app, streamId); + return streamProxyMapper.selectOne(app, streamId); } @Override @@ -528,7 +553,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { syncPullStream(mediaServerId); // 恢复流代理, 只查找这个这个流媒体 - List streamProxyListForEnable = storager.getStreamProxyListForEnableInMediaServer( + List streamProxyListForEnable = streamProxyMapper.selectForEnableInMediaServer( mediaServerId, true); for (StreamProxy streamProxyDto : streamProxyListForEnable) { logger.info("恢复流代理," + streamProxyDto.getApp() + "/" + streamProxyDto.getStream()); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java index b76f83c7..2e73ef74 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java @@ -1,9 +1,6 @@ package com.genersoft.iot.vmp.storager; import com.genersoft.iot.vmp.gb28181.bean.*; -import com.genersoft.iot.vmp.media.zlm.dto.StreamProxy; -import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; -import com.genersoft.iot.vmp.storager.dao.dto.ChannelSourceInfo; import com.genersoft.iot.vmp.web.gb28181.dto.DeviceChannelExtend; import com.github.pagehelper.PageInfo; @@ -145,39 +142,6 @@ public interface IVideoManagerStorage { */ public MobilePosition queryLatestPosition(String deviceId); - - /** - * 移除代理流 - * @param app - * @param stream - * @return - */ - public int deleteStreamProxy(String app, String stream); - - /** - * 按照是app和stream获取代理流 - * @param app - * @param stream - * @return - */ - public StreamProxy queryStreamProxy(String app, String stream); - - /** - * 获取代理流 - * @param page - * @param count - * @return - */ - PageInfo queryStreamProxyList(Integer page, Integer count); - - /** - * 根据媒体ID获取启用/不启用的代理列表 - * @param id 媒体ID - * @param enable 启用/不启用 - * @return - */ - List getStreamProxyListForEnableInMediaServer(String id, boolean enable); - /** * 通道上线 * @param channelId 通道ID @@ -190,14 +154,6 @@ public interface IVideoManagerStorage { */ void deviceChannelOffline(String deviceId, String channelId); - /** - * 通过app与stream获取StreamProxy - * @param app - * @param streamId - * @return - */ - StreamProxy getStreamProxyByAppAndStream(String app, String streamId); - void updateChannelPosition(DeviceChannel deviceChannel); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java index ecb412b1..c2d47506 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java @@ -14,10 +14,12 @@ public interface StreamProxyMapper { @Insert("INSERT INTO wvp_stream_proxy (type, name, app, stream,media_server_id, url, src_url, dst_url, " + "timeout_ms, ffmpeg_cmd_key, rtp_type, enable_audio, enable_mp4, enable, status, stream_key, " + - "enable_remove_none_reader, enable_disable_none_reader, create_time, longitude, latitude, common_gb_channel_id) VALUES " + + "enable_remove_none_reader, enable_disable_none_reader, create_time, longitude, latitude, " + + "common_gb_channel_id, gb_id) VALUES " + "(#{type}, #{name}, #{app}, #{stream}, #{mediaServerId}, #{url}, #{srcUrl}, #{dstUrl}, " + "#{timeoutMs}, #{ffmpegCmdKey}, #{rtpType}, #{enableAudio}, #{enableMp4}, #{enable}, #{status}, #{streamKey}, " + - "#{enableRemoveNoneReader}, #{enableDisableNoneReader}, #{createTime} , #{longitude} , #{latitude}, #{commonGbChannelId} )") + "#{enableRemoveNoneReader}, #{enableDisableNoneReader}, #{createTime} , #{longitude} , #{latitude}, " + + "#{commonGbChannelId}, #{gbId})") @Options(useGeneratedKeys=true, keyProperty="id", keyColumn="id") int add(StreamProxy streamProxy); @@ -46,13 +48,13 @@ public interface StreamProxyMapper { @Delete("DELETE FROM wvp_stream_proxy WHERE app=#{app} AND stream=#{stream}") int del(String app, String stream); - @Select("SELECT st.* FROM wvp_stream_proxy st order by st.create_time desc") + @Select("SELECT * FROM wvp_stream_proxy order by create_time desc") List selectAll(); @Select("SELECT st.* FROM wvp_stream_proxy st WHERE st.enable=#{enable} order by st.create_time desc") List selectForEnable(boolean enable); - @Select("SELECT st.* from wvp_stream_proxy st WHERE st.app=#{app} AND st.stream=#{stream} order by st.create_time desc") + @Select("SELECT st.* from wvp_stream_proxy st WHERE st.app=#{app} AND st.stream=#{stream}") StreamProxy selectOne(@Param("app") String app, @Param("stream") String stream); @Select("SELECT st.* FROM wvp_stream_proxy st " + @@ -100,4 +102,10 @@ public interface StreamProxyMapper { ""}) void updateStreamGPS(@Param("gpsMsgInfoList") List gpsMsgInfoList); + @Select("SELECT * from wvp_stream_proxy WHERE id=#{id}") + StreamProxy selectOneById(@Param("id") int id); + + @Delete("delete from wvp_stream_proxy WHERE id=#{id}") + void delById(int id); + } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java index dc862055..81ec19b8 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java @@ -234,72 +234,6 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage { return deviceMobilePositionMapper.queryLatestPositionByDevice(deviceId); } - /** - * 移除代理流 - * @param app - * @param stream - * @return - */ - @Override - public int deleteStreamProxy(String app, String stream) { - return streamProxyMapper.del(app, stream); - } - - /** - * 分页查询代理流列表 - * @param page - * @param count - * @return - */ - @Override - public PageInfo queryStreamProxyList(Integer page, Integer count) { - PageHelper.startPage(page, count); - List all = streamProxyMapper.selectAll(); - return new PageInfo<>(all); - } - - /** - * 按照是app和stream获取代理流 - * @param app - * @param stream - * @return - */ - @Override - public StreamProxy queryStreamProxy(String app, String stream){ - return streamProxyMapper.selectOne(app, stream); - } - - @Override - public List getStreamProxyListForEnableInMediaServer(String id, boolean enable) { - return streamProxyMapper.selectForEnableInMediaServer(id, enable); - } - - @Override - public StreamProxy getStreamProxyByAppAndStream(String app, String streamId) { - return streamProxyMapper.selectOne(app, streamId); - } - - - private DeviceChannel getDeviceChannelByCatalog(PlatformCatalog catalog) { - ParentPlatform platform = platformMapper.getParentPlatByServerGBId(catalog.getPlatformId()); - DeviceChannel deviceChannel = new DeviceChannel(); - deviceChannel.setChannelId(catalog.getId()); - deviceChannel.setName(catalog.getName()); - deviceChannel.setDeviceId(platform.getDeviceGBId()); - deviceChannel.setManufacture("wvp-pro"); - deviceChannel.setStatus(true); - deviceChannel.setParental(1); - - deviceChannel.setRegisterWay(1); - deviceChannel.setParentId(catalog.getParentId()); - deviceChannel.setBusinessGroupId(catalog.getBusinessGroupId()); - - deviceChannel.setModel("live"); - deviceChannel.setOwner("wvp-pro"); - deviceChannel.setSecrecy("0"); - return deviceChannel; - } - @Override public void updateChannelPosition(DeviceChannel deviceChannel) { if (deviceChannel.getChannelId().equals(deviceChannel.getDeviceId())) { diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java index 32608fc4..70d9b73c 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java @@ -191,6 +191,20 @@ public class StreamProxyController { } } + @DeleteMapping(value = "/del/id") + @ResponseBody + @Operation(summary = "使用ID移除代理", security = @SecurityRequirement(name = JwtUtils.HEADER)) + @Parameter(name = "app", description = "应用名", required = true) + @Parameter(name = "stream", description = "流id", required = true) + public void del(@RequestBody StreamProxy proxy){ + logger.info("移除代理: " + proxy.getGbId()); + if (ObjectUtils.isEmpty(proxy.getGbId())) { + throw new ControllerException(ErrorCode.ERROR400.getCode(), "缺少ID"); + }else { + streamProxyService.delById(proxy.getId()); + } + } + @GetMapping(value = "/start") @ResponseBody @Operation(summary = "启用代理", security = @SecurityRequirement(name = JwtUtils.HEADER))