优化方法
parent
8db0192a27
commit
fc2f9d8cca
|
@ -128,4 +128,6 @@ public interface IStreamProxyService {
|
|||
* 添加拉流代理
|
||||
*/
|
||||
void add(StreamProxy param, GeneralCallback<StreamInfo> callback);
|
||||
|
||||
void delById(int gbId);
|
||||
}
|
||||
|
|
|
@ -30,6 +30,7 @@ import com.genersoft.iot.vmp.storager.dao.StreamProxyMapper;
|
|||
import com.genersoft.iot.vmp.utils.DateUtil;
|
||||
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
|
||||
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
|
||||
import com.github.pagehelper.PageHelper;
|
||||
import com.github.pagehelper.PageInfo;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -159,7 +160,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
|||
logger.info("[拉流代理] 输出地址为:{}", dstUrl);
|
||||
param.setMediaServerId(mediaInfo.getId());
|
||||
// 更新
|
||||
StreamProxy streamProxyInDb = videoManagerStorager.queryStreamProxy(param.getApp(), param.getStream());
|
||||
StreamProxy streamProxyInDb = streamProxyMapper.selectOne(param.getApp(), param.getStream());
|
||||
if (streamProxyInDb != null) {
|
||||
if (streamProxyInDb.getCommonGbChannelId() == 0 && !ObjectUtils.isEmpty(param.getGbId()) ) {
|
||||
// 新增通用通道
|
||||
|
@ -426,31 +427,55 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
|||
|
||||
@Override
|
||||
public PageInfo<StreamProxy> getAll(Integer page, Integer count) {
|
||||
return videoManagerStorager.queryStreamProxyList(page, count);
|
||||
PageHelper.startPage(page, count);
|
||||
List<StreamProxy> all = streamProxyMapper.selectAll();
|
||||
return new PageInfo<>(all);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void del(String app, String stream) {
|
||||
StreamProxy streamProxyItem = videoManagerStorager.queryStreamProxy(app, stream);
|
||||
if (streamProxyItem != null) {
|
||||
if (streamProxyItem.getCommonGbChannelId() > 0) {
|
||||
commonGbChannelService.deleteById(streamProxyItem.getCommonGbChannelId());
|
||||
}
|
||||
videoManagerStorager.deleteStreamProxy(app, stream);
|
||||
redisCatchStorage.removeStream(streamProxyItem.getMediaServerId(), "PULL", app, stream);
|
||||
JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyItem);
|
||||
if (jsonObject != null && jsonObject.getInteger("code") == 0) {
|
||||
logger.info("[移除代理]: 代理: {}/{}, 从zlm移除成功", app, stream);
|
||||
}else {
|
||||
logger.info("[移除代理]: 代理: {}/{}, 从zlm移除失败", app, stream);
|
||||
}
|
||||
StreamProxy streamProxyItem = streamProxyMapper.selectOne(app, stream);
|
||||
if (streamProxyItem == null) {
|
||||
return;
|
||||
}
|
||||
if (streamProxyItem.getCommonGbChannelId() > 0) {
|
||||
commonGbChannelService.deleteById(streamProxyItem.getCommonGbChannelId());
|
||||
}
|
||||
streamProxyMapper.delById(streamProxyItem.getId());
|
||||
redisCatchStorage.removeStream(streamProxyItem.getMediaServerId(), "PULL", app, stream);
|
||||
|
||||
JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyItem);
|
||||
if (jsonObject != null && jsonObject.getInteger("code") == 0) {
|
||||
logger.info("[移除代理]: 代理: {}/{}, 从zlm移除成功", app, stream);
|
||||
}else {
|
||||
logger.info("[移除代理]: 代理: {}/{}, 从zlm移除失败", app, stream);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void delById(int id) {
|
||||
StreamProxy streamProxyItem = streamProxyMapper.selectOneById(id);
|
||||
if (streamProxyItem == null) {
|
||||
return;
|
||||
}
|
||||
if (streamProxyItem.getCommonGbChannelId() > 0) {
|
||||
commonGbChannelService.deleteById(streamProxyItem.getCommonGbChannelId());
|
||||
}
|
||||
streamProxyMapper.delById(id);
|
||||
redisCatchStorage.removeStream(streamProxyItem.getMediaServerId(), "PULL", streamProxyItem.getApp(), streamProxyItem.getStream());
|
||||
|
||||
JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyItem);
|
||||
if (jsonObject != null && jsonObject.getInteger("code") == 0) {
|
||||
logger.info("[移除代理]: 代理: {}/{}, 从zlm移除成功", streamProxyItem.getApp(), streamProxyItem.getStream());
|
||||
}else {
|
||||
logger.info("[移除代理]: 代理: {}/{}, 从zlm移除失败", streamProxyItem.getApp(), streamProxyItem.getStream());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean start(String app, String stream) {
|
||||
boolean result = false;
|
||||
StreamProxy streamProxy = videoManagerStorager.queryStreamProxy(app, stream);
|
||||
StreamProxy streamProxy = streamProxyMapper.selectOne(app, stream);
|
||||
if (streamProxy != null && !streamProxy.isEnable() ) {
|
||||
JSONObject jsonObject = addStreamProxyToZlm(streamProxy);
|
||||
if (jsonObject == null) {
|
||||
|
@ -473,7 +498,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
|||
@Override
|
||||
public boolean stop(String app, String stream) {
|
||||
boolean result = false;
|
||||
StreamProxy streamProxyDto = videoManagerStorager.queryStreamProxy(app, stream);
|
||||
StreamProxy streamProxyDto = streamProxyMapper.selectOne(app, stream);
|
||||
if (streamProxyDto != null && streamProxyDto.isEnable()) {
|
||||
JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyDto);
|
||||
if (jsonObject != null && jsonObject.getInteger("code") == 0) {
|
||||
|
@ -504,7 +529,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
|||
|
||||
@Override
|
||||
public StreamProxy getStreamProxyByAppAndStream(String app, String streamId) {
|
||||
return videoManagerStorager.getStreamProxyByAppAndStream(app, streamId);
|
||||
return streamProxyMapper.selectOne(app, streamId);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -528,7 +553,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
|||
syncPullStream(mediaServerId);
|
||||
|
||||
// 恢复流代理, 只查找这个这个流媒体
|
||||
List<StreamProxy> streamProxyListForEnable = storager.getStreamProxyListForEnableInMediaServer(
|
||||
List<StreamProxy> streamProxyListForEnable = streamProxyMapper.selectForEnableInMediaServer(
|
||||
mediaServerId, true);
|
||||
for (StreamProxy streamProxyDto : streamProxyListForEnable) {
|
||||
logger.info("恢复流代理," + streamProxyDto.getApp() + "/" + streamProxyDto.getStream());
|
||||
|
|
|
@ -1,9 +1,6 @@
|
|||
package com.genersoft.iot.vmp.storager;
|
||||
|
||||
import com.genersoft.iot.vmp.gb28181.bean.*;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxy;
|
||||
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
|
||||
import com.genersoft.iot.vmp.storager.dao.dto.ChannelSourceInfo;
|
||||
import com.genersoft.iot.vmp.web.gb28181.dto.DeviceChannelExtend;
|
||||
import com.github.pagehelper.PageInfo;
|
||||
|
||||
|
@ -145,39 +142,6 @@ public interface IVideoManagerStorage {
|
|||
*/
|
||||
public MobilePosition queryLatestPosition(String deviceId);
|
||||
|
||||
|
||||
/**
|
||||
* 移除代理流
|
||||
* @param app
|
||||
* @param stream
|
||||
* @return
|
||||
*/
|
||||
public int deleteStreamProxy(String app, String stream);
|
||||
|
||||
/**
|
||||
* 按照是app和stream获取代理流
|
||||
* @param app
|
||||
* @param stream
|
||||
* @return
|
||||
*/
|
||||
public StreamProxy queryStreamProxy(String app, String stream);
|
||||
|
||||
/**
|
||||
* 获取代理流
|
||||
* @param page
|
||||
* @param count
|
||||
* @return
|
||||
*/
|
||||
PageInfo<StreamProxy> queryStreamProxyList(Integer page, Integer count);
|
||||
|
||||
/**
|
||||
* 根据媒体ID获取启用/不启用的代理列表
|
||||
* @param id 媒体ID
|
||||
* @param enable 启用/不启用
|
||||
* @return
|
||||
*/
|
||||
List<StreamProxy> getStreamProxyListForEnableInMediaServer(String id, boolean enable);
|
||||
|
||||
/**
|
||||
* 通道上线
|
||||
* @param channelId 通道ID
|
||||
|
@ -190,14 +154,6 @@ public interface IVideoManagerStorage {
|
|||
*/
|
||||
void deviceChannelOffline(String deviceId, String channelId);
|
||||
|
||||
/**
|
||||
* 通过app与stream获取StreamProxy
|
||||
* @param app
|
||||
* @param streamId
|
||||
* @return
|
||||
*/
|
||||
StreamProxy getStreamProxyByAppAndStream(String app, String streamId);
|
||||
|
||||
|
||||
void updateChannelPosition(DeviceChannel deviceChannel);
|
||||
|
||||
|
|
|
@ -14,10 +14,12 @@ public interface StreamProxyMapper {
|
|||
|
||||
@Insert("INSERT INTO wvp_stream_proxy (type, name, app, stream,media_server_id, url, src_url, dst_url, " +
|
||||
"timeout_ms, ffmpeg_cmd_key, rtp_type, enable_audio, enable_mp4, enable, status, stream_key, " +
|
||||
"enable_remove_none_reader, enable_disable_none_reader, create_time, longitude, latitude, common_gb_channel_id) VALUES " +
|
||||
"enable_remove_none_reader, enable_disable_none_reader, create_time, longitude, latitude, " +
|
||||
"common_gb_channel_id, gb_id) VALUES " +
|
||||
"(#{type}, #{name}, #{app}, #{stream}, #{mediaServerId}, #{url}, #{srcUrl}, #{dstUrl}, " +
|
||||
"#{timeoutMs}, #{ffmpegCmdKey}, #{rtpType}, #{enableAudio}, #{enableMp4}, #{enable}, #{status}, #{streamKey}, " +
|
||||
"#{enableRemoveNoneReader}, #{enableDisableNoneReader}, #{createTime} , #{longitude} , #{latitude}, #{commonGbChannelId} )")
|
||||
"#{enableRemoveNoneReader}, #{enableDisableNoneReader}, #{createTime} , #{longitude} , #{latitude}, " +
|
||||
"#{commonGbChannelId}, #{gbId})")
|
||||
@Options(useGeneratedKeys=true, keyProperty="id", keyColumn="id")
|
||||
int add(StreamProxy streamProxy);
|
||||
|
||||
|
@ -46,13 +48,13 @@ public interface StreamProxyMapper {
|
|||
@Delete("DELETE FROM wvp_stream_proxy WHERE app=#{app} AND stream=#{stream}")
|
||||
int del(String app, String stream);
|
||||
|
||||
@Select("SELECT st.* FROM wvp_stream_proxy st order by st.create_time desc")
|
||||
@Select("SELECT * FROM wvp_stream_proxy order by create_time desc")
|
||||
List<StreamProxy> selectAll();
|
||||
|
||||
@Select("SELECT st.* FROM wvp_stream_proxy st WHERE st.enable=#{enable} order by st.create_time desc")
|
||||
List<StreamProxy> selectForEnable(boolean enable);
|
||||
|
||||
@Select("SELECT st.* from wvp_stream_proxy st WHERE st.app=#{app} AND st.stream=#{stream} order by st.create_time desc")
|
||||
@Select("SELECT st.* from wvp_stream_proxy st WHERE st.app=#{app} AND st.stream=#{stream}")
|
||||
StreamProxy selectOne(@Param("app") String app, @Param("stream") String stream);
|
||||
|
||||
@Select("SELECT st.* FROM wvp_stream_proxy st " +
|
||||
|
@ -100,4 +102,10 @@ public interface StreamProxyMapper {
|
|||
"</script>"})
|
||||
void updateStreamGPS(@Param("gpsMsgInfoList") List<GPSMsgInfo> gpsMsgInfoList);
|
||||
|
||||
@Select("SELECT * from wvp_stream_proxy WHERE id=#{id}")
|
||||
StreamProxy selectOneById(@Param("id") int id);
|
||||
|
||||
@Delete("delete from wvp_stream_proxy WHERE id=#{id}")
|
||||
void delById(int id);
|
||||
|
||||
}
|
||||
|
|
|
@ -234,72 +234,6 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage {
|
|||
return deviceMobilePositionMapper.queryLatestPositionByDevice(deviceId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 移除代理流
|
||||
* @param app
|
||||
* @param stream
|
||||
* @return
|
||||
*/
|
||||
@Override
|
||||
public int deleteStreamProxy(String app, String stream) {
|
||||
return streamProxyMapper.del(app, stream);
|
||||
}
|
||||
|
||||
/**
|
||||
* 分页查询代理流列表
|
||||
* @param page
|
||||
* @param count
|
||||
* @return
|
||||
*/
|
||||
@Override
|
||||
public PageInfo<StreamProxy> queryStreamProxyList(Integer page, Integer count) {
|
||||
PageHelper.startPage(page, count);
|
||||
List<StreamProxy> all = streamProxyMapper.selectAll();
|
||||
return new PageInfo<>(all);
|
||||
}
|
||||
|
||||
/**
|
||||
* 按照是app和stream获取代理流
|
||||
* @param app
|
||||
* @param stream
|
||||
* @return
|
||||
*/
|
||||
@Override
|
||||
public StreamProxy queryStreamProxy(String app, String stream){
|
||||
return streamProxyMapper.selectOne(app, stream);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<StreamProxy> getStreamProxyListForEnableInMediaServer(String id, boolean enable) {
|
||||
return streamProxyMapper.selectForEnableInMediaServer(id, enable);
|
||||
}
|
||||
|
||||
@Override
|
||||
public StreamProxy getStreamProxyByAppAndStream(String app, String streamId) {
|
||||
return streamProxyMapper.selectOne(app, streamId);
|
||||
}
|
||||
|
||||
|
||||
private DeviceChannel getDeviceChannelByCatalog(PlatformCatalog catalog) {
|
||||
ParentPlatform platform = platformMapper.getParentPlatByServerGBId(catalog.getPlatformId());
|
||||
DeviceChannel deviceChannel = new DeviceChannel();
|
||||
deviceChannel.setChannelId(catalog.getId());
|
||||
deviceChannel.setName(catalog.getName());
|
||||
deviceChannel.setDeviceId(platform.getDeviceGBId());
|
||||
deviceChannel.setManufacture("wvp-pro");
|
||||
deviceChannel.setStatus(true);
|
||||
deviceChannel.setParental(1);
|
||||
|
||||
deviceChannel.setRegisterWay(1);
|
||||
deviceChannel.setParentId(catalog.getParentId());
|
||||
deviceChannel.setBusinessGroupId(catalog.getBusinessGroupId());
|
||||
|
||||
deviceChannel.setModel("live");
|
||||
deviceChannel.setOwner("wvp-pro");
|
||||
deviceChannel.setSecrecy("0");
|
||||
return deviceChannel;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateChannelPosition(DeviceChannel deviceChannel) {
|
||||
if (deviceChannel.getChannelId().equals(deviceChannel.getDeviceId())) {
|
||||
|
|
|
@ -191,6 +191,20 @@ public class StreamProxyController {
|
|||
}
|
||||
}
|
||||
|
||||
@DeleteMapping(value = "/del/id")
|
||||
@ResponseBody
|
||||
@Operation(summary = "使用ID移除代理", security = @SecurityRequirement(name = JwtUtils.HEADER))
|
||||
@Parameter(name = "app", description = "应用名", required = true)
|
||||
@Parameter(name = "stream", description = "流id", required = true)
|
||||
public void del(@RequestBody StreamProxy proxy){
|
||||
logger.info("移除代理: " + proxy.getGbId());
|
||||
if (ObjectUtils.isEmpty(proxy.getGbId())) {
|
||||
throw new ControllerException(ErrorCode.ERROR400.getCode(), "缺少ID");
|
||||
}else {
|
||||
streamProxyService.delById(proxy.getId());
|
||||
}
|
||||
}
|
||||
|
||||
@GetMapping(value = "/start")
|
||||
@ResponseBody
|
||||
@Operation(summary = "启用代理", security = @SecurityRequirement(name = JwtUtils.HEADER))
|
||||
|
|
Loading…
Reference in New Issue