修复拉流代理

结构优化
648540858 2023-12-11 14:52:52 +08:00
parent 6678ac8605
commit b7af59b2ce
14 changed files with 223 additions and 177 deletions

View File

@ -3,11 +3,13 @@ package com.genersoft.iot.vmp.common;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.Gb28181CodeType;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxy;
import com.genersoft.iot.vmp.service.bean.CommonGbChannelType;
import com.genersoft.iot.vmp.service.impl.CommonGbChannelServiceImpl;
import com.genersoft.iot.vmp.utils.DateUtil;
import io.swagger.v3.oas.annotations.media.Schema;
import org.apache.commons.lang3.math.NumberUtils;
import org.checkerframework.checker.units.qual.C;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -611,6 +613,17 @@ public class CommonGbChannel {
this.createTime = createTime;
}
public static CommonGbChannel getInstance(StreamProxy streamProxy) {
CommonGbChannel commonGbChannel = new CommonGbChannel();
commonGbChannel.setType(CommonGbChannelType.PROXY);
commonGbChannel.setCommonGbDeviceID(streamProxy.getGbId());
commonGbChannel.setCommonGbName(streamProxy.getName());
commonGbChannel.setCommonGbLongitude(streamProxy.getLongitude());
commonGbChannel.setCommonGbLatitude(streamProxy.getLatitude());
commonGbChannel.setCreateTime(DateUtil.getNow());
commonGbChannel.setUpdateTime(DateUtil.getNow());
return commonGbChannel;
}
public static CommonGbChannel getInstance(List<String> syncKeys, DeviceChannel deviceChannel){
CommonGbChannel commonGbChannel = new CommonGbChannel();

View File

@ -161,7 +161,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
MediaServerItem mediaServerItem = null;
StreamPush streamPushItem = null;
StreamProxyItem proxyByAppAndStream =null;
StreamProxy proxyByAppAndStream =null;
// 不是通道可能是直播流
if (channel != null && gbStream == null) {
// 通道存在发100TRYING

View File

@ -20,7 +20,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.media.zlm.dto.HookType;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxy;
import com.genersoft.iot.vmp.media.zlm.dto.hook.*;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
@ -199,7 +199,7 @@ public class ZLMHttpHookListener {
}
// 推流鉴权的处理
if (!"rtp".equals(param.getApp())) {
StreamProxyItem stream = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream());
StreamProxy stream = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream());
if (stream != null) {
HookResultForOnPublish result = HookResultForOnPublish.SUCCESS();
result.setEnable_audio(stream.isEnableAudio());
@ -539,7 +539,7 @@ public class ZLMHttpHookListener {
} else {
// 非国标流 推流/拉流代理
// 拉流代理
StreamProxyItem streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream());
StreamProxy streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream());
if (streamProxyItem != null) {
if (streamProxyItem.isEnableRemoveNoneReader()) {
// 无人观看自动移除
@ -678,7 +678,7 @@ public class ZLMHttpHookListener {
} else {
// 拉流代理
StreamProxyItem streamProxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream());
StreamProxy streamProxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream());
if (streamProxyByAppAndStream != null && streamProxyByAppAndStream.isEnableDisableNoneReader()) {
streamProxyService.start(param.getApp(), param.getStream());
}

View File

@ -6,7 +6,10 @@ import io.swagger.v3.oas.annotations.media.Schema;
* @author lin
*/
@Schema(description = "拉流代理的信息")
public class StreamProxyItem {
public class StreamProxy {
@Schema(description = "ID")
private int id;
@Schema(description = "类型")
private String type;
@ -43,12 +46,44 @@ public class StreamProxyItem {
@Schema(description = "拉流代理时zlm返回的key用于停止拉流代理")
private String streamKey;
@Schema(description = "国标ID")
private String gbId;
@Schema(description = "名称")
private String name;
@Schema(description = "经度")
private double longitude;
@Schema(description = "纬度")
private double latitude;
@Schema(description = "状态")
private boolean status;
@Schema(description = "创建时间")
private String createTime;
@Schema(description = "更新时间")
private String updateTime;
/**
* ID
*/
@Schema(description = "国标通用信息ID")
private int commonGbChannelId;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getCreateTime() {
return createTime;
}
public String getUpdateTime() {
return updateTime;
}
public String getType() {
return type;
}
@ -189,4 +224,60 @@ public class StreamProxyItem {
public void setCommonGbChannelId(int commonGbChannelId) {
this.commonGbChannelId = commonGbChannelId;
}
public String getGbId() {
return gbId;
}
public void setGbId(String gbId) {
this.gbId = gbId;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public double getLongitude() {
return longitude;
}
public void setLongitude(double longitude) {
this.longitude = longitude;
}
public double getLatitude() {
return latitude;
}
public void setLatitude(double latitude) {
this.latitude = latitude;
}
public boolean isStatus() {
return status;
}
public void setStatus(boolean status) {
this.status = status;
}
public String isCreateTime() {
return createTime;
}
public void setCreateTime(String createTime) {
this.createTime = createTime;
}
public String isUpdateTime() {
return updateTime;
}
public void setUpdateTime(String updateTime) {
this.updateTime = updateTime;
}
}

View File

@ -82,4 +82,7 @@ public interface ICommonGbChannelService {
void batchDelete(List<Integer> allCommonChannelsForDelete);
void deleteById(int commonGbChannelId);
void deleteByIdList(List<Integer> commonChannelIdList);
}

View File

@ -4,7 +4,7 @@ import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.GeneralCallback;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxy;
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
import com.github.pagehelper.PageInfo;
@ -14,21 +14,21 @@ public interface IStreamProxyService {
*
* @param param
*/
void save(StreamProxyItem param, GeneralCallback<StreamInfo> callback);
void save(StreamProxy param, GeneralCallback<StreamInfo> callback);
/**
* zlm
* @param param
* @return
*/
JSONObject addStreamProxyToZlm(StreamProxyItem param);
JSONObject addStreamProxyToZlm(StreamProxy param);
/**
* zlm
* @param param
* @return
*/
JSONObject removeStreamProxyFromZlm(StreamProxyItem param);
JSONObject removeStreamProxyFromZlm(StreamProxy param);
/**
*
@ -36,7 +36,7 @@ public interface IStreamProxyService {
* @param count
* @return
*/
PageInfo<StreamProxyItem> getAll(Integer page, Integer count);
PageInfo<StreamProxy> getAll(Integer page, Integer count);
/**
*
@ -81,7 +81,7 @@ public interface IStreamProxyService {
* appstreamstreamProxy
* @return
*/
StreamProxyItem getStreamProxyByAppAndStream(String app, String streamId);
StreamProxy getStreamProxyByAppAndStream(String app, String streamId);
/**
@ -103,7 +103,7 @@ public interface IStreamProxyService {
/**
*
*/
boolean updateStreamProxy(StreamProxyItem streamProxyItem);
boolean updateStreamProxy(StreamProxy streamProxyItem);
/**
*

View File

@ -765,4 +765,9 @@ public class CommonGbChannelServiceImpl implements ICommonGbChannelService {
}
// TODO 向国标级联发送catalog
}
@Override
public void deleteById(int commonGbChannelId) {
}
}

View File

@ -1,7 +1,6 @@
package com.genersoft.iot.vmp.service.impl;
import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.service.IDeviceAlarmService;
import com.genersoft.iot.vmp.storager.dao.DeviceAlarmMapper;
import com.github.pagehelper.PageHelper;

View File

@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.service.impl;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.CommonGbChannel;
import com.genersoft.iot.vmp.common.GeneralCallback;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.DynamicTask;
@ -15,17 +16,15 @@ import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxy;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.service.IGbStreamService;
import com.genersoft.iot.vmp.service.ICommonGbChannelService;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IMediaService;
import com.genersoft.iot.vmp.service.IStreamProxyService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper;
import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.StreamProxyMapper;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
@ -38,12 +37,10 @@ import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.*;
/**
*
@ -78,10 +75,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
private UserSetting userSetting;
@Autowired
private GbStreamMapper gbStreamMapper;
@Autowired
private PlatformGbStreamMapper platformGbStreamMapper;
private ICommonGbChannelService commonGbChannelService;
@Autowired
private EventPublisher eventPublisher;
@ -89,9 +83,6 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
@Autowired
private ParentPlatformMapper parentPlatformMapper;
@Autowired
private IGbStreamService gbStreamService;
@Autowired
private IMediaServerService mediaServerService;
@ -109,7 +100,8 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
@Override
public void save(StreamProxyItem param, GeneralCallback<StreamInfo> callback) {
@Transactional
public void save(StreamProxy param, GeneralCallback<StreamInfo> callback) {
MediaServerItem mediaInfo;
if (ObjectUtils.isEmpty(param.getMediaServerId()) || "auto".equals(param.getMediaServerId())){
mediaInfo = mediaServerService.getMediaServerForMinimumLoad(null);
@ -155,16 +147,30 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
param.setDstUrl(dstUrl);
logger.info("[拉流代理] 输出地址为:{}", dstUrl);
param.setMediaServerId(mediaInfo.getId());
boolean saveResult;
// 更新
if (videoManagerStorager.queryStreamProxy(param.getApp(), param.getStream()) != null) {
saveResult = updateStreamProxy(param);
StreamProxy streamProxyInDb = videoManagerStorager.queryStreamProxy(param.getApp(), param.getStream());
if (streamProxyInDb != null) {
if (streamProxyInDb.getCommonGbChannelId() == 0 && param.getGbId() != null ) {
// 新增通用通道
CommonGbChannel commonGbChannel = CommonGbChannel.getInstance(param);
commonGbChannelService.add(commonGbChannel);
param.setCommonGbChannelId(commonGbChannel.getCommonGbId());
}
if (streamProxyInDb.getCommonGbChannelId() > 0 && param.getGbId() == null ) {
// 移除通用通道
commonGbChannelService.deleteById(streamProxyInDb.getCommonGbChannelId());
}
param.setUpdateTime(DateUtil.getNow());
streamProxyMapper.update(param);
}else { // 新增
saveResult = addStreamProxy(param);
}
if (!saveResult) {
callback.run(ErrorCode.ERROR100.getCode(), "保存失败", null);
return;
// 新增通用通道
CommonGbChannel commonGbChannel = CommonGbChannel.getInstance(param);
commonGbChannelService.add(commonGbChannel);
param.setCommonGbChannelId(commonGbChannel.getCommonGbId());
param.setCreateTime(DateUtil.getNow());
param.setUpdateTime(DateUtil.getNow());
streamProxyMapper.add(param);
}
HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(param.getApp(), param.getStream(), true, "rtsp", mediaInfo.getId());
hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> {
@ -232,79 +238,19 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
return null;
}
/**
*
* @param streamProxyItem
* @return
*/
private boolean addStreamProxy(StreamProxyItem streamProxyItem) {
TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
boolean result = false;
streamProxyItem.setStreamType("proxy");
streamProxyItem.setStatus(true);
String now = DateUtil.getNow();
streamProxyItem.setCreateTime(now);
try {
if (streamProxyMapper.add(streamProxyItem) > 0) {
if (!ObjectUtils.isEmpty(streamProxyItem.getGbId())) {
if (gbStreamMapper.add(streamProxyItem) < 0) {
//事务回滚
dataSourceTransactionManager.rollback(transactionStatus);
return false;
}
}
}else {
//事务回滚
dataSourceTransactionManager.rollback(transactionStatus);
return false;
}
result = true;
dataSourceTransactionManager.commit(transactionStatus); //手动提交
}catch (Exception e) {
logger.error("向数据库添加流代理失败:", e);
dataSourceTransactionManager.rollback(transactionStatus);
}
return result;
}
/**
*
* @param streamProxyItem
* @return
*/
@Override
public boolean updateStreamProxy(StreamProxyItem streamProxyItem) {
TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
boolean result = false;
streamProxyItem.setStreamType("proxy");
try {
if (streamProxyMapper.update(streamProxyItem) > 0) {
if (!ObjectUtils.isEmpty(streamProxyItem.getGbId())) {
if (gbStreamMapper.updateByAppAndStream(streamProxyItem) == 0) {
//事务回滚
dataSourceTransactionManager.rollback(transactionStatus);
return false;
}
}
} else {
//事务回滚
dataSourceTransactionManager.rollback(transactionStatus);
return false;
}
dataSourceTransactionManager.commit(transactionStatus); //手动提交
result = true;
}catch (Exception e) {
logger.error("未处理的异常 ", e);
dataSourceTransactionManager.rollback(transactionStatus);
}
return result;
public boolean updateStreamProxy(StreamProxy streamProxyItem) {
streamProxyItem.setCreateTime(DateUtil.getNow());
return streamProxyMapper.update(streamProxyItem) > 0;
}
@Override
public JSONObject addStreamProxyToZlm(StreamProxyItem param) {
public JSONObject addStreamProxyToZlm(StreamProxy param) {
JSONObject result = null;
MediaServerItem mediaServerItem = null;
if (param.getMediaServerId() == null) {
@ -347,7 +293,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
}
@Override
public JSONObject removeStreamProxyFromZlm(StreamProxyItem param) {
public JSONObject removeStreamProxyFromZlm(StreamProxy param) {
if (param ==null) {
return null;
}
@ -362,19 +308,17 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
}
@Override
public PageInfo<StreamProxyItem> getAll(Integer page, Integer count) {
public PageInfo<StreamProxy> getAll(Integer page, Integer count) {
return videoManagerStorager.queryStreamProxyList(page, count);
}
@Override
public void del(String app, String stream) {
StreamProxyItem streamProxyItem = videoManagerStorager.queryStreamProxy(app, stream);
StreamProxy streamProxyItem = videoManagerStorager.queryStreamProxy(app, stream);
if (streamProxyItem != null) {
gbStreamService.sendCatalogMsg(streamProxyItem, CatalogEvent.DEL);
// 如果关联了国标那么移除关联
platformGbStreamMapper.delByAppAndStream(app, stream);
gbStreamMapper.del(app, stream);
if (streamProxyItem.getCommonGbChannelId() > 0) {
commonGbChannelService.deleteById(streamProxyItem.getCommonGbChannelId());
}
videoManagerStorager.deleteStreamProxy(app, stream);
redisCatchStorage.removeStream(streamProxyItem.getMediaServerId(), "PULL", app, stream);
JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyItem);
@ -389,7 +333,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
@Override
public boolean start(String app, String stream) {
boolean result = false;
StreamProxyItem streamProxy = videoManagerStorager.queryStreamProxy(app, stream);
StreamProxy streamProxy = videoManagerStorager.queryStreamProxy(app, stream);
if (streamProxy != null && !streamProxy.isEnable() ) {
JSONObject jsonObject = addStreamProxyToZlm(streamProxy);
if (jsonObject == null) {
@ -410,7 +354,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
@Override
public boolean stop(String app, String stream) {
boolean result = false;
StreamProxyItem streamProxyDto = videoManagerStorager.queryStreamProxy(app, stream);
StreamProxy streamProxyDto = videoManagerStorager.queryStreamProxy(app, stream);
if (streamProxyDto != null && streamProxyDto.isEnable()) {
JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyDto);
if (jsonObject != null && jsonObject.getInteger("code") == 0) {
@ -440,16 +384,24 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
@Override
public StreamProxyItem getStreamProxyByAppAndStream(String app, String streamId) {
public StreamProxy getStreamProxyByAppAndStream(String app, String streamId) {
return videoManagerStorager.getStreamProxyByAppAndStream(app, streamId);
}
@Override
public void zlmServerOnline(String mediaServerId) {
// 移除开启了无人观看自动移除的流
List<StreamProxyItem> streamProxyItemList = streamProxyMapper.selectAutoRemoveItemByMediaServerId(mediaServerId);
if (streamProxyItemList.size() > 0) {
gbStreamMapper.batchDel(streamProxyItemList);
List<StreamProxy> streamProxyItemList = streamProxyMapper.selectAutoRemoveItemByMediaServerId(mediaServerId);
List<Integer> commonChannelIdList = new ArrayList<>();
if (!streamProxyItemList.isEmpty()) {
streamProxyItemList.stream().forEach(streamProxy -> {
if (streamProxy.getCommonGbChannelId() > 0) {
commonChannelIdList.add(streamProxy.getCommonGbChannelId());
}
});
}
if (!commonChannelIdList.isEmpty()) {
commonGbChannelService.deleteByIdList(commonChannelIdList);
}
streamProxyMapper.deleteAutoRemoveItemByMediaServerId(mediaServerId);
@ -457,9 +409,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
// syncPullStream(mediaServerId);
// 恢复流代理, 只查找这个这个流媒体
List<StreamProxyItem> streamProxyListForEnable = storager.getStreamProxyListForEnableInMediaServer(
List<StreamProxy> streamProxyListForEnable = storager.getStreamProxyListForEnableInMediaServer(
mediaServerId, true);
for (StreamProxyItem streamProxyDto : streamProxyListForEnable) {
for (StreamProxy streamProxyDto : streamProxyListForEnable) {
logger.info("恢复流代理," + streamProxyDto.getApp() + "/" + streamProxyDto.getStream());
JSONObject jsonObject = addStreamProxyToZlm(streamProxyDto);
if (jsonObject == null) {
@ -475,9 +427,17 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
@Override
public void zlmServerOffline(String mediaServerId) {
// 移除开启了无人观看自动移除的流
List<StreamProxyItem> streamProxyItemList = streamProxyMapper.selectAutoRemoveItemByMediaServerId(mediaServerId);
if (streamProxyItemList.size() > 0) {
gbStreamMapper.batchDel(streamProxyItemList);
List<StreamProxy> streamProxyItemList = streamProxyMapper.selectAutoRemoveItemByMediaServerId(mediaServerId);
List<Integer> commonChannelIdList = new ArrayList<>();
if (!streamProxyItemList.isEmpty()) {
streamProxyItemList.stream().forEach(streamProxy -> {
if (streamProxy.getCommonGbChannelId() > 0) {
commonChannelIdList.add(streamProxy.getCommonGbChannelId());
}
});
}
if (!commonChannelIdList.isEmpty()) {
commonGbChannelService.deleteByIdList(commonChannelIdList);
}
streamProxyMapper.deleteAutoRemoveItemByMediaServerId(mediaServerId);
// 其他的流设置离线

View File

@ -1,7 +1,7 @@
package com.genersoft.iot.vmp.storager;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxy;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.storager.dao.dto.ChannelSourceInfo;
import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce;
@ -172,7 +172,7 @@ public interface IVideoManagerStorage {
* @param stream
* @return
*/
public StreamProxyItem queryStreamProxy(String app, String stream);
public StreamProxy queryStreamProxy(String app, String stream);
/**
*
@ -180,7 +180,7 @@ public interface IVideoManagerStorage {
* @param count
* @return
*/
PageInfo<StreamProxyItem> queryStreamProxyList(Integer page, Integer count);
PageInfo<StreamProxy> queryStreamProxyList(Integer page, Integer count);
/**
* ID
@ -215,7 +215,7 @@ public interface IVideoManagerStorage {
* @param enable /
* @return
*/
List<StreamProxyItem> getStreamProxyListForEnableInMediaServer(String id, boolean enable);
List<StreamProxy> getStreamProxyListForEnableInMediaServer(String id, boolean enable);
/**
* 线
@ -235,7 +235,7 @@ public interface IVideoManagerStorage {
* @param streamId
* @return
*/
StreamProxyItem getStreamProxyByAppAndStream(String app, String streamId);
StreamProxy getStreamProxyByAppAndStream(String app, String streamId);
int updateStreamGPS(List<GPSMsgInfo> gpsMsgInfo);

View File

@ -1,6 +1,6 @@
package com.genersoft.iot.vmp.storager.dao;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxy;
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
import org.apache.ibatis.annotations.*;
import org.springframework.stereotype.Repository;
@ -16,7 +16,7 @@ public interface StreamProxyMapper {
"(#{type}, #{name}, #{app}, #{stream}, #{mediaServerId}, #{url}, #{srcUrl}, #{dstUrl}, " +
"#{timeoutMs}, #{ffmpegCmdKey}, #{rtpType}, #{enableAudio}, #{enableMp4}, #{enable}, #{status}, #{streamKey}, " +
"#{enableRemoveNoneReader}, #{enableDisableNoneReader}, #{createTime} )")
int add(StreamProxyItem streamProxyDto);
int add(StreamProxy streamProxyDto);
@Update("UPDATE wvp_stream_proxy " +
"SET type=#{type}, " +
@ -38,29 +38,29 @@ public interface StreamProxyMapper {
"enable_disable_none_reader=#{enableDisableNoneReader}, " +
"enable_mp4=#{enableMp4} " +
"WHERE app=#{app} AND stream=#{stream}")
int update(StreamProxyItem streamProxyDto);
int update(StreamProxy streamProxyDto);
@Delete("DELETE FROM wvp_stream_proxy WHERE app=#{app} AND stream=#{stream}")
int del(String app, String stream);
@Select("SELECT st.*, pgs.gb_id, pgs.name, pgs.longitude, pgs.latitude FROM wvp_stream_proxy st LEFT join wvp_gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream order by st.create_time desc")
List<StreamProxyItem> selectAll();
List<StreamProxy> selectAll();
@Select("SELECT st.*, pgs.gb_id, pgs.name, pgs.longitude, pgs.latitude, 'proxy' as streamType FROM wvp_stream_proxy st LEFT join wvp_gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream WHERE st.enable=#{enable} order by st.create_time desc")
List<StreamProxyItem> selectForEnable(boolean enable);
List<StreamProxy> selectForEnable(boolean enable);
@Select("SELECT st.*, pgs.gb_id, pgs.name, pgs.longitude, pgs.latitude FROM wvp_stream_proxy st LEFT join wvp_gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream WHERE st.app=#{app} AND st.stream=#{stream} order by st.create_time desc")
StreamProxyItem selectOne(@Param("app") String app, @Param("stream") String stream);
StreamProxy selectOne(@Param("app") String app, @Param("stream") String stream);
@Select("SELECT st.*, pgs.gb_id, pgs.name, pgs.longitude, pgs.latitude FROM wvp_stream_proxy st " +
"LEFT join wvp_gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream " +
"WHERE st.enable=#{enable} and st.media_server_id= #{id} order by st.create_time desc")
List<StreamProxyItem> selectForEnableInMediaServer( @Param("id") String id, @Param("enable") boolean enable);
List<StreamProxy> selectForEnableInMediaServer(@Param("id") String id, @Param("enable") boolean enable);
@Select("SELECT st.*, pgs.gb_id, pgs.name, pgs.longitude, pgs.latitude FROM wvp_stream_proxy st " +
"LEFT join wvp_gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream " +
"WHERE st.media_server_id= #{id} order by st.create_time desc")
List<StreamProxyItem> selectInMediaServer(String id);
List<StreamProxy> selectInMediaServer(String id);
@Update("UPDATE wvp_stream_proxy " +
"SET status=#{status} " +
@ -76,7 +76,7 @@ public interface StreamProxyMapper {
void deleteAutoRemoveItemByMediaServerId(String mediaServerId);
@Select("SELECT st.*, pgs.gb_id, pgs.name, pgs.longitude, pgs.latitude FROM wvp_stream_proxy st LEFT join wvp_gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream WHERE st.enable_remove_none_reader=true AND st.media_server_id=#{mediaServerId} order by st.create_time desc")
List<StreamProxyItem> selectAutoRemoveItemByMediaServerId(String mediaServerId);
List<StreamProxy> selectAutoRemoveItemByMediaServerId(String mediaServerId);
@Select("select count(1) as total, sum(status) as online from wvp_stream_proxy")
ResourceBaseInfo getOverview();

View File

@ -4,11 +4,9 @@ import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxy;
import com.genersoft.iot.vmp.service.IGbStreamService;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.storager.dao.*;
import com.genersoft.iot.vmp.storager.dao.dto.ChannelSourceInfo;
@ -23,13 +21,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
* -jdbc
@ -312,9 +305,9 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage {
* @return
*/
@Override
public PageInfo<StreamProxyItem> queryStreamProxyList(Integer page, Integer count) {
public PageInfo<StreamProxy> queryStreamProxyList(Integer page, Integer count) {
PageHelper.startPage(page, count);
List<StreamProxyItem> all = streamProxyMapper.selectAll();
List<StreamProxy> all = streamProxyMapper.selectAll();
return new PageInfo<>(all);
}
@ -346,7 +339,7 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage {
* @return
*/
@Override
public StreamProxyItem queryStreamProxy(String app, String stream){
public StreamProxy queryStreamProxy(String app, String stream){
return streamProxyMapper.selectOne(app, stream);
}
@ -368,24 +361,15 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage {
}
@Override
public List<StreamProxyItem> getStreamProxyListForEnableInMediaServer(String id, boolean enable) {
public List<StreamProxy> getStreamProxyListForEnableInMediaServer(String id, boolean enable) {
return streamProxyMapper.selectForEnableInMediaServer(id, enable);
}
@Override
public StreamProxyItem getStreamProxyByAppAndStream(String app, String streamId) {
public StreamProxy getStreamProxyByAppAndStream(String app, String streamId) {
return streamProxyMapper.selectOne(app, streamId);
}
private PlatformCatalog getTopCatalog(String id, String platformId) {
PlatformCatalog catalog = catalogMapper.selectByPlatFormAndCatalogId(platformId, id);
if (catalog.getParentId().equals(platformId)) {
return catalog;
}else {
return getTopCatalog(catalog.getParentId(), platformId);
}
}
@Override
public int updateStreamGPS(List<GPSMsgInfo> gpsMsgInfos) {
return gbStreamMapper.updateStreamGPS(gpsMsgInfos);

View File

@ -7,7 +7,7 @@ import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxy;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IStreamProxyService;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
@ -59,10 +59,10 @@ public class StreamProxyController {
@Parameter(name = "online", description = "是否在线")
@GetMapping(value = "/list")
@ResponseBody
public PageInfo<StreamProxyItem> list(@RequestParam(required = false)Integer page,
@RequestParam(required = false)Integer count,
@RequestParam(required = false)String query,
@RequestParam(required = false)Boolean online ){
public PageInfo<StreamProxy> list(@RequestParam(required = false)Integer page,
@RequestParam(required = false)Integer count,
@RequestParam(required = false)String query,
@RequestParam(required = false)Boolean online ){
return streamProxyService.getAll(page, count);
}
@ -72,7 +72,7 @@ public class StreamProxyController {
@Parameter(name = "stream", description = "流Id")
@GetMapping(value = "/one")
@ResponseBody
public StreamProxyItem one(String app, String stream){
public StreamProxy one(String app, String stream){
return streamProxyService.getStreamProxyByAppAndStream(app, stream);
}
@ -82,7 +82,7 @@ public class StreamProxyController {
})
@PostMapping(value = "/save")
@ResponseBody
public DeferredResult<Object> save(@RequestBody StreamProxyItem param){
public DeferredResult<Object> save(@RequestBody StreamProxy param){
logger.info("添加代理: " + JSONObject.toJSONString(param));
if (ObjectUtils.isEmpty(param.getMediaServerId())) {
param.setMediaServerId("auto");
@ -96,35 +96,27 @@ public class StreamProxyController {
if (ObjectUtils.isEmpty(param.getGbId())) {
param.setGbId(null);
}
StreamProxyItem streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream());
StreamProxy streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream());
if (streamProxyItem != null) {
streamProxyService.del(param.getApp(), param.getStream());
}
RequestMessage requestMessage = new RequestMessage();
String key = DeferredResultHolder.CALLBACK_CMD_PROXY + param.getApp() + param.getStream();
requestMessage.setKey(key);
String uuid = UUID.randomUUID().toString();
requestMessage.setId(uuid);
DeferredResult<Object> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
// 录像查询以channelId作为deviceId查询
resultHolder.put(key, uuid, result);
result.onTimeout(()->{
WVPResult<StreamInfo> wvpResult = new WVPResult<>();
wvpResult.setCode(ErrorCode.ERROR100.getCode());
wvpResult.setMsg("超时");
requestMessage.setData(wvpResult);
resultHolder.invokeAllResult(requestMessage);
result.setResult(wvpResult);
});
streamProxyService.save(param, (code, msg, streamInfo) -> {
logger.info("[拉流代理] {}", code == ErrorCode.SUCCESS.getCode()? "成功":"失败: " + msg);
if (code == ErrorCode.SUCCESS.getCode()) {
requestMessage.setData(new StreamContent(streamInfo));
result.setResult(new StreamContent(streamInfo));
}else {
requestMessage.setData(WVPResult.fail(code, msg));
result.setResult(WVPResult.fail(code, msg));
}
resultHolder.invokeAllResult(requestMessage);
});
return result;
}

View File

@ -163,7 +163,6 @@ public class StreamPushController {
//获取文件流
InputStream inputStream = null;
try {
String name = file.getName();
inputStream = file.getInputStream();
} catch (IOException e) {
logger.error("未处理的异常 ", e);