临时提交
parent
481c88f517
commit
6290e94733
|
@ -13,6 +13,7 @@ import org.springframework.stereotype.Component;
|
||||||
import java.io.*;
|
import java.io.*;
|
||||||
import java.net.ConnectException;
|
import java.net.ConnectException;
|
||||||
import java.net.SocketTimeoutException;
|
import java.net.SocketTimeoutException;
|
||||||
|
import java.net.URLEncoder;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
|
|
@ -146,14 +146,6 @@ public class StreamProxy {
|
||||||
this.rtpType = rtpType;
|
this.rtpType = rtpType;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isEnable() {
|
|
||||||
return enable;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setEnable(boolean enable) {
|
|
||||||
this.enable = enable;
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isEnableAudio() {
|
public boolean isEnableAudio() {
|
||||||
return enableAudio;
|
return enableAudio;
|
||||||
}
|
}
|
||||||
|
@ -257,4 +249,20 @@ public class StreamProxy {
|
||||||
public void setProxyError(String proxyError) {
|
public void setProxyError(String proxyError) {
|
||||||
this.proxyError = proxyError;
|
this.proxyError = proxyError;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isPulling() {
|
||||||
|
return pulling;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setPulling(boolean pulling) {
|
||||||
|
this.pulling = pulling;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getPullTime() {
|
||||||
|
return pullTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setPullTime(String pullTime) {
|
||||||
|
this.pullTime = pullTime;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -73,11 +73,6 @@ public interface IStreamProxyService {
|
||||||
*/
|
*/
|
||||||
void updateStreamGPS(List<GPSMsgInfo> gpsMsgInfoList);
|
void updateStreamGPS(List<GPSMsgInfo> gpsMsgInfoList);
|
||||||
|
|
||||||
/**
|
|
||||||
* 获取所有启用的拉流代理
|
|
||||||
*/
|
|
||||||
List<StreamProxy> getAllForEnable();
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 添加拉流代理
|
* 添加拉流代理
|
||||||
*/
|
*/
|
||||||
|
@ -98,4 +93,8 @@ public interface IStreamProxyService {
|
||||||
*/
|
*/
|
||||||
void getStreamProxyById(Integer id, GeneralCallback<StreamInfo> callback);
|
void getStreamProxyById(Integer id, GeneralCallback<StreamInfo> callback);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 播放代理流
|
||||||
|
*/
|
||||||
|
void play(Integer id, GeneralCallback<StreamInfo> callback);
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,7 +10,6 @@ import com.genersoft.iot.vmp.conf.DynamicTask;
|
||||||
import com.genersoft.iot.vmp.conf.UserSetting;
|
import com.genersoft.iot.vmp.conf.UserSetting;
|
||||||
import com.genersoft.iot.vmp.conf.exception.ControllerException;
|
import com.genersoft.iot.vmp.conf.exception.ControllerException;
|
||||||
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
|
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
|
||||||
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
|
|
||||||
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
|
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
|
||||||
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
|
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
|
||||||
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
|
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
|
||||||
|
@ -24,7 +23,6 @@ import com.genersoft.iot.vmp.service.IMediaService;
|
||||||
import com.genersoft.iot.vmp.service.IStreamProxyService;
|
import com.genersoft.iot.vmp.service.IStreamProxyService;
|
||||||
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
|
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
|
||||||
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
||||||
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
|
|
||||||
import com.genersoft.iot.vmp.storager.dao.StreamProxyMapper;
|
import com.genersoft.iot.vmp.storager.dao.StreamProxyMapper;
|
||||||
import com.genersoft.iot.vmp.utils.DateUtil;
|
import com.genersoft.iot.vmp.utils.DateUtil;
|
||||||
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
|
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
|
||||||
|
@ -37,11 +35,9 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.beans.factory.annotation.Qualifier;
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
import org.springframework.data.redis.core.RedisTemplate;
|
import org.springframework.data.redis.core.RedisTemplate;
|
||||||
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
|
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
|
||||||
import org.springframework.scheduling.annotation.Scheduled;
|
|
||||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.springframework.transaction.TransactionDefinition;
|
import org.springframework.transaction.TransactionDefinition;
|
||||||
import org.springframework.util.CollectionUtils;
|
|
||||||
import org.springframework.transaction.annotation.Transactional;
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
import org.springframework.util.ObjectUtils;
|
import org.springframework.util.ObjectUtils;
|
||||||
|
|
||||||
|
@ -51,8 +47,6 @@ import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.function.Function;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -166,6 +160,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
}else { // 新增
|
}else { // 新增
|
||||||
addProxyToDb(param);
|
addProxyToDb(param);
|
||||||
}
|
}
|
||||||
|
|
||||||
HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(param.getApp(), param.getStream(), true, "rtsp", mediaInfo.getId());
|
HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(param.getApp(), param.getStream(), true, "rtsp", mediaInfo.getId());
|
||||||
hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> {
|
hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> {
|
||||||
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
|
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
|
||||||
|
@ -174,19 +169,18 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
|
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
if (param.isEnable()) {
|
|
||||||
startProxy(param, mediaInfo, (code, msg, data) -> {
|
startProxy(param, mediaInfo, (code, msg, data) -> {
|
||||||
if (code != ErrorCode.SUCCESS.getCode()) {
|
if (code != ErrorCode.SUCCESS.getCode()) {
|
||||||
if (callback != null) {
|
if (callback != null) {
|
||||||
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), data);
|
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), data);
|
||||||
}
|
}
|
||||||
param.setStatus(true);
|
param.setPulling(true);
|
||||||
streamProxyMapper.update(param);
|
streamProxyMapper.update(param);
|
||||||
}else {
|
}else {
|
||||||
if (callback != null) {
|
if (callback != null) {
|
||||||
callback.run(code, msg, null);
|
callback.run(code, msg, null);
|
||||||
}
|
}
|
||||||
param.setEnable(false);
|
param.setPulling(false);
|
||||||
// 直接移除
|
// 直接移除
|
||||||
if (param.isEnableRemoveNoneReader()) {
|
if (param.isEnableRemoveNoneReader()) {
|
||||||
delProxyFromDb(param);
|
delProxyFromDb(param);
|
||||||
|
@ -195,55 +189,20 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} else{
|
|
||||||
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
|
|
||||||
mediaInfo, param.getApp(), param.getStream(), null, null);
|
|
||||||
if (callback != null) {
|
|
||||||
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Transactional
|
@Transactional
|
||||||
public void add(StreamProxy param, GeneralCallback<StreamInfo> callback) {
|
public void add(StreamProxy param, GeneralCallback<StreamInfo> callback) {
|
||||||
MediaServerItem mediaInfo;
|
|
||||||
if (ObjectUtils.isEmpty(param.getMediaServerId()) || "auto".equals(param.getMediaServerId())){
|
|
||||||
mediaInfo = mediaServerService.getMediaServerForMinimumLoad(null);
|
|
||||||
}else {
|
|
||||||
mediaInfo = mediaServerService.getOne(param.getMediaServerId());
|
|
||||||
}
|
|
||||||
if (mediaInfo == null) {
|
|
||||||
logger.warn("[添加拉流代理] 未找到在线的ZLM...");
|
|
||||||
throw new ControllerException(ErrorCode.ERROR100.getCode(), "保存代理未找到可用的ZLM");
|
|
||||||
}
|
|
||||||
proxyParamHandler(param);
|
proxyParamHandler(param);
|
||||||
param.setMediaServerId(mediaInfo.getId());
|
|
||||||
StreamProxy streamProxyInDb = streamProxyMapper.selectOne(param.getApp(), param.getStream());
|
StreamProxy streamProxyInDb = streamProxyMapper.selectOne(param.getApp(), param.getStream());
|
||||||
if (streamProxyInDb != null) {
|
if (streamProxyInDb != null) {
|
||||||
throw new ControllerException(ErrorCode.ERROR100.getCode(), "app/stream已经存在");
|
throw new ControllerException(ErrorCode.ERROR100.getCode(), "app/stream已经存在");
|
||||||
}
|
}
|
||||||
if (!param.isEnable()) {
|
|
||||||
param.setStatus(false);
|
|
||||||
}
|
|
||||||
addProxyToDb(param);
|
addProxyToDb(param);
|
||||||
if (callback != null) {
|
if (callback != null) {
|
||||||
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), null);
|
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), null);
|
||||||
}
|
}
|
||||||
taskExecutor.execute(()->{
|
|
||||||
startProxy(param, mediaInfo, (code, msg, data) -> {
|
|
||||||
if (code == ErrorCode.SUCCESS.getCode()) {
|
|
||||||
param.setStatus(true);
|
|
||||||
} else {
|
|
||||||
if (param.isEnableRemoveNoneReader()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
param.setProxyError(msg);
|
|
||||||
param.setStatus(false);
|
|
||||||
}
|
|
||||||
// updateStatusById(param.get);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -279,7 +238,6 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
boolean stopOldProxy = !streamProxyInDb.getType().equals(param.getType())
|
boolean stopOldProxy = !streamProxyInDb.getType().equals(param.getType())
|
||||||
|| !streamProxyInDb.getUrl().equals(param.getUrl())
|
|| !streamProxyInDb.getUrl().equals(param.getUrl())
|
||||||
|| !streamProxyInDb.getMediaServerId().equals(param.getMediaServerId())
|
|| !streamProxyInDb.getMediaServerId().equals(param.getMediaServerId())
|
||||||
|| (streamProxyInDb.isEnable() && !param.isEnable())
|
|
||||||
|| (streamProxyInDb.getType().equals("ffmpeg") && (
|
|| (streamProxyInDb.getType().equals("ffmpeg") && (
|
||||||
streamProxyInDb.getDstUrl().equals(param.getDstUrl())
|
streamProxyInDb.getDstUrl().equals(param.getDstUrl())
|
||||||
|| streamProxyInDb.getFfmpegCmdKey().equals(param.getFfmpegCmdKey())
|
|| streamProxyInDb.getFfmpegCmdKey().equals(param.getFfmpegCmdKey())
|
||||||
|
@ -288,9 +246,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
// 如果是开启代理这是开启代理结束后的回调
|
// 如果是开启代理这是开启代理结束后的回调
|
||||||
final GeneralCallback<StreamInfo> startProxyCallback = (code, msg, data) -> {
|
final GeneralCallback<StreamInfo> startProxyCallback = (code, msg, data) -> {
|
||||||
if (code == ErrorCode.SUCCESS.getCode()) {
|
if (code == ErrorCode.SUCCESS.getCode()) {
|
||||||
param.setStatus(true);
|
param.setPulling(true);
|
||||||
} else {
|
} else {
|
||||||
param.setStatus(false);
|
param.setPulling(false);
|
||||||
if (param.isEnableRemoveNoneReader()) {
|
if (param.isEnableRemoveNoneReader()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -300,17 +258,10 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
};
|
};
|
||||||
if(stopOldProxy) {
|
if(stopOldProxy) {
|
||||||
stopProxy(param, mediaInfo, (code, msg, data) -> {
|
stopProxy(param, mediaInfo, (code, msg, data) -> {
|
||||||
if (param.isEnable()) {
|
if (param.isPulling()) {
|
||||||
startProxy(param, mediaInfo, startProxyCallback);
|
startProxy(param, mediaInfo, startProxyCallback);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}else {
|
|
||||||
if (param.isEnable()) {
|
|
||||||
startProxy(param, mediaInfo, startProxyCallback);
|
|
||||||
}else {
|
|
||||||
param.setStatus(false);
|
|
||||||
updateProxyToDb(param);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -335,15 +286,15 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
if (callback != null) {
|
if (callback != null) {
|
||||||
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
|
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
String delayTalkKey = UUID.randomUUID().toString();
|
String delayTalkKey = UUID.randomUUID().toString();
|
||||||
|
|
||||||
HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(streamProxy.getApp(), streamProxy.getStream(), true, "rtsp", mediaInfo.getId());
|
HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(streamProxy.getApp(), streamProxy.getStream(), true, "rtsp", mediaInfo.getId());
|
||||||
hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> {
|
hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> {
|
||||||
dynamicTask.stop(delayTalkKey);
|
dynamicTask.stop(delayTalkKey);
|
||||||
streamProxy.setStatus(true);
|
streamProxy.setPulling(true);
|
||||||
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
|
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
|
||||||
mediaInfo, streamProxy.getApp(), streamProxy.getStream(), null, null);
|
mediaInfo, streamProxy.getApp(), streamProxy.getStream(), null, null);
|
||||||
logger.info("[开始拉流代理] 成功: {}/{}", streamProxy.getApp(), streamProxy.getStream());
|
logger.info("[开始拉流代理] 成功: {}/{}", streamProxy.getApp(), streamProxy.getStream());
|
||||||
|
@ -558,7 +509,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
if (streamProxy == null) {
|
if (streamProxy == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (streamProxy.isEnable()) {
|
if (streamProxy.isPulling()) {
|
||||||
String mediaServerId = streamProxy.getMediaServerId();
|
String mediaServerId = streamProxy.getMediaServerId();
|
||||||
MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
|
MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
|
||||||
if (mediaServerItem != null) {
|
if (mediaServerItem != null) {
|
||||||
|
@ -581,9 +532,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
@Transactional
|
@Transactional
|
||||||
public void start(String app, String stream, GeneralCallback<StreamInfo> callback) {
|
public void start(String app, String stream, GeneralCallback<StreamInfo> callback) {
|
||||||
StreamProxy streamProxy = streamProxyMapper.selectOne(app, stream);
|
StreamProxy streamProxy = streamProxyMapper.selectOne(app, stream);
|
||||||
if (streamProxy == null || !streamProxy.isEnable()){
|
if (streamProxy == null ){
|
||||||
if (callback != null) {
|
if (callback != null) {
|
||||||
callback.run(ErrorCode.ERROR100.getCode(), "代理不存在或未启用", null);
|
callback.run(ErrorCode.ERROR100.getCode(), "代理不存在", null);
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -597,9 +548,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
}
|
}
|
||||||
startProxy(streamProxy, mediaServerItem, (code, msg, data) -> {
|
startProxy(streamProxy, mediaServerItem, (code, msg, data) -> {
|
||||||
if (code == ErrorCode.SUCCESS.getCode()) {
|
if (code == ErrorCode.SUCCESS.getCode()) {
|
||||||
streamProxy.setStatus(true);
|
streamProxy.setPulling(true);
|
||||||
}else {
|
}else {
|
||||||
streamProxy.setStatus(false);
|
streamProxy.setPulling(false);
|
||||||
}
|
}
|
||||||
streamProxy.setUpdateTime(DateUtil.getNow());
|
streamProxy.setUpdateTime(DateUtil.getNow());
|
||||||
updateProxyToDb(streamProxy);
|
updateProxyToDb(streamProxy);
|
||||||
|
@ -613,9 +564,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
@Transactional
|
@Transactional
|
||||||
public void stop(String app, String stream, GeneralCallback<StreamInfo> callback) {
|
public void stop(String app, String stream, GeneralCallback<StreamInfo> callback) {
|
||||||
StreamProxy streamProxy = streamProxyMapper.selectOne(app, stream);
|
StreamProxy streamProxy = streamProxyMapper.selectOne(app, stream);
|
||||||
if (streamProxy == null || !streamProxy.isEnable()){
|
if (streamProxy == null){
|
||||||
if (callback != null) {
|
if (callback != null) {
|
||||||
callback.run(ErrorCode.ERROR100.getCode(), "代理不存在或未启用", null);
|
callback.run(ErrorCode.ERROR100.getCode(), "代理不存在", null);
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -628,7 +579,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
stopProxy(streamProxy, mediaServerItem, (code, msg, data) -> {
|
stopProxy(streamProxy, mediaServerItem, (code, msg, data) -> {
|
||||||
streamProxy.setStatus(false);
|
streamProxy.setPulling(false);
|
||||||
streamProxy.setUpdateTime(DateUtil.getNow());
|
streamProxy.setUpdateTime(DateUtil.getNow());
|
||||||
updateProxyToDb(streamProxy);
|
updateProxyToDb(streamProxy);
|
||||||
if (callback != null) {
|
if (callback != null) {
|
||||||
|
@ -680,31 +631,11 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
|
|
||||||
// 移除拉流代理生成的流信息
|
// 移除拉流代理生成的流信息
|
||||||
syncPullStream(mediaServerId);
|
syncPullStream(mediaServerId);
|
||||||
|
|
||||||
// 恢复流代理, 只查找这个这个流媒体
|
|
||||||
List<StreamProxy> streamProxyListForEnable = streamProxyMapper.selectForEnableInMediaServer(
|
|
||||||
mediaServerId, true);
|
|
||||||
for (StreamProxy streamProxyDto : streamProxyListForEnable) {
|
|
||||||
logger.info("恢复流代理," + streamProxyDto.getApp() + "/" + streamProxyDto.getStream());
|
|
||||||
MediaServerItem mediaServerItem = mediaServerService.getOne(streamProxyDto.getMediaServerId());
|
|
||||||
startProxy(streamProxyDto, mediaServerItem, (code, msg, data) -> {
|
|
||||||
if (code == ErrorCode.ERROR100.getCode()) {
|
|
||||||
if (!streamProxyDto.isStatus()) {
|
|
||||||
updateStatusById(streamProxyDto, true);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (streamProxyDto.isStatus()) {
|
|
||||||
updateStatusById(streamProxyDto, false);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Transactional
|
@Transactional
|
||||||
public void updateStatusById(StreamProxy streamProxy, boolean status) {
|
public void updateStatusById(StreamProxy streamProxy, boolean status) {
|
||||||
streamProxyMapper.updateStatusById(streamProxy.getId(), status);
|
streamProxyMapper.updatePullingById(streamProxy.getId(), status);
|
||||||
if (streamProxy.getCommonGbChannelId() > 0) {
|
if (streamProxy.getCommonGbChannelId() > 0) {
|
||||||
List<Integer> ids = new ArrayList<>();
|
List<Integer> ids = new ArrayList<>();
|
||||||
ids.add(streamProxy.getCommonGbChannelId());
|
ids.add(streamProxy.getCommonGbChannelId());
|
||||||
|
@ -733,7 +664,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
}
|
}
|
||||||
streamProxyMapper.deleteAutoRemoveItemByMediaServerId(mediaServerId);
|
streamProxyMapper.deleteAutoRemoveItemByMediaServerId(mediaServerId);
|
||||||
// 其他的流设置离线
|
// 其他的流设置离线
|
||||||
streamProxyMapper.updateStatusByMediaServerId(mediaServerId, false);
|
streamProxyMapper.updatePullingStatusByMediaServerId(mediaServerId, false);
|
||||||
String type = "PULL";
|
String type = "PULL";
|
||||||
|
|
||||||
// 发送redis消息
|
// 发送redis消息
|
||||||
|
@ -808,7 +739,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
public ResourceBaseInfo getOverview() {
|
public ResourceBaseInfo getOverview() {
|
||||||
|
|
||||||
int total = streamProxyMapper.getAllCount();
|
int total = streamProxyMapper.getAllCount();
|
||||||
int online = streamProxyMapper.getOnline();
|
int online = streamProxyMapper.getPulline();
|
||||||
|
|
||||||
return new ResourceBaseInfo(total, online);
|
return new ResourceBaseInfo(total, online);
|
||||||
}
|
}
|
||||||
|
@ -860,11 +791,6 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
streamProxyMapper.updateStreamGPS(gpsMsgInfoList);
|
streamProxyMapper.updateStreamGPS(gpsMsgInfoList);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public List<StreamProxy> getAllForEnable() {
|
|
||||||
return streamProxyMapper.selectForEnable(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void getStreamProxyById(Integer id, GeneralCallback<StreamInfo> callback) {
|
public void getStreamProxyById(Integer id, GeneralCallback<StreamInfo> callback) {
|
||||||
assert id != null;
|
assert id != null;
|
||||||
|
@ -878,4 +804,21 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
|
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void play(Integer id, GeneralCallback<StreamInfo> callback) {
|
||||||
|
StreamProxy streamProxy = streamProxyMapper.selectOneById(id);
|
||||||
|
assert streamProxy != null;
|
||||||
|
String mediaServerId = streamProxy.getMediaServerId();
|
||||||
|
MediaServerItem mediaServerItem;
|
||||||
|
if (ObjectUtils.isEmpty(mediaServerId) || mediaServerId.equals("auto")) {
|
||||||
|
mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null);
|
||||||
|
}else {
|
||||||
|
mediaServerItem = mediaServerService.getOne(mediaServerId);
|
||||||
|
}
|
||||||
|
if (mediaServerItem == null && callback != null) {
|
||||||
|
callback.run(ErrorCode.ERROR100.getCode(), "未找到可用的节点", null);
|
||||||
|
}
|
||||||
|
startProxy(streamProxy, mediaServerItem, callback);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -248,7 +248,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
|
||||||
streamPushMapper.deleteWithoutGBId(mediaServerId);
|
streamPushMapper.deleteWithoutGBId(mediaServerId);
|
||||||
// 其他的流设置未启用
|
// 其他的流设置未启用
|
||||||
streamPushMapper.updateStatusByMediaServerId(mediaServerId, false);
|
streamPushMapper.updateStatusByMediaServerId(mediaServerId, false);
|
||||||
streamProxyMapper.updateStatusByMediaServerId(mediaServerId, false);
|
streamProxyMapper.updatePullingStatusByMediaServerId(mediaServerId, false);
|
||||||
// 发送流停止消息
|
// 发送流停止消息
|
||||||
String type = "PUSH";
|
String type = "PUSH";
|
||||||
// 发送redis消息
|
// 发送redis消息
|
||||||
|
|
|
@ -13,11 +13,11 @@ import java.util.List;
|
||||||
public interface StreamProxyMapper {
|
public interface StreamProxyMapper {
|
||||||
|
|
||||||
@Insert("INSERT INTO wvp_stream_proxy (type, name, app, stream,media_server_id, url, dst_url, " +
|
@Insert("INSERT INTO wvp_stream_proxy (type, name, app, stream,media_server_id, url, dst_url, " +
|
||||||
"timeout_ms, ffmpeg_cmd_key, rtp_type, enable_audio, enable_mp4, enable, status, stream_key, " +
|
"timeout_ms, ffmpeg_cmd_key, rtp_type, enable_audio, enable_mp4, pulling, stream_key, " +
|
||||||
"enable_remove_none_reader, enable_disable_none_reader, create_time, longitude, latitude, " +
|
"enable_remove_none_reader, enable_disable_none_reader, create_time, longitude, latitude, " +
|
||||||
"common_gb_channel_id, gb_id) VALUES " +
|
"common_gb_channel_id, gb_id) VALUES " +
|
||||||
"(#{type}, #{name}, #{app}, #{stream}, #{mediaServerId}, #{url}, #{dstUrl}, " +
|
"(#{type}, #{name}, #{app}, #{stream}, #{mediaServerId}, #{url}, #{dstUrl}, " +
|
||||||
"#{timeoutMs}, #{ffmpegCmdKey}, #{rtpType}, #{enableAudio}, #{enableMp4}, #{enable}, #{status}, #{streamKey}, " +
|
"#{timeoutMs}, #{ffmpegCmdKey}, #{rtpType}, #{enableAudio}, #{enableMp4}, #{pulling}, #{streamKey}, " +
|
||||||
"#{enableRemoveNoneReader}, #{enableDisableNoneReader}, #{createTime} , #{longitude} , #{latitude}, " +
|
"#{enableRemoveNoneReader}, #{enableDisableNoneReader}, #{createTime} , #{longitude} , #{latitude}, " +
|
||||||
"#{commonGbChannelId}, #{gbId})")
|
"#{commonGbChannelId}, #{gbId})")
|
||||||
@Options(useGeneratedKeys=true, keyProperty="id", keyColumn="id")
|
@Options(useGeneratedKeys=true, keyProperty="id", keyColumn="id")
|
||||||
|
@ -35,8 +35,7 @@ public interface StreamProxyMapper {
|
||||||
"ffmpeg_cmd_key=#{ffmpegCmdKey}, " +
|
"ffmpeg_cmd_key=#{ffmpegCmdKey}, " +
|
||||||
"rtp_type=#{rtpType}, " +
|
"rtp_type=#{rtpType}, " +
|
||||||
"enable_audio=#{enableAudio}, " +
|
"enable_audio=#{enableAudio}, " +
|
||||||
"enable=#{enable}, " +
|
"pulling=#{pulling}, " +
|
||||||
"status=#{status}, " +
|
|
||||||
"stream_key=#{streamKey}, " +
|
"stream_key=#{streamKey}, " +
|
||||||
"enable_remove_none_reader=#{enableRemoveNoneReader}, " +
|
"enable_remove_none_reader=#{enableRemoveNoneReader}, " +
|
||||||
"enable_disable_none_reader=#{enableDisableNoneReader}, " +
|
"enable_disable_none_reader=#{enableDisableNoneReader}, " +
|
||||||
|
@ -51,42 +50,26 @@ public interface StreamProxyMapper {
|
||||||
"SELECT * FROM wvp_stream_proxy where 1 = 1 " +
|
"SELECT * FROM wvp_stream_proxy where 1 = 1 " +
|
||||||
" <if test='query != null'> AND (app LIKE '%${query}%' OR stream LIKE '%${query}%' OR name LIKE '%${query}%')</if> " +
|
" <if test='query != null'> AND (app LIKE '%${query}%' OR stream LIKE '%${query}%' OR name LIKE '%${query}%')</if> " +
|
||||||
" <if test='mediaServerId != null'> AND media_server_id=#{mediaServerId}</if> " +
|
" <if test='mediaServerId != null'> AND media_server_id=#{mediaServerId}</if> " +
|
||||||
" <if test='online == true' > AND status=true</if>" +
|
" <if test='pulling == true' > AND pulling=true</if>" +
|
||||||
" <if test='online == false' > AND status=false</if>" +
|
" <if test='pulling == false' > AND pulling=false</if>" +
|
||||||
"order by create_time desc"+
|
"order by create_time desc"+
|
||||||
" </script>" )
|
" </script>" )
|
||||||
List<StreamProxy> selectAll(@Param("query") String query,
|
List<StreamProxy> selectAll(@Param("query") String query,
|
||||||
@Param("online") Boolean online,
|
@Param("pulling") Boolean pulling,
|
||||||
@Param("mediaServerId") String mediaServerId);
|
@Param("mediaServerId") String mediaServerId);
|
||||||
|
|
||||||
@Select("SELECT st.* FROM wvp_stream_proxy st WHERE st.enable=#{enable} order by st.create_time desc")
|
|
||||||
List<StreamProxy> selectForEnable(boolean enable);
|
|
||||||
|
|
||||||
@Select("SELECT st.* from wvp_stream_proxy st WHERE st.app=#{app} AND st.stream=#{stream}")
|
@Select("SELECT st.* from wvp_stream_proxy st WHERE st.app=#{app} AND st.stream=#{stream}")
|
||||||
StreamProxy selectOne(@Param("app") String app, @Param("stream") String stream);
|
StreamProxy selectOne(@Param("app") String app, @Param("stream") String stream);
|
||||||
|
|
||||||
@Select("SELECT st.* FROM wvp_stream_proxy st " +
|
|
||||||
"WHERE st.enable=#{enable} and st.media_server_id= #{id} order by st.create_time desc")
|
|
||||||
List<StreamProxy> selectForEnableInMediaServer(@Param("id") String id, @Param("enable") boolean enable);
|
|
||||||
|
|
||||||
@Select("SELECT st.* FROM wvp_stream_proxy st " +
|
|
||||||
"WHERE st.media_server_id= #{id} order by st.create_time desc")
|
|
||||||
List<StreamProxy> selectInMediaServer(String id);
|
|
||||||
|
|
||||||
@Update("UPDATE wvp_stream_proxy " +
|
@Update("UPDATE wvp_stream_proxy " +
|
||||||
"SET status=#{status} " +
|
"SET pulling=#{pulling} " +
|
||||||
"WHERE media_server_id=#{mediaServerId}")
|
"WHERE media_server_id=#{mediaServerId}")
|
||||||
void updateStatusByMediaServerId(@Param("mediaServerId") String mediaServerId, @Param("status") boolean status);
|
void updatePullingStatusByMediaServerId(@Param("mediaServerId") String mediaServerId, @Param("pulling") boolean pulling);
|
||||||
|
|
||||||
@Update("UPDATE wvp_stream_proxy " +
|
@Update("UPDATE wvp_stream_proxy " +
|
||||||
"SET status=#{status} " +
|
"SET pulling=#{pulling} " +
|
||||||
"WHERE app=#{app} AND stream=#{stream}")
|
|
||||||
int updateStatus(@Param("app") String app, @Param("stream") String stream, @Param("status") boolean status);
|
|
||||||
|
|
||||||
@Update("UPDATE wvp_stream_proxy " +
|
|
||||||
"SET status=#{status} " +
|
|
||||||
"WHERE id=#{id}")
|
"WHERE id=#{id}")
|
||||||
int updateStatusById(@Param("id") int id, @Param("status") boolean status);
|
int updatePullingById(@Param("id") int id, @Param("pulling") boolean pulling);
|
||||||
|
|
||||||
@Delete("DELETE FROM wvp_stream_proxy WHERE enable_remove_none_reader=true AND media_server_id=#{mediaServerId}")
|
@Delete("DELETE FROM wvp_stream_proxy WHERE enable_remove_none_reader=true AND media_server_id=#{mediaServerId}")
|
||||||
void deleteAutoRemoveItemByMediaServerId(String mediaServerId);
|
void deleteAutoRemoveItemByMediaServerId(String mediaServerId);
|
||||||
|
@ -94,15 +77,15 @@ public interface StreamProxyMapper {
|
||||||
@Select("SELECT st.* FROM wvp_stream_proxy st WHERE st.enable_remove_none_reader=true AND st.media_server_id=#{mediaServerId} order by st.create_time desc")
|
@Select("SELECT st.* FROM wvp_stream_proxy st WHERE st.enable_remove_none_reader=true AND st.media_server_id=#{mediaServerId} order by st.create_time desc")
|
||||||
List<StreamProxy> selectAutoRemoveItemByMediaServerId(String mediaServerId);
|
List<StreamProxy> selectAutoRemoveItemByMediaServerId(String mediaServerId);
|
||||||
|
|
||||||
@Select("select count(1) as total, sum(status) as online from wvp_stream_proxy")
|
@Select("select count(1) as total, sum(pulling) as online from wvp_stream_proxy")
|
||||||
ResourceBaseInfo getOverview();
|
ResourceBaseInfo getOverview();
|
||||||
|
|
||||||
@Select("select count(1) from wvp_stream_proxy")
|
@Select("select count(1) from wvp_stream_proxy")
|
||||||
|
|
||||||
int getAllCount();
|
int getAllCount();
|
||||||
|
|
||||||
@Select("select count(1) from wvp_stream_proxy where status = true")
|
@Select("select count(1) from wvp_stream_proxy where pulling = true")
|
||||||
int getOnline();
|
int getPulline();
|
||||||
|
|
||||||
|
|
||||||
@Update({"<script>" +
|
@Update({"<script>" +
|
||||||
|
|
|
@ -280,14 +280,14 @@ public class StreamProxyController {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
@GetMapping(value = "/stream")
|
@GetMapping(value = "/play")
|
||||||
@ResponseBody
|
@ResponseBody
|
||||||
@Operation(summary = "获取代理播放地址", security = @SecurityRequirement(name = JwtUtils.HEADER))
|
@Operation(summary = "播放代理流", security = @SecurityRequirement(name = JwtUtils.HEADER))
|
||||||
@Parameter(name = "id", description = "ID", required = true)
|
@Parameter(name = "id", description = "ID", required = true)
|
||||||
public DeferredResult<WVPResult<StreamContent>> getStream(Integer id){
|
public DeferredResult<WVPResult<StreamContent>> getStream(Integer id){
|
||||||
logger.info("获取代理播放地址: " + id );
|
logger.info("播放代理流: " + id );
|
||||||
DeferredResult<WVPResult<StreamContent>> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
|
DeferredResult<WVPResult<StreamContent>> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
|
||||||
streamProxyService.getStreamProxyById(id, (code, msg, data) -> {
|
streamProxyService.play(id, (code, msg, data) -> {
|
||||||
WVPResult<StreamContent> wvpResult = new WVPResult<>(code, msg, new StreamContent(data));
|
WVPResult<StreamContent> wvpResult = new WVPResult<>(code, msg, new StreamContent(data));
|
||||||
result.setResult(wvpResult);
|
result.setResult(wvpResult);
|
||||||
});
|
});
|
||||||
|
|
|
@ -240,11 +240,11 @@ create table wvp_stream_proxy (
|
||||||
media_server_id character varying(50),
|
media_server_id character varying(50),
|
||||||
enable_audio bool default false,
|
enable_audio bool default false,
|
||||||
enable_mp4 bool default false,
|
enable_mp4 bool default false,
|
||||||
enable bool default false,
|
pulling boolean default false,
|
||||||
status boolean,
|
|
||||||
enable_remove_none_reader bool default false,
|
enable_remove_none_reader bool default false,
|
||||||
create_time character varying(50),
|
create_time character varying(50),
|
||||||
name character varying(255),
|
name character varying(255),
|
||||||
|
pull_time character varying(50) default null,
|
||||||
update_time character varying(50),
|
update_time character varying(50),
|
||||||
stream_key character varying(255),
|
stream_key character varying(255),
|
||||||
enable_disable_none_reader bool default false,
|
enable_disable_none_reader bool default false,
|
||||||
|
|
|
@ -208,12 +208,12 @@ create table wvp_stream_proxy (
|
||||||
media_server_id character varying(50),
|
media_server_id character varying(50),
|
||||||
enable_audio bool default false,
|
enable_audio bool default false,
|
||||||
enable_mp4 bool default false,
|
enable_mp4 bool default false,
|
||||||
enable bool default false,
|
pulling boolean default false,
|
||||||
status boolean,
|
|
||||||
enable_remove_none_reader bool default false,
|
enable_remove_none_reader bool default false,
|
||||||
create_time character varying(50),
|
create_time character varying(50),
|
||||||
name character varying(255),
|
name character varying(255),
|
||||||
update_time character varying(50),
|
update_time character varying(50),
|
||||||
|
pull_time character varying(50) default null,
|
||||||
stream_key character varying(255),
|
stream_key character varying(255),
|
||||||
enable_disable_none_reader bool default false,
|
enable_disable_none_reader bool default false,
|
||||||
common_gb_channel_id integer,
|
common_gb_channel_id integer,
|
||||||
|
|
|
@ -138,11 +138,20 @@ alter table wvp_stream_proxy
|
||||||
add latitude double precision;
|
add latitude double precision;
|
||||||
|
|
||||||
alter table wvp_stream_proxy
|
alter table wvp_stream_proxy
|
||||||
add status bool default false;
|
add pulling bool default false;
|
||||||
|
|
||||||
|
alter table wvp_stream_proxy
|
||||||
|
add pulling bool default false;
|
||||||
|
|
||||||
|
alter table wvp_stream_proxy
|
||||||
|
add pull_time varchar(255) default NULL;
|
||||||
|
|
||||||
alter table wvp_stream_proxy
|
alter table wvp_stream_proxy
|
||||||
add proxy_error varchar(255) default NULL;
|
add proxy_error varchar(255) default NULL;
|
||||||
|
|
||||||
|
alter table wvp_stream_proxy
|
||||||
|
drop column enable;
|
||||||
|
|
||||||
alter table wvp_device
|
alter table wvp_device
|
||||||
drop column auto_sync_channel;
|
drop column auto_sync_channel;
|
||||||
|
|
||||||
|
|
|
@ -240,13 +240,13 @@ create table wvp_stream_proxy (
|
||||||
media_server_id character varying(50),
|
media_server_id character varying(50),
|
||||||
enable_audio bool default false,
|
enable_audio bool default false,
|
||||||
enable_mp4 bool default false,
|
enable_mp4 bool default false,
|
||||||
enable bool default false,
|
pulling boolean default false,
|
||||||
status boolean,
|
|
||||||
enable_remove_none_reader bool default false,
|
enable_remove_none_reader bool default false,
|
||||||
create_time character varying(50),
|
create_time character varying(50),
|
||||||
name character varying(255),
|
name character varying(255),
|
||||||
proxy_error character varying(255) default null,
|
proxy_error character varying(255) default null,
|
||||||
update_time character varying(50),
|
update_time character varying(50),
|
||||||
|
pull_time character varying(50) default null,
|
||||||
stream_key character varying(255),
|
stream_key character varying(255),
|
||||||
enable_disable_none_reader bool default false,
|
enable_disable_none_reader bool default false,
|
||||||
constraint uk_stream_proxy_app_stream unique (app, stream)
|
constraint uk_stream_proxy_app_stream unique (app, stream)
|
||||||
|
|
|
@ -208,13 +208,13 @@ create table wvp_stream_proxy (
|
||||||
media_server_id character varying(50),
|
media_server_id character varying(50),
|
||||||
enable_audio bool default false,
|
enable_audio bool default false,
|
||||||
enable_mp4 bool default false,
|
enable_mp4 bool default false,
|
||||||
enable bool default false,
|
pulling boolean default false,
|
||||||
status boolean,
|
|
||||||
enable_remove_none_reader bool default false,
|
enable_remove_none_reader bool default false,
|
||||||
create_time character varying(50),
|
create_time character varying(50),
|
||||||
name character varying(255) default null,
|
name character varying(255) default null,
|
||||||
proxy_error character varying(255) default null,
|
proxy_error character varying(255) default null,
|
||||||
update_time character varying(50),
|
update_time character varying(50),
|
||||||
|
pull_time character varying(50) default null,
|
||||||
stream_key character varying(255),
|
stream_key character varying(255),
|
||||||
enable_disable_none_reader bool default false,
|
enable_disable_none_reader bool default false,
|
||||||
common_gb_channel_id integer,
|
common_gb_channel_id integer,
|
||||||
|
|
Loading…
Reference in New Issue