From 74431b1e983b2f9c22d31eef596b2412f8f81641 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Fri, 2 Jun 2023 15:46:07 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E4=BC=98=E5=8C=96=E7=82=B9=E6=92=AD?= =?UTF-8?q?=E6=B5=81=E7=A8=8B=E4=B8=ADssrc=E7=9A=84=E9=87=8A=E6=94=BE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/gb28181/session/SSRCFactory.java | 24 +++------ .../iot/vmp/service/impl/PlayServiceImpl.java | 52 ++++++------------- 2 files changed, 24 insertions(+), 52 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/SSRCFactory.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/SSRCFactory.java index ec8e0ba6..657bb2fa 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/SSRCFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/SSRCFactory.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.gb28181.session; import com.genersoft.iot.vmp.conf.SipConfig; +import com.genersoft.iot.vmp.conf.UserSetting; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; @@ -31,10 +32,13 @@ public class SSRCFactory { @Autowired private SipConfig sipConfig; + @Autowired + private UserSetting userSetting; + public void initMediaServerSSRC(String mediaServerId, Set usedSet) { String ssrcPrefix = sipConfig.getDomain().substring(3, 8); - String redisKey = SSRC_INFO_KEY + mediaServerId; + String redisKey = SSRC_INFO_KEY + userSetting.getServerId() + "_" + mediaServerId; List ssrcList = new ArrayList<>(); for (int i = 1; i < MAX_STREAM_COUNT; i++) { String ssrc = String.format("%s%04d", ssrcPrefix, i); @@ -77,7 +81,7 @@ public class SSRCFactory { return; } String sn = ssrc.substring(1); - String redisKey = SSRC_INFO_KEY + mediaServerId; + String redisKey = SSRC_INFO_KEY + userSetting.getServerId() + "_" + mediaServerId; redisTemplate.opsForSet().add(redisKey, sn); } @@ -86,7 +90,7 @@ public class SSRCFactory { */ private String getSN(String mediaServerId) { String sn = null; - String redisKey = SSRC_INFO_KEY + mediaServerId; + String redisKey = SSRC_INFO_KEY + userSetting.getServerId() + "_" + mediaServerId; Long size = redisTemplate.opsForSet().size(redisKey); if (size == null || size == 0) { throw new RuntimeException("ssrc已经用完"); @@ -113,20 +117,8 @@ public class SSRCFactory { * @param mediaServerId 流媒体服务ID */ public boolean hasMediaServerSSRC(String mediaServerId) { - String redisKey = SSRC_INFO_KEY + mediaServerId; + String redisKey = SSRC_INFO_KEY + userSetting.getServerId() + "_" + mediaServerId; return redisTemplate.opsForSet().members(redisKey) != null; } - /** - * 查询ssrc是否可用 - * - * @param mediaServerId - * @param ssrc - * @return - */ - public boolean checkSsrc(String mediaServerId, String ssrc) { - String sn = ssrc.substring(1); - String redisKey = SSRC_INFO_KEY + mediaServerId; - return redisTemplate.opsForSet().isMember(redisKey, sn) != null; - } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 51574392..2fbe6bbc 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -228,7 +228,14 @@ public class PlayServiceImpl implements IPlayService { ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, InviteTimeOutCallback timeoutCallback) { - logger.info("[点播开始] deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck()); + logger.info("\r\n" + + "[点播开始] \r\n" + + "deviceId : {}, \r\n" + + "channelId : {},\r\n" + + "收流端口 :{}, \r\n" + + "收流模式 :{}, \r\n" + + "SSRC : {}, \r\n" + + "SSRC校验 :{} ", device.getDeviceId(), channelId, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck()); // 超时处理 String timeOutTaskKey = UUID.randomUUID().toString(); dynamicTask.startDelay(timeOutTaskKey, () -> { @@ -254,7 +261,7 @@ public class PlayServiceImpl implements IPlayService { }, userSetting.getPlayTimeout()); //端口获取失败的ssrcInfo 没有必要发送点播指令 if (ssrcInfo.getPort() <= 0) { - logger.info("[点播端口分配异常],deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channelId, ssrcInfo); + logger.info("[点播端口分配异常],deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channelId, JSON.toJSONString(ssrcInfo)); dynamicTask.stop(timeOutTaskKey); // 释放ssrc mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); @@ -327,17 +334,8 @@ public class PlayServiceImpl implements IPlayService { if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) { logger.info("[点播消息] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse); - if (!ssrcFactory.checkSsrc(mediaServerItem.getId(),ssrcInResponse)) { - // ssrc 不可用 - // 释放ssrc - ssrcFactory.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); - event.msg = "下级自定义了ssrc,但是此ssrc不可用"; - event.statusCode = 400; - errorEvent.response(event); - return; - } - + // 释放不被使用的ssrc + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); // 单端口模式streamId也有变化,需要重新设置监听 if (!mediaServerItem.isRtpEnable()) { // 添加订阅 @@ -352,6 +350,7 @@ public class PlayServiceImpl implements IPlayService { hookEvent.response(mediaServerItemInUse, response); }); } + // 关闭rtp server mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream(), result->{ if (result) { @@ -367,8 +366,6 @@ public class PlayServiceImpl implements IPlayService { } dynamicTask.stop(timeOutTaskKey); - // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); event.msg = "下级自定义了ssrc,重新设置收流信息失败"; @@ -590,17 +587,8 @@ public class PlayServiceImpl implements IPlayService { if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) { logger.info("[回放消息] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse); - if (!ssrcFactory.checkSsrc(mediaServerItem.getId(),ssrcInResponse)) { - // ssrc 不可用 - // 释放ssrc - dynamicTask.stop(playBackTimeOutTaskKey); - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); - eventResult.msg = "下级自定义了ssrc,但是此ssrc不可用"; - eventResult.statusCode = 400; - errorEvent.response(eventResult); - return; - } + // 释放不被使用的ssrc + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); // 单端口模式streamId也有变化,需要重新设置监听 if (!mediaServerItem.isRtpEnable()) { @@ -752,16 +740,8 @@ public class PlayServiceImpl implements IPlayService { if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) { logger.info("[录像下载] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse); - if (!ssrcFactory.checkSsrc(mediaServerItem.getId(),ssrcInResponse)) { - // ssrc 不可用 - // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); - eventResult.msg = "下级自定义了ssrc,但是此ssrc不可用"; - eventResult.statusCode = 400; - errorEvent.response(eventResult); - return; - } + // 释放不被使用的ssrc + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); // 单端口模式streamId也有变化,需要重新设置监听 if (!mediaServerItem.isRtpEnable()) { From 97b04c4dba581bc51791944a876979449855a648 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Wed, 7 Jun 2023 20:15:08 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=BD=95=E5=83=8F?= =?UTF-8?q?=E4=B8=8B=E8=BD=BD=EF=BC=8C=E4=BC=98=E5=8C=96=E6=8B=89=E6=B5=81?= =?UTF-8?q?=E4=BB=A3=E7=90=86=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/common/GeneralCallback.java | 5 +++ .../callback/DeferredResultHolder.java | 2 + .../transmit/cmd/impl/SIPCommander.java | 4 +- .../vmp/media/zlm/ZLMRTPServerFactory.java | 4 +- .../iot/vmp/service/IStreamProxyService.java | 3 +- .../service/impl/StreamProxyServiceImpl.java | 45 +++++++++++++------ .../streamProxy/StreamProxyController.java | 44 +++++++++++++++++- src/main/resources/application.yml | 2 +- 8 files changed, 89 insertions(+), 20 deletions(-) create mode 100644 src/main/java/com/genersoft/iot/vmp/common/GeneralCallback.java diff --git a/src/main/java/com/genersoft/iot/vmp/common/GeneralCallback.java b/src/main/java/com/genersoft/iot/vmp/common/GeneralCallback.java new file mode 100644 index 00000000..df07fac5 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/common/GeneralCallback.java @@ -0,0 +1,5 @@ +package com.genersoft.iot.vmp.common; + +public interface GeneralCallback{ + void run(int code, String msg, T data); +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java index a351445e..467481e7 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java @@ -39,6 +39,8 @@ public class DeferredResultHolder { public static final String CALLBACK_CMD_DOWNLOAD = "CALLBACK_DOWNLOAD"; + public static final String CALLBACK_CMD_PROXY = "CALLBACK_PROXY"; + public static final String CALLBACK_CMD_STOP = "CALLBACK_STOP"; public static final String UPLOAD_FILE_CHANNEL = "UPLOAD_FILE_CHANNEL"; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index 4f0dc11a..beec9e90 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -447,7 +447,7 @@ public class SIPCommander implements ISIPCommander { } subscribe.removeSubscribe(hookSubscribe); }); - Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()), ssrcInfo.getSsrc()); + Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()), ssrcInfo.getSsrc()); sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent, event -> { ResponseEvent responseEvent = (ResponseEvent) event.event; @@ -566,7 +566,7 @@ public class SIPCommander implements ISIPCommander { }); }); - Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, SipUtils.getNewFromTag(), null,newCallIdHeader, ssrcInfo.getSsrc()); + Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,newCallIdHeader, ssrcInfo.getSsrc()); if (inviteStreamCallback != null) { inviteStreamCallback.call(new InviteStreamInfo(mediaServerItem, null,callId, "rtp", ssrcInfo.getStream())); } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java index 99576c4a..68f1bee4 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java @@ -6,7 +6,9 @@ import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; -import com.genersoft.iot.vmp.media.zlm.dto.*; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java index de9613eb..576ad3c6 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.service; import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.common.GeneralCallback; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; @@ -13,7 +14,7 @@ public interface IStreamProxyService { * 保存视频代理 * @param param */ - StreamInfo save(StreamProxyItem param); + void save(StreamProxyItem param, GeneralCallback callback); /** * 添加视频代理到zlm diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java index 02f4f914..68e1122f 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java @@ -2,12 +2,16 @@ package com.genersoft.iot.vmp.service.impl; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.common.GeneralCallback; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; @@ -85,6 +89,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @Autowired private IMediaServerService mediaServerService; + @Autowired + private ZlmHttpHookSubscribe hookSubscribe; + @Autowired DataSourceTransactionManager dataSourceTransactionManager; @@ -93,7 +100,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @Override - public StreamInfo save(StreamProxyItem param) { + public void save(StreamProxyItem param, GeneralCallback callback) { MediaServerItem mediaInfo; if (ObjectUtils.isEmpty(param.getMediaServerId()) || "auto".equals(param.getMediaServerId())){ mediaInfo = mediaServerService.getMediaServerForMinimumLoad(null); @@ -107,7 +114,6 @@ public class StreamProxyServiceImpl implements IStreamProxyService { String dstUrl = String.format("rtmp://%s:%s/%s/%s", "127.0.0.1", mediaInfo.getRtmpPort(), param.getApp(), param.getStream() ); param.setDst_url(dstUrl); - StringBuffer resultMsg = new StringBuffer(); param.setMediaServerId(mediaInfo.getId()); boolean saveResult; // 更新 @@ -117,14 +123,25 @@ public class StreamProxyServiceImpl implements IStreamProxyService { saveResult = addStreamProxy(param); } if (!saveResult) { - throw new ControllerException(ErrorCode.ERROR100.getCode(),"保存失败"); + callback.run(ErrorCode.ERROR100.getCode(), "保存失败", null); + return; } - StreamInfo resultForStreamInfo = null; - resultMsg.append("保存成功"); + + HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(param.getApp(), param.getStream(), true, "rtsp", mediaInfo.getId()); + hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> { + StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( + mediaInfo, param.getApp(), param.getStream(), null, null); + callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); + }); + if (param.isEnable()) { JSONObject jsonObject = addStreamProxyToZlm(param); - if (jsonObject == null || jsonObject.getInteger("code") != 0) { - resultMsg.append(", 但是启用失败,请检查流地址是否可用"); + if (jsonObject != null && jsonObject.getInteger("code") == 0) { + hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); + StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( + mediaInfo, param.getApp(), param.getStream(), null, null); + callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); + }else { param.setEnable(false); // 直接移除 if (param.isEnable_remove_none_reader()) { @@ -132,14 +149,15 @@ public class StreamProxyServiceImpl implements IStreamProxyService { }else { updateStreamProxy(param); } - - }else { - resultForStreamInfo = mediaService.getStreamInfoByAppAndStream( - mediaInfo, param.getApp(), param.getStream(), null, null); - + if (jsonObject == null){ + callback.run(ErrorCode.ERROR100.getCode(), "记录已保存,启用失败", null); + return; + }else { + callback.run(ErrorCode.ERROR100.getCode(), jsonObject.getString("msg"), null); + return; + } } } - return resultForStreamInfo; } /** @@ -233,6 +251,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { result = zlmresTfulUtils.addFFmpegSource(mediaServerItem, param.getSrc_url(), param.getDst_url(), param.getTimeout_ms() + "", param.isEnable_audio(), param.isEnable_mp4(), param.getFfmpeg_cmd_key()); + System.out.println(result); } return result; } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java index 4a8522b6..0689f423 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java @@ -1,13 +1,18 @@ package com.genersoft.iot.vmp.vmanager.streamProxy; import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; +import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; +import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IStreamProxyService; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.StreamContent; +import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.github.pagehelper.PageInfo; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; @@ -18,6 +23,9 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.util.ObjectUtils; import org.springframework.web.bind.annotation.*; +import org.springframework.web.context.request.async.DeferredResult; + +import java.util.UUID; @SuppressWarnings("rawtypes") /** @@ -37,6 +45,12 @@ public class StreamProxyController { @Autowired private IStreamProxyService streamProxyService; + @Autowired + private DeferredResultHolder resultHolder; + + @Autowired + private UserSetting userSetting; + @Operation(summary = "分页查询流代理") @Parameter(name = "page", description = "当前页") @@ -58,7 +72,7 @@ public class StreamProxyController { }) @PostMapping(value = "/save") @ResponseBody - public StreamContent save(@RequestBody StreamProxyItem param){ + public DeferredResult save(@RequestBody StreamProxyItem param){ logger.info("添加代理: " + JSONObject.toJSONString(param)); if (ObjectUtils.isEmpty(param.getMediaServerId())) { param.setMediaServerId("auto"); @@ -69,7 +83,33 @@ public class StreamProxyController { if (ObjectUtils.isEmpty(param.getGbId())) { param.setGbId(null); } - return new StreamContent(streamProxyService.save(param)); + + RequestMessage requestMessage = new RequestMessage(); + String key = DeferredResultHolder.CALLBACK_CMD_PROXY + param.getApp() + param.getStream(); + requestMessage.setKey(key); + String uuid = UUID.randomUUID().toString(); + requestMessage.setId(uuid); + DeferredResult result = new DeferredResult<>(userSetting.getPlayTimeout().longValue()); + // 录像查询以channelId作为deviceId查询 + resultHolder.put(key, uuid, result); + result.onTimeout(()->{ + WVPResult wvpResult = new WVPResult<>(); + wvpResult.setCode(ErrorCode.ERROR100.getCode()); + wvpResult.setMsg("超时"); + requestMessage.setData(wvpResult); + resultHolder.invokeAllResult(requestMessage); + }); + + streamProxyService.save(param, (code, msg, streamInfo) -> { + logger.info("[拉流代理] {}", code == ErrorCode.SUCCESS.getCode()? "成功":"失败: " + msg); + if (code == ErrorCode.SUCCESS.getCode()) { + requestMessage.setData(new StreamContent(streamInfo)); + }else { + requestMessage.setData(WVPResult.fail(code, msg)); + } + resultHolder.invokeAllResult(requestMessage); + }); + return result; } @GetMapping(value = "/ffmpeg_cmd/list") diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 3f478442..c160a2ff 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -2,4 +2,4 @@ spring: application: name: wvp profiles: - active: local \ No newline at end of file + active: local268 \ No newline at end of file