[国标级联] 修复点播未开始的拉流代理通道时推流失败的BUG

2.7.3-20250312
lin 2025-03-05 16:04:05 +08:00
parent 85ecc4c201
commit e080534b15
1 changed files with 33 additions and 2 deletions

View File

@ -2,9 +2,14 @@ package com.genersoft.iot.vmp.streamProxy.service.impl;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
@ -23,6 +28,7 @@ import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import javax.sip.message.Response;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
/**
@ -39,6 +45,15 @@ public class StreamProxyPlayServiceImpl implements IStreamProxyPlayService {
@Autowired
private IMediaServerService mediaServerService;
@Autowired
private HookSubscribe subscribe;
@Autowired
private DynamicTask dynamicTask;
@Autowired
private UserSetting userSetting;
private ConcurrentHashMap<Integer, ErrorCallback<StreamInfo>> callbackMap = new ConcurrentHashMap<>();
private ConcurrentHashMap<Integer, StreamInfo> streamInfoMap = new ConcurrentHashMap<>();
@ -96,9 +111,25 @@ public class StreamProxyPlayServiceImpl implements IStreamProxyPlayService {
if (record != null) {
streamProxy.setEnableMp4(record);
}
StreamInfo streamInfo = startProxy(streamProxy);
if (streamInfo != null && callback != null) {
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
if (callback != null) {
// 设置流超时的定时任务
String timeOutTaskKey = UUID.randomUUID().toString();
Hook rtpHook = Hook.getInstance(HookType.on_media_arrival, streamProxy.getApp(), streamProxy.getStream(), streamInfo.getMediaServer().getId());
dynamicTask.startDelay(timeOutTaskKey, () -> {
// 收流超时
subscribe.removeSubscribe(rtpHook);
callback.run(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), streamInfo);
}, userSetting.getPlayTimeout());
// 开启流到来的监听
subscribe.addSubscribe(rtpHook, (hookData) -> {
dynamicTask.stop(timeOutTaskKey);
// hook响应
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
subscribe.removeSubscribe(rtpHook);
});
}
return streamInfo;
}