From 6b1d966255db75070f42b1e6a430ddff4299b248 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Tue, 21 Mar 2023 10:27:07 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=90=88=E5=B9=B6=E5=AF=B9?= =?UTF-8?q?=E8=AE=B2broadcast=E7=BA=A7=E8=81=94=E6=A8=A1=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 2 +- .../cmd/impl/SIPCommanderFroPlatform.java | 10 +- .../request/impl/AckRequestProcessor.java | 2 +- .../request/impl/ByeRequestProcessor.java | 51 +++++-- .../cmd/BroadcastNotifyMessageHandler.java | 4 +- .../vmp/media/zlm/ZLMRTPServerFactory.java | 8 +- .../vmp/service/impl/PlatformServiceImpl.java | 3 +- .../iot/vmp/service/impl/PlayServiceImpl.java | 2 +- .../src/components/dialog/devicePlayer.vue | 136 +++++++++--------- 9 files changed, 127 insertions(+), 91 deletions(-) diff --git a/pom.xml b/pom.xml index cbcbb111..75d29f2b 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ org.springframework.boot spring-boot-starter-parent - 2.7.2 + 2.7.9 com.genersoft diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index 9809a9f9..1bc03c72 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -790,11 +790,11 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { content.append("t=0 0\r\n"); if ("TCP-PASSIVE".equalsIgnoreCase(userSetting.getBroadcastForPlatform())) { - content.append("m=video " + ssrcInfo.getPort() + " TCP/RTP/AVP 8 96\r\n"); + content.append("m=audio " + ssrcInfo.getPort() + " TCP/RTP/AVP 8 96\r\n"); } else if ("TCP-ACTIVE".equalsIgnoreCase(userSetting.getBroadcastForPlatform())) { - content.append("m=video " + ssrcInfo.getPort() + " TCP/RTP/AVP 8 96\r\n"); + content.append("m=audio " + ssrcInfo.getPort() + " TCP/RTP/AVP 8 96\r\n"); } else if ("UDP".equalsIgnoreCase(userSetting.getBroadcastForPlatform())) { - content.append("m=video " + ssrcInfo.getPort() + " RTP/AVP 8 96\r\n"); + content.append("m=audio " + ssrcInfo.getPort() + " RTP/AVP 8 96\r\n"); } content.append("a=recvonly\r\n"); @@ -817,12 +817,12 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { sipSender.transmitRequest(sipLayer.getLocalIp(platform.getDeviceIp()), request, (e -> { streamSession.remove(platform.getServerGBId(), channelId, ssrcInfo.getStream()); mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); + subscribe.removeSubscribe(hookSubscribe); errorEvent.response(e); }), e -> { - // 这里为例避免一个通道的点播只有一个callID这个参数使用一个固定值 ResponseEvent responseEvent = (ResponseEvent) e.event; SIPResponse response = (SIPResponse) responseEvent.getResponse(); - streamSession.put(platform.getServerGBId(), channelId, callIdHeader.getCallId(), stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), response, VideoStreamSessionManager.SessionType.play); + streamSession.put(platform.getServerGBId(), channelId, callIdHeader.getCallId(), stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), response, VideoStreamSessionManager.SessionType.broadcast); okEvent.response(e); }); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index 12bddb14..0562262e 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -102,7 +102,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In } String isUdp = sendRtpItem.isTcp() ? "0" : "1"; MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - logger.info("收到ACK,rtp/{}开始向上级推流, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStream(), + logger.info("收到ACK,rtp/{}开始级推流, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp()); Map param = new HashMap<>(12); param.put("vhost","__defaultVhost__"); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index 5758d23e..86c2c78e 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -2,7 +2,10 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; -import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType; +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction; import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; @@ -24,9 +27,10 @@ import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import javax.sip.*; +import javax.sip.InvalidArgumentException; +import javax.sip.RequestEvent; +import javax.sip.SipException; import javax.sip.address.SipURI; -import javax.sip.header.CallIdHeader; import javax.sip.header.FromHeader; import javax.sip.header.HeaderAddress; import javax.sip.header.ToHeader; @@ -87,13 +91,15 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In @Override public void process(RequestEvent evt) { + // TODO 此处需要重构 + SIPRequest request =(SIPRequest) evt.getRequest(); try { - responseAck((SIPRequest) evt.getRequest(), Response.OK); + responseAck(request, Response.OK); } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[回复BYE信息失败],{}", e.getMessage()); } - CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME); - SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId()); + + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, request.getCallIdHeader().getCallId()); if (sendRtpItem != null){ logger.info("[收到bye] {}/{}", sendRtpItem.getPlatformId(), sendRtpItem.getChannelId()); @@ -115,7 +121,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In param.put("ssrc",sendRtpItem.getSsrc()); logger.info("[收到bye] 停止推流:{}", streamId); MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), callIdHeader.getCallId(), null); + redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), request.getCallIdHeader().getCallId(), null); zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param); int totalReaderCount = zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId); @@ -159,7 +165,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In } SsrcTransaction ssrcTransactionForPlay = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, "play", null); if (ssrcTransactionForPlay != null){ - if (ssrcTransactionForPlay.getCallId().equals(callIdHeader.getCallId())){ + if (ssrcTransactionForPlay.getCallId().equals(request.getCallIdHeader().getCallId())){ // 释放ssrc MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransactionForPlay.getMediaServerId()); if (mediaServerItem != null) { @@ -168,7 +174,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In streamSession.remove(device.getDeviceId(), channelId, ssrcTransactionForPlay.getStream()); } } - SsrcTransaction ssrcTransactionForPlayBack = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, callIdHeader.getCallId(), null); + SsrcTransaction ssrcTransactionForPlayBack = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, request.getCallIdHeader().getCallId(), null); if (ssrcTransactionForPlayBack != null) { // 释放ssrc MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransactionForPlayBack.getMediaServerId()); @@ -178,5 +184,32 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In streamSession.remove(device.getDeviceId(), channelId, ssrcTransactionForPlayBack.getStream()); } } + SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(null, null, request.getCallIdHeader().getCallId(), null); + if (ssrcTransaction != null) { + // 释放ssrc + MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransaction.getMediaServerId()); + if (mediaServerItem != null) { + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransaction.getSsrc()); + } + + switch (ssrcTransaction.getType()) { +// case play: +// break; +// case talk: +// break; +// case playback: +// break; +// case download: +// break; + case broadcast: + String deviceId = ssrcTransaction.getDeviceId(); + String channelId1 = ssrcTransaction.getChannelId(); + // 如果是 + break; + default: + break; + } + streamSession.remove(device.getDeviceId(), channelId, ssrcTransaction.getStream()); + } } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java index 240f24f4..ab54d158 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java @@ -102,6 +102,7 @@ public class BroadcastNotifyMessageHandler extends SIPRequestProcessorParent imp DeviceChannel deviceChannel = storage.queryChannelInParentPlatform(platform.getServerGBId(), targetId); if (deviceChannel == null) { + logger.warn("[国标级联 语音喊话] 未找到通道 platform: {}, channel: {}", platform.getServerGBId(), targetId); responseAck(request, Response.NOT_FOUND, "TargetID not found"); return; } @@ -123,6 +124,7 @@ public class BroadcastNotifyMessageHandler extends SIPRequestProcessorParent imp commanderForPlatform.broadcastResultCmd(platform, deviceChannel, sn, true, eventResult->{ logger.info("[国标级联] 语音喊话 回复失败 platform: {}, 错误:{}/{}", platform.getServerGBId(), eventResult.statusCode, eventResult.msg); }, eventResult->{ + // 消息发送成功, 向上级发送invite,获取推流 try { platformService.broadcastInvite(platform, deviceChannel.getChannelId(), mediaServerForMinimumLoad, (mediaServerItem, response)->{ @@ -132,7 +134,7 @@ public class BroadcastNotifyMessageHandler extends SIPRequestProcessorParent imp AudioBroadcastCatch broadcastCatch = audioBroadcastManager.get(device.getDeviceId(), targetId); if (broadcastCatch != null ) { if (playService.audioBroadcastInUse(device, targetId)) { - logger.info("[国标级联] 语音喊话 设备正正在使用中 platform: {}, channel: {}", + logger.info("[国标级联] 语音喊话 设备正在使用中 platform: {}, channel: {}", platform.getServerGBId(), deviceChannel.getChannelId()); // 查看语音通道已经建立且已经占用 回复BYE try { diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java index c8dcdfe2..bb391ad1 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java @@ -259,11 +259,11 @@ public class ZLMRTPServerFactory { // 订阅 zlm启动事件, 新的zlm也会从这里进入系统 hookSubscribe.addSubscribe(hookSubscribeForRtpServerTimeout, (MediaServerItem mediaServerItem, JSONObject response)->{ - logger.info("[上级点播] {}->监听端口到期继续保持监听", ssrc); + logger.info("[保持端口] {}->监听端口到期继续保持监听", ssrc); keepPort(serverItem, ssrc); }); } - logger.info("[上级点播] {}->监听端口: {}", ssrc, localPort); + logger.info("[保持端口] {}->监听端口: {}", ssrc, localPort); return localPort; } @@ -271,7 +271,7 @@ public class ZLMRTPServerFactory { * 释放保持的端口 */ public boolean releasePort(MediaServerItem serverItem, String ssrc) { - logger.info("[上级点播] {}->释放监听端口", ssrc); + logger.info("[保持端口] {}->释放监听端口", ssrc); boolean closeRTPServerResult = closeRtpServer(serverItem, ssrc); HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, serverItem.getId()); // 订阅 zlm启动事件, 新的zlm也会从这里进入系统 @@ -357,7 +357,7 @@ public class ZLMRTPServerFactory { public JSONObject startSendRtp(MediaServerItem mediaInfo, SendRtpItem sendRtpItem) { String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; - logger.info("rtp/{}开始向上级推流, 目标={}:{},SSRC={}", sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc()); + logger.info("rtp/{}开始推流, 目标={}:{},SSRC={}", sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc()); Map param = new HashMap<>(12); param.put("vhost","__defaultVhost__"); param.put("app",sendRtpItem.getApp()); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java index 1a788d27..e9b31caa 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java @@ -373,7 +373,7 @@ public class PlatformServiceImpl implements IPlatformService { errorEvent.response(new SipSubscribe.EventResult(-1, "端口监听失败")); return; } - logger.info("[国标级联] 发起语音喊话 deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验:{}", + logger.info("[国标级联] 语音喊话,发起Invite消息 deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验:{}", platform.getServerGBId(), channelId, ssrcInfo.getPort(), userSetting.getBroadcastForPlatform(), ssrcInfo.getSsrc(), ssrcCheck); String timeOutTaskKey = UUID.randomUUID().toString(); @@ -396,6 +396,7 @@ public class PlatformServiceImpl implements IPlatformService { } }, userSetting.getPlayTimeout()); commanderForPlatform.broadcastInviteCmd(platform, channelId, mediaServerItem, ssrcInfo, (mediaServerItemForInvite, response)->{ + logger.info("[国标级联] 发起语音喊话 收到上级推流 deviceId: {}, channelId: {}", platform.getServerGBId(), channelId); dynamicTask.stop(timeOutTaskKey); // hook响应 playService.onPublishHandlerForPlay(mediaServerItemForInvite, response, platform.getServerGBId(), channelId); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index eea990d8..8d8e011a 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -1183,7 +1183,7 @@ public class PlayServiceImpl implements IPlayService { String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - logger.info("收到ACK,rtp/{}开始向上级推流, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStream(), + logger.info("收到ACK,rtp/{}开始推流, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp()); Map param = new HashMap<>(12); param.put("vhost", "__defaultVhost__"); diff --git a/web_src/src/components/dialog/devicePlayer.vue b/web_src/src/components/dialog/devicePlayer.vue index 94e5b6f7..8b10c330 100644 --- a/web_src/src/components/dialog/devicePlayer.vue +++ b/web_src/src/components/dialog/devicePlayer.vue @@ -181,7 +181,7 @@ style="font-size: 1.875rem;">
+ class="el-icon-zoom-out control-zoom-btn">
@@ -300,7 +300,8 @@
-
@@ -565,20 +566,20 @@ export default { this.tracks = [] let _this = this; this.$copyText(data).then( - function (e) { - _this.$message({ - showClose: true, - message: '复制成功', - type: 'success' - }); - }, - function (e) { - _this.$message({ - showClose: true, - message: '复制失败,请手动复制', - type: 'error' - }); - } + function (e) { + _this.$message({ + showClose: true, + message: '复制成功', + type: 'success' + }); + }, + function (e) { + _this.$message({ + showClose: true, + message: '复制失败,请手动复制', + type: 'error' + }); + } ); }, ptzCamera: function (command) { @@ -654,55 +655,54 @@ export default { this.$axios({ method: 'get', url: '/api/play/broadcast/' + this.deviceId + '/' + this.channelId + "?timeout=30&broadcastMode=" + this.broadcastMode - }).then( (res)=> { + }).then((res) => { if (res.data.code === 0) { let streamInfo = res.data.data.streamInfo; if (document.location.protocol.includes("https")) { this.startBroadcast(streamInfo.rtcs) - }else { + } else { this.startBroadcast(streamInfo.rtc) } - - }else { - this.$message({ - showClose: true, - message: res.data.msg, - type: "error", - }); - } - }); - }else if (this.broadcastStatus === 1) { - this.broadcastStatus = -1; - this.broadcastRtc.close() - } - }, - startBroadcast(url){ - // 获取推流鉴权Key - this.$axios({ - method: 'post', - url: '/api/user/userInfo', - }).then( (res)=> { - if (res.data.code !== 0) { - this.$message({ - showClose: true, - message: "获取推流鉴权Key失败", - type: "error", - }); - this.broadcastStatus = -1; - }else { - let pushKey = res.data.data.pushKey; - // 获取推流鉴权KEY - url += "&sign=" + crypto.createHash('md5').update(pushKey, "utf8").digest('hex') - console.log("开始语音喊话: " + url) - this.broadcastRtc = new ZLMRTCClient.Endpoint({ - debug: true, // 是否打印日志 - zlmsdpUrl: url, //流地址 - simulecast: false, - useCamera: false, - audioEnable: true, - videoEnable: false, - recvOnly: false, - }) + } else { + this.$message({ + showClose: true, + message: res.data.msg, + type: "error", + }); + } + }); + } else if (this.broadcastStatus === 1) { + this.broadcastStatus = -1; + this.broadcastRtc.close() + } + }, + startBroadcast(url) { + // 获取推流鉴权Key + this.$axios({ + method: 'post', + url: '/api/user/userInfo', + }).then((res) => { + if (res.data.code !== 0) { + this.$message({ + showClose: true, + message: "获取推流鉴权Key失败", + type: "error", + }); + this.broadcastStatus = -1; + } else { + let pushKey = res.data.data.pushKey; + // 获取推流鉴权KEY + url += "&sign=" + crypto.createHash('md5').update(pushKey, "utf8").digest('hex') + console.log("开始语音喊话: " + url) + this.broadcastRtc = new ZLMRTCClient.Endpoint({ + debug: true, // 是否打印日志 + zlmsdpUrl: url, //流地址 + simulecast: false, + useCamera: false, + audioEnable: true, + videoEnable: false, + recvOnly: false, + }) // webrtcPlayer.on(ZLMRTCClient.Events.WEBRTC_ON_REMOTE_STREAMS,(e)=>{//获取到了远端流,可以播放 // console.error('播放成功',e.streams) @@ -715,15 +715,15 @@ export default { // // this.eventcallbacK("LOCAL STREAM", "获取到了本地流") // }); - this.broadcastRtc.on(ZLMRTCClient.Events.WEBRTC_NOT_SUPPORT,(e)=>{// 获取到了本地流 - console.error('不支持webrtc',e) - this.$message({ - showClose: true, - message: '不支持webrtc, 无法进行语音喊话', - type: 'error' - }); - this.broadcastStatus = -1; - }); + this.broadcastRtc.on(ZLMRTCClient.Events.WEBRTC_NOT_SUPPORT, (e) => {// 获取到了本地流 + console.error('不支持webrtc', e) + this.$message({ + showClose: true, + message: '不支持webrtc, 无法进行语音喊话', + type: 'error' + }); + this.broadcastStatus = -1; + }); this.broadcastRtc.on(ZLMRTCClient.Events.WEBRTC_ICE_CANDIDATE_ERROR, (e) => {// ICE 协商出错 console.error('ICE 协商出错')