支持拉流代理检索,播放

结构优化
648540858 2024-01-14 00:01:34 +08:00
parent 8805966f89
commit be011a7c85
5 changed files with 168 additions and 80 deletions

View File

@ -26,7 +26,7 @@ public interface IStreamProxyService {
* @param count
* @return
*/
PageInfo<StreamProxy> getAll(Integer page, Integer count);
PageInfo<StreamProxy> getAll(String query, Boolean online, String mediaServerId, Integer page, Integer count);
/**
@ -109,4 +109,10 @@ public interface IStreamProxyService {
*
*/
void edit(StreamProxy param, GeneralCallback<StreamInfo> callback);
/**
*
*/
void getStreamProxyById(Integer id, GeneralCallback<StreamInfo> callback);
}

View File

@ -34,9 +34,11 @@ import com.github.pagehelper.PageInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.util.CollectionUtils;
@ -97,6 +99,10 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
@Autowired
TransactionDefinition transactionDefinition;
@Qualifier("taskExecutor")
@Autowired
private ThreadPoolTaskExecutor taskExecutor;
@Override
@Transactional
@ -164,16 +170,22 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> {
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
mediaInfo, param.getApp(), param.getStream(), null, null);
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
if (callback != null) {
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
}
});
if (param.isEnable()) {
startProxy(param, mediaInfo, (code, msg, data) -> {
if (code != ErrorCode.SUCCESS.getCode()) {
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), data);
if (callback != null) {
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), data);
}
param.setStatus(true);
streamProxyMapper.update(param);
}else {
callback.run(code, msg, null);
if (callback != null) {
callback.run(code, msg, null);
}
param.setEnable(false);
// 直接移除
if (param.isEnableRemoveNoneReader()) {
@ -186,7 +198,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
} else{
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
mediaInfo, param.getApp(), param.getStream(), null, null);
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
if (callback != null) {
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
}
}
}
@ -211,24 +225,24 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
}
if (!param.isEnable()) {
param.setStatus(false);
addProxyToDb(param);
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
mediaInfo, param.getApp(), param.getStream(), null, null);
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
return;
}
startProxy(param, mediaInfo, (code, msg, data) -> {
callback.run(code, msg, data);
if (code == ErrorCode.SUCCESS.getCode()) {
param.setStatus(true);
} else {
if (param.isEnableRemoveNoneReader()) {
return;
addProxyToDb(param);
if (callback != null) {
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), null);
}
taskExecutor.execute(()->{
startProxy(param, mediaInfo, (code, msg, data) -> {
if (code == ErrorCode.SUCCESS.getCode()) {
param.setStatus(true);
} else {
if (param.isEnableRemoveNoneReader()) {
return;
}
param.setProxyError(msg);
param.setStatus(false);
}
param.setProxyError(msg);
param.setStatus(false);
}
addProxyToDb(param);
addProxyToDb(param);
});
});
}
@ -251,52 +265,54 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
}
proxyParamHandler(param);
param.setMediaServerId(mediaInfo.getId());
// 国标编号发生变化,修改通用通道国标变化,通用通道应发送删除再发送添加命令通知上级
// 类型变化,启用->启用:需要重新拉起视频流, 启用->未启用: 停止旧的视频流, 未启用->启用:发起新的视频流
// 流地址发生变化。停止旧的,拉起新的
// ffmpeg类型下目标流地址变化停止旧的拉起新的
// 节点变化: 停止旧的,拉起新的
// ffmpeg命令模板变化 停止旧的,拉起新的
boolean stopOldProxy = !streamProxyInDb.getType().equals(param.getType())
|| !streamProxyInDb.getUrl().equals(param.getUrl())
|| !streamProxyInDb.getMediaServerId().equals(param.getMediaServerId())
|| (streamProxyInDb.isEnable() && !param.isEnable())
|| (streamProxyInDb.getType().equals("ffmpeg") && (
streamProxyInDb.getDstUrl().equals(param.getDstUrl())
|| streamProxyInDb.getFfmpegCmdKey().equals(param.getFfmpegCmdKey())
));
updateProxyToDb(param);
if (callback != null) {
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), null);
}
taskExecutor.execute(()->{
// 国标编号发生变化,修改通用通道国标变化,通用通道应发送删除再发送添加命令通知上级
// 类型变化,启用->启用:需要重新拉起视频流, 启用->未启用: 停止旧的视频流, 未启用->启用:发起新的视频流
// 流地址发生变化。停止旧的,拉起新的
// ffmpeg类型下目标流地址变化停止旧的拉起新的
// 节点变化: 停止旧的,拉起新的
// ffmpeg命令模板变化 停止旧的,拉起新的
boolean stopOldProxy = !streamProxyInDb.getType().equals(param.getType())
|| !streamProxyInDb.getUrl().equals(param.getUrl())
|| !streamProxyInDb.getMediaServerId().equals(param.getMediaServerId())
|| (streamProxyInDb.isEnable() && !param.isEnable())
|| (streamProxyInDb.getType().equals("ffmpeg") && (
streamProxyInDb.getDstUrl().equals(param.getDstUrl())
|| streamProxyInDb.getFfmpegCmdKey().equals(param.getFfmpegCmdKey())
));
// 如果是开启代理这是开启代理结束后的回调
final GeneralCallback<StreamInfo> startProxyCallback = (code, msg, data) -> {
if (code == ErrorCode.SUCCESS.getCode()) {
param.setStatus(true);
} else {
param.setStatus(false);
if (param.isEnableRemoveNoneReader()) {
return;
// 如果是开启代理这是开启代理结束后的回调
final GeneralCallback<StreamInfo> startProxyCallback = (code, msg, data) -> {
if (code == ErrorCode.SUCCESS.getCode()) {
param.setStatus(true);
} else {
param.setStatus(false);
if (param.isEnableRemoveNoneReader()) {
return;
}
param.setProxyError(msg);
}
param.setProxyError(msg);
}
updateProxyToDb(param);
callback.run(code, msg, null);
};
if(stopOldProxy) {
stopProxy(param, mediaInfo, (code, msg, data) -> {
updateProxyToDb(param);
};
if(stopOldProxy) {
stopProxy(param, mediaInfo, (code, msg, data) -> {
if (param.isEnable()) {
startProxy(param, mediaInfo, startProxyCallback);
}
});
}else {
if (param.isEnable()) {
startProxy(param, mediaInfo, startProxyCallback);
}else {
callback.run(code, msg, null);
param.setStatus(false);
updateProxyToDb(param);
}
});
}else {
if (param.isEnable()) {
startProxy(param, mediaInfo, startProxyCallback);
}else {
param.setStatus(false);
updateProxyToDb(param);
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), null);
}
}
});
}
public void startProxy(StreamProxy streamProxy, MediaServerItem mediaInfo, GeneralCallback<StreamInfo> callback) {
@ -316,7 +332,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
mediaInfo, streamProxy.getApp(), streamProxy.getStream(), null, null);
logger.info("[开始拉流代理] 已拉起,直接返回 {}/{}", streamProxy.getApp(), streamProxy.getStream());
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
if (callback != null) {
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
}
}
return;
}
@ -329,13 +347,17 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
mediaInfo, streamProxy.getApp(), streamProxy.getStream(), null, null);
logger.info("[开始拉流代理] 成功: {}/{}", streamProxy.getApp(), streamProxy.getStream());
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
if (callback != null) {
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
}
});
dynamicTask.startDelay(delayTalkKey, ()->{
hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
dynamicTask.stop(delayTalkKey);
callback.run(ErrorCode.ERROR100.getCode(), "启用超时,请检查源地址是否可用", null);
if (callback != null) {
callback.run(ErrorCode.ERROR100.getCode(), "启用超时,请检查源地址是否可用", null);
}
streamProxy.setProxyError("启用超时");
}, 10000);
JSONObject result;
@ -348,24 +370,32 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
streamProxy.isEnableAudio(), streamProxy.isEnableMp4(), streamProxy.getRtpType());
}
if (result == null) {
callback.run(ErrorCode.ERROR100.getCode(), "接口调用失败", null);
if (callback != null) {
callback.run(ErrorCode.ERROR100.getCode(), "接口调用失败", null);
}
return;
}
if (result.getInteger("code") != 0) {
hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
dynamicTask.stop(delayTalkKey);
callback.run(result.getInteger("code"), result.getString("msg"), null);
if (callback != null) {
callback.run(result.getInteger("code"), result.getString("msg"), null);
}
}else {
JSONObject data = result.getJSONObject("data");
if (data == null) {
logger.warn("[获取拉流代理的结果数据Data] 失败: {}", result );
callback.run(result.getInteger("code"), result.getString("msg"), null);
if (callback != null) {
callback.run(result.getInteger("code"), result.getString("msg"), null);
}
return;
}
String key = data.getString("key");
if (key == null) {
logger.warn("[获取拉流代理的结果数据Data中的KEY] 失败: {}", result );
callback.run(ErrorCode.ERROR100.getCode(), "获取代理流结果中的KEY失败", null);
if (callback != null) {
callback.run(ErrorCode.ERROR100.getCode(), "获取代理流结果中的KEY失败", null);
}
return;
}
streamProxy.setStreamKey(key);
@ -392,7 +422,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
redisTemplate.delete(key);
}
logger.info("[停止拉流代理] 成功 {}/{}", streamProxy.getApp(), streamProxy.getStream());
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), null);
if (callback != null) {
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), null);
}
}
public void proxyParamHandler(StreamProxy param) {
@ -497,9 +529,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
}
@Override
public PageInfo<StreamProxy> getAll(Integer page, Integer count) {
public PageInfo<StreamProxy> getAll(String query, Boolean online, String mediaServerId, Integer page, Integer count) {
PageHelper.startPage(page, count);
List<StreamProxy> all = streamProxyMapper.selectAll();
List<StreamProxy> all = streamProxyMapper.selectAll(query, online, mediaServerId);
return new PageInfo<>(all);
}
@ -546,13 +578,17 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
public void start(String app, String stream, GeneralCallback<StreamInfo> callback) {
StreamProxy streamProxy = streamProxyMapper.selectOne(app, stream);
if (streamProxy == null || !streamProxy.isEnable()){
callback.run(ErrorCode.ERROR100.getCode(), "代理不存在或未启用", null);
if (callback != null) {
callback.run(ErrorCode.ERROR100.getCode(), "代理不存在或未启用", null);
}
return;
}
String mediaServerId = streamProxy.getMediaServerId();
MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
if (mediaServerItem == null) {
callback.run(ErrorCode.ERROR100.getCode(), "使用的媒体节点不存在", null);
if (callback != null) {
callback.run(ErrorCode.ERROR100.getCode(), "使用的媒体节点不存在", null);
}
return;
}
startProxy(streamProxy, mediaServerItem, (code, msg, data) -> {
@ -563,7 +599,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
}
streamProxy.setUpdateTime(DateUtil.getNow());
updateProxyToDb(streamProxy);
callback.run(code, msg, data);
if (callback != null) {
callback.run(code, msg, data);
}
});
}
@ -572,20 +610,26 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
public void stop(String app, String stream, GeneralCallback<StreamInfo> callback) {
StreamProxy streamProxy = streamProxyMapper.selectOne(app, stream);
if (streamProxy == null || !streamProxy.isEnable()){
callback.run(ErrorCode.ERROR100.getCode(), "代理不存在或未启用", null);
if (callback != null) {
callback.run(ErrorCode.ERROR100.getCode(), "代理不存在或未启用", null);
}
return;
}
String mediaServerId = streamProxy.getMediaServerId();
MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
if (mediaServerItem == null) {
callback.run(ErrorCode.ERROR100.getCode(), "使用的媒体节点不存在", null);
if (callback != null) {
callback.run(ErrorCode.ERROR100.getCode(), "使用的媒体节点不存在", null);
}
return;
}
stopProxy(streamProxy, mediaServerItem, (code, msg, data) -> {
streamProxy.setStatus(false);
streamProxy.setUpdateTime(DateUtil.getNow());
updateProxyToDb(streamProxy);
callback.run(code, msg, data);
if (callback != null) {
callback.run(code, msg, data);
}
});
}
@ -820,4 +864,17 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
return streamProxyMapper.selectForEnable(true);
}
@Override
public void getStreamProxyById(Integer id, GeneralCallback<StreamInfo> callback) {
assert id != null;
StreamProxy streamProxy = streamProxyMapper.selectOneById(id);
assert streamProxy != null;
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(
streamProxy.getApp(), streamProxy.getStream(), streamProxy.getMediaServerId(), false);
if (streamInfo == null) {
callback.run(ErrorCode.ERROR100.getCode(), "地址获取失败", null);
}else {
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
}
}
}

View File

@ -107,7 +107,7 @@ public interface DeviceChannelMapper {
"wvp_device_channel dc " +
"WHERE " +
"dc.device_id = #{deviceId} " +
" <if test='query != null'> AND (dc.channel_id LIKE concat('%',#{query},'%') OR dc.name LIKE concat('%',#{query},'%') OR dc.name LIKE concat('%',#{query},'%'))</if> " +
" <if test='query != null'> AND (dc.channel_id LIKE concat('%',#{query},'%') OR dc.name LIKE concat('%',#{query},'%') OR dc.name LIKE concat('%',#{query},'%'))</if> " +
" <if test='parentChannelId != null'> AND (dc.parent_id=#{parentChannelId} OR dc.civil_code = #{parentChannelId}) </if> " +
" <if test='online == true' > AND dc.status= true</if>" +
" <if test='online == false' > AND dc.status= false</if>" +

View File

@ -47,8 +47,17 @@ public interface StreamProxyMapper {
@Delete("DELETE FROM wvp_stream_proxy WHERE app=#{app} AND stream=#{stream}")
int del(String app, String stream);
@Select("SELECT * FROM wvp_stream_proxy order by create_time desc")
List<StreamProxy> selectAll();
@Select(" <script>" +
"SELECT * FROM wvp_stream_proxy where 1 = 1 " +
" <if test='query != null'> AND (app LIKE '%${query}%' OR stream LIKE '%${query}%' OR name LIKE '%${query}%')</if> " +
" <if test='mediaServerId != null'> AND media_server_id=#{mediaServerId}</if> " +
" <if test='online == true' > AND status=true</if>" +
" <if test='online == false' > AND status=false</if>" +
"order by create_time desc"+
" </script>" )
List<StreamProxy> selectAll(@Param("query") String query,
@Param("online") Boolean online,
@Param("mediaServerId") String mediaServerId);
@Select("SELECT st.* FROM wvp_stream_proxy st WHERE st.enable=#{enable} order by st.create_time desc")
List<StreamProxy> selectForEnable(boolean enable);

View File

@ -56,14 +56,16 @@ public class StreamProxyController {
@Parameter(name = "count", description = "每页查询数量")
@Parameter(name = "query", description = "查询内容")
@Parameter(name = "online", description = "是否在线")
@Parameter(name = "mediaServerId", description = "节点ID")
@GetMapping(value = "/list")
@ResponseBody
public PageInfo<StreamProxy> list(@RequestParam(required = false)Integer page,
@RequestParam(required = false)Integer count,
@RequestParam(required = false)String query,
@RequestParam(required = false)String mediaServerId,
@RequestParam(required = false)Boolean online ){
return streamProxyService.getAll(page, count);
return streamProxyService.getAll(query, online, mediaServerId, page, count);
}
@Operation(summary = "查询流代理", security = @SecurityRequirement(name = JwtUtils.HEADER))
@ -277,4 +279,18 @@ public class StreamProxyController {
});
return result;
}
@GetMapping(value = "/stream")
@ResponseBody
@Operation(summary = "获取代理播放地址", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "id", description = "ID", required = true)
public DeferredResult<WVPResult<StreamContent>> getStream(Integer id){
logger.info("获取代理播放地址: " + id );
DeferredResult<WVPResult<StreamContent>> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
streamProxyService.getStreamProxyById(id, (code, msg, data) -> {
WVPResult<StreamContent> wvpResult = new WVPResult<>(code, msg, new StreamContent(data));
result.setResult(wvpResult);
});
return result;
}
}