diff --git a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyPlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyPlayServiceImpl.java index d31fca5e..f4ea79d0 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyPlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyPlayServiceImpl.java @@ -2,9 +2,14 @@ package com.genersoft.iot.vmp.streamProxy.service.impl; import com.baomidou.dynamic.datasource.annotation.DS; import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.conf.DynamicTask; +import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.bean.MediaServer; +import com.genersoft.iot.vmp.media.event.hook.Hook; +import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; +import com.genersoft.iot.vmp.media.event.hook.HookType; import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.ErrorCallback; @@ -23,6 +28,7 @@ import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; import javax.sip.message.Response; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; /** @@ -39,6 +45,15 @@ public class StreamProxyPlayServiceImpl implements IStreamProxyPlayService { @Autowired private IMediaServerService mediaServerService; + @Autowired + private HookSubscribe subscribe; + + @Autowired + private DynamicTask dynamicTask; + + @Autowired + private UserSetting userSetting; + private ConcurrentHashMap> callbackMap = new ConcurrentHashMap<>(); private ConcurrentHashMap streamInfoMap = new ConcurrentHashMap<>(); @@ -96,9 +111,25 @@ public class StreamProxyPlayServiceImpl implements IStreamProxyPlayService { if (record != null) { streamProxy.setEnableMp4(record); } + StreamInfo streamInfo = startProxy(streamProxy); - if (streamInfo != null && callback != null) { - callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); + if (callback != null) { + // 设置流超时的定时任务 + String timeOutTaskKey = UUID.randomUUID().toString(); + Hook rtpHook = Hook.getInstance(HookType.on_media_arrival, streamProxy.getApp(), streamProxy.getStream(), streamInfo.getMediaServer().getId()); + dynamicTask.startDelay(timeOutTaskKey, () -> { + // 收流超时 + subscribe.removeSubscribe(rtpHook); + callback.run(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), streamInfo); + }, userSetting.getPlayTimeout()); + + // 开启流到来的监听 + subscribe.addSubscribe(rtpHook, (hookData) -> { + dynamicTask.stop(timeOutTaskKey); + // hook响应 + callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); + subscribe.removeSubscribe(rtpHook); + }); } return streamInfo; }