优化拉流代理逻辑
parent
591fcc6b3d
commit
4e60d7ed55
|
@ -18,18 +18,9 @@ public class VideoManagerConstants {
|
||||||
|
|
||||||
public static final String DEVICE_PREFIX = "VMP_DEVICE_";
|
public static final String DEVICE_PREFIX = "VMP_DEVICE_";
|
||||||
|
|
||||||
// 设备同步完成
|
|
||||||
public static final String DEVICE_SYNC_PREFIX = "VMP_DEVICE_SYNC_";
|
|
||||||
|
|
||||||
public static final String CACHEKEY_PREFIX = "VMP_CHANNEL_";
|
|
||||||
|
|
||||||
public static final String KEEPLIVEKEY_PREFIX = "VMP_KEEPALIVE_";
|
|
||||||
|
|
||||||
// TODO 此处多了一个_,暂不修改
|
// TODO 此处多了一个_,暂不修改
|
||||||
public static final String INVITE_PREFIX = "VMP_INVITE";
|
public static final String INVITE_PREFIX = "VMP_INVITE";
|
||||||
public static final String PLAYER_PREFIX = "VMP_INVITE_PLAY_";
|
|
||||||
public static final String PLAY_BLACK_PREFIX = "VMP_INVITE_PLAYBACK_";
|
|
||||||
public static final String DOWNLOAD_PREFIX = "VMP_INVITE_DOWNLOAD_";
|
|
||||||
|
|
||||||
public static final String PLATFORM_KEEPALIVE_PREFIX = "VMP_PLATFORM_KEEPALIVE_";
|
public static final String PLATFORM_KEEPALIVE_PREFIX = "VMP_PLATFORM_KEEPALIVE_";
|
||||||
|
|
||||||
|
@ -41,16 +32,6 @@ public class VideoManagerConstants {
|
||||||
|
|
||||||
public static final String PLATFORM_SEND_RTP_INFO_PREFIX = "VMP_PLATFORM_SEND_RTP_INFO_";
|
public static final String PLATFORM_SEND_RTP_INFO_PREFIX = "VMP_PLATFORM_SEND_RTP_INFO_";
|
||||||
|
|
||||||
public static final String EVENT_ONLINE_REGISTER = "1";
|
|
||||||
|
|
||||||
public static final String EVENT_ONLINE_MESSAGE = "3";
|
|
||||||
|
|
||||||
public static final String EVENT_OUTLINE_UNREGISTER = "1";
|
|
||||||
|
|
||||||
public static final String EVENT_OUTLINE_TIMEOUT = "2";
|
|
||||||
|
|
||||||
public static final String MEDIA_SSRC_USED_PREFIX = "VMP_MEDIA_USED_SSRC_";
|
|
||||||
|
|
||||||
public static final String MEDIA_TRANSACTION_USED_PREFIX = "VMP_MEDIA_TRANSACTION_";
|
public static final String MEDIA_TRANSACTION_USED_PREFIX = "VMP_MEDIA_TRANSACTION_";
|
||||||
|
|
||||||
public static final String MEDIA_STREAM_AUTHORITY = "VMP_MEDIA_STREAM_AUTHORITY_";
|
public static final String MEDIA_STREAM_AUTHORITY = "VMP_MEDIA_STREAM_AUTHORITY_";
|
||||||
|
|
|
@ -41,4 +41,14 @@ public interface IMediaService {
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, Object tracks, String addr, String callId);
|
StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, Object tracks, String addr, String callId);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 查看流是否已经注册
|
||||||
|
*/
|
||||||
|
boolean isReady(MediaServerItem mediaInfo, String app, String stream);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 关闭zlm的流
|
||||||
|
*/
|
||||||
|
boolean closeStream(MediaServerItem mediaInfo, String app, String stream);
|
||||||
}
|
}
|
||||||
|
|
|
@ -110,4 +110,15 @@ public class MediaServiceImpl implements IMediaService {
|
||||||
return streamInfoResult;
|
return streamInfoResult;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isReady(MediaServerItem mediaInfo, String app, String stream) {
|
||||||
|
JSONObject jsonObject = zlmresTfulUtils.getMediaInfo(mediaInfo, app, "rtsp", stream);
|
||||||
|
return jsonObject != null && jsonObject.getInteger("code") == 0 && jsonObject.getBoolean("online");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean closeStream(MediaServerItem mediaInfo, String app, String stream) {
|
||||||
|
JSONObject jsonObject = zlmresTfulUtils.closeStreams(mediaInfo, app, stream);
|
||||||
|
return jsonObject != null && jsonObject.getInteger("code") == 0 && jsonObject.getInteger("count_hit") > 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,11 +5,10 @@ import com.alibaba.fastjson2.JSONObject;
|
||||||
import com.genersoft.iot.vmp.common.CommonGbChannel;
|
import com.genersoft.iot.vmp.common.CommonGbChannel;
|
||||||
import com.genersoft.iot.vmp.common.GeneralCallback;
|
import com.genersoft.iot.vmp.common.GeneralCallback;
|
||||||
import com.genersoft.iot.vmp.common.StreamInfo;
|
import com.genersoft.iot.vmp.common.StreamInfo;
|
||||||
|
import com.genersoft.iot.vmp.common.VideoManagerConstants;
|
||||||
import com.genersoft.iot.vmp.conf.DynamicTask;
|
import com.genersoft.iot.vmp.conf.DynamicTask;
|
||||||
import com.genersoft.iot.vmp.conf.UserSetting;
|
import com.genersoft.iot.vmp.conf.UserSetting;
|
||||||
import com.genersoft.iot.vmp.conf.exception.ControllerException;
|
import com.genersoft.iot.vmp.conf.exception.ControllerException;
|
||||||
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
|
|
||||||
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
|
|
||||||
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
|
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
|
||||||
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
|
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
|
||||||
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
|
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
|
||||||
|
@ -18,6 +17,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
|
||||||
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
|
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
|
||||||
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxy;
|
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxy;
|
||||||
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
|
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
|
||||||
|
import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType;
|
||||||
import com.genersoft.iot.vmp.service.ICommonGbChannelService;
|
import com.genersoft.iot.vmp.service.ICommonGbChannelService;
|
||||||
import com.genersoft.iot.vmp.service.IMediaServerService;
|
import com.genersoft.iot.vmp.service.IMediaServerService;
|
||||||
import com.genersoft.iot.vmp.service.IMediaService;
|
import com.genersoft.iot.vmp.service.IMediaService;
|
||||||
|
@ -25,7 +25,6 @@ import com.genersoft.iot.vmp.service.IStreamProxyService;
|
||||||
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
|
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
|
||||||
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
||||||
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
|
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
|
||||||
import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper;
|
|
||||||
import com.genersoft.iot.vmp.storager.dao.StreamProxyMapper;
|
import com.genersoft.iot.vmp.storager.dao.StreamProxyMapper;
|
||||||
import com.genersoft.iot.vmp.utils.DateUtil;
|
import com.genersoft.iot.vmp.utils.DateUtil;
|
||||||
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
|
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
|
||||||
|
@ -35,11 +34,11 @@ import com.github.pagehelper.PageInfo;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.data.redis.core.RedisTemplate;
|
||||||
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
|
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
|
||||||
import org.springframework.scheduling.annotation.Scheduled;
|
import org.springframework.scheduling.annotation.Scheduled;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.springframework.transaction.TransactionDefinition;
|
import org.springframework.transaction.TransactionDefinition;
|
||||||
import org.springframework.transaction.TransactionStatus;
|
|
||||||
import org.springframework.util.CollectionUtils;
|
import org.springframework.util.CollectionUtils;
|
||||||
import org.springframework.transaction.annotation.Transactional;
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
import org.springframework.util.ObjectUtils;
|
import org.springframework.util.ObjectUtils;
|
||||||
|
@ -63,7 +62,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
private final static Logger logger = LoggerFactory.getLogger(StreamProxyServiceImpl.class);
|
private final static Logger logger = LoggerFactory.getLogger(StreamProxyServiceImpl.class);
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private IVideoManagerStorage videoManagerStorager;
|
private RedisTemplate<Object, Object> redisTemplate;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private IMediaService mediaService;
|
private IMediaService mediaService;
|
||||||
|
@ -162,28 +161,10 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
// 更新
|
// 更新
|
||||||
StreamProxy streamProxyInDb = streamProxyMapper.selectOne(param.getApp(), param.getStream());
|
StreamProxy streamProxyInDb = streamProxyMapper.selectOne(param.getApp(), param.getStream());
|
||||||
if (streamProxyInDb != null) {
|
if (streamProxyInDb != null) {
|
||||||
if (streamProxyInDb.getCommonGbChannelId() == 0 && !ObjectUtils.isEmpty(param.getGbId()) ) {
|
param.setId(streamProxyInDb.getId());
|
||||||
// 新增通用通道
|
updateProxyToDb(param);
|
||||||
CommonGbChannel commonGbChannel = CommonGbChannel.getInstance(param);
|
|
||||||
commonGbChannelService.add(commonGbChannel);
|
|
||||||
param.setCommonGbChannelId(commonGbChannel.getCommonGbId());
|
|
||||||
}
|
|
||||||
if (streamProxyInDb.getCommonGbChannelId() > 0 && ObjectUtils.isEmpty(param.getGbId()) ) {
|
|
||||||
// 移除通用通道
|
|
||||||
commonGbChannelService.deleteById(streamProxyInDb.getCommonGbChannelId());
|
|
||||||
}
|
|
||||||
param.setUpdateTime(DateUtil.getNow());
|
|
||||||
streamProxyMapper.update(param);
|
|
||||||
}else { // 新增
|
}else { // 新增
|
||||||
if (!ObjectUtils.isEmpty(param.getGbId())) {
|
addProxyToDb(param);
|
||||||
// 新增通用通道
|
|
||||||
CommonGbChannel commonGbChannel = CommonGbChannel.getInstance(param);
|
|
||||||
commonGbChannelService.add(commonGbChannel);
|
|
||||||
param.setCommonGbChannelId(commonGbChannel.getCommonGbId());
|
|
||||||
}
|
|
||||||
param.setCreateTime(DateUtil.getNow());
|
|
||||||
param.setUpdateTime(DateUtil.getNow());
|
|
||||||
streamProxyMapper.add(param);
|
|
||||||
}
|
}
|
||||||
HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(param.getApp(), param.getStream(), true, "rtsp", mediaInfo.getId());
|
HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(param.getApp(), param.getStream(), true, "rtsp", mediaInfo.getId());
|
||||||
hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> {
|
hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> {
|
||||||
|
@ -192,40 +173,23 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
|
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
|
||||||
});
|
});
|
||||||
if (param.isEnable()) {
|
if (param.isEnable()) {
|
||||||
String talkKey = UUID.randomUUID().toString();
|
startProxy(param, mediaInfo, (code, msg, data) -> {
|
||||||
String delayTalkKey = UUID.randomUUID().toString();
|
if (code != ErrorCode.SUCCESS.getCode()) {
|
||||||
dynamicTask.startDelay(delayTalkKey, ()->{
|
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), data);
|
||||||
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(param.getApp(), param.getStream(), mediaInfo.getId(), false);
|
param.setStatus(true);
|
||||||
if (streamInfo != null) {
|
streamProxyMapper.update(param);
|
||||||
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
|
|
||||||
}else {
|
}else {
|
||||||
dynamicTask.stop(talkKey);
|
callback.run(code, msg, null);
|
||||||
callback.run(ErrorCode.ERROR100.getCode(), "超时", null);
|
param.setEnable(false);
|
||||||
|
// 直接移除
|
||||||
|
if (param.isEnableRemoveNoneReader()) {
|
||||||
|
del(param.getApp(), param.getStream());
|
||||||
|
}else {
|
||||||
|
updateStreamProxy(param);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}, 7000);
|
});
|
||||||
JSONObject jsonObject = addStreamProxyToZlm(param);
|
} else{
|
||||||
if (jsonObject != null && jsonObject.getInteger("code") == 0) {
|
|
||||||
hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
|
|
||||||
dynamicTask.stop(talkKey);
|
|
||||||
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
|
|
||||||
mediaInfo, param.getApp(), param.getStream(), null, null);
|
|
||||||
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
|
|
||||||
}else {
|
|
||||||
param.setEnable(false);
|
|
||||||
// 直接移除
|
|
||||||
if (param.isEnableRemoveNoneReader()) {
|
|
||||||
del(param.getApp(), param.getStream());
|
|
||||||
}else {
|
|
||||||
updateStreamProxy(param);
|
|
||||||
}
|
|
||||||
if (jsonObject == null){
|
|
||||||
callback.run(ErrorCode.ERROR100.getCode(), "记录已保存,启用失败", null);
|
|
||||||
}else {
|
|
||||||
callback.run(ErrorCode.ERROR100.getCode(), jsonObject.getString("msg"), null);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else{
|
|
||||||
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
|
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
|
||||||
mediaInfo, param.getApp(), param.getStream(), null, null);
|
mediaInfo, param.getApp(), param.getStream(), null, null);
|
||||||
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
|
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
|
||||||
|
@ -276,12 +240,22 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
}
|
}
|
||||||
if (!param.isEnable()) {
|
if (!param.isEnable()) {
|
||||||
param.setStatus(false);
|
param.setStatus(false);
|
||||||
saveProxyToDb(param);
|
addProxyToDb(param);
|
||||||
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
|
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
|
||||||
mediaInfo, param.getApp(), param.getStream(), null, null);
|
mediaInfo, param.getApp(), param.getStream(), null, null);
|
||||||
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
|
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
startProxy(param, mediaInfo, (code, msg, data) -> {
|
||||||
|
callback.run(code, msg, data);
|
||||||
|
if (code == ErrorCode.SUCCESS.getCode()) {
|
||||||
|
param.setStatus(true);
|
||||||
|
addProxyToDb(param);
|
||||||
|
} else {
|
||||||
|
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
String talkKey = UUID.randomUUID().toString();
|
String talkKey = UUID.randomUUID().toString();
|
||||||
String delayTalkKey = UUID.randomUUID().toString();
|
String delayTalkKey = UUID.randomUUID().toString();
|
||||||
|
|
||||||
|
@ -289,7 +263,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> {
|
hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> {
|
||||||
dynamicTask.stop(talkKey);
|
dynamicTask.stop(talkKey);
|
||||||
param.setStatus(true);
|
param.setStatus(true);
|
||||||
saveProxyToDb(param);
|
addProxyToDb(param);
|
||||||
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
|
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
|
||||||
mediaInfo, param.getApp(), param.getStream(), null, null);
|
mediaInfo, param.getApp(), param.getStream(), null, null);
|
||||||
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
|
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
|
||||||
|
@ -304,7 +278,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
}
|
}
|
||||||
param.setProxyError("启用超时");
|
param.setProxyError("启用超时");
|
||||||
param.setStatus(false);
|
param.setStatus(false);
|
||||||
saveProxyToDb(param);
|
addProxyToDb(param);
|
||||||
}, 10000);
|
}, 10000);
|
||||||
JSONObject jsonObject = addStreamProxyToZlm(param);
|
JSONObject jsonObject = addStreamProxyToZlm(param);
|
||||||
if (jsonObject != null && jsonObject.getInteger("code") != 0) {
|
if (jsonObject != null && jsonObject.getInteger("code") != 0) {
|
||||||
|
@ -316,7 +290,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
}
|
}
|
||||||
param.setProxyError("启用失败: " + jsonObject.getString("msg"));
|
param.setProxyError("启用失败: " + jsonObject.getString("msg"));
|
||||||
param.setStatus(false);
|
param.setStatus(false);
|
||||||
saveProxyToDb(param);
|
addProxyToDb(param);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -390,7 +364,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
}
|
}
|
||||||
if (!param.isEnable()) {
|
if (!param.isEnable()) {
|
||||||
param.setStatus(false);
|
param.setStatus(false);
|
||||||
saveProxyToDb(param);
|
addProxyToDb(param);
|
||||||
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
|
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
|
||||||
mediaInfo, param.getApp(), param.getStream(), null, null);
|
mediaInfo, param.getApp(), param.getStream(), null, null);
|
||||||
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
|
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
|
||||||
|
@ -403,7 +377,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> {
|
hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> {
|
||||||
dynamicTask.stop(talkKey);
|
dynamicTask.stop(talkKey);
|
||||||
param.setStatus(true);
|
param.setStatus(true);
|
||||||
saveProxyToDb(param);
|
addProxyToDb(param);
|
||||||
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
|
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
|
||||||
mediaInfo, param.getApp(), param.getStream(), null, null);
|
mediaInfo, param.getApp(), param.getStream(), null, null);
|
||||||
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
|
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
|
||||||
|
@ -418,7 +392,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
}
|
}
|
||||||
param.setProxyError("启用超时");
|
param.setProxyError("启用超时");
|
||||||
param.setStatus(false);
|
param.setStatus(false);
|
||||||
saveProxyToDb(param);
|
addProxyToDb(param);
|
||||||
}, 10000);
|
}, 10000);
|
||||||
JSONObject jsonObject = addStreamProxyToZlm(param);
|
JSONObject jsonObject = addStreamProxyToZlm(param);
|
||||||
if (jsonObject != null && jsonObject.getInteger("code") != 0) {
|
if (jsonObject != null && jsonObject.getInteger("code") != 0) {
|
||||||
|
@ -430,11 +404,29 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
}
|
}
|
||||||
param.setProxyError("启用失败: " + jsonObject.getString("msg"));
|
param.setProxyError("启用失败: " + jsonObject.getString("msg"));
|
||||||
param.setStatus(false);
|
param.setStatus(false);
|
||||||
saveProxyToDb(param);
|
addProxyToDb(param);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void startProxy(StreamProxy streamProxy, MediaServerItem mediaInfo, GeneralCallback<StreamInfo> callback) {
|
public void startProxy(StreamProxy streamProxy, MediaServerItem mediaInfo, GeneralCallback<StreamInfo> callback) {
|
||||||
|
// 检测是否在线
|
||||||
|
boolean ready = mediaService.isReady(mediaInfo, streamProxy.getApp(), streamProxy.getStream());
|
||||||
|
if (ready) {
|
||||||
|
// 检查redis内容是否正确
|
||||||
|
String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_"
|
||||||
|
+ OriginType.PULL + "_" + streamProxy.getApp() + "_" + streamProxy.getStream() + "_"
|
||||||
|
+ mediaInfo.getId();
|
||||||
|
|
||||||
|
if (redisTemplate.opsForValue().get(key) == null) {
|
||||||
|
logger.info("[拉起代理] 发现redis的流信息不存在,但是流存在。关闭流");
|
||||||
|
mediaService.closeStream(mediaInfo, streamProxy.getApp(), streamProxy.getStream());
|
||||||
|
}else {
|
||||||
|
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
|
||||||
|
mediaInfo, streamProxy.getApp(), streamProxy.getStream(), null, null);
|
||||||
|
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
String talkKey = UUID.randomUUID().toString();
|
String talkKey = UUID.randomUUID().toString();
|
||||||
String delayTalkKey = UUID.randomUUID().toString();
|
String delayTalkKey = UUID.randomUUID().toString();
|
||||||
|
|
||||||
|
@ -442,7 +434,6 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> {
|
hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> {
|
||||||
dynamicTask.stop(talkKey);
|
dynamicTask.stop(talkKey);
|
||||||
streamProxy.setStatus(true);
|
streamProxy.setStatus(true);
|
||||||
saveProxyToDb(streamProxy);
|
|
||||||
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
|
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
|
||||||
mediaInfo, streamProxy.getApp(), streamProxy.getStream(), null, null);
|
mediaInfo, streamProxy.getApp(), streamProxy.getStream(), null, null);
|
||||||
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
|
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
|
||||||
|
@ -452,33 +443,60 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
|
hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
|
||||||
dynamicTask.stop(talkKey);
|
dynamicTask.stop(talkKey);
|
||||||
callback.run(ErrorCode.ERROR100.getCode(), "启用超时,请检查源地址是否可用", null);
|
callback.run(ErrorCode.ERROR100.getCode(), "启用超时,请检查源地址是否可用", null);
|
||||||
if (streamProxy.isEnableRemoveNoneReader()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
streamProxy.setProxyError("启用超时");
|
streamProxy.setProxyError("启用超时");
|
||||||
streamProxy.setStatus(false);
|
|
||||||
saveProxyToDb(streamProxy);
|
|
||||||
}, 10000);
|
}, 10000);
|
||||||
JSONObject jsonObject = addStreamProxyToZlm(streamProxy);
|
JSONObject result;
|
||||||
if (jsonObject != null && jsonObject.getInteger("code") != 0) {
|
if ("ffmpeg".equalsIgnoreCase(streamProxy.getType())){
|
||||||
|
result = zlmresTfulUtils.addFFmpegSource(mediaInfo, streamProxy.getSrcUrl().trim(), streamProxy.getDstUrl(),
|
||||||
|
streamProxy.getTimeoutMs() + "", streamProxy.isEnableAudio(), streamProxy.isEnableMp4(),
|
||||||
|
streamProxy.getFfmpegCmdKey());
|
||||||
|
}else {
|
||||||
|
result = zlmresTfulUtils.addStreamProxy(mediaInfo, streamProxy.getApp(), streamProxy.getStream(), streamProxy.getUrl().trim(),
|
||||||
|
streamProxy.isEnableAudio(), streamProxy.isEnableMp4(), streamProxy.getRtpType());
|
||||||
|
}
|
||||||
|
if (result == null) {
|
||||||
|
callback.run(ErrorCode.ERROR100.getCode(), "接口调用失败", null);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (result.getInteger("code") != 0) {
|
||||||
hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
|
hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
|
||||||
dynamicTask.stop(talkKey);
|
dynamicTask.stop(talkKey);
|
||||||
callback.run(ErrorCode.ERROR100.getCode(), jsonObject.getString("msg"), null);
|
callback.run(result.getInteger("code"), result.getString("msg"), null);
|
||||||
if (streamProxy.isEnableRemoveNoneReader()) {
|
}else {
|
||||||
|
JSONObject data = result.getJSONObject("data");
|
||||||
|
if (data == null) {
|
||||||
|
logger.warn("[获取拉流代理的结果数据Data] 失败: {}", result );
|
||||||
|
callback.run(result.getInteger("code"), result.getString("msg"), null);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
streamProxy.setProxyError("启用失败: " + jsonObject.getString("msg"));
|
String key = data.getString("key");
|
||||||
streamProxy.setStatus(false);
|
if (key == null) {
|
||||||
saveProxyToDb(streamProxy);
|
logger.warn("[获取拉流代理的结果数据Data中的KEY] 失败: {}", result );
|
||||||
|
callback.run(ErrorCode.ERROR100.getCode(), "获取代理流结果中的KEY失败", null);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
streamProxy.setStreamKey(key);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void stopProxy(StreamProxy streamProxy, GeneralCallback<StreamInfo> callback) {
|
public void stopProxy(StreamProxy streamProxy, MediaServerItem mediaInfo, GeneralCallback<StreamInfo> callback) {
|
||||||
|
boolean ready = mediaService.isReady(mediaInfo, streamProxy.getApp(), streamProxy.getStream());
|
||||||
|
if (ready) {
|
||||||
|
mediaService.closeStream(mediaInfo, streamProxy.getApp(), streamProxy.getStream());
|
||||||
|
}
|
||||||
|
// 检查redis内容是否正确
|
||||||
|
String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_"
|
||||||
|
+ OriginType.PULL + "_" + streamProxy.getApp() + "_" + streamProxy.getStream() + "_"
|
||||||
|
+ mediaInfo.getId();
|
||||||
|
|
||||||
|
if (redisTemplate.opsForValue().get(key) == null) {
|
||||||
|
redisTemplate.delete(key);
|
||||||
|
}
|
||||||
|
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private void saveProxyToDb(StreamProxy param) {
|
private void addProxyToDb(StreamProxy param) {
|
||||||
// 未启用的数据可以直接保存了
|
// 未启用的数据可以直接保存了
|
||||||
if (!ObjectUtils.isEmpty(param.getGbId())) {
|
if (!ObjectUtils.isEmpty(param.getGbId())) {
|
||||||
CommonGbChannel commonGbChannel = CommonGbChannel.getInstance(param);
|
CommonGbChannel commonGbChannel = CommonGbChannel.getInstance(param);
|
||||||
|
@ -496,6 +514,39 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Transactional
|
||||||
|
public void updateProxyToDb(StreamProxy param) {
|
||||||
|
if (param.getId() <= 0) {
|
||||||
|
logger.error("[更新代理存储到数据库] 错误, 缺少ID");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
StreamProxy streamProxyInDb = streamProxyMapper.selectOneById(param.getId());
|
||||||
|
if (streamProxyInDb == null) {
|
||||||
|
logger.error("[更新代理存储到数据库] 错误,ID: {} 不在数据库中", param.getId());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (!ObjectUtils.isEmpty(streamProxyInDb.getGbId().trim()) && ObjectUtils.isEmpty(param.getGbId().trim())) {
|
||||||
|
// 国标ID已经移除
|
||||||
|
if (streamProxyInDb.getCommonGbChannelId() > 0) {
|
||||||
|
commonGbChannelService.deleteById(streamProxyInDb.getCommonGbChannelId());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!ObjectUtils.isEmpty(param.getGbId().trim()) && ObjectUtils.isEmpty(streamProxyInDb.getGbId().trim())) {
|
||||||
|
CommonGbChannel commonGbChannel = CommonGbChannel.getInstance(param);
|
||||||
|
// 国标ID已经添加
|
||||||
|
if (commonGbChannelService.add(commonGbChannel) > 0) {
|
||||||
|
param.setCommonGbChannelId(commonGbChannel.getCommonGbId());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
param.setUpdateTime(DateUtil.getNow());
|
||||||
|
param.setStatus(streamProxyInDb.isStatus());
|
||||||
|
int addStreamProxyResult = streamProxyMapper.add(param);
|
||||||
|
if (addStreamProxyResult <= 0) {
|
||||||
|
throw new ControllerException(ErrorCode.ERROR100.getCode(), "添加拉流代理失败");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
private String getSchemaFromFFmpegCmd(String ffmpegCmd) {
|
private String getSchemaFromFFmpegCmd(String ffmpegCmd) {
|
||||||
ffmpegCmd = ffmpegCmd.replaceAll(" + ", " ");
|
ffmpegCmd = ffmpegCmd.replaceAll(" + ", " ");
|
||||||
String[] paramArray = ffmpegCmd.split(" ");
|
String[] paramArray = ffmpegCmd.split(" ");
|
||||||
|
@ -776,7 +827,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
MediaServerItem mediaServer = mediaServerService.getOne(mediaServerId);
|
MediaServerItem mediaServer = mediaServerService.getOne(mediaServerId);
|
||||||
if (mediaServer != null) {
|
if (mediaServer != null) {
|
||||||
List<OnStreamChangedHookParam> allPullStream = redisCatchStorage.getStreams(mediaServerId, "PULL");
|
List<OnStreamChangedHookParam> allPullStream = redisCatchStorage.getStreams(mediaServerId, "PULL");
|
||||||
if (allPullStream.size() > 0) {
|
if (!allPullStream.isEmpty()) {
|
||||||
zlmresTfulUtils.getMediaList(mediaServer, jsonObject->{
|
zlmresTfulUtils.getMediaList(mediaServer, jsonObject->{
|
||||||
Map<String, StreamInfo> stringStreamInfoMap = new HashMap<>();
|
Map<String, StreamInfo> stringStreamInfoMap = new HashMap<>();
|
||||||
if (jsonObject.getInteger("code") == 0) {
|
if (jsonObject.getInteger("code") == 0) {
|
||||||
|
|
Loading…
Reference in New Issue