优化合并对讲broadcast级联模式

结构优化
648540858 2023-03-21 10:27:07 +08:00
parent 4362a5b499
commit 6b1d966255
9 changed files with 127 additions and 91 deletions

View File

@ -6,7 +6,7 @@
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.2</version>
<version>2.7.9</version>
</parent>
<groupId>com.genersoft</groupId>

View File

@ -790,11 +790,11 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
content.append("t=0 0\r\n");
if ("TCP-PASSIVE".equalsIgnoreCase(userSetting.getBroadcastForPlatform())) {
content.append("m=video " + ssrcInfo.getPort() + " TCP/RTP/AVP 8 96\r\n");
content.append("m=audio " + ssrcInfo.getPort() + " TCP/RTP/AVP 8 96\r\n");
} else if ("TCP-ACTIVE".equalsIgnoreCase(userSetting.getBroadcastForPlatform())) {
content.append("m=video " + ssrcInfo.getPort() + " TCP/RTP/AVP 8 96\r\n");
content.append("m=audio " + ssrcInfo.getPort() + " TCP/RTP/AVP 8 96\r\n");
} else if ("UDP".equalsIgnoreCase(userSetting.getBroadcastForPlatform())) {
content.append("m=video " + ssrcInfo.getPort() + " RTP/AVP 8 96\r\n");
content.append("m=audio " + ssrcInfo.getPort() + " RTP/AVP 8 96\r\n");
}
content.append("a=recvonly\r\n");
@ -817,12 +817,12 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
sipSender.transmitRequest(sipLayer.getLocalIp(platform.getDeviceIp()), request, (e -> {
streamSession.remove(platform.getServerGBId(), channelId, ssrcInfo.getStream());
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
subscribe.removeSubscribe(hookSubscribe);
errorEvent.response(e);
}), e -> {
// 这里为例避免一个通道的点播只有一个callID这个参数使用一个固定值
ResponseEvent responseEvent = (ResponseEvent) e.event;
SIPResponse response = (SIPResponse) responseEvent.getResponse();
streamSession.put(platform.getServerGBId(), channelId, callIdHeader.getCallId(), stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), response, VideoStreamSessionManager.SessionType.play);
streamSession.put(platform.getServerGBId(), channelId, callIdHeader.getCallId(), stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), response, VideoStreamSessionManager.SessionType.broadcast);
okEvent.response(e);
});
}

View File

@ -102,7 +102,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
}
String isUdp = sendRtpItem.isTcp() ? "0" : "1";
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
logger.info("收到ACKrtp/{}开始向上级推流, 目标={}:{}SSRC={}, RTCP={}", sendRtpItem.getStream(),
logger.info("收到ACKrtp/{}开始级推流, 目标={}:{}SSRC={}, RTCP={}", sendRtpItem.getStream(),
sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp());
Map<String, Object> param = new HashMap<>(12);
param.put("vhost","__defaultVhost__");

View File

@ -2,7 +2,10 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
@ -24,9 +27,10 @@ import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.sip.*;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.address.SipURI;
import javax.sip.header.CallIdHeader;
import javax.sip.header.FromHeader;
import javax.sip.header.HeaderAddress;
import javax.sip.header.ToHeader;
@ -87,13 +91,15 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
@Override
public void process(RequestEvent evt) {
// TODO 此处需要重构
SIPRequest request =(SIPRequest) evt.getRequest();
try {
responseAck((SIPRequest) evt.getRequest(), Response.OK);
responseAck(request, Response.OK);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[回复BYE信息失败]{}", e.getMessage());
}
CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME);
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId());
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, request.getCallIdHeader().getCallId());
if (sendRtpItem != null){
logger.info("[收到bye] {}/{}", sendRtpItem.getPlatformId(), sendRtpItem.getChannelId());
@ -115,7 +121,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
param.put("ssrc",sendRtpItem.getSsrc());
logger.info("[收到bye] 停止推流:{}", streamId);
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), callIdHeader.getCallId(), null);
redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), request.getCallIdHeader().getCallId(), null);
zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
int totalReaderCount = zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId);
@ -159,7 +165,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
}
SsrcTransaction ssrcTransactionForPlay = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, "play", null);
if (ssrcTransactionForPlay != null){
if (ssrcTransactionForPlay.getCallId().equals(callIdHeader.getCallId())){
if (ssrcTransactionForPlay.getCallId().equals(request.getCallIdHeader().getCallId())){
// 释放ssrc
MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransactionForPlay.getMediaServerId());
if (mediaServerItem != null) {
@ -168,7 +174,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
streamSession.remove(device.getDeviceId(), channelId, ssrcTransactionForPlay.getStream());
}
}
SsrcTransaction ssrcTransactionForPlayBack = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, callIdHeader.getCallId(), null);
SsrcTransaction ssrcTransactionForPlayBack = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, request.getCallIdHeader().getCallId(), null);
if (ssrcTransactionForPlayBack != null) {
// 释放ssrc
MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransactionForPlayBack.getMediaServerId());
@ -178,5 +184,32 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
streamSession.remove(device.getDeviceId(), channelId, ssrcTransactionForPlayBack.getStream());
}
}
SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(null, null, request.getCallIdHeader().getCallId(), null);
if (ssrcTransaction != null) {
// 释放ssrc
MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransaction.getMediaServerId());
if (mediaServerItem != null) {
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransaction.getSsrc());
}
switch (ssrcTransaction.getType()) {
// case play:
// break;
// case talk:
// break;
// case playback:
// break;
// case download:
// break;
case broadcast:
String deviceId = ssrcTransaction.getDeviceId();
String channelId1 = ssrcTransaction.getChannelId();
// 如果是
break;
default:
break;
}
streamSession.remove(device.getDeviceId(), channelId, ssrcTransaction.getStream());
}
}
}

View File

@ -102,6 +102,7 @@ public class BroadcastNotifyMessageHandler extends SIPRequestProcessorParent imp
DeviceChannel deviceChannel = storage.queryChannelInParentPlatform(platform.getServerGBId(), targetId);
if (deviceChannel == null) {
logger.warn("[国标级联 语音喊话] 未找到通道 platform: {}, channel: {}", platform.getServerGBId(), targetId);
responseAck(request, Response.NOT_FOUND, "TargetID not found");
return;
}
@ -123,6 +124,7 @@ public class BroadcastNotifyMessageHandler extends SIPRequestProcessorParent imp
commanderForPlatform.broadcastResultCmd(platform, deviceChannel, sn, true, eventResult->{
logger.info("[国标级联] 语音喊话 回复失败 platform {} 错误:{}/{}", platform.getServerGBId(), eventResult.statusCode, eventResult.msg);
}, eventResult->{
// 消息发送成功, 向上级发送invite获取推流
try {
platformService.broadcastInvite(platform, deviceChannel.getChannelId(), mediaServerForMinimumLoad, (mediaServerItem, response)->{
@ -132,7 +134,7 @@ public class BroadcastNotifyMessageHandler extends SIPRequestProcessorParent imp
AudioBroadcastCatch broadcastCatch = audioBroadcastManager.get(device.getDeviceId(), targetId);
if (broadcastCatch != null ) {
if (playService.audioBroadcastInUse(device, targetId)) {
logger.info("[国标级联] 语音喊话 设备正在使用中 platform {} channel: {}",
logger.info("[国标级联] 语音喊话 设备正在使用中 platform {} channel: {}",
platform.getServerGBId(), deviceChannel.getChannelId());
// 查看语音通道已经建立且已经占用 回复BYE
try {

View File

@ -259,11 +259,11 @@ public class ZLMRTPServerFactory {
// 订阅 zlm启动事件, 新的zlm也会从这里进入系统
hookSubscribe.addSubscribe(hookSubscribeForRtpServerTimeout,
(MediaServerItem mediaServerItem, JSONObject response)->{
logger.info("[上级点播] {}->监听端口到期继续保持监听", ssrc);
logger.info("[保持端口] {}->监听端口到期继续保持监听", ssrc);
keepPort(serverItem, ssrc);
});
}
logger.info("[上级点播] {}->监听端口: {}", ssrc, localPort);
logger.info("[保持端口] {}->监听端口: {}", ssrc, localPort);
return localPort;
}
@ -271,7 +271,7 @@ public class ZLMRTPServerFactory {
*
*/
public boolean releasePort(MediaServerItem serverItem, String ssrc) {
logger.info("[上级点播] {}->释放监听端口", ssrc);
logger.info("[保持端口] {}->释放监听端口", ssrc);
boolean closeRTPServerResult = closeRtpServer(serverItem, ssrc);
HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, serverItem.getId());
// 订阅 zlm启动事件, 新的zlm也会从这里进入系统
@ -357,7 +357,7 @@ public class ZLMRTPServerFactory {
public JSONObject startSendRtp(MediaServerItem mediaInfo, SendRtpItem sendRtpItem) {
String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
logger.info("rtp/{}开始向上级推流, 目标={}:{}SSRC={}", sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc());
logger.info("rtp/{}开始推流, 目标={}:{}SSRC={}", sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc());
Map<String, Object> param = new HashMap<>(12);
param.put("vhost","__defaultVhost__");
param.put("app",sendRtpItem.getApp());

View File

@ -373,7 +373,7 @@ public class PlatformServiceImpl implements IPlatformService {
errorEvent.response(new SipSubscribe.EventResult(-1, "端口监听失败"));
return;
}
logger.info("[国标级联] 发起语音喊话 deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验{}",
logger.info("[国标级联] 语音喊话发起Invite消息 deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验{}",
platform.getServerGBId(), channelId, ssrcInfo.getPort(), userSetting.getBroadcastForPlatform(), ssrcInfo.getSsrc(), ssrcCheck);
String timeOutTaskKey = UUID.randomUUID().toString();
@ -396,6 +396,7 @@ public class PlatformServiceImpl implements IPlatformService {
}
}, userSetting.getPlayTimeout());
commanderForPlatform.broadcastInviteCmd(platform, channelId, mediaServerItem, ssrcInfo, (mediaServerItemForInvite, response)->{
logger.info("[国标级联] 发起语音喊话 收到上级推流 deviceId: {}, channelId: {}", platform.getServerGBId(), channelId);
dynamicTask.stop(timeOutTaskKey);
// hook响应
playService.onPublishHandlerForPlay(mediaServerItemForInvite, response, platform.getServerGBId(), channelId);

View File

@ -1183,7 +1183,7 @@ public class PlayServiceImpl implements IPlayService {
String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
logger.info("收到ACKrtp/{}开始向上级推流, 目标={}:{}SSRC={}, RTCP={}", sendRtpItem.getStream(),
logger.info("收到ACKrtp/{}开始推流, 目标={}:{}SSRC={}, RTCP={}", sendRtpItem.getStream(),
sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp());
Map<String, Object> param = new HashMap<>(12);
param.put("vhost", "__defaultVhost__");

View File

@ -181,7 +181,7 @@
style="font-size: 1.875rem;"></i></div>
<div style="position: absolute; left: 7.25rem; top: 3.25rem; font-size: 1.875rem;"
@mousedown="ptzCamera('zoomout')" @mouseup="ptzCamera('stop')"><i
class="el-icon-zoom-out control-zoom-btn"></i></div>
class="el-icon-zoom-out control-zoom-btn"></i></div>
<div class="contro-speed" style="position: absolute; left: 4px; top: 7rem; width: 9rem;">
<el-slider v-model="controSpeed" :max="255"></el-slider>
</div>
@ -300,7 +300,8 @@
</el-tab-pane>
<el-tab-pane label="语音对讲" name="broadcast">
<div style="padding: 0 10px">
<el-switch v-model="broadcastMode" :disabled="broadcastStatus !== -1" active-color="#409EFF" active-text=""
<el-switch v-model="broadcastMode" :disabled="broadcastStatus !== -1" active-color="#409EFF"
active-text="喊话"
inactive-text="对讲"></el-switch>
</div>
<div class="trank" style="text-align: center;">
@ -565,20 +566,20 @@ export default {
this.tracks = []
let _this = this;
this.$copyText(data).then(
function (e) {
_this.$message({
showClose: true,
message: '复制成功',
type: 'success'
});
},
function (e) {
_this.$message({
showClose: true,
message: '复制失败,请手动复制',
type: 'error'
});
}
function (e) {
_this.$message({
showClose: true,
message: '复制成功',
type: 'success'
});
},
function (e) {
_this.$message({
showClose: true,
message: '复制失败,请手动复制',
type: 'error'
});
}
);
},
ptzCamera: function (command) {
@ -654,55 +655,54 @@ export default {
this.$axios({
method: 'get',
url: '/api/play/broadcast/' + this.deviceId + '/' + this.channelId + "?timeout=30&broadcastMode=" + this.broadcastMode
}).then( (res)=> {
}).then((res) => {
if (res.data.code === 0) {
let streamInfo = res.data.data.streamInfo;
if (document.location.protocol.includes("https")) {
this.startBroadcast(streamInfo.rtcs)
}else {
} else {
this.startBroadcast(streamInfo.rtc)
}
}else {
this.$message({
showClose: true,
message: res.data.msg,
type: "error",
});
}
});
}else if (this.broadcastStatus === 1) {
this.broadcastStatus = -1;
this.broadcastRtc.close()
}
},
startBroadcast(url){
// Key
this.$axios({
method: 'post',
url: '/api/user/userInfo',
}).then( (res)=> {
if (res.data.code !== 0) {
this.$message({
showClose: true,
message: "获取推流鉴权Key失败",
type: "error",
});
this.broadcastStatus = -1;
}else {
let pushKey = res.data.data.pushKey;
// KEY
url += "&sign=" + crypto.createHash('md5').update(pushKey, "utf8").digest('hex')
console.log("开始语音喊话: " + url)
this.broadcastRtc = new ZLMRTCClient.Endpoint({
debug: true, //
zlmsdpUrl: url, //
simulecast: false,
useCamera: false,
audioEnable: true,
videoEnable: false,
recvOnly: false,
})
} else {
this.$message({
showClose: true,
message: res.data.msg,
type: "error",
});
}
});
} else if (this.broadcastStatus === 1) {
this.broadcastStatus = -1;
this.broadcastRtc.close()
}
},
startBroadcast(url) {
// Key
this.$axios({
method: 'post',
url: '/api/user/userInfo',
}).then((res) => {
if (res.data.code !== 0) {
this.$message({
showClose: true,
message: "获取推流鉴权Key失败",
type: "error",
});
this.broadcastStatus = -1;
} else {
let pushKey = res.data.data.pushKey;
// KEY
url += "&sign=" + crypto.createHash('md5').update(pushKey, "utf8").digest('hex')
console.log("开始语音喊话: " + url)
this.broadcastRtc = new ZLMRTCClient.Endpoint({
debug: true, //
zlmsdpUrl: url, //
simulecast: false,
useCamera: false,
audioEnable: true,
videoEnable: false,
recvOnly: false,
})
// webrtcPlayer.on(ZLMRTCClient.Events.WEBRTC_ON_REMOTE_STREAMS,(e)=>{//
// console.error('',e.streams)
@ -715,15 +715,15 @@ export default {
// // this.eventcallbacK("LOCAL STREAM", "")
// });
this.broadcastRtc.on(ZLMRTCClient.Events.WEBRTC_NOT_SUPPORT,(e)=>{//
console.error('不支持webrtc',e)
this.$message({
showClose: true,
message: '不支持webrtc, 无法进行语音喊话',
type: 'error'
});
this.broadcastStatus = -1;
});
this.broadcastRtc.on(ZLMRTCClient.Events.WEBRTC_NOT_SUPPORT, (e) => {//
console.error('不支持webrtc', e)
this.$message({
showClose: true,
message: '不支持webrtc, 无法进行语音喊话',
type: 'error'
});
this.broadcastStatus = -1;
});
this.broadcastRtc.on(ZLMRTCClient.Events.WEBRTC_ICE_CANDIDATE_ERROR, (e) => {// ICE
console.error('ICE 协商出错')