修复旧版本拉流代理的兼容接口
parent
0c642f3b58
commit
c4076ed393
|
@ -37,7 +37,9 @@ public class SipSubscribe {
|
||||||
// @Scheduled(fixedRate= 100 * 60 * 60 )
|
// @Scheduled(fixedRate= 100 * 60 * 60 )
|
||||||
@Scheduled(cron="0 0/5 * * * ?") //每5分钟执行一次
|
@Scheduled(cron="0 0/5 * * * ?") //每5分钟执行一次
|
||||||
public void execute(){
|
public void execute(){
|
||||||
log.info("[定时任务] 清理过期的SIP订阅信息");
|
if(log.isDebugEnabled()){
|
||||||
|
log.info("[定时任务] 清理过期的SIP订阅信息");
|
||||||
|
}
|
||||||
|
|
||||||
Instant instant = Instant.now().minusMillis(TimeUnit.MINUTES.toMillis(5));
|
Instant instant = Instant.now().minusMillis(TimeUnit.MINUTES.toMillis(5));
|
||||||
|
|
||||||
|
@ -53,10 +55,12 @@ public class SipSubscribe {
|
||||||
errorTimeSubscribes.remove(key);
|
errorTimeSubscribes.remove(key);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.debug("okTimeSubscribes.size:{}",okTimeSubscribes.size());
|
if(log.isDebugEnabled()){
|
||||||
log.debug("okSubscribes.size:{}",okSubscribes.size());
|
log.debug("okTimeSubscribes.size:{}",okTimeSubscribes.size());
|
||||||
log.debug("errorTimeSubscribes.size:{}",errorTimeSubscribes.size());
|
log.debug("okSubscribes.size:{}",okSubscribes.size());
|
||||||
log.debug("errorSubscribes.size:{}",errorSubscribes.size());
|
log.debug("errorTimeSubscribes.size:{}",errorTimeSubscribes.size());
|
||||||
|
log.debug("errorSubscribes.size:{}",errorSubscribes.size());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public interface Event { void response(EventResult eventResult);
|
public interface Event { void response(EventResult eventResult);
|
||||||
|
|
|
@ -16,6 +16,9 @@ public class StreamProxyParam {
|
||||||
@Schema(description = "应用名")
|
@Schema(description = "应用名")
|
||||||
private String app;
|
private String app;
|
||||||
|
|
||||||
|
@Schema(description = "名称")
|
||||||
|
private String name;
|
||||||
|
|
||||||
@Schema(description = "流ID")
|
@Schema(description = "流ID")
|
||||||
private String stream;
|
private String stream;
|
||||||
|
|
||||||
|
@ -64,7 +67,7 @@ public class StreamProxyParam {
|
||||||
streamProxy.setEnableRemoveNoneReader(enableRemoveNoneReader);
|
streamProxy.setEnableRemoveNoneReader(enableRemoveNoneReader);
|
||||||
streamProxy.setEnableDisableNoneReader(enableDisableNoneReader);
|
streamProxy.setEnableDisableNoneReader(enableDisableNoneReader);
|
||||||
streamProxy.setFfmpegCmdKey(ffmpegCmdKey);
|
streamProxy.setFfmpegCmdKey(ffmpegCmdKey);
|
||||||
|
streamProxy.setGbName(name);
|
||||||
return streamProxy;
|
return streamProxy;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -87,7 +87,7 @@ public interface IStreamProxyService {
|
||||||
*/
|
*/
|
||||||
ResourceBaseInfo getOverview();
|
ResourceBaseInfo getOverview();
|
||||||
|
|
||||||
StreamInfo add(StreamProxy streamProxy);
|
void add(StreamProxy streamProxy);
|
||||||
|
|
||||||
StreamProxy getStreamProxy(int id);
|
StreamProxy getStreamProxy(int id);
|
||||||
|
|
||||||
|
|
|
@ -140,21 +140,35 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
public StreamInfo save(StreamProxyParam param) {
|
public StreamInfo save(StreamProxyParam param) {
|
||||||
// 兼容旧接口
|
// 兼容旧接口
|
||||||
StreamProxy streamProxyInDb = getStreamProxyByAppAndStream(param.getApp(), param.getStream());
|
StreamProxy streamProxyInDb = getStreamProxyByAppAndStream(param.getApp(), param.getStream());
|
||||||
if (streamProxyInDb != null && streamProxyInDb.getPulling()) {
|
if (streamProxyInDb != null && streamProxyInDb.getPulling() != null && streamProxyInDb.getPulling()) {
|
||||||
playService.stopProxy(streamProxyInDb);
|
playService.stopProxy(streamProxyInDb);
|
||||||
}
|
}
|
||||||
if (streamProxyInDb == null){
|
if (param.getMediaServerId().equals("auto")) {
|
||||||
return add(param.buildStreamProxy());
|
param.setMediaServerId(null);
|
||||||
}else {
|
}
|
||||||
playService.stopProxy(streamProxyInDb);
|
StreamProxy streamProxy = param.buildStreamProxy();
|
||||||
|
|
||||||
|
if (streamProxyInDb == null) {
|
||||||
|
add(streamProxy);
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
playService.stopProxy(streamProxyInDb);
|
||||||
|
} catch (ControllerException ignored) {
|
||||||
|
}
|
||||||
streamProxyMapper.delete(streamProxyInDb.getId());
|
streamProxyMapper.delete(streamProxyInDb.getId());
|
||||||
return add(param.buildStreamProxy());
|
add(streamProxy);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (param.isEnable()) {
|
||||||
|
return playService.startProxy(streamProxy);
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Transactional
|
@Transactional
|
||||||
public StreamInfo add(StreamProxy streamProxy) {
|
public void add(StreamProxy streamProxy) {
|
||||||
StreamProxy streamProxyInDb = streamProxyMapper.selectOneByAppAndStream(streamProxy.getApp(), streamProxy.getStream());
|
StreamProxy streamProxyInDb = streamProxyMapper.selectOneByAppAndStream(streamProxy.getApp(), streamProxy.getStream());
|
||||||
if (streamProxyInDb != null) {
|
if (streamProxyInDb != null) {
|
||||||
throw new ControllerException(ErrorCode.ERROR100.getCode(), "APP+STREAM已经存在");
|
throw new ControllerException(ErrorCode.ERROR100.getCode(), "APP+STREAM已经存在");
|
||||||
|
@ -166,7 +180,6 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
streamProxy.setUpdateTime(DateUtil.getNow());
|
streamProxy.setUpdateTime(DateUtil.getNow());
|
||||||
streamProxyMapper.add(streamProxy);
|
streamProxyMapper.add(streamProxy);
|
||||||
streamProxy.setStreamProxyId(streamProxy.getId());
|
streamProxy.setStreamProxyId(streamProxy.getId());
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -183,7 +196,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
if (streamProxy.getPulling() != null && streamProxy.getPulling()) {
|
if (streamProxy.getPulling() != null && streamProxy.getPulling()) {
|
||||||
playService.stopProxy(streamProxy);
|
playService.stopProxy(streamProxy);
|
||||||
}
|
}
|
||||||
if(streamProxy.getGbId() > 0) {
|
if (streamProxy.getGbId() > 0) {
|
||||||
gbChannelService.delete(streamProxy.getGbId());
|
gbChannelService.delete(streamProxy.getGbId());
|
||||||
}
|
}
|
||||||
streamProxyMapper.delete(streamProxy.getId());
|
streamProxyMapper.delete(streamProxy.getId());
|
||||||
|
@ -206,14 +219,14 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
public boolean update(StreamProxy streamProxy) {
|
public boolean update(StreamProxy streamProxy) {
|
||||||
streamProxy.setUpdateTime(DateUtil.getNow());
|
streamProxy.setUpdateTime(DateUtil.getNow());
|
||||||
StreamProxy streamProxyInDb = streamProxyMapper.select(streamProxy.getId());
|
StreamProxy streamProxyInDb = streamProxyMapper.select(streamProxy.getId());
|
||||||
if (streamProxyInDb == null) {
|
if (streamProxyInDb == null) {
|
||||||
throw new ControllerException(ErrorCode.ERROR100.getCode(), "代理不存在");
|
throw new ControllerException(ErrorCode.ERROR100.getCode(), "代理不存在");
|
||||||
}
|
}
|
||||||
int updateResult = streamProxyMapper.update(streamProxy);
|
int updateResult = streamProxyMapper.update(streamProxy);
|
||||||
if (updateResult > 0 && !ObjectUtils.isEmpty(streamProxy.getGbDeviceId())) {
|
if (updateResult > 0 && !ObjectUtils.isEmpty(streamProxy.getGbDeviceId())) {
|
||||||
if (streamProxy.getGbId() > 0) {
|
if (streamProxy.getGbId() > 0) {
|
||||||
gbChannelService.update(streamProxy.buildCommonGBChannel());
|
gbChannelService.update(streamProxy.buildCommonGBChannel());
|
||||||
}else {
|
} else {
|
||||||
gbChannelService.add(streamProxy.buildCommonGBChannel());
|
gbChannelService.add(streamProxy.buildCommonGBChannel());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -231,7 +244,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public PageInfo<StreamProxy> getAll(Integer page, Integer count, String query, Boolean pulling,String mediaServerId) {
|
public PageInfo<StreamProxy> getAll(Integer page, Integer count, String query, Boolean pulling, String mediaServerId) {
|
||||||
PageHelper.startPage(page, count);
|
PageHelper.startPage(page, count);
|
||||||
List<StreamProxy> all = streamProxyMapper.selectAll(query, pulling, mediaServerId);
|
List<StreamProxy> all = streamProxyMapper.selectAll(query, pulling, mediaServerId);
|
||||||
return new PageInfo<>(all);
|
return new PageInfo<>(all);
|
||||||
|
@ -258,7 +271,6 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Map<String, String> getFFmpegCMDs(MediaServer mediaServer) {
|
public Map<String, String> getFFmpegCMDs(MediaServer mediaServer) {
|
||||||
return mediaServerService.getFFmpegCMDs(mediaServer);
|
return mediaServerService.getFFmpegCMDs(mediaServer);
|
||||||
|
@ -281,7 +293,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
redisCatchStorage.removeStream(mediaServerId, "pull");
|
redisCatchStorage.removeStream(mediaServerId, "pull");
|
||||||
|
|
||||||
List<StreamProxy> streamProxies = streamProxyMapper.selectForEnableInMediaServer(mediaServerId, true);
|
List<StreamProxy> streamProxies = streamProxyMapper.selectForEnableInMediaServer(mediaServerId, true);
|
||||||
if (streamProxies.isEmpty()){
|
if (streamProxies.isEmpty()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
Map<String, StreamProxy> streamProxyMapForDb = new HashMap<>();
|
Map<String, StreamProxy> streamProxyMapForDb = new HashMap<>();
|
||||||
|
@ -300,7 +312,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (streamInfo.getOriginType() == OriginType.PULL.ordinal()
|
if (streamInfo.getOriginType() == OriginType.PULL.ordinal()
|
||||||
|| streamInfo.getOriginType() == OriginType.FFMPEG_PULL.ordinal() ) {
|
|| streamInfo.getOriginType() == OriginType.FFMPEG_PULL.ordinal()) {
|
||||||
if (streamProxyMapForDb.get(key) != null) {
|
if (streamProxyMapForDb.get(key) != null) {
|
||||||
redisCatchStorage.addStream(mediaServer, "pull", streamInfo.getApp(), streamInfo.getStream(), streamInfo.getMediaInfo());
|
redisCatchStorage.addStream(mediaServer, "pull", streamInfo.getApp(), streamInfo.getStream(), streamInfo.getMediaInfo());
|
||||||
if ("OFF".equalsIgnoreCase(streamProxy.getGbStatus()) && streamProxy.getGbId() > 0) {
|
if ("OFF".equalsIgnoreCase(streamProxy.getGbStatus()) && streamProxy.getGbId() > 0) {
|
||||||
|
@ -353,7 +365,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
// 清理redis相关的缓存
|
// 清理redis相关的缓存
|
||||||
redisCatchStorage.removeStream(mediaServerId, "pull");
|
redisCatchStorage.removeStream(mediaServerId, "pull");
|
||||||
|
|
||||||
if (streamProxies.isEmpty()){
|
if (streamProxies.isEmpty()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
List<StreamProxy> streamProxiesForRemove = new ArrayList<>();
|
List<StreamProxy> streamProxiesForRemove = new ArrayList<>();
|
||||||
|
@ -402,11 +414,11 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
}
|
}
|
||||||
streamProxy.setUpdateTime(DateUtil.getNow());
|
streamProxy.setUpdateTime(DateUtil.getNow());
|
||||||
streamProxyMapper.update(streamProxy);
|
streamProxyMapper.update(streamProxy);
|
||||||
streamProxy.setGbStatus(status?"ON":"OFF");
|
streamProxy.setGbStatus(status ? "ON" : "OFF");
|
||||||
if (streamProxy.getGbId() > 0) {
|
if (streamProxy.getGbId() > 0) {
|
||||||
if (status) {
|
if (status) {
|
||||||
gbChannelService.online(streamProxy.buildCommonGBChannel());
|
gbChannelService.online(streamProxy.buildCommonGBChannel());
|
||||||
}else {
|
} else {
|
||||||
gbChannelService.offline(streamProxy.buildCommonGBChannel());
|
gbChannelService.offline(streamProxy.buildCommonGBChannel());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue