拉流代理优化未完成
parent
bc0ee2bf13
commit
d9d8aaca6e
|
@ -76,10 +76,8 @@ public class GB28181ResourceServiceImpl implements IResourceService {
|
||||||
MediaServerItem mediaServerItem = playService.getNewMediaServerItem(device);
|
MediaServerItem mediaServerItem = playService.getNewMediaServerItem(device);
|
||||||
playService.play(mediaServerItem, channel.getDeviceId(), channel.getChannelId(), null, (code, msg, data) -> {
|
playService.play(mediaServerItem, channel.getDeviceId(), channel.getChannelId(), null, (code, msg, data) -> {
|
||||||
if (code == InviteErrorCode.SUCCESS.getCode()) {
|
if (code == InviteErrorCode.SUCCESS.getCode()) {
|
||||||
if (data != null) {
|
StreamInfo streamInfo = (StreamInfo)data;
|
||||||
StreamInfo streamInfo = (StreamInfo)data;
|
callback.call(commonGbChannel, mediaServerItem, ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
|
||||||
callback.call(commonGbChannel, mediaServerItem, ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
|
|
||||||
}
|
|
||||||
}else {
|
}else {
|
||||||
callback.call(commonGbChannel, null, code, msg, null);
|
callback.call(commonGbChannel, null, code, msg, null);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,21 @@
|
||||||
|
package com.genersoft.iot.vmp.service;
|
||||||
|
|
||||||
|
import com.alibaba.fastjson2.JSONObject;
|
||||||
|
import com.genersoft.iot.vmp.common.GeneralCallback;
|
||||||
|
import com.genersoft.iot.vmp.common.StreamInfo;
|
||||||
|
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
|
||||||
|
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxy;
|
||||||
|
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
|
||||||
|
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
|
||||||
|
import com.github.pagehelper.PageInfo;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public interface IStreamProxyPlayService {
|
||||||
|
|
||||||
|
|
||||||
|
void startProxy(StreamProxy streamProxy, MediaServerItem mediaInfo, GeneralCallback<StreamInfo> callback);
|
||||||
|
|
||||||
|
|
||||||
|
void stopProxy(StreamProxy streamProxy, MediaServerItem mediaInfo, GeneralCallback<StreamInfo> callback);
|
||||||
|
}
|
|
@ -18,7 +18,6 @@ public interface IStreamProxyService {
|
||||||
*/
|
*/
|
||||||
void save(StreamProxy param, GeneralCallback<StreamInfo> callback);
|
void save(StreamProxy param, GeneralCallback<StreamInfo> callback);
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 分页查询
|
* 分页查询
|
||||||
*/
|
*/
|
||||||
|
@ -97,4 +96,9 @@ public interface IStreamProxyService {
|
||||||
* 播放代理流
|
* 播放代理流
|
||||||
*/
|
*/
|
||||||
void play(Integer id, GeneralCallback<StreamInfo> callback);
|
void play(Integer id, GeneralCallback<StreamInfo> callback);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 根据通用通道查询拉流代理
|
||||||
|
*/
|
||||||
|
StreamProxy getStreamProxyByCommonGbChannelId(int commonGbId);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,195 @@
|
||||||
|
package com.genersoft.iot.vmp.service.impl;
|
||||||
|
|
||||||
|
import com.alibaba.fastjson2.JSONObject;
|
||||||
|
import com.genersoft.iot.vmp.common.GeneralCallback;
|
||||||
|
import com.genersoft.iot.vmp.common.StreamInfo;
|
||||||
|
import com.genersoft.iot.vmp.common.VideoManagerConstants;
|
||||||
|
import com.genersoft.iot.vmp.conf.DynamicTask;
|
||||||
|
import com.genersoft.iot.vmp.conf.UserSetting;
|
||||||
|
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
|
||||||
|
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
|
||||||
|
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
|
||||||
|
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
|
||||||
|
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
|
||||||
|
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxy;
|
||||||
|
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
|
||||||
|
import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType;
|
||||||
|
import com.genersoft.iot.vmp.service.IMediaServerService;
|
||||||
|
import com.genersoft.iot.vmp.service.IMediaService;
|
||||||
|
import com.genersoft.iot.vmp.service.IStreamProxyPlayService;
|
||||||
|
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
||||||
|
import com.genersoft.iot.vmp.storager.dao.StreamProxyMapper;
|
||||||
|
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.data.redis.core.RedisTemplate;
|
||||||
|
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
import org.springframework.transaction.TransactionDefinition;
|
||||||
|
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
@Service
|
||||||
|
public class StreamProxyPlayServiceImpl implements IStreamProxyPlayService {
|
||||||
|
|
||||||
|
private final static Logger logger = LoggerFactory.getLogger(StreamProxyPlayServiceImpl.class);
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private RedisTemplate<Object, Object> redisTemplate;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private IMediaService mediaService;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private ZLMRESTfulUtils zlmresTfulUtils;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private UserSetting userSetting;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private ZlmHttpHookSubscribe hookSubscribe;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private DynamicTask dynamicTask;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
DataSourceTransactionManager dataSourceTransactionManager;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
TransactionDefinition transactionDefinition;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
IRedisCatchStorage redisCatchStorage;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
IMediaServerService mediaServerService;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
StreamProxyMapper streamProxyMapper;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void startProxy(StreamProxy streamProxy, MediaServerItem mediaInfo, GeneralCallback<StreamInfo> callback) {
|
||||||
|
logger.info("[开始拉流代理] {}/{}", streamProxy.getApp(), streamProxy.getStream());
|
||||||
|
|
||||||
|
OnStreamChangedHookParam streamChangedHookParam = redisCatchStorage.getProxyStreamInfo(streamProxy.getApp(), streamProxy.getStream(), null);
|
||||||
|
if (streamChangedHookParam != null) {
|
||||||
|
MediaServerItem serverItemInCatch = mediaServerService.getOne(streamChangedHookParam.getMediaServerId());
|
||||||
|
if (serverItemInCatch != null) {
|
||||||
|
// 检测是否在线
|
||||||
|
boolean ready = mediaService.isReady(serverItemInCatch, streamProxy.getApp(), streamProxy.getStream());
|
||||||
|
if (ready) {
|
||||||
|
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
|
||||||
|
mediaInfo, streamProxy.getApp(), streamProxy.getStream(), null, null);
|
||||||
|
logger.info("[开始拉流代理] 已拉起,直接返回 {}/{}", streamProxy.getApp(), streamProxy.getStream());
|
||||||
|
if (callback != null) {
|
||||||
|
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}else {
|
||||||
|
redisCatchStorage.removeStream(streamChangedHookParam.getMediaServerId(), "PULL", streamChangedHookParam.getApp(),
|
||||||
|
streamChangedHookParam.getStream());
|
||||||
|
}
|
||||||
|
}else {
|
||||||
|
redisCatchStorage.removeStream(streamChangedHookParam.getMediaServerId(), "PULL", streamChangedHookParam.getApp(),
|
||||||
|
streamChangedHookParam.getStream());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (streamProxy.getStreamKey() != null) {
|
||||||
|
zlmresTfulUtils.delStreamProxy(mediaInfo, streamProxy.getStreamKey());
|
||||||
|
}
|
||||||
|
|
||||||
|
String delayTalkKey = UUID.randomUUID().toString();
|
||||||
|
|
||||||
|
HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(streamProxy.getApp(), streamProxy.getStream(), true, "rtsp", mediaInfo.getId());
|
||||||
|
hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> {
|
||||||
|
dynamicTask.stop(delayTalkKey);
|
||||||
|
streamProxy.setPulling(true);
|
||||||
|
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
|
||||||
|
mediaInfo, streamProxy.getApp(), streamProxy.getStream(), null, null);
|
||||||
|
logger.info("[开始拉流代理] 成功: {}/{}", streamProxy.getApp(), streamProxy.getStream());
|
||||||
|
if (callback != null) {
|
||||||
|
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
|
||||||
|
}
|
||||||
|
streamProxyMapper.update(streamProxy);
|
||||||
|
});
|
||||||
|
|
||||||
|
dynamicTask.startDelay(delayTalkKey, ()->{
|
||||||
|
hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
|
||||||
|
dynamicTask.stop(delayTalkKey);
|
||||||
|
if (callback != null) {
|
||||||
|
callback.run(ErrorCode.ERROR100.getCode(), "启用超时,请检查源地址是否可用", null);
|
||||||
|
}
|
||||||
|
streamProxy.setProxyError("启用超时");
|
||||||
|
streamProxyMapper.update(streamProxy);
|
||||||
|
}, 10000);
|
||||||
|
JSONObject result;
|
||||||
|
if ("ffmpeg".equalsIgnoreCase(streamProxy.getType())){
|
||||||
|
result = zlmresTfulUtils.addFFmpegSource(mediaInfo, streamProxy.getUrl().trim(), streamProxy.getDstUrl(),
|
||||||
|
streamProxy.getTimeoutMs() + "", streamProxy.isEnableAudio(), streamProxy.isEnableMp4(),
|
||||||
|
streamProxy.getFfmpegCmdKey());
|
||||||
|
}else {
|
||||||
|
result = zlmresTfulUtils.addStreamProxy(mediaInfo, streamProxy.getApp(), streamProxy.getStream(), streamProxy.getUrl().trim(),
|
||||||
|
streamProxy.isEnableAudio(), streamProxy.isEnableMp4(), streamProxy.getRtpType());
|
||||||
|
}
|
||||||
|
if (result == null) {
|
||||||
|
if (callback != null) {
|
||||||
|
callback.run(ErrorCode.ERROR100.getCode(), "接口调用失败", null);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (result.getInteger("code") != 0) {
|
||||||
|
hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
|
||||||
|
dynamicTask.stop(delayTalkKey);
|
||||||
|
if (callback != null) {
|
||||||
|
callback.run(result.getInteger("code"), result.getString("msg"), null);
|
||||||
|
}
|
||||||
|
}else {
|
||||||
|
JSONObject data = result.getJSONObject("data");
|
||||||
|
if (data == null) {
|
||||||
|
logger.warn("[获取拉流代理的结果数据Data] 失败: {}", result );
|
||||||
|
if (callback != null) {
|
||||||
|
callback.run(result.getInteger("code"), result.getString("msg"), null);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
String key = data.getString("key");
|
||||||
|
if (key == null) {
|
||||||
|
logger.warn("[获取拉流代理的结果数据Data中的KEY] 失败: {}", result );
|
||||||
|
if (callback != null) {
|
||||||
|
callback.run(ErrorCode.ERROR100.getCode(), "获取代理流结果中的KEY失败", null);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
streamProxy.setStreamKey(key);
|
||||||
|
streamProxyMapper.update(streamProxy);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stopProxy(StreamProxy streamProxy, MediaServerItem mediaInfo, GeneralCallback<StreamInfo> callback) {
|
||||||
|
logger.info("[停止拉流代理] {}/{}", streamProxy.getApp(), streamProxy.getStream());
|
||||||
|
boolean ready = mediaService.isReady(mediaInfo, streamProxy.getApp(), streamProxy.getStream());
|
||||||
|
if (ready) {
|
||||||
|
if ("ffmpeg".equalsIgnoreCase(streamProxy.getType())){
|
||||||
|
zlmresTfulUtils.delFFmpegSource(mediaInfo, streamProxy.getStreamKey());
|
||||||
|
}else {
|
||||||
|
zlmresTfulUtils.delStreamProxy(mediaInfo, streamProxy.getStreamKey());
|
||||||
|
}
|
||||||
|
mediaService.closeStream(mediaInfo, streamProxy.getApp(), streamProxy.getStream());
|
||||||
|
}
|
||||||
|
// 检查redis内容是否正确
|
||||||
|
String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_"
|
||||||
|
+ OriginType.PULL + "_" + streamProxy.getApp() + "_" + streamProxy.getStream() + "_"
|
||||||
|
+ mediaInfo.getId();
|
||||||
|
|
||||||
|
if (redisTemplate.opsForValue().get(key) == null) {
|
||||||
|
redisTemplate.delete(key);
|
||||||
|
}
|
||||||
|
logger.info("[停止拉流代理] 成功 {}/{}", streamProxy.getApp(), streamProxy.getStream());
|
||||||
|
if (callback != null) {
|
||||||
|
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,13 +1,35 @@
|
||||||
package com.genersoft.iot.vmp.service.impl;
|
package com.genersoft.iot.vmp.service.impl;
|
||||||
|
|
||||||
import com.genersoft.iot.vmp.common.CommonGbChannel;
|
import com.genersoft.iot.vmp.common.CommonGbChannel;
|
||||||
import com.genersoft.iot.vmp.service.IResourcePlayCallback;
|
import com.genersoft.iot.vmp.common.StreamInfo;
|
||||||
import com.genersoft.iot.vmp.service.IResourceService;
|
import com.genersoft.iot.vmp.gb28181.GB28181ResourceServiceImpl;
|
||||||
|
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
|
||||||
|
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxy;
|
||||||
|
import com.genersoft.iot.vmp.service.*;
|
||||||
import com.genersoft.iot.vmp.service.bean.CommonGbChannelType;
|
import com.genersoft.iot.vmp.service.bean.CommonGbChannelType;
|
||||||
|
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
|
||||||
|
import com.genersoft.iot.vmp.storager.dao.StreamProxyMapper;
|
||||||
|
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
import org.springframework.util.ObjectUtils;
|
||||||
|
|
||||||
@Service(CommonGbChannelType.PROXY)
|
@Service(CommonGbChannelType.PROXY)
|
||||||
public class StreamProxyResourceServiceImpl implements IResourceService {
|
public class StreamProxyResourceServiceImpl implements IResourceService {
|
||||||
|
|
||||||
|
private final Logger logger = LoggerFactory.getLogger(StreamProxyResourceServiceImpl.class);
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private StreamProxyMapper streamProxyMapper;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private IStreamProxyPlayService streamProxyPlayService;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private IMediaServerService mediaServerService;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean deleteChannel(CommonGbChannel commonGbChannel) {
|
public boolean deleteChannel(CommonGbChannel commonGbChannel) {
|
||||||
return false;
|
return false;
|
||||||
|
@ -15,7 +37,36 @@ public class StreamProxyResourceServiceImpl implements IResourceService {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void startPlay(CommonGbChannel commonGbChannel, IResourcePlayCallback callback) {
|
public void startPlay(CommonGbChannel commonGbChannel, IResourcePlayCallback callback) {
|
||||||
|
assert callback != null;
|
||||||
|
if (!CommonGbChannelType.PROXY.equals(commonGbChannel.getType())) {
|
||||||
|
logger.warn("[资源类-拉流代理] 收到播放通道: {} 时发现类型不为proxy", commonGbChannel.getCommonGbId());
|
||||||
|
callback.call(commonGbChannel, null, ErrorCode.ERROR100.getCode(), "数据关系错误", null);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
StreamProxy streamProxy = streamProxyMapper.selectOneByByCommonGbChannelId(commonGbChannel.getCommonGbId());
|
||||||
|
if (streamProxy == null) {
|
||||||
|
logger.warn("[资源类-拉流代理] 收到播放通道: {} 时未找到国标通道", commonGbChannel.getCommonGbId());
|
||||||
|
callback.call(commonGbChannel, null, ErrorCode.ERROR100.getCode(), "未找到通道", null);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
String mediaServerId = streamProxy.getMediaServerId();
|
||||||
|
MediaServerItem mediaServerItem;
|
||||||
|
if (ObjectUtils.isEmpty(mediaServerId) || mediaServerId.equals("auto")) {
|
||||||
|
mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null);
|
||||||
|
}else {
|
||||||
|
mediaServerItem = mediaServerService.getOne(mediaServerId);
|
||||||
|
}
|
||||||
|
if (mediaServerItem == null) {
|
||||||
|
callback.call(commonGbChannel, null, ErrorCode.ERROR100.getCode(), "未找到可用的节点", null);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
streamProxyPlayService.startProxy(streamProxy, mediaServerItem, (code, msg, data) -> {
|
||||||
|
if (code == InviteErrorCode.SUCCESS.getCode()) {
|
||||||
|
callback.call(commonGbChannel, mediaServerItem, ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), data);
|
||||||
|
}else {
|
||||||
|
callback.call(commonGbChannel, null, code, msg, null);
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -17,10 +17,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
|
||||||
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxy;
|
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxy;
|
||||||
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
|
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
|
||||||
import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType;
|
import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType;
|
||||||
import com.genersoft.iot.vmp.service.ICommonGbChannelService;
|
import com.genersoft.iot.vmp.service.*;
|
||||||
import com.genersoft.iot.vmp.service.IMediaServerService;
|
|
||||||
import com.genersoft.iot.vmp.service.IMediaService;
|
|
||||||
import com.genersoft.iot.vmp.service.IStreamProxyService;
|
|
||||||
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
|
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
|
||||||
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
||||||
import com.genersoft.iot.vmp.storager.dao.StreamProxyMapper;
|
import com.genersoft.iot.vmp.storager.dao.StreamProxyMapper;
|
||||||
|
@ -85,7 +82,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
private ZlmHttpHookSubscribe hookSubscribe;
|
private ZlmHttpHookSubscribe hookSubscribe;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private DynamicTask dynamicTask;
|
private IStreamProxyPlayService streamProxyPlayService;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
DataSourceTransactionManager dataSourceTransactionManager;
|
DataSourceTransactionManager dataSourceTransactionManager;
|
||||||
|
@ -169,7 +166,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
|
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
startProxy(param, mediaInfo, (code, msg, data) -> {
|
streamProxyPlayService.startProxy(param, mediaInfo, (code, msg, data) -> {
|
||||||
if (code != ErrorCode.SUCCESS.getCode()) {
|
if (code != ErrorCode.SUCCESS.getCode()) {
|
||||||
if (callback != null) {
|
if (callback != null) {
|
||||||
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), data);
|
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), data);
|
||||||
|
@ -257,127 +254,15 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
updateProxyToDb(param);
|
updateProxyToDb(param);
|
||||||
};
|
};
|
||||||
if(stopOldProxy) {
|
if(stopOldProxy) {
|
||||||
stopProxy(param, mediaInfo, (code, msg, data) -> {
|
streamProxyPlayService.stopProxy(param, mediaInfo, (code, msg, data) -> {
|
||||||
if (param.isPulling()) {
|
if (param.isPulling()) {
|
||||||
startProxy(param, mediaInfo, startProxyCallback);
|
streamProxyPlayService.startProxy(param, mediaInfo, callback);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public void startProxy(StreamProxy streamProxy, MediaServerItem mediaInfo, GeneralCallback<StreamInfo> callback) {
|
|
||||||
logger.info("[开始拉流代理] {}/{}", streamProxy.getApp(), streamProxy.getStream());
|
|
||||||
// 检测是否在线
|
|
||||||
boolean ready = mediaService.isReady(mediaInfo, streamProxy.getApp(), streamProxy.getStream());
|
|
||||||
if (ready) {
|
|
||||||
// 检查redis内容是否正确
|
|
||||||
String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_"
|
|
||||||
+ OriginType.PULL + "_" + streamProxy.getApp() + "_" + streamProxy.getStream() + "_"
|
|
||||||
+ mediaInfo.getId();
|
|
||||||
|
|
||||||
if (redisTemplate.opsForValue().get(key) == null) {
|
|
||||||
logger.info("[拉起代理] 发现redis的流信息不存在,但是流存在。关闭流");
|
|
||||||
mediaService.closeStream(mediaInfo, streamProxy.getApp(), streamProxy.getStream());
|
|
||||||
}else {
|
|
||||||
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
|
|
||||||
mediaInfo, streamProxy.getApp(), streamProxy.getStream(), null, null);
|
|
||||||
logger.info("[开始拉流代理] 已拉起,直接返回 {}/{}", streamProxy.getApp(), streamProxy.getStream());
|
|
||||||
if (callback != null) {
|
|
||||||
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
String delayTalkKey = UUID.randomUUID().toString();
|
|
||||||
|
|
||||||
HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(streamProxy.getApp(), streamProxy.getStream(), true, "rtsp", mediaInfo.getId());
|
|
||||||
hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> {
|
|
||||||
dynamicTask.stop(delayTalkKey);
|
|
||||||
streamProxy.setPulling(true);
|
|
||||||
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
|
|
||||||
mediaInfo, streamProxy.getApp(), streamProxy.getStream(), null, null);
|
|
||||||
logger.info("[开始拉流代理] 成功: {}/{}", streamProxy.getApp(), streamProxy.getStream());
|
|
||||||
if (callback != null) {
|
|
||||||
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
dynamicTask.startDelay(delayTalkKey, ()->{
|
|
||||||
hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
|
|
||||||
dynamicTask.stop(delayTalkKey);
|
|
||||||
if (callback != null) {
|
|
||||||
callback.run(ErrorCode.ERROR100.getCode(), "启用超时,请检查源地址是否可用", null);
|
|
||||||
}
|
|
||||||
streamProxy.setProxyError("启用超时");
|
|
||||||
}, 10000);
|
|
||||||
JSONObject result;
|
|
||||||
if ("ffmpeg".equalsIgnoreCase(streamProxy.getType())){
|
|
||||||
result = zlmresTfulUtils.addFFmpegSource(mediaInfo, streamProxy.getUrl().trim(), streamProxy.getDstUrl(),
|
|
||||||
streamProxy.getTimeoutMs() + "", streamProxy.isEnableAudio(), streamProxy.isEnableMp4(),
|
|
||||||
streamProxy.getFfmpegCmdKey());
|
|
||||||
}else {
|
|
||||||
result = zlmresTfulUtils.addStreamProxy(mediaInfo, streamProxy.getApp(), streamProxy.getStream(), streamProxy.getUrl().trim(),
|
|
||||||
streamProxy.isEnableAudio(), streamProxy.isEnableMp4(), streamProxy.getRtpType());
|
|
||||||
}
|
|
||||||
if (result == null) {
|
|
||||||
if (callback != null) {
|
|
||||||
callback.run(ErrorCode.ERROR100.getCode(), "接口调用失败", null);
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (result.getInteger("code") != 0) {
|
|
||||||
hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
|
|
||||||
dynamicTask.stop(delayTalkKey);
|
|
||||||
if (callback != null) {
|
|
||||||
callback.run(result.getInteger("code"), result.getString("msg"), null);
|
|
||||||
}
|
|
||||||
}else {
|
|
||||||
JSONObject data = result.getJSONObject("data");
|
|
||||||
if (data == null) {
|
|
||||||
logger.warn("[获取拉流代理的结果数据Data] 失败: {}", result );
|
|
||||||
if (callback != null) {
|
|
||||||
callback.run(result.getInteger("code"), result.getString("msg"), null);
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
String key = data.getString("key");
|
|
||||||
if (key == null) {
|
|
||||||
logger.warn("[获取拉流代理的结果数据Data中的KEY] 失败: {}", result );
|
|
||||||
if (callback != null) {
|
|
||||||
callback.run(ErrorCode.ERROR100.getCode(), "获取代理流结果中的KEY失败", null);
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
streamProxy.setStreamKey(key);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void stopProxy(StreamProxy streamProxy, MediaServerItem mediaInfo, GeneralCallback<StreamInfo> callback) {
|
|
||||||
logger.info("[停止拉流代理] {}/{}", streamProxy.getApp(), streamProxy.getStream());
|
|
||||||
boolean ready = mediaService.isReady(mediaInfo, streamProxy.getApp(), streamProxy.getStream());
|
|
||||||
if (ready) {
|
|
||||||
if ("ffmpeg".equalsIgnoreCase(streamProxy.getType())){
|
|
||||||
zlmresTfulUtils.delFFmpegSource(mediaInfo, streamProxy.getStreamKey());
|
|
||||||
}else {
|
|
||||||
zlmresTfulUtils.delStreamProxy(mediaInfo, streamProxy.getStreamKey());
|
|
||||||
}
|
|
||||||
mediaService.closeStream(mediaInfo, streamProxy.getApp(), streamProxy.getStream());
|
|
||||||
}
|
|
||||||
// 检查redis内容是否正确
|
|
||||||
String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_"
|
|
||||||
+ OriginType.PULL + "_" + streamProxy.getApp() + "_" + streamProxy.getStream() + "_"
|
|
||||||
+ mediaInfo.getId();
|
|
||||||
|
|
||||||
if (redisTemplate.opsForValue().get(key) == null) {
|
|
||||||
redisTemplate.delete(key);
|
|
||||||
}
|
|
||||||
logger.info("[停止拉流代理] 成功 {}/{}", streamProxy.getApp(), streamProxy.getStream());
|
|
||||||
if (callback != null) {
|
|
||||||
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), null);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void proxyParamHandler(StreamProxy param) {
|
public void proxyParamHandler(StreamProxy param) {
|
||||||
if ("ffmpeg".equalsIgnoreCase(param.getType())) {
|
if ("ffmpeg".equalsIgnoreCase(param.getType())) {
|
||||||
if (ObjectUtils.isEmpty(param.getDstUrl())) {
|
if (ObjectUtils.isEmpty(param.getDstUrl())) {
|
||||||
|
@ -515,7 +400,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
if (mediaServerItem != null) {
|
if (mediaServerItem != null) {
|
||||||
boolean ready = mediaService.isReady(mediaServerItem, streamProxy.getApp(), streamProxy.getStream());
|
boolean ready = mediaService.isReady(mediaServerItem, streamProxy.getApp(), streamProxy.getStream());
|
||||||
if (ready) {
|
if (ready) {
|
||||||
stopProxy(streamProxy, mediaServerItem, (code, msg, data) -> {
|
streamProxyPlayService.stopProxy(streamProxy, mediaServerItem, (code, msg, data) -> {
|
||||||
if (code == ErrorCode.SUCCESS.getCode()) {
|
if (code == ErrorCode.SUCCESS.getCode()) {
|
||||||
logger.info("[移除代理]: 代理: {}/{}, 从zlm移除成功", streamProxy.getApp(), streamProxy.getStream());
|
logger.info("[移除代理]: 代理: {}/{}, 从zlm移除成功", streamProxy.getApp(), streamProxy.getStream());
|
||||||
}else {
|
}else {
|
||||||
|
@ -546,7 +431,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
startProxy(streamProxy, mediaServerItem, (code, msg, data) -> {
|
streamProxyPlayService.startProxy(streamProxy, mediaServerItem, (code, msg, data) -> {
|
||||||
if (code == ErrorCode.SUCCESS.getCode()) {
|
if (code == ErrorCode.SUCCESS.getCode()) {
|
||||||
streamProxy.setPulling(true);
|
streamProxy.setPulling(true);
|
||||||
}else {
|
}else {
|
||||||
|
@ -578,7 +463,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
stopProxy(streamProxy, mediaServerItem, (code, msg, data) -> {
|
streamProxyPlayService.stopProxy(streamProxy, mediaServerItem, (code, msg, data) -> {
|
||||||
streamProxy.setPulling(false);
|
streamProxy.setPulling(false);
|
||||||
streamProxy.setUpdateTime(DateUtil.getNow());
|
streamProxy.setUpdateTime(DateUtil.getNow());
|
||||||
updateProxyToDb(streamProxy);
|
updateProxyToDb(streamProxy);
|
||||||
|
@ -818,7 +703,13 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||||
}
|
}
|
||||||
if (mediaServerItem == null && callback != null) {
|
if (mediaServerItem == null && callback != null) {
|
||||||
callback.run(ErrorCode.ERROR100.getCode(), "未找到可用的节点", null);
|
callback.run(ErrorCode.ERROR100.getCode(), "未找到可用的节点", null);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
startProxy(streamProxy, mediaServerItem, callback);
|
streamProxyPlayService.startProxy(streamProxy, mediaServerItem, callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public StreamProxy getStreamProxyByCommonGbChannelId(int commonGbId) {
|
||||||
|
return streamProxyMapper.selectOneByByCommonGbChannelId(commonGbId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -134,6 +134,8 @@ public interface IRedisCatchStorage {
|
||||||
|
|
||||||
OnStreamChangedHookParam getStreamInfo(String app, String streamId, String mediaServerId);
|
OnStreamChangedHookParam getStreamInfo(String app, String streamId, String mediaServerId);
|
||||||
|
|
||||||
|
OnStreamChangedHookParam getProxyStreamInfo(String app, String streamId, String mediaServerId);
|
||||||
|
|
||||||
void addCpuInfo(double cpuInfo);
|
void addCpuInfo(double cpuInfo);
|
||||||
|
|
||||||
void addMemInfo(double memInfo);
|
void addMemInfo(double memInfo);
|
||||||
|
|
|
@ -104,4 +104,6 @@ public interface StreamProxyMapper {
|
||||||
@Delete("delete from wvp_stream_proxy WHERE id=#{id}")
|
@Delete("delete from wvp_stream_proxy WHERE id=#{id}")
|
||||||
void delById(int id);
|
void delById(int id);
|
||||||
|
|
||||||
|
@Select("SELECT * from wvp_stream_proxy WHERE common_gb_channel_id=#{commonGbId}")
|
||||||
|
StreamProxy selectOneByByCommonGbChannelId(@Param("commonGbId") int commonGbId);
|
||||||
}
|
}
|
||||||
|
|
|
@ -477,6 +477,20 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public OnStreamChangedHookParam getProxyStreamInfo(String app, String streamId, String mediaServerId) {
|
||||||
|
String scanKey = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_PULL_" + app + "_" + streamId + "_" + mediaServerId;
|
||||||
|
|
||||||
|
OnStreamChangedHookParam result = null;
|
||||||
|
List<Object> keys = RedisUtil.scan(redisTemplate, scanKey);
|
||||||
|
if (keys.size() > 0) {
|
||||||
|
String key = (String) keys.get(0);
|
||||||
|
result = JsonUtil.redisJsonToObject(redisTemplate, key, OnStreamChangedHookParam.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void addCpuInfo(double cpuInfo) {
|
public void addCpuInfo(double cpuInfo) {
|
||||||
String key = VideoManagerConstants.SYSTEM_INFO_CPU_PREFIX + userSetting.getServerId();
|
String key = VideoManagerConstants.SYSTEM_INFO_CPU_PREFIX + userSetting.getServerId();
|
||||||
|
|
Loading…
Reference in New Issue