From 7fd34cfda51657549667be84369c7f6794eeb6a3 Mon Sep 17 00:00:00 2001 From: wanghui <13282027326@163.com> Date: Fri, 21 Jul 2023 16:20:39 +0800 Subject: [PATCH] =?UTF-8?q?feat:=E6=94=AF=E6=8C=81=E7=BA=A7=E8=81=94tcp?= =?UTF-8?q?=E4=B8=BB=E5=8A=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../request/impl/AckRequestProcessor.java | 5 ++- .../request/impl/InviteRequestProcessor.java | 45 ++++++++++++++++++- .../iot/vmp/media/zlm/ZLMRESTfulUtils.java | 4 ++ .../iot/vmp/media/zlm/ZLMServerFactory.java | 7 +++ 4 files changed, 59 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index 07946eb8..61a60041 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -130,7 +130,10 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In // 开启rtcp保活 param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0"); } - + // tcp主动时,此时是级联下级平台,在回复200ok时,本地已经请求zlm开启监听,跳过下面步骤 + if (sendRtpItem.isTcpActive()) { + return; + } if (mediaInfo == null) { RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance( sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStreamId(), diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index df87e463..397a4a0f 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -1,5 +1,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; @@ -47,6 +49,8 @@ import javax.sip.header.CallIdHeader; import javax.sip.message.Response; import java.text.ParseException; import java.time.Instant; +import java.util.HashMap; +import java.util.Map; import java.util.Random; import java.util.Vector; @@ -396,6 +400,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements content.append("y=" + sendRtpItem.getSsrc() + "\r\n"); content.append("f=\r\n"); + try { // 超时未收到Ack应该回复bye,当前等待时间为10秒 dynamicTask.startDelay(callIdHeader.getCallId(), () -> { @@ -409,7 +414,33 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } }, 60 * 1000); responseSdpAck(request, content.toString(), platform); - + // tcp主动模式,回复sdp后开启监听 + if (sendRtpItem.isTcpActive()) { + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + Map param = new HashMap<>(12); + param.put("vhost","__defaultVhost__"); + param.put("app",sendRtpItem.getApp()); + param.put("stream",sendRtpItem.getStreamId()); + param.put("ssrc", sendRtpItem.getSsrc()); + if (!sendRtpItem.isTcpActive()) { + param.put("dst_url",sendRtpItem.getIp()); + param.put("dst_port", sendRtpItem.getPort()); + } + String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; + param.put("is_udp", is_Udp); + param.put("src_port", localPort); + param.put("pt", sendRtpItem.getPt()); + param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); + param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); + if (!sendRtpItem.isTcp()) { + // 开启rtcp保活 + param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0"); + } + JSONObject startSendRtpStreamResult = zlmServerFactory.startSendRtpStreamForPassive(mediaInfo, param); + if (startSendRtpStreamResult != null) { + startSendRtpStreamHand(evt, sendRtpItem, null, startSendRtpStreamResult, param, callIdHeader); + } + } } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[命令发送失败] 国标级联 回复SdpAck", e); } @@ -543,6 +574,18 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } } + private void startSendRtpStreamHand(RequestEvent evt, SendRtpItem sendRtpItem, ParentPlatform parentPlatform, + JSONObject jsonObject, Map param, CallIdHeader callIdHeader) { + if (jsonObject == null) { + logger.error("下级TCP被动启动监听失败: 请检查ZLM服务"); + } else if (jsonObject.getInteger("code") == 0) { + logger.info("调用ZLM-TCP被动推流接口, 结果: {}", jsonObject); + logger.info("启动监听TCP被动推流成功[ {}/{} ],{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port")); + } else { + logger.error("启动监听TCP被动推流失败: {}, 参数:{}",jsonObject.getString("msg"), JSON.toJSONString(param)); + } + } + /** * 安排推流 */ diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java index 054e5fcc..e58f2aeb 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java @@ -289,6 +289,10 @@ public class ZLMRESTfulUtils { return sendPost(mediaServerItem, "startSendRtp",param, null); } + public JSONObject startSendRtpPassive(MediaServerItem mediaServerItem, Map param) { + return sendPost(mediaServerItem, "startSendRtpPassive",param, null); + } + public JSONObject stopSendRtp(MediaServerItem mediaServerItem, Map param) { return sendPost(mediaServerItem, "stopSendRtp",param, null); } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java index 34acf152..c190de18 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java @@ -231,6 +231,13 @@ public class ZLMServerFactory { return zlmresTfulUtils.startSendRtp(mediaServerItem, param); } + /** + * 调用zlm RESTFUL API —— startSendRtpPassive + */ + public JSONObject startSendRtpStreamForPassive(MediaServerItem mediaServerItem, Mapparam) { + return zlmresTfulUtils.startSendRtpPassive(mediaServerItem, param); + } + /** * 查询待转推的流是否就绪 */