From 3140672e6321f90374dd0d543898a9f1bd6f34b0 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Tue, 10 Sep 2024 16:50:54 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=9B=BD=E6=A0=87=E5=BD=95?= =?UTF-8?q?=E5=83=8F=E4=B8=8B=E8=BD=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../vmp/gb28181/bean/OpenRTPServerResult.java | 12 ++ .../iot/vmp/gb28181/service/IPlayService.java | 1 - .../gb28181/service/impl/PlayServiceImpl.java | 155 ++++++++++-------- .../transmit/cmd/impl/SIPCommander.java | 24 +-- .../notify/cmd/AlarmNotifyMessageHandler.java | 9 +- .../vmp/media/event/hook/HookSubscribe.java | 10 +- .../vmp/media/zlm/ZLMHttpHookListener.java | 4 +- .../iot/vmp/media/zlm/ZLMRESTfulUtils.java | 4 +- .../iot/vmp/media/zlm/ZLMServerFactory.java | 2 - .../vmp/service/IReceiveRtpServerService.java | 4 +- .../service/impl/RtpServerServiceImpl.java | 15 +- .../iot/vmp/vmanager/TestController.java | 28 ++++ .../src/components/dialog/recordDownload.vue | 7 +- 13 files changed, 163 insertions(+), 112 deletions(-) create mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/bean/OpenRTPServerResult.java create mode 100644 src/main/java/com/genersoft/iot/vmp/vmanager/TestController.java diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/OpenRTPServerResult.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/OpenRTPServerResult.java new file mode 100644 index 000000000..aa6044443 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/OpenRTPServerResult.java @@ -0,0 +1,12 @@ +package com.genersoft.iot.vmp.gb28181.bean; + +import com.genersoft.iot.vmp.media.event.hook.HookData; +import com.genersoft.iot.vmp.service.bean.SSRCInfo; +import lombok.Data; + +@Data +public class OpenRTPServerResult { + + private SSRCInfo ssrcInfo; + private HookData hookData; +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlayService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlayService.java index e4c5befed..e085d1b68 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlayService.java @@ -31,7 +31,6 @@ public interface IPlayService { void zlmServerOffline(String mediaServerId); void download(Device device, DeviceChannel channel, String startTime, String endTime, int downloadSpeed, ErrorCallback callback); - void download(MediaServer mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channel, String startTime, String endTime, int downloadSpeed, ErrorCallback callback); StreamInfo getDownLoadInfo(Device device, DeviceChannel channel, String stream); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java index 5d550b789..7b7597233 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java @@ -355,10 +355,11 @@ public class PlayServiceImpl implements IPlayService { rtpServerParam.setTcpMode(tcpMode); rtpServerParam.setOnlyAuto(false); rtpServerParam.setDisableAudio(!channel.isHasAudio()); - SSRCInfo ssrcInfo = receiveRtpServerService.openRTPServer(rtpServerParam, (code, msg, hookData) -> { - if (code == InviteErrorCode.SUCCESS.getCode()) { + SSRCInfo ssrcInfo = receiveRtpServerService.openRTPServer(rtpServerParam, (code, msg, result) -> { + + if (code == InviteErrorCode.SUCCESS.getCode() && result != null && result.getHookData() != null) { // hook响应 - StreamInfo streamInfo = onPublishHandlerForPlay(hookData.getMediaServer(), hookData.getMediaInfo(), device, channel); + StreamInfo streamInfo = onPublishHandlerForPlay(result.getHookData().getMediaServer(), result.getHookData().getMediaInfo(), device, channel); if (streamInfo == null){ if (callback != null) { callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(), @@ -379,7 +380,7 @@ public class PlayServiceImpl implements IPlayService { log.info("[点播成功] deviceId: {}, channelId:{}, 码流类型:{}", device.getDeviceId(), channel.getDeviceId(), channel.getStreamIdentification()); - snapOnPlay(hookData.getMediaServer(), device.getDeviceId(), channel.getDeviceId(), streamId); + snapOnPlay(result.getHookData().getMediaServer(), device.getDeviceId(), channel.getDeviceId(), streamId); }else { if (callback != null) { callback.run(code, msg, null); @@ -401,7 +402,6 @@ public class PlayServiceImpl implements IPlayService { if (ssrcInfo == null || ssrcInfo.getPort() <= 0) { log.info("[点播端口/SSRC]获取失败,deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channel.getDeviceId(), ssrcInfo); callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "获取端口或者ssrc失败", null); - sessionManager.removeByStream(streamId); inviteStreamService.call(InviteSessionType.PLAY, channel.getId(), null, InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getMsg(), @@ -743,10 +743,10 @@ public class PlayServiceImpl implements IPlayService { rtpServerParam.setTcpMode(tcpMode); rtpServerParam.setOnlyAuto(false); rtpServerParam.setDisableAudio(!channel.isHasAudio()); - SSRCInfo ssrcInfo = receiveRtpServerService.openRTPServer(rtpServerParam, (code, msg, hookData) -> { - if (code == InviteErrorCode.SUCCESS.getCode()) { + SSRCInfo ssrcInfo = receiveRtpServerService.openRTPServer(rtpServerParam, (code, msg, result) -> { + if (code == InviteErrorCode.SUCCESS.getCode() && result != null && result.getHookData() != null) { // hook响应 - StreamInfo streamInfo = onPublishHandlerForPlayback(hookData.getMediaServer(), hookData.getMediaInfo(), device, channel, startTime, endTime); + StreamInfo streamInfo = onPublishHandlerForPlayback(result.getHookData().getMediaServer(), result.getHookData().getMediaInfo(), device, channel, startTime, endTime); if (streamInfo == null) { log.warn("设备回放API调用失败!"); callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(), @@ -778,7 +778,6 @@ public class PlayServiceImpl implements IPlayService { if (callback != null) { callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "获取端口或者ssrc失败", null); } - sessionManager.removeByStream(stream); inviteStreamService.call(InviteSessionType.PLAY, channel.getId(), null, InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getMsg(), @@ -924,70 +923,92 @@ public class PlayServiceImpl implements IPlayService { null); return; } - int tcpMode = device.getStreamMode().equals("TCP-ACTIVE")? 2: (device.getStreamMode().equals("TCP-PASSIVE")? 1:0); - // 录像下载不使用固定流地址,固定流地址会导致如果开始时间与结束时间一致时文件错误的叠加在一起 - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, null, device.isSsrcCheck(), true, 0, false,!channel.isHasAudio(), false, tcpMode); - download(newMediaServerItem, ssrcInfo, device, channel, startTime, endTime, downloadSpeed, callback); + + download(newMediaServerItem, device, channel, startTime, endTime, downloadSpeed, callback); } - @Override - public void download(MediaServer mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channel, String startTime, String endTime, int downloadSpeed, ErrorCallback callback) { - if (mediaServerItem == null || ssrcInfo == null) { + private void download(MediaServer mediaServerItem, Device device, DeviceChannel channel, String startTime, String endTime, int downloadSpeed, ErrorCallback callback) { + if (mediaServerItem == null ) { callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(), InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(), null); return; } - log.info("[录像下载] deviceId: {}, channelId: {}, 下载速度:{}, 收流端口:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channel.getDeviceId(), downloadSpeed, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck()); + + int tcpMode = device.getStreamMode().equals("TCP-ACTIVE")? 2: (device.getStreamMode().equals("TCP-PASSIVE")? 1:0); + // 录像下载不使用固定流地址,固定流地址会导致如果开始时间与结束时间一致时文件错误的叠加在一起 + RTPServerParam rtpServerParam = new RTPServerParam(); + rtpServerParam.setMediaServerItem(mediaServerItem); + rtpServerParam.setSsrcCheck(device.isSsrcCheck()); + rtpServerParam.setPlayback(true); + rtpServerParam.setPort(0); + rtpServerParam.setTcpMode(tcpMode); + rtpServerParam.setOnlyAuto(false); + rtpServerParam.setDisableAudio(!channel.isHasAudio()); + SSRCInfo ssrcInfo = receiveRtpServerService.openRTPServer(rtpServerParam, (code, msg, result) -> { + if (code == InviteErrorCode.SUCCESS.getCode() && result != null && result.getHookData() != null) { + // hook响应 + StreamInfo streamInfo = onPublishHandlerForDownload(mediaServerItem, result.getHookData().getMediaInfo(), device, channel, startTime, endTime); + if (streamInfo == null) { + log.warn("[录像下载] 获取流地址信息失败"); + callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(), + InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null); + return; + } + callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); + log.info("[录像下载] 调用成功 deviceId: {}, channelId: {}, 开始时间: {}, 结束时间: {}", device.getDeviceId(), channel, startTime, endTime); + }else { + if (callback != null) { + callback.run(code, msg, null); + } + inviteStreamService.call(InviteSessionType.DOWNLOAD, channel.getId(), null, code, msg, null); + inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.DOWNLOAD, channel.getId()); + if (result != null && result.getSsrcInfo() != null) { + SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(result.getSsrcInfo().getStream()); + if (ssrcTransaction != null) { + try { + cmder.streamByeCmd(device, channel.getDeviceId(), ssrcTransaction.getStream(), null); + } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { + log.error("[录像下载] 发送BYE失败 {}", e.getMessage()); + } finally { + sessionManager.removeByStream(ssrcTransaction.getStream()); + } + } + } + } + }); + if (ssrcInfo == null || ssrcInfo.getPort() <= 0) { + log.info("[录像下载端口/SSRC]获取失败,deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channel.getDeviceId(), ssrcInfo); + if (callback != null) { + callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "获取端口或者ssrc失败", null); + } + inviteStreamService.call(InviteSessionType.PLAY, channel.getId(), null, + InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), + InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getMsg(), + null); + return; + } + log.info("[录像下载] deviceId: {}, channelId: {}, 下载速度:{}, 收流端口:{}, 收流模式:{}, SSRC: {}({}), SSRC校验:{}", + device.getDeviceId(), channel.getDeviceId(), downloadSpeed, ssrcInfo.getPort(), device.getStreamMode(), + ssrcInfo.getSsrc(), String.format("%08x", Long.parseLong(ssrcInfo.getSsrc())).toUpperCase(), + device.isSsrcCheck()); + // 初始化redis中的invite消息状态 InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channel.getId(), ssrcInfo.getStream(), ssrcInfo, mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.DOWNLOAD, InviteSessionStatus.ready); + inviteStreamService.updateInviteInfo(inviteInfo); - String downLoadTimeOutTaskKey = UUID.randomUUID().toString(); - dynamicTask.startDelay(downLoadTimeOutTaskKey, () -> { - log.warn(String.format("录像下载请求超时,deviceId:%s ,channelId:%s", device, channel)); - inviteStreamService.removeInviteInfo(inviteInfo); - callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(), - InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null); - - // 点播超时回复BYE 同时释放ssrc以及此次点播的资源 - try { - cmder.streamByeCmd(device, channel.getDeviceId(), ssrcInfo.getStream(), null); - } catch (InvalidArgumentException | ParseException | SipException e) { - log.error("[录像流]录像下载请求超时, 发送BYE失败 {}", e.getMessage()); - } catch (SsrcTransactionNotFoundException e) { - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); - sessionManager.removeByStream(ssrcInfo.getStream()); - } - }, userSetting.getPlayTimeout()); - - SipSubscribe.Event errorEvent = event -> { - dynamicTask.stop(downLoadTimeOutTaskKey); - callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(), - String.format("录像下载失败, 错误码: %s, %s", event.statusCode, event.msg), null); - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - sessionManager.removeByStream(ssrcInfo.getStream()); - inviteStreamService.removeInviteInfo(inviteInfo); - }; - HookSubscribe.Event hookEvent = (hookData) -> { - log.info("[录像下载]收到订阅消息: " + hookData); - dynamicTask.stop(downLoadTimeOutTaskKey); - StreamInfo streamInfo = onPublishHandlerForDownload(hookData.getMediaServer(), hookData.getMediaInfo(), device, channel, startTime, endTime); - if (streamInfo == null) { - log.warn("[录像下载] 获取流地址信息失败"); - callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(), - InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null); - return; - } - callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); - log.info("[录像下载] 调用成功 deviceId: {}, channelId: {}, 开始时间: {}, 结束时间: {}", device.getDeviceId(), channel, startTime, endTime); - }; try { cmder.downloadStreamCmd(mediaServerItem, ssrcInfo, device, channel, startTime, endTime, downloadSpeed, - errorEvent, eventResult ->{ + eventResult -> { + // 对方返回错误 + callback.run(InviteErrorCode.FAIL.getCode(), String.format("录像下载失败, 错误码: %s, %s", eventResult.statusCode, eventResult.msg), null); + receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo); + sessionManager.removeByStream(ssrcInfo.getStream()); + inviteStreamService.removeInviteInfo(inviteInfo); + }, eventResult ->{ // 处理收到200ok后的TCP主动连接以及SSRC不一致的问题 InviteOKHandler(eventResult, ssrcInfo, mediaServerItem, device, channel, callback, inviteInfo, InviteSessionType.DOWNLOAD); @@ -1002,9 +1023,11 @@ public class PlayServiceImpl implements IPlayService { DownloadFileInfo downloadFileInfo = CloudRecordUtils.getDownloadFilePath(mediaServerItem, filePath); InviteInfo inviteInfoForNew = inviteStreamService.getInviteInfo(inviteInfo.getType() , inviteInfo.getChannelId(), inviteInfo.getStream()); - inviteInfoForNew.getStreamInfo().setDownLoadFilePath(downloadFileInfo); - // 不可以马上移除会导致后续接口拿不到下载地址 - inviteStreamService.updateInviteInfo(inviteInfoForNew, 60*15L); + if (inviteInfoForNew != null && inviteInfoForNew.getStreamInfo() != null) { + inviteInfoForNew.getStreamInfo().setDownLoadFilePath(downloadFileInfo); + // 不可以马上移除会导致后续接口拿不到下载地址 + inviteStreamService.updateInviteInfo(inviteInfoForNew, 60*15L); + } }; Hook hook = Hook.getInstance(HookType.on_record_mp4, "rtp", ssrcInfo.getStream(), mediaServerItem.getId()); // 设置过期时间,下载失败时自动处理订阅数据 @@ -1013,12 +1036,10 @@ public class PlayServiceImpl implements IPlayService { }); } catch (InvalidArgumentException | SipException | ParseException e) { log.error("[命令发送失败] 录像下载: {}", e.getMessage()); - - SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(); - eventResult.type = SipSubscribe.EventResultType.cmdSendFailEvent; - eventResult.statusCode = -1; - eventResult.msg = "命令发送失败"; - errorEvent.response(eventResult); + callback.run(InviteErrorCode.FAIL.getCode(),e.getMessage(), null); + receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo); + sessionManager.removeByStream(ssrcInfo.getStream()); + inviteStreamService.removeInviteInfo(inviteInfo); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index 1b0c0ab69..49a1abd19 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -456,7 +456,7 @@ public class SIPCommander implements ISIPCommander { String startTime, String endTime, int downloadSpeed, SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException { - log.info("{} 分配的ZLM为: {} [{}:{}]", ssrcInfo.getStream(), mediaServerItem.getId(), mediaServerItem.getSdpIp(), ssrcInfo.getPort()); + log.info("[发送-请求历史媒体下载-命令] 流ID: {},节点为: {} [{}:{}]", ssrcInfo.getStream(), mediaServerItem.getId(), mediaServerItem.getSdpIp(), ssrcInfo.getPort()); String sdpIp; if (!ObjectUtils.isEmpty(device.getSdpIp())) { sdpIp = device.getSdpIp(); @@ -524,27 +524,8 @@ public class SIPCommander implements ISIPCommander { content.append("y=" + ssrcInfo.getSsrc() + "\r\n");//ssrc log.debug("此时请求下载信令的ssrc===>{}",ssrcInfo.getSsrc()); - Hook rtpHook = Hook.getInstance(HookType.on_media_arrival, "rtp", ssrcInfo.getStream(), mediaServerItem.getId()); // 添加订阅 CallIdHeader newCallIdHeader = sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()), device.getTransport()); - String callId= newCallIdHeader.getCallId(); - subscribe.addSubscribe(rtpHook, (hookData) -> { - log.debug("sipc 添加订阅===callId {}",callId); - subscribe.removeSubscribe(rtpHook); - // 添加流注销的订阅,注销了后向设备发送bye - Hook departureHook = Hook.getInstance(HookType.on_media_departure, "rtp", ssrcInfo.getStream(), mediaServerItem.getId()); - subscribe.addSubscribe(departureHook, - (departureHookData) -> { - log.info("[录像]下载结束, 发送BYE"); - try { - streamByeCmd(device, channel.getDeviceId(), ssrcInfo.getStream(), callId); - } catch (InvalidArgumentException | ParseException | SipException | - SsrcTransactionNotFoundException e) { - log.error("[录像]下载结束, 发送BYE失败 {}", e.getMessage()); - } - }); - }); - Request request = headerProvider.createPlaybackInviteRequest(device, channel.getDeviceId(), content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,newCallIdHeader, ssrcInfo.getSsrc()); sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent, event -> { @@ -653,9 +634,6 @@ public class SIPCommander implements ISIPCommander { } log.info("[发送BYE] 设备: device: {}, channel: {}, callId: {}", device.getDeviceId(), channelId, ssrcTransaction.getCallId()); - mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc()); - - mediaServerService.closeRTPServer(ssrcTransaction.getMediaServerId(), ssrcTransaction.getStream()); sessionManager.removeByCallId(ssrcTransaction.getCallId()); Request byteRequest = headerProvider.createByteRequest(device, channelId, ssrcTransaction.getSipTransactionInfo()); sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), byteRequest, null, okEvent); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java index 8bab63b4a..9cf74a542 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java @@ -85,7 +85,6 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme @Override public void handForDevice(RequestEvent evt, Device device, Element rootElement) { - log.info("[收到报警通知]设备:{}", device.getDeviceId()); boolean isEmpty = taskQueue.isEmpty(); taskQueue.offer(new SipMsgInfo(evt, device, rootElement)); // 回复200 OK @@ -96,7 +95,9 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme } if (isEmpty) { taskExecutor.execute(() -> { - log.info("[处理报警通知]待处理数量:{}", taskQueue.size() ); + if (log.isDebugEnabled()) { + log.info("[处理报警通知]待处理数量:{}", taskQueue.size() ); + } while (!taskQueue.isEmpty()) { try { SipMsgInfo sipMsgInfo = taskQueue.poll(); @@ -161,7 +162,9 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme deviceAlarm.setAlarmType(getText(sipMsgInfo.getRootElement().element("Info"), "AlarmType")); } } - log.info("[收到报警通知]内容:{}", JSON.toJSONString(deviceAlarm)); + if (log.isDebugEnabled()) { + log.debug("[收到报警通知]设备:{}, 内容:{}", device.getDeviceId(), JSON.toJSONString(deviceAlarm)); + } // 作者自用判断,其他小伙伴需要此消息可以自行修改,但是不要提在pr里 if (DeviceAlarmMethod.Other.getVal() == Integer.parseInt(deviceAlarm.getAlarmMethod())) { // 发送给平台的报警信息。 发送redis通知 diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribe.java b/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribe.java index ee2f5677b..b88fdd443 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribe.java +++ b/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribe.java @@ -6,6 +6,8 @@ import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -21,6 +23,7 @@ public class HookSubscribe { */ private final long subscribeExpire = 5 * 60 * 1000; + @FunctionalInterface public interface Event{ void response(HookData data); @@ -75,12 +78,11 @@ public class HookSubscribe { if (hookSubscribeEvent != null) { HookData data = HookData.getInstance(event); hookSubscribeEvent.response(data); - }else { - } } public void addSubscribe(Hook hook, HookSubscribe.Event event) { + System.out.println("add==" + hook.toString()); if (hook.getExpireTime() == null) { hook.setExpireTime(System.currentTimeMillis() + subscribeExpire); } @@ -106,4 +108,8 @@ public class HookSubscribe { } } } + + public List getAll() { + return new ArrayList<>(allHook.values()); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 7829d8ee7..fc0b60ec6 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -82,7 +82,9 @@ public class ZLMHttpHookListener { log.info("[ZLM HOOK] 播放鉴权 失败:{}->{}", param.getMediaServerId(), param); return new HookResult(401, "Unauthorized"); } - log.info("[ZLM HOOK] 播放鉴权成功:{}->{}", param.getMediaServerId(), param); + if (log.isDebugEnabled()){ + log.debug("[ZLM HOOK] 播放鉴权成功:{}->{}", param.getMediaServerId(), param); + } return HookResult.SUCCESS(); } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java index 62454f24d..9125858cd 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java @@ -179,7 +179,9 @@ public class ZLMRESTfulUtils { Request request = new Request.Builder() .url(httpBuilder.build()) .build(); - log.info(request.toString()); + if (log.isDebugEnabled()){ + log.debug(request.toString()); + } try { OkHttpClient client = getClient(); Response response = client.newCall(request).execute(); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java index f74916f29..b487f7ab5 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java @@ -39,7 +39,6 @@ public class ZLMServerFactory { int result = -1; // 查询此rtp server 是否已经存在 JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaServerItem, streamId); - log.info(JSONObject.toJSONString(rtpInfo)); if(rtpInfo.getInteger("code") == 0){ if (rtpInfo.getBoolean("exist")) { result = rtpInfo.getInteger("local_port"); @@ -87,7 +86,6 @@ public class ZLMServerFactory { } JSONObject openRtpServerResultJson = zlmresTfulUtils.openRtpServer(mediaServerItem, param); - log.info(JSONObject.toJSONString(openRtpServerResultJson)); if (openRtpServerResultJson != null) { if (openRtpServerResultJson.getInteger("code") == 0) { result= openRtpServerResultJson.getInteger("port"); diff --git a/src/main/java/com/genersoft/iot/vmp/service/IReceiveRtpServerService.java b/src/main/java/com/genersoft/iot/vmp/service/IReceiveRtpServerService.java index 06620367a..caf2b9044 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IReceiveRtpServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IReceiveRtpServerService.java @@ -1,13 +1,13 @@ package com.genersoft.iot.vmp.service; +import com.genersoft.iot.vmp.gb28181.bean.OpenRTPServerResult; import com.genersoft.iot.vmp.media.bean.MediaServer; -import com.genersoft.iot.vmp.media.event.hook.HookData; import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.service.bean.RTPServerParam; import com.genersoft.iot.vmp.service.bean.SSRCInfo; public interface IReceiveRtpServerService { - SSRCInfo openRTPServer(RTPServerParam rtpServerParam, ErrorCallback callback); + SSRCInfo openRTPServer(RTPServerParam rtpServerParam, ErrorCallback callback); void closeRTPServer(MediaServer mediaServer, SSRCInfo ssrcInfo); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java index 351ffb1ea..7321e94b8 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java @@ -2,11 +2,11 @@ package com.genersoft.iot.vmp.service.impl; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.gb28181.bean.OpenRTPServerResult; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager; import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.event.hook.Hook; -import com.genersoft.iot.vmp.media.event.hook.HookData; import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.event.hook.HookType; import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; @@ -66,7 +66,7 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService { } @Override - public SSRCInfo openRTPServer(RTPServerParam rtpServerParam, ErrorCallback callback) { + public SSRCInfo openRTPServer(RTPServerParam rtpServerParam, ErrorCallback callback) { if (callback == null) { log.warn("[开启RTP收流] 失败,回调为NULL"); return null; @@ -118,7 +118,8 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService { String timeOutTaskKey = UUID.randomUUID().toString(); SSRCInfo ssrcInfo = new SSRCInfo(rtpServerPort, ssrc, streamId, timeOutTaskKey); - + OpenRTPServerResult openRTPServerResult = new OpenRTPServerResult(); + openRTPServerResult.setSsrcInfo(ssrcInfo); Hook rtpHook = Hook.getInstance(HookType.on_media_arrival, "rtp", streamId, rtpServerParam.getMediaServerItem().getId()); dynamicTask.startDelay(timeOutTaskKey, () -> { @@ -130,21 +131,23 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService { // 关闭收流端口 mediaServerService.closeRTPServer(rtpServerParam.getMediaServerItem(), streamId); subscribe.removeSubscribe(rtpHook); - callback.run(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null); + callback.run(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), openRTPServerResult); }, userSetting.getPlayTimeout()); - // 开启流到来的监听 subscribe.addSubscribe(rtpHook, (hookData) -> { dynamicTask.stop(timeOutTaskKey); // hook响应 - callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), hookData); + openRTPServerResult.setHookData(hookData); + callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), openRTPServerResult); subscribe.removeSubscribe(rtpHook); }); + return ssrcInfo; } @Override public void closeRTPServer(MediaServer mediaServer, SSRCInfo ssrcInfo) { + System.out.println(4444); if (mediaServer == null) { return; } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/TestController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/TestController.java new file mode 100644 index 000000000..bfac5c731 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/TestController.java @@ -0,0 +1,28 @@ +package com.genersoft.iot.vmp.vmanager; + +import com.genersoft.iot.vmp.conf.security.JwtUtils; +import com.genersoft.iot.vmp.media.event.hook.Hook; +import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.security.SecurityRequirement; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.List; + +@RestController +@RequestMapping("/api/test") +public class TestController { + + @Autowired + private HookSubscribe subscribe; + + + @GetMapping("/hook/list") + @Operation(summary = "查询角色", security = @SecurityRequirement(name = JwtUtils.HEADER)) + public List all(){ + return subscribe.getAll(); + } +} diff --git a/web_src/src/components/dialog/recordDownload.vue b/web_src/src/components/dialog/recordDownload.vue index c78cd1ce3..ef4334fca 100755 --- a/web_src/src/components/dialog/recordDownload.vue +++ b/web_src/src/components/dialog/recordDownload.vue @@ -59,13 +59,12 @@ export default { if (!this.getProgressRun) { return; } - if (this.percentage == 100 ) { - + if (this.downloadFile) { return; } setTimeout( ()=>{ if (!this.showDialog) return; - this.getProgress(this.getProgressTimer()) + this.getProgress(this.getProgressTimer) }, 5000) }, getProgress: function (callback){ @@ -75,7 +74,7 @@ export default { }).then((res)=> { if (res.data.code === 0) { this.streamInfo = res.data.data; - if (parseFloat(res.data.progress) == 1) { + if (parseFloat(res.data.progress) === 1) { this.percentage = 100; }else { this.percentage = (parseFloat(res.data.data.progress)*100).toFixed(1);