From fc90cd7951600ce5173f71c3e28d78e69b4db4ae Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Mon, 19 Dec 2022 14:20:22 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96tcp=E4=B8=BB=E5=8A=A8?= =?UTF-8?q?=E6=96=B9=E5=BC=8F=E7=9A=84=E8=AF=AD=E9=9F=B3=E5=AF=B9=E8=AE=B2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../genersoft/iot/vmp/conf/UserSetting.java | 10 + .../request/SIPRequestProcessorParent.java | 19 +- .../request/impl/AckRequestProcessor.java | 177 +++++++----------- .../request/impl/InviteRequestProcessor.java | 51 +++-- .../cmd/CatalogResponseMessageHandler.java | 1 + .../vmp/media/zlm/ZLMHttpHookListener.java | 6 +- .../iot/vmp/media/zlm/ZLMRESTfulUtils.java | 2 +- .../iot/vmp/service/IPlayService.java | 12 +- .../iot/vmp/service/impl/PlayServiceImpl.java | 114 ++++++++++- .../impl/VideoManagerStorageImpl.java | 7 - src/main/resources/all-application.yml | 2 + 11 files changed, 247 insertions(+), 154 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java index a2d3054e..57f32451 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java @@ -43,6 +43,8 @@ public class UserSetting { private Boolean syncChannelOnDeviceOnline = Boolean.FALSE; + private Boolean pushStreamAfterAck = Boolean.FALSE; + private String serverId = "000000"; private String thirdPartyGBIdReg = "[\\s\\S]*"; @@ -196,4 +198,12 @@ public class UserSetting { public void setSyncChannelOnDeviceOnline(Boolean syncChannelOnDeviceOnline) { this.syncChannelOnDeviceOnline = syncChannelOnDeviceOnline; } + + public Boolean getPushStreamAfterAck() { + return pushStreamAfterAck; + } + + public void setPushStreamAfterAck(Boolean pushStreamAfterAck) { + this.pushStreamAfterAck = pushStreamAfterAck; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java index ddb51699..f3dfa966 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java @@ -1,14 +1,10 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request; -import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.SIPSender; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; -import gov.nist.javax.sip.SipProviderImpl; import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPResponse; -import gov.nist.javax.sip.stack.SIPServerTransactionImpl; import org.apache.commons.lang3.ArrayUtils; import org.dom4j.Document; import org.dom4j.DocumentException; @@ -17,14 +13,14 @@ import org.dom4j.io.SAXReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.security.core.parameters.P; import javax.sip.*; import javax.sip.address.Address; import javax.sip.address.AddressFactory; import javax.sip.address.SipURI; -import javax.sip.header.*; +import javax.sip.header.ContentTypeHeader; +import javax.sip.header.ExpiresHeader; +import javax.sip.header.HeaderFactory; import javax.sip.message.MessageFactory; import javax.sip.message.Request; import javax.sip.message.Response; @@ -157,7 +153,10 @@ public abstract class SIPRequestProcessorParent { responseAckExtraParam.content = sdp; responseAckExtraParam.sipURI = sipURI; - return responseAck(request, Response.OK, null, responseAckExtraParam); + SIPResponse sipResponse = responseAck(request, Response.OK, null, responseAckExtraParam); + + + return sipResponse; } /** @@ -190,7 +189,8 @@ public abstract class SIPRequestProcessorParent { reader.setEncoding(charset); // 对海康出现的未转义字符做处理。 String[] destStrArray = new String[]{"<",">","&","'","""}; - char despChar = '&'; // 或许可扩展兼容其他字符 + // 或许可扩展兼容其他字符 + char despChar = '&'; byte destBye = (byte) despChar; List result = new ArrayList<>(); byte[] rawContent = request.getRawContent(); @@ -220,4 +220,5 @@ public abstract class SIPRequestProcessorParent { return xml.getRootElement(); } + } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index c5220a9e..68bc38b0 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -1,17 +1,11 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; -import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.conf.DynamicTask; -import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; -import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatch; -import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; -import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; @@ -19,8 +13,8 @@ import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; -import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.service.IPlayService; import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg; import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; @@ -31,15 +25,12 @@ import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; -import javax.sip.SipException; import javax.sip.address.SipURI; import javax.sip.header.CallIdHeader; import javax.sip.header.FromHeader; import javax.sip.header.HeaderAddress; import javax.sip.header.ToHeader; -import java.text.ParseException; import java.util.HashMap; import java.util.Map; @@ -50,7 +41,7 @@ import java.util.Map; @Component public class AckRequestProcessor extends SIPRequestProcessorParent implements InitializingBean, ISIPRequestProcessor { - private Logger logger = LoggerFactory.getLogger(AckRequestProcessor.class); + private final Logger logger = LoggerFactory.getLogger(AckRequestProcessor.class); private final String method = "ACK"; @Autowired @@ -77,32 +68,21 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In @Autowired private IMediaServerService mediaServerService; - @Autowired - private ZlmHttpHookSubscribe subscribe; - @Autowired private DynamicTask dynamicTask; - @Autowired - private ISIPCommander cmder; - - @Autowired - private IDeviceService deviceService; - - @Autowired - private ISIPCommanderForPlatform commanderForPlatform; - - @Autowired - private AudioBroadcastManager audioBroadcastManager; - @Autowired private RedisGbPlayMsgListener redisGbPlayMsgListener; + @Autowired + private UserSetting userSetting; + + @Autowired + private IPlayService playService; + /** * 处理 ACK请求 - * - * @param evt */ @Override public void process(RequestEvent evt) { @@ -110,100 +90,73 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser(); logger.info("[收到ACK]: platformGbId->{}", platformGbId); - ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformGbId); - // 取消设置的超时任务 - dynamicTask.stop(callIdHeader.getCallId()); - String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); - SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId()); - if (sendRtpItem == null) { - logger.warn("[收到ACK]:未找到通道({})的推流信息", channelId); - return; - } - String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; - MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - logger.info("收到ACK,rtp/{}开始向上级推流, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStreamId(), - sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp()); - Map param = new HashMap<>(12); - param.put("vhost","__defaultVhost__"); - param.put("app",sendRtpItem.getApp()); - param.put("stream",sendRtpItem.getStreamId()); - param.put("ssrc", sendRtpItem.getSsrc()); - param.put("src_port", sendRtpItem.getLocalPort()); - param.put("pt", sendRtpItem.getPt()); - param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); - param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); - if (!sendRtpItem.isTcp()) { - // udp模式下开启rtcp保活 - param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0"); - } + if (userSetting.getPushStreamAfterAck()) { + ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformGbId); + // 取消设置的超时任务 + dynamicTask.stop(callIdHeader.getCallId()); + String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId()); + if (sendRtpItem == null) { + logger.warn("[收到ACK]:未找到通道({})的推流信息", channelId); + return; + } + String isUdp = sendRtpItem.isTcp() ? "0" : "1"; + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + logger.info("收到ACK,rtp/{}开始向上级推流, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStreamId(), + sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp()); + Map param = new HashMap<>(12); + param.put("vhost","__defaultVhost__"); + param.put("app",sendRtpItem.getApp()); + param.put("stream",sendRtpItem.getStreamId()); + param.put("ssrc", sendRtpItem.getSsrc()); + param.put("src_port", sendRtpItem.getLocalPort()); + param.put("pt", sendRtpItem.getPt()); + param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); + param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); + param.put("is_udp", isUdp); + if (!sendRtpItem.isTcp()) { + // udp模式下开启rtcp保活 + param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0"); + } - if (mediaInfo == null) { - RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance( - sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStreamId(), - sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(), - sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio()); - redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> { - startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, json, param, callIdHeader); - }); - } else { - // 如果是非严格模式,需要关闭端口占用 - JSONObject startSendRtpStreamResult = null; - if (sendRtpItem.getLocalPort() != 0) { - HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(sendRtpItem.getSsrc(), null, mediaInfo.getId()); - hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout); - if (zlmrtpServerFactory.releasePort(mediaInfo, sendRtpItem.getSsrc())) { + if (mediaInfo == null) { + RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance( + sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStreamId(), + sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(), + sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio()); + redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> { + playService.startSendRtpStreamHand(sendRtpItem, parentPlatform, json, param, callIdHeader); + }); + } else { + // 如果是非严格模式,需要关闭端口占用 + JSONObject startSendRtpStreamResult = null; + if (sendRtpItem.getLocalPort() != 0) { + HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(sendRtpItem.getSsrc(), null, mediaInfo.getId()); + hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout); + if (zlmrtpServerFactory.releasePort(mediaInfo, sendRtpItem.getSsrc())) { + if (sendRtpItem.isTcpActive()) { + startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param); + }else { + param.put("dst_url", sendRtpItem.getIp()); + param.put("dst_port", sendRtpItem.getPort()); + startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); + } + } + }else { if (sendRtpItem.isTcpActive()) { startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param); }else { - param.put("is_udp", is_Udp); param.put("dst_url", sendRtpItem.getIp()); param.put("dst_port", sendRtpItem.getPort()); startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); } } - }else { - if (sendRtpItem.isTcpActive()) { - startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param); - }else { - param.put("is_udp", is_Udp); - param.put("dst_url", sendRtpItem.getIp()); - param.put("dst_port", sendRtpItem.getPort()); - startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); - } - } - if (startSendRtpStreamResult != null) { - startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, startSendRtpStreamResult, param, callIdHeader); - } - } - } - private void startSendRtpStreamHand(RequestEvent evt, SendRtpItem sendRtpItem, ParentPlatform parentPlatform, - JSONObject jsonObject, Map param, CallIdHeader callIdHeader) { - if (jsonObject == null) { - logger.error("RTP推流失败: 请检查ZLM服务"); - } else if (jsonObject.getInteger("code") == 0) { - logger.info("调用ZLM推流接口, 结果: {}", jsonObject); - logger.info("RTP推流成功[ {}/{} ],{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port")); - } else { - logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"), JSON.toJSONString(param)); - if (sendRtpItem.isOnlyAudio()) { - Device device = deviceService.getDevice(sendRtpItem.getDeviceId()); - AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); - if (audioBroadcastCatch != null) { - try { - cmder.streamByeCmd(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null); - } catch (SipException | ParseException | InvalidArgumentException | - SsrcTransactionNotFoundException e) { - logger.error("[命令发送失败] 停止语音对讲: {}", e.getMessage()); - } - } - }else { - // 向上级平台 - try { - commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId()); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); + if (startSendRtpStreamResult != null) { + playService.startSendRtpStreamHand(sendRtpItem, parentPlatform, startSendRtpStreamResult, param, callIdHeader); } } } + } + } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index e31995c3..f33f5df1 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -439,18 +439,23 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements try { // 超时未收到Ack应该回复bye,当前等待时间为10秒 - dynamicTask.startDelay(callIdHeader.getCallId(), () -> { - logger.info("Ack 等待超时"); - mediaServerService.releaseSsrc(mediaServerItemInUSe.getId(), sendRtpItem.getSsrc()); - // 回复bye - try { - cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId()); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); - } - }, 60 * 1000); - responseSdpAck(request, content.toString(), platform); + if (userSetting.getPushStreamAfterAck()) { + dynamicTask.startDelay(callIdHeader.getCallId(), () -> { + logger.info("Ack 等待超时"); + mediaServerService.releaseSsrc(mediaServerItemInUSe.getId(), sendRtpItem.getSsrc()); + // 回复bye + try { + cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId()); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); + } + }, 60 * 1000); + } + SIPResponse sipResponse = responseSdpAck(request, content.toString(), platform); + if (!userSetting.getPushStreamAfterAck()) { + playService.startPushStream(sendRtpItem, sipResponse, platform, request.getCallIdHeader()); + } } catch (SipException e) { e.printStackTrace(); } catch (InvalidArgumentException e) { @@ -878,7 +883,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements content.append("f=\r\n"); try { - return responseSdpAck(request, content.toString(), platform); + SIPResponse sipResponse = responseSdpAck(request, content.toString(), platform); + if (!userSetting.getPushStreamAfterAck()) { + playService.startPushStream(sendRtpItem, sipResponse, platform, request.getCallIdHeader()); + } + return sipResponse; } catch (SipException e) { e.printStackTrace(); } catch (InvalidArgumentException e) { @@ -968,7 +977,8 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements return; } String addressStr = sdp.getOrigin().getAddress(); - logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}", requesterId, addressStr, port, ssrc); + logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}, {}", requesterId, addressStr, port, ssrc, + mediaTransmissionTCP ? (tcpActive? "TCP主动":"TCP被动") : "UDP"); MediaServerItem mediaServerItem = playService.getNewMediaServerItem(device); if (mediaServerItem == null) { @@ -993,10 +1003,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } return; } - sendRtpItem.setTcp(mediaTransmissionTCP); - if (tcpActive != null) { - sendRtpItem.setTcpActive(tcpActive); - } + String app = "broadcast"; String stream = device.getDeviceId() + "_" + audioBroadcastCatch.getChannelId(); @@ -1011,6 +1018,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements sendRtpItem.setUsePs(false); sendRtpItem.setRtcp(false); sendRtpItem.setOnlyAudio(true); + sendRtpItem.setTcp(mediaTransmissionTCP); + if (tcpActive != null) { + sendRtpItem.setTcpActive(tcpActive); + } + redisCatchStorage.updateSendRTPSever(sendRtpItem); Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, app, stream); @@ -1083,6 +1095,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements audioBroadcastCatch.setSipTransactionInfoByRequset(sipResponse); audioBroadcastManager.update(audioBroadcastCatch); + // 开启发流,大华在收到200OK后就会开始建立连接 + if (!userSetting.getPushStreamAfterAck()) { + playService.startPushStream(sendRtpItem, sipResponse, parentPlatform, request.getCallIdHeader()); + } + } catch (SipException | InvalidArgumentException | ParseException | SdpParseException e) { logger.error("[命令发送失败] 语音对讲 回复200OK(SDP): {}", e.getMessage()); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java index 761481be..2129ee12 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java @@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; public class CatalogResponseMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler { private Logger logger = LoggerFactory.getLogger(CatalogResponseMessageHandler.class); + private final String cmdType = "Catalog"; @Autowired diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 88d7e141..6f40ef8c 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -319,7 +319,7 @@ public class ZLMHttpHookListener { }); if ("rtsp".equals(param.getSchema())){ - logger.info("on_stream_changed:注册->{}, app->{}, stream->{}", param.isRegist(), param.getApp(), param.getStream()); + logger.info("流变化:注册->{}, app->{}, stream->{}", param.isRegist(), param.getApp(), param.getStream()); if (param.isRegist()) { mediaServerService.addCount(param.getMediaServerId()); }else { @@ -399,7 +399,11 @@ public class ZLMHttpHookListener { } } + }else { + logger.info("[语音对讲] 未找到通道:{}", channelId); } + }else{ + logger.info("[语音对讲] 未找到设备:{}", deviceId); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java index 3eb61f54..fc7d90af 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java @@ -36,7 +36,7 @@ public class ZLMRESTfulUtils { // 设置连接超时时间 httpClientBuilder.connectTimeout(5,TimeUnit.SECONDS); // 设置读取超时时间 - httpClientBuilder.readTimeout(5,TimeUnit.SECONDS); + httpClientBuilder.readTimeout(15,TimeUnit.SECONDS); // 设置连接池 httpClientBuilder.connectionPool(new ConnectionPool(16, 5, TimeUnit.MINUTES)); if (logger.isDebugEnabled()) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java index 4ab2f4a5..39c5ffad 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java @@ -3,9 +3,7 @@ package com.genersoft.iot.vmp.service; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.exception.ServiceException; -import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.bean.InviteStreamCallback; -import com.genersoft.iot.vmp.gb28181.bean.InviteStreamInfo; +import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; @@ -15,11 +13,14 @@ import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult; import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioBroadcastEvent; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; +import gov.nist.javax.sip.message.SIPResponse; import org.springframework.web.context.request.async.DeferredResult; import javax.sip.InvalidArgumentException; import javax.sip.SipException; +import javax.sip.header.CallIdHeader; import java.text.ParseException; +import java.util.Map; /** * 点播处理 @@ -61,4 +62,9 @@ public interface IPlayService { void pauseRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException; void resumeRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException; + + void startPushStream(SendRtpItem sendRtpItem, SIPResponse sipResponse, ParentPlatform platform, CallIdHeader callIdHeader); + + void startSendRtpStreamHand(SendRtpItem sendRtpItem, ParentPlatform parentPlatform, + JSONObject jsonObject, Map param, CallIdHeader callIdHeader); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 32f0364f..d4328a7c 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -24,16 +24,15 @@ import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IMediaService; import com.genersoft.iot.vmp.service.IPlayService; -import com.genersoft.iot.vmp.service.bean.InviteTimeOutCallback; -import com.genersoft.iot.vmp.service.bean.PlayBackCallback; -import com.genersoft.iot.vmp.service.bean.PlayBackResult; -import com.genersoft.iot.vmp.service.bean.SSRCInfo; +import com.genersoft.iot.vmp.service.bean.*; +import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.DateUtil; @@ -42,6 +41,7 @@ import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioBroadcastEvent; +import gov.nist.javax.sip.message.SIPResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -54,6 +54,7 @@ import org.springframework.web.context.request.async.DeferredResult; import javax.sip.InvalidArgumentException; import javax.sip.ResponseEvent; import javax.sip.SipException; +import javax.sip.header.CallIdHeader; import java.math.BigDecimal; import java.math.RoundingMode; import java.text.ParseException; @@ -119,11 +120,20 @@ public class PlayServiceImpl implements IPlayService { @Autowired private ZlmHttpHookSubscribe subscribe; + @Autowired + private ISIPCommanderForPlatform commanderForPlatform; + @Qualifier("taskExecutor") @Autowired private ThreadPoolTaskExecutor taskExecutor; + @Autowired + private RedisGbPlayMsgListener redisGbPlayMsgListener; + + @Autowired + private ZlmHttpHookSubscribe hookSubscribe; + @Override public void play(MediaServerItem mediaServerItem, String deviceId, String channelId, @@ -1179,4 +1189,100 @@ public class PlayServiceImpl implements IPlayService { Device device = storager.queryVideoDevice(streamInfo.getDeviceID()); cmder.playResumeCmd(device, streamInfo); } + + @Override + public void startPushStream(SendRtpItem sendRtpItem, SIPResponse sipResponse, ParentPlatform platform, CallIdHeader callIdHeader) { + + // 开始发流 + // 取消设置的超时任务 +// String channelId = request.getCallIdHeader().getCallId(); + + String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + logger.info("收到ACK,rtp/{}开始向上级推流, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStreamId(), + sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp()); + Map param = new HashMap<>(12); + param.put("vhost","__defaultVhost__"); + param.put("app",sendRtpItem.getApp()); + param.put("stream",sendRtpItem.getStreamId()); + param.put("ssrc", sendRtpItem.getSsrc()); + param.put("src_port", sendRtpItem.getLocalPort()); + param.put("pt", sendRtpItem.getPt()); + param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); + param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); + param.put("is_udp", is_Udp); + if (!sendRtpItem.isTcp()) { + // udp模式下开启rtcp保活 + param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0"); + } + + if (mediaInfo == null) { + RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance( + sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStreamId(), + sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(), + sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio()); + redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> { + startSendRtpStreamHand(sendRtpItem, platform, json, param, callIdHeader); + }); + } else { + // 如果是非严格模式,需要关闭端口占用 + JSONObject startSendRtpStreamResult = null; + if (sendRtpItem.getLocalPort() != 0) { + HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(sendRtpItem.getSsrc(), null, mediaInfo.getId()); + hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout); + if (zlmrtpServerFactory.releasePort(mediaInfo, sendRtpItem.getSsrc())) { + if (sendRtpItem.isTcpActive()) { + startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param); + }else { + param.put("dst_url", sendRtpItem.getIp()); + param.put("dst_port", sendRtpItem.getPort()); + startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); + } + } + }else { + if (sendRtpItem.isTcpActive()) { + startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param); + }else { + param.put("dst_url", sendRtpItem.getIp()); + param.put("dst_port", sendRtpItem.getPort()); + startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); + } + } + if (startSendRtpStreamResult != null) { + startSendRtpStreamHand(sendRtpItem, platform, startSendRtpStreamResult, param, callIdHeader); + } + } + } + + @Override + public void startSendRtpStreamHand(SendRtpItem sendRtpItem, ParentPlatform parentPlatform, + JSONObject jsonObject, Map param, CallIdHeader callIdHeader) { + if (jsonObject == null) { + logger.error("RTP推流失败: 请检查ZLM服务"); + } else if (jsonObject.getInteger("code") == 0) { + logger.info("调用ZLM推流接口, 结果: {}", jsonObject); + logger.info("RTP推流成功[ {}/{} ],{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port")); + } else { + logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"), JSON.toJSONString(param)); + if (sendRtpItem.isOnlyAudio()) { + Device device = deviceService.getDevice(sendRtpItem.getDeviceId()); + AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + if (audioBroadcastCatch != null) { + try { + cmder.streamByeCmd(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null); + } catch (SipException | ParseException | InvalidArgumentException | + SsrcTransactionNotFoundException e) { + logger.error("[命令发送失败] 停止语音对讲: {}", e.getMessage()); + } + } + }else { + // 向上级平台 + try { + commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId()); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); + } + } + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java index 377b20f2..e8997cba 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java @@ -6,7 +6,6 @@ import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; -import com.genersoft.iot.vmp.service.IGbStreamService; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; @@ -90,12 +89,6 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage { @Autowired private PlatformGbStreamMapper platformGbStreamMapper; - @Autowired - private IGbStreamService gbStreamService; - - @Autowired - private ParentPlatformMapper parentPlatformMapper; - /** * 根据设备ID判断设备是否存在 * diff --git a/src/main/resources/all-application.yml b/src/main/resources/all-application.yml index ba150fb9..bd2038d3 100644 --- a/src/main/resources/all-application.yml +++ b/src/main/resources/all-application.yml @@ -195,6 +195,8 @@ user-settings: gb-send-stream-strict: false # 设备上线时是否自动同步通道 sync-channel-on-device-online: false + # 收到ack消息后开始发流,默认false, 回复200ok后直接开始发流 + push-stream-after-ack: false # 关闭在线文档(生产环境建议关闭) springdoc: