From fc90cd7951600ce5173f71c3e28d78e69b4db4ae Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Mon, 19 Dec 2022 14:20:22 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E4=BC=98=E5=8C=96tcp=E4=B8=BB=E5=8A=A8?= =?UTF-8?q?=E6=96=B9=E5=BC=8F=E7=9A=84=E8=AF=AD=E9=9F=B3=E5=AF=B9=E8=AE=B2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../genersoft/iot/vmp/conf/UserSetting.java | 10 + .../request/SIPRequestProcessorParent.java | 19 +- .../request/impl/AckRequestProcessor.java | 177 +++++++----------- .../request/impl/InviteRequestProcessor.java | 51 +++-- .../cmd/CatalogResponseMessageHandler.java | 1 + .../vmp/media/zlm/ZLMHttpHookListener.java | 6 +- .../iot/vmp/media/zlm/ZLMRESTfulUtils.java | 2 +- .../iot/vmp/service/IPlayService.java | 12 +- .../iot/vmp/service/impl/PlayServiceImpl.java | 114 ++++++++++- .../impl/VideoManagerStorageImpl.java | 7 - src/main/resources/all-application.yml | 2 + 11 files changed, 247 insertions(+), 154 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java index a2d3054e..57f32451 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java @@ -43,6 +43,8 @@ public class UserSetting { private Boolean syncChannelOnDeviceOnline = Boolean.FALSE; + private Boolean pushStreamAfterAck = Boolean.FALSE; + private String serverId = "000000"; private String thirdPartyGBIdReg = "[\\s\\S]*"; @@ -196,4 +198,12 @@ public class UserSetting { public void setSyncChannelOnDeviceOnline(Boolean syncChannelOnDeviceOnline) { this.syncChannelOnDeviceOnline = syncChannelOnDeviceOnline; } + + public Boolean getPushStreamAfterAck() { + return pushStreamAfterAck; + } + + public void setPushStreamAfterAck(Boolean pushStreamAfterAck) { + this.pushStreamAfterAck = pushStreamAfterAck; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java index ddb51699..f3dfa966 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java @@ -1,14 +1,10 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request; -import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.SIPSender; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; -import gov.nist.javax.sip.SipProviderImpl; import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPResponse; -import gov.nist.javax.sip.stack.SIPServerTransactionImpl; import org.apache.commons.lang3.ArrayUtils; import org.dom4j.Document; import org.dom4j.DocumentException; @@ -17,14 +13,14 @@ import org.dom4j.io.SAXReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.security.core.parameters.P; import javax.sip.*; import javax.sip.address.Address; import javax.sip.address.AddressFactory; import javax.sip.address.SipURI; -import javax.sip.header.*; +import javax.sip.header.ContentTypeHeader; +import javax.sip.header.ExpiresHeader; +import javax.sip.header.HeaderFactory; import javax.sip.message.MessageFactory; import javax.sip.message.Request; import javax.sip.message.Response; @@ -157,7 +153,10 @@ public abstract class SIPRequestProcessorParent { responseAckExtraParam.content = sdp; responseAckExtraParam.sipURI = sipURI; - return responseAck(request, Response.OK, null, responseAckExtraParam); + SIPResponse sipResponse = responseAck(request, Response.OK, null, responseAckExtraParam); + + + return sipResponse; } /** @@ -190,7 +189,8 @@ public abstract class SIPRequestProcessorParent { reader.setEncoding(charset); // 对海康出现的未转义字符做处理。 String[] destStrArray = new String[]{"<",">","&","'","""}; - char despChar = '&'; // 或许可扩展兼容其他字符 + // 或许可扩展兼容其他字符 + char despChar = '&'; byte destBye = (byte) despChar; List result = new ArrayList<>(); byte[] rawContent = request.getRawContent(); @@ -220,4 +220,5 @@ public abstract class SIPRequestProcessorParent { return xml.getRootElement(); } + } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index c5220a9e..68bc38b0 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -1,17 +1,11 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; -import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.conf.DynamicTask; -import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; -import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatch; -import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; -import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; @@ -19,8 +13,8 @@ import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; -import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.service.IPlayService; import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg; import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; @@ -31,15 +25,12 @@ import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; -import javax.sip.SipException; import javax.sip.address.SipURI; import javax.sip.header.CallIdHeader; import javax.sip.header.FromHeader; import javax.sip.header.HeaderAddress; import javax.sip.header.ToHeader; -import java.text.ParseException; import java.util.HashMap; import java.util.Map; @@ -50,7 +41,7 @@ import java.util.Map; @Component public class AckRequestProcessor extends SIPRequestProcessorParent implements InitializingBean, ISIPRequestProcessor { - private Logger logger = LoggerFactory.getLogger(AckRequestProcessor.class); + private final Logger logger = LoggerFactory.getLogger(AckRequestProcessor.class); private final String method = "ACK"; @Autowired @@ -77,32 +68,21 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In @Autowired private IMediaServerService mediaServerService; - @Autowired - private ZlmHttpHookSubscribe subscribe; - @Autowired private DynamicTask dynamicTask; - @Autowired - private ISIPCommander cmder; - - @Autowired - private IDeviceService deviceService; - - @Autowired - private ISIPCommanderForPlatform commanderForPlatform; - - @Autowired - private AudioBroadcastManager audioBroadcastManager; - @Autowired private RedisGbPlayMsgListener redisGbPlayMsgListener; + @Autowired + private UserSetting userSetting; + + @Autowired + private IPlayService playService; + /** * 处理 ACK请求 - * - * @param evt */ @Override public void process(RequestEvent evt) { @@ -110,100 +90,73 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser(); logger.info("[收到ACK]: platformGbId->{}", platformGbId); - ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformGbId); - // 取消设置的超时任务 - dynamicTask.stop(callIdHeader.getCallId()); - String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); - SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId()); - if (sendRtpItem == null) { - logger.warn("[收到ACK]:未找到通道({})的推流信息", channelId); - return; - } - String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; - MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - logger.info("收到ACK,rtp/{}开始向上级推流, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStreamId(), - sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp()); - Map param = new HashMap<>(12); - param.put("vhost","__defaultVhost__"); - param.put("app",sendRtpItem.getApp()); - param.put("stream",sendRtpItem.getStreamId()); - param.put("ssrc", sendRtpItem.getSsrc()); - param.put("src_port", sendRtpItem.getLocalPort()); - param.put("pt", sendRtpItem.getPt()); - param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); - param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); - if (!sendRtpItem.isTcp()) { - // udp模式下开启rtcp保活 - param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0"); - } + if (userSetting.getPushStreamAfterAck()) { + ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformGbId); + // 取消设置的超时任务 + dynamicTask.stop(callIdHeader.getCallId()); + String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId()); + if (sendRtpItem == null) { + logger.warn("[收到ACK]:未找到通道({})的推流信息", channelId); + return; + } + String isUdp = sendRtpItem.isTcp() ? "0" : "1"; + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + logger.info("收到ACK,rtp/{}开始向上级推流, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStreamId(), + sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp()); + Map param = new HashMap<>(12); + param.put("vhost","__defaultVhost__"); + param.put("app",sendRtpItem.getApp()); + param.put("stream",sendRtpItem.getStreamId()); + param.put("ssrc", sendRtpItem.getSsrc()); + param.put("src_port", sendRtpItem.getLocalPort()); + param.put("pt", sendRtpItem.getPt()); + param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); + param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); + param.put("is_udp", isUdp); + if (!sendRtpItem.isTcp()) { + // udp模式下开启rtcp保活 + param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0"); + } - if (mediaInfo == null) { - RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance( - sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStreamId(), - sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(), - sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio()); - redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> { - startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, json, param, callIdHeader); - }); - } else { - // 如果是非严格模式,需要关闭端口占用 - JSONObject startSendRtpStreamResult = null; - if (sendRtpItem.getLocalPort() != 0) { - HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(sendRtpItem.getSsrc(), null, mediaInfo.getId()); - hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout); - if (zlmrtpServerFactory.releasePort(mediaInfo, sendRtpItem.getSsrc())) { + if (mediaInfo == null) { + RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance( + sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStreamId(), + sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(), + sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio()); + redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> { + playService.startSendRtpStreamHand(sendRtpItem, parentPlatform, json, param, callIdHeader); + }); + } else { + // 如果是非严格模式,需要关闭端口占用 + JSONObject startSendRtpStreamResult = null; + if (sendRtpItem.getLocalPort() != 0) { + HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(sendRtpItem.getSsrc(), null, mediaInfo.getId()); + hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout); + if (zlmrtpServerFactory.releasePort(mediaInfo, sendRtpItem.getSsrc())) { + if (sendRtpItem.isTcpActive()) { + startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param); + }else { + param.put("dst_url", sendRtpItem.getIp()); + param.put("dst_port", sendRtpItem.getPort()); + startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); + } + } + }else { if (sendRtpItem.isTcpActive()) { startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param); }else { - param.put("is_udp", is_Udp); param.put("dst_url", sendRtpItem.getIp()); param.put("dst_port", sendRtpItem.getPort()); startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); } } - }else { - if (sendRtpItem.isTcpActive()) { - startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param); - }else { - param.put("is_udp", is_Udp); - param.put("dst_url", sendRtpItem.getIp()); - param.put("dst_port", sendRtpItem.getPort()); - startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); - } - } - if (startSendRtpStreamResult != null) { - startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, startSendRtpStreamResult, param, callIdHeader); - } - } - } - private void startSendRtpStreamHand(RequestEvent evt, SendRtpItem sendRtpItem, ParentPlatform parentPlatform, - JSONObject jsonObject, Map param, CallIdHeader callIdHeader) { - if (jsonObject == null) { - logger.error("RTP推流失败: 请检查ZLM服务"); - } else if (jsonObject.getInteger("code") == 0) { - logger.info("调用ZLM推流接口, 结果: {}", jsonObject); - logger.info("RTP推流成功[ {}/{} ],{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port")); - } else { - logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"), JSON.toJSONString(param)); - if (sendRtpItem.isOnlyAudio()) { - Device device = deviceService.getDevice(sendRtpItem.getDeviceId()); - AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); - if (audioBroadcastCatch != null) { - try { - cmder.streamByeCmd(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null); - } catch (SipException | ParseException | InvalidArgumentException | - SsrcTransactionNotFoundException e) { - logger.error("[命令发送失败] 停止语音对讲: {}", e.getMessage()); - } - } - }else { - // 向上级平台 - try { - commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId()); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); + if (startSendRtpStreamResult != null) { + playService.startSendRtpStreamHand(sendRtpItem, parentPlatform, startSendRtpStreamResult, param, callIdHeader); } } } + } + } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index e31995c3..f33f5df1 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -439,18 +439,23 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements try { // 超时未收到Ack应该回复bye,当前等待时间为10秒 - dynamicTask.startDelay(callIdHeader.getCallId(), () -> { - logger.info("Ack 等待超时"); - mediaServerService.releaseSsrc(mediaServerItemInUSe.getId(), sendRtpItem.getSsrc()); - // 回复bye - try { - cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId()); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); - } - }, 60 * 1000); - responseSdpAck(request, content.toString(), platform); + if (userSetting.getPushStreamAfterAck()) { + dynamicTask.startDelay(callIdHeader.getCallId(), () -> { + logger.info("Ack 等待超时"); + mediaServerService.releaseSsrc(mediaServerItemInUSe.getId(), sendRtpItem.getSsrc()); + // 回复bye + try { + cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId()); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); + } + }, 60 * 1000); + } + SIPResponse sipResponse = responseSdpAck(request, content.toString(), platform); + if (!userSetting.getPushStreamAfterAck()) { + playService.startPushStream(sendRtpItem, sipResponse, platform, request.getCallIdHeader()); + } } catch (SipException e) { e.printStackTrace(); } catch (InvalidArgumentException e) { @@ -878,7 +883,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements content.append("f=\r\n"); try { - return responseSdpAck(request, content.toString(), platform); + SIPResponse sipResponse = responseSdpAck(request, content.toString(), platform); + if (!userSetting.getPushStreamAfterAck()) { + playService.startPushStream(sendRtpItem, sipResponse, platform, request.getCallIdHeader()); + } + return sipResponse; } catch (SipException e) { e.printStackTrace(); } catch (InvalidArgumentException e) { @@ -968,7 +977,8 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements return; } String addressStr = sdp.getOrigin().getAddress(); - logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}", requesterId, addressStr, port, ssrc); + logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}, {}", requesterId, addressStr, port, ssrc, + mediaTransmissionTCP ? (tcpActive? "TCP主动":"TCP被动") : "UDP"); MediaServerItem mediaServerItem = playService.getNewMediaServerItem(device); if (mediaServerItem == null) { @@ -993,10 +1003,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } return; } - sendRtpItem.setTcp(mediaTransmissionTCP); - if (tcpActive != null) { - sendRtpItem.setTcpActive(tcpActive); - } + String app = "broadcast"; String stream = device.getDeviceId() + "_" + audioBroadcastCatch.getChannelId(); @@ -1011,6 +1018,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements sendRtpItem.setUsePs(false); sendRtpItem.setRtcp(false); sendRtpItem.setOnlyAudio(true); + sendRtpItem.setTcp(mediaTransmissionTCP); + if (tcpActive != null) { + sendRtpItem.setTcpActive(tcpActive); + } + redisCatchStorage.updateSendRTPSever(sendRtpItem); Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, app, stream); @@ -1083,6 +1095,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements audioBroadcastCatch.setSipTransactionInfoByRequset(sipResponse); audioBroadcastManager.update(audioBroadcastCatch); + // 开启发流,大华在收到200OK后就会开始建立连接 + if (!userSetting.getPushStreamAfterAck()) { + playService.startPushStream(sendRtpItem, sipResponse, parentPlatform, request.getCallIdHeader()); + } + } catch (SipException | InvalidArgumentException | ParseException | SdpParseException e) { logger.error("[命令发送失败] 语音对讲 回复200OK(SDP): {}", e.getMessage()); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java index 761481be..2129ee12 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java @@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; public class CatalogResponseMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler { private Logger logger = LoggerFactory.getLogger(CatalogResponseMessageHandler.class); + private final String cmdType = "Catalog"; @Autowired diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 88d7e141..6f40ef8c 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -319,7 +319,7 @@ public class ZLMHttpHookListener { }); if ("rtsp".equals(param.getSchema())){ - logger.info("on_stream_changed:注册->{}, app->{}, stream->{}", param.isRegist(), param.getApp(), param.getStream()); + logger.info("流变化:注册->{}, app->{}, stream->{}", param.isRegist(), param.getApp(), param.getStream()); if (param.isRegist()) { mediaServerService.addCount(param.getMediaServerId()); }else { @@ -399,7 +399,11 @@ public class ZLMHttpHookListener { } } + }else { + logger.info("[语音对讲] 未找到通道:{}", channelId); } + }else{ + logger.info("[语音对讲] 未找到设备:{}", deviceId); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java index 3eb61f54..fc7d90af 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java @@ -36,7 +36,7 @@ public class ZLMRESTfulUtils { // 设置连接超时时间 httpClientBuilder.connectTimeout(5,TimeUnit.SECONDS); // 设置读取超时时间 - httpClientBuilder.readTimeout(5,TimeUnit.SECONDS); + httpClientBuilder.readTimeout(15,TimeUnit.SECONDS); // 设置连接池 httpClientBuilder.connectionPool(new ConnectionPool(16, 5, TimeUnit.MINUTES)); if (logger.isDebugEnabled()) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java index 4ab2f4a5..39c5ffad 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java @@ -3,9 +3,7 @@ package com.genersoft.iot.vmp.service; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.exception.ServiceException; -import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.bean.InviteStreamCallback; -import com.genersoft.iot.vmp.gb28181.bean.InviteStreamInfo; +import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; @@ -15,11 +13,14 @@ import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult; import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioBroadcastEvent; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; +import gov.nist.javax.sip.message.SIPResponse; import org.springframework.web.context.request.async.DeferredResult; import javax.sip.InvalidArgumentException; import javax.sip.SipException; +import javax.sip.header.CallIdHeader; import java.text.ParseException; +import java.util.Map; /** * 点播处理 @@ -61,4 +62,9 @@ public interface IPlayService { void pauseRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException; void resumeRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException; + + void startPushStream(SendRtpItem sendRtpItem, SIPResponse sipResponse, ParentPlatform platform, CallIdHeader callIdHeader); + + void startSendRtpStreamHand(SendRtpItem sendRtpItem, ParentPlatform parentPlatform, + JSONObject jsonObject, Map param, CallIdHeader callIdHeader); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 32f0364f..d4328a7c 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -24,16 +24,15 @@ import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IMediaService; import com.genersoft.iot.vmp.service.IPlayService; -import com.genersoft.iot.vmp.service.bean.InviteTimeOutCallback; -import com.genersoft.iot.vmp.service.bean.PlayBackCallback; -import com.genersoft.iot.vmp.service.bean.PlayBackResult; -import com.genersoft.iot.vmp.service.bean.SSRCInfo; +import com.genersoft.iot.vmp.service.bean.*; +import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.DateUtil; @@ -42,6 +41,7 @@ import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioBroadcastEvent; +import gov.nist.javax.sip.message.SIPResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -54,6 +54,7 @@ import org.springframework.web.context.request.async.DeferredResult; import javax.sip.InvalidArgumentException; import javax.sip.ResponseEvent; import javax.sip.SipException; +import javax.sip.header.CallIdHeader; import java.math.BigDecimal; import java.math.RoundingMode; import java.text.ParseException; @@ -119,11 +120,20 @@ public class PlayServiceImpl implements IPlayService { @Autowired private ZlmHttpHookSubscribe subscribe; + @Autowired + private ISIPCommanderForPlatform commanderForPlatform; + @Qualifier("taskExecutor") @Autowired private ThreadPoolTaskExecutor taskExecutor; + @Autowired + private RedisGbPlayMsgListener redisGbPlayMsgListener; + + @Autowired + private ZlmHttpHookSubscribe hookSubscribe; + @Override public void play(MediaServerItem mediaServerItem, String deviceId, String channelId, @@ -1179,4 +1189,100 @@ public class PlayServiceImpl implements IPlayService { Device device = storager.queryVideoDevice(streamInfo.getDeviceID()); cmder.playResumeCmd(device, streamInfo); } + + @Override + public void startPushStream(SendRtpItem sendRtpItem, SIPResponse sipResponse, ParentPlatform platform, CallIdHeader callIdHeader) { + + // 开始发流 + // 取消设置的超时任务 +// String channelId = request.getCallIdHeader().getCallId(); + + String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + logger.info("收到ACK,rtp/{}开始向上级推流, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStreamId(), + sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp()); + Map param = new HashMap<>(12); + param.put("vhost","__defaultVhost__"); + param.put("app",sendRtpItem.getApp()); + param.put("stream",sendRtpItem.getStreamId()); + param.put("ssrc", sendRtpItem.getSsrc()); + param.put("src_port", sendRtpItem.getLocalPort()); + param.put("pt", sendRtpItem.getPt()); + param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); + param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); + param.put("is_udp", is_Udp); + if (!sendRtpItem.isTcp()) { + // udp模式下开启rtcp保活 + param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0"); + } + + if (mediaInfo == null) { + RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance( + sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStreamId(), + sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(), + sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio()); + redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> { + startSendRtpStreamHand(sendRtpItem, platform, json, param, callIdHeader); + }); + } else { + // 如果是非严格模式,需要关闭端口占用 + JSONObject startSendRtpStreamResult = null; + if (sendRtpItem.getLocalPort() != 0) { + HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(sendRtpItem.getSsrc(), null, mediaInfo.getId()); + hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout); + if (zlmrtpServerFactory.releasePort(mediaInfo, sendRtpItem.getSsrc())) { + if (sendRtpItem.isTcpActive()) { + startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param); + }else { + param.put("dst_url", sendRtpItem.getIp()); + param.put("dst_port", sendRtpItem.getPort()); + startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); + } + } + }else { + if (sendRtpItem.isTcpActive()) { + startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param); + }else { + param.put("dst_url", sendRtpItem.getIp()); + param.put("dst_port", sendRtpItem.getPort()); + startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); + } + } + if (startSendRtpStreamResult != null) { + startSendRtpStreamHand(sendRtpItem, platform, startSendRtpStreamResult, param, callIdHeader); + } + } + } + + @Override + public void startSendRtpStreamHand(SendRtpItem sendRtpItem, ParentPlatform parentPlatform, + JSONObject jsonObject, Map param, CallIdHeader callIdHeader) { + if (jsonObject == null) { + logger.error("RTP推流失败: 请检查ZLM服务"); + } else if (jsonObject.getInteger("code") == 0) { + logger.info("调用ZLM推流接口, 结果: {}", jsonObject); + logger.info("RTP推流成功[ {}/{} ],{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port")); + } else { + logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"), JSON.toJSONString(param)); + if (sendRtpItem.isOnlyAudio()) { + Device device = deviceService.getDevice(sendRtpItem.getDeviceId()); + AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + if (audioBroadcastCatch != null) { + try { + cmder.streamByeCmd(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null); + } catch (SipException | ParseException | InvalidArgumentException | + SsrcTransactionNotFoundException e) { + logger.error("[命令发送失败] 停止语音对讲: {}", e.getMessage()); + } + } + }else { + // 向上级平台 + try { + commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId()); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); + } + } + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java index 377b20f2..e8997cba 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java @@ -6,7 +6,6 @@ import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; -import com.genersoft.iot.vmp.service.IGbStreamService; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; @@ -90,12 +89,6 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage { @Autowired private PlatformGbStreamMapper platformGbStreamMapper; - @Autowired - private IGbStreamService gbStreamService; - - @Autowired - private ParentPlatformMapper parentPlatformMapper; - /** * 根据设备ID判断设备是否存在 * diff --git a/src/main/resources/all-application.yml b/src/main/resources/all-application.yml index ba150fb9..bd2038d3 100644 --- a/src/main/resources/all-application.yml +++ b/src/main/resources/all-application.yml @@ -195,6 +195,8 @@ user-settings: gb-send-stream-strict: false # 设备上线时是否自动同步通道 sync-channel-on-device-online: false + # 收到ack消息后开始发流,默认false, 回复200ok后直接开始发流 + push-stream-after-ack: false # 关闭在线文档(生产环境建议关闭) springdoc: From 28f4f688dd37cbce7f7e2ff4a939d0701f2a8bbe Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Mon, 19 Dec 2022 15:39:37 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=E4=BC=98=E5=8C=96=E8=AF=AD=E9=9F=B3?= =?UTF-8?q?=E9=81=87=E5=88=B0=E9=94=99=E8=AF=AF=E6=97=B6=E4=B8=BB=E5=8A=A8?= =?UTF-8?q?=E7=BB=88=E6=AD=A2=E5=AF=B9=E8=AE=B2=E6=B5=81=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/common/VideoManagerConstants.java | 1 + .../session/AudioBroadcastManager.java | 4 -- .../callback/DeferredResultHolder.java | 2 - .../request/impl/InviteRequestProcessor.java | 13 ++++- .../cmd/BroadcastResponseMessageHandler.java | 52 ++++++++++--------- .../iot/vmp/service/impl/PlayServiceImpl.java | 5 +- 6 files changed, 42 insertions(+), 35 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java index 47e51baa..3db903bb 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java +++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java @@ -69,6 +69,7 @@ public class VideoManagerConstants { public static final String SYSTEM_INFO_NET_PREFIX = "VMP_SYSTEM_INFO_NET_"; public static final String SYSTEM_INFO_DISK_PREFIX = "VMP_SYSTEM_INFO_DISK_"; + public static final String BROADCAST_WAITE_INVITE = "task_broadcast_waite_invite_"; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java index 7186fad7..072d0cbc 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java @@ -23,10 +23,6 @@ public class AudioBroadcastManager { public static Map data = new ConcurrentHashMap<>(); - public void add(AudioBroadcastCatch audioBroadcastCatch) { - this.update(audioBroadcastCatch); - } - public void update(AudioBroadcastCatch audioBroadcastCatch) { if (SipUtils.isFrontEnd(audioBroadcastCatch.getDeviceId())) { data.put(audioBroadcastCatch.getDeviceId(), audioBroadcastCatch); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java index a351445e..43d41865 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java @@ -49,8 +49,6 @@ public class DeferredResultHolder { public static final String CALLBACK_CMD_ALARM = "CALLBACK_ALARM"; - public static final String CALLBACK_CMD_BROADCAST = "CALLBACK_BROADCAST"; - private Map> map = new ConcurrentHashMap<>(); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index f33f5df1..8f306413 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; @@ -914,11 +915,14 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } if (device != null) { logger.info("收到设备" + requesterId + "的语音广播Invite请求"); - + String key = VideoManagerConstants.BROADCAST_WAITE_INVITE + request.getCallIdHeader().getCallId(); + dynamicTask.stop(key); try { responseAck(request, Response.TRYING); } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[命令发送失败] invite BAD_REQUEST: {}", e.getMessage()); + playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId()); + return; } String contentString = new String(request.getRawContent()); // jainSip不支持y=字段, 移除移除以解析。 @@ -973,6 +977,8 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements responseAck(request, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415 } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[命令发送失败] invite 不支持的媒体格式: {}", e.getMessage()); + playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId()); + return; } return; } @@ -987,6 +993,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements responseAck(request, Response.BUSY_HERE); } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[命令发送失败] invite 未找到可用的zlm: {}", e.getMessage()); + playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId()); } return; } @@ -1000,6 +1007,8 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements responseAck(request, Response.BUSY_HERE); } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage()); + playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId()); + return; } return; } @@ -1034,11 +1043,13 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements responseAck(request, Response.GONE); } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[命令发送失败] 语音通话 回复410失败, {}", e.getMessage()); + return; } playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId()); } } catch (SdpException e) { logger.error("[SDP解析异常]", e); + playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId()); } } else { logger.warn("来自无效设备/平台的请求"); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/BroadcastResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/BroadcastResponseMessageHandler.java index 56fb7894..7306ce38 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/BroadcastResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/BroadcastResponseMessageHandler.java @@ -1,17 +1,16 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd; -import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatch; import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatchStatus; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; -import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; -import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler; -import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; +import com.genersoft.iot.vmp.service.IPlayService; import gov.nist.javax.sip.message.SIPRequest; import org.dom4j.Element; import org.slf4j.Logger; @@ -38,11 +37,14 @@ public class BroadcastResponseMessageHandler extends SIPRequestProcessorParent i private ResponseMessageHandler responseMessageHandler; @Autowired - private DeferredResultHolder deferredResultHolder; + private DynamicTask dynamicTask; @Autowired private AudioBroadcastManager audioBroadcastManager; + @Autowired + private IPlayService playService; + @Override public void afterPropertiesSet() throws Exception { responseMessageHandler.addHandler(cmdType, this); @@ -50,33 +52,33 @@ public class BroadcastResponseMessageHandler extends SIPRequestProcessorParent i @Override public void handForDevice(RequestEvent evt, Device device, Element rootElement) { + + String channelId = getText(rootElement, "DeviceID"); + SIPRequest request = (SIPRequest) evt.getRequest(); try { - String channelId = getText(rootElement, "DeviceID"); - String key = DeferredResultHolder.CALLBACK_CMD_BROADCAST + device.getDeviceId() + channelId; - - // 此处是对本平台发出Broadcast指令的应答 - JSONObject json = new JSONObject(); - XmlUtil.node2Json(rootElement, json); - if (logger.isDebugEnabled()) { - logger.debug(json.toJSONString()); - } - RequestMessage msg = new RequestMessage(); - msg.setKey(key); - msg.setData(json); - deferredResultHolder.invokeAllResult(msg); - - if (!audioBroadcastManager.exit(device.getDeviceId(), channelId)) { // 回复410 responseAck((SIPRequest) evt.getRequest(), Response.GONE); return; } - logger.info("收到语音广播的回复:{}/{}", device.getDeviceId(), channelId ); - AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(device.getDeviceId(), channelId); - audioBroadcastCatch.setStatus(AudioBroadcastCatchStatus.WaiteInvite); - audioBroadcastManager.update(audioBroadcastCatch); + String result = getText(rootElement, "Result"); + logger.info("[语音广播]回复:{}, {}/{}", result, device.getDeviceId(), channelId ); + // 回复200 OK - responseAck((SIPRequest) evt.getRequest(), Response.OK); + responseAck(request, Response.OK); + if (result.equalsIgnoreCase("OK")) { + AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(device.getDeviceId(), channelId); + audioBroadcastCatch.setStatus(AudioBroadcastCatchStatus.WaiteInvite); + audioBroadcastManager.update(audioBroadcastCatch); + // 等待invite消息, 超时则结束 + String key = VideoManagerConstants.BROADCAST_WAITE_INVITE + request.getCallIdHeader().getCallId(); + dynamicTask.startDelay(key, ()->{ + logger.info("[语音广播]等待invite消息超时:{}/{}", device.getDeviceId(), channelId); + playService.stopAudioBroadcast(device.getDeviceId(), channelId); + }, 2000); + }else { + playService.stopAudioBroadcast(device.getDeviceId(), channelId); + } } catch (ParseException | SipException | InvalidArgumentException e) { logger.error("[命令发送失败] 国标级联 语音喊话: {}", e.getMessage()); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index d4328a7c..31c71e70 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -1045,8 +1045,7 @@ public class PlayServiceImpl implements IPlayService { event.call("语音广播已经开启"); return; } else { - audioBroadcastManager.del(deviceChannel.getDeviceId(), channelId); - redisCatchStorage.deleteSendRTPServer(device.getDeviceId(), channelId, sendRtpItem.getCallId(), sendRtpItem.getStreamId()); + stopAudioBroadcast(device.getDeviceId(), channelId); } } } @@ -1055,7 +1054,7 @@ public class PlayServiceImpl implements IPlayService { cmder.audioBroadcastCmd(device, channelId, eventResultForOk -> { // 发送成功 AudioBroadcastCatch audioBroadcastCatch = new AudioBroadcastCatch(device.getDeviceId(), channelId, AudioBroadcastCatchStatus.Ready); - audioBroadcastManager.add(audioBroadcastCatch); + audioBroadcastManager.update(audioBroadcastCatch); }, eventResultForError -> { // 发送失败 logger.error("语音广播发送失败: {}:{}", channelId, eventResultForError.msg); From 49e937c39637c897cccf430de85af33942887a29 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Mon, 19 Dec 2022 15:51:59 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dinvite=E8=B6=85=E6=97=B6?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E7=9A=84=E8=AE=BE=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../transmit/event/request/impl/InviteRequestProcessor.java | 2 +- .../message/response/cmd/BroadcastResponseMessageHandler.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 8f306413..c9b979f7 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -915,7 +915,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } if (device != null) { logger.info("收到设备" + requesterId + "的语音广播Invite请求"); - String key = VideoManagerConstants.BROADCAST_WAITE_INVITE + request.getCallIdHeader().getCallId(); + String key = VideoManagerConstants.BROADCAST_WAITE_INVITE + device.getDeviceId() + audioBroadcastCatch.getChannelId(); dynamicTask.stop(key); try { responseAck(request, Response.TRYING); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/BroadcastResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/BroadcastResponseMessageHandler.java index 7306ce38..6e5c42e1 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/BroadcastResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/BroadcastResponseMessageHandler.java @@ -71,7 +71,7 @@ public class BroadcastResponseMessageHandler extends SIPRequestProcessorParent i audioBroadcastCatch.setStatus(AudioBroadcastCatchStatus.WaiteInvite); audioBroadcastManager.update(audioBroadcastCatch); // 等待invite消息, 超时则结束 - String key = VideoManagerConstants.BROADCAST_WAITE_INVITE + request.getCallIdHeader().getCallId(); + String key = VideoManagerConstants.BROADCAST_WAITE_INVITE + device.getDeviceId() + channelId; dynamicTask.startDelay(key, ()->{ logger.info("[语音广播]等待invite消息超时:{}/{}", device.getDeviceId(), channelId); playService.stopAudioBroadcast(device.getDeviceId(), channelId); From 16f3b0553d5ea1dd891b56b93e3e13da950cf4dd Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Mon, 19 Dec 2022 18:00:28 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E8=AE=BE=E5=A4=87=E7=A6=BB=E7=BA=BF?= =?UTF-8?q?=E6=B8=85=E7=A9=BA=E8=AF=AD=E9=9F=B3=E5=AF=B9=E8=AE=B2=E9=80=9A?= =?UTF-8?q?=E9=81=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../session/AudioBroadcastManager.java | 15 +++++++ .../vmp/service/impl/DeviceServiceImpl.java | 30 +++++++++++-- .../iot/vmp/service/impl/PlayServiceImpl.java | 45 ++++++++++--------- 3 files changed, 66 insertions(+), 24 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java index 072d0cbc..5acbf8eb 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java @@ -83,4 +83,19 @@ public class AudioBroadcastManager { return audioBroadcastCatch; } + + public List get(String deviceId) { + List audioBroadcastCatchList= new ArrayList<>(); + if (SipUtils.isFrontEnd(deviceId)) { + audioBroadcastCatchList.add(data.get(deviceId)); + }else { + for (String key : data.keySet()) { + if (key.startsWith(deviceId)) { + audioBroadcastCatchList.add(data.get(key)); + } + } + } + + return audioBroadcastCatchList; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java index d25e537e..eb208d32 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java @@ -3,12 +3,15 @@ package com.genersoft.iot.vmp.service.impl; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; import com.genersoft.iot.vmp.gb28181.task.impl.CatalogSubscribeTask; import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeTask; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.CatalogResponseMessageHandler; +import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IDeviceChannelService; import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.service.IMediaServerService; @@ -32,9 +35,7 @@ import javax.sip.InvalidArgumentException; import javax.sip.SipException; import java.text.ParseException; import java.time.Instant; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; +import java.util.*; import java.util.concurrent.TimeUnit; /** @@ -89,6 +90,12 @@ public class DeviceServiceImpl implements IDeviceService { @Autowired private IMediaServerService mediaServerService; + @Autowired + private AudioBroadcastManager audioBroadcastManager; + + @Autowired + private ZLMRESTfulUtils zlmresTfulUtils; + @Override public void online(Device device) { logger.info("[设备上线] deviceId:{}->{}:{}", device.getDeviceId(), device.getIp(), device.getPort()); @@ -183,6 +190,23 @@ public class DeviceServiceImpl implements IDeviceService { // 移除订阅 removeCatalogSubscribe(device); removeMobilePositionSubscribe(device); + List audioBroadcastCatches = audioBroadcastManager.get(deviceId); + if (audioBroadcastCatches.size() > 0) { + for (AudioBroadcastCatch audioBroadcastCatch : audioBroadcastCatches) { + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(deviceId, audioBroadcastCatch.getChannelId(), null, null); + if (sendRtpItem != null) { + redisCatchStorage.deleteSendRTPServer(deviceId, sendRtpItem.getChannelId(), null, null); + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + Map param = new HashMap<>(); + param.put("vhost", "__defaultVhost__"); + param.put("app", sendRtpItem.getApp()); + param.put("stream", sendRtpItem.getStreamId()); + zlmresTfulUtils.stopSendRtp(mediaInfo, param); + } + + audioBroadcastManager.del(deviceId, audioBroadcastCatch.getChannelId()); + } + } } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 31c71e70..870b6a33 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -58,10 +58,7 @@ import javax.sip.header.CallIdHeader; import java.math.BigDecimal; import java.math.RoundingMode; import java.text.ParseException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; +import java.util.*; @SuppressWarnings(value = {"rawtypes", "unchecked"}) @Service @@ -1067,25 +1064,31 @@ public class PlayServiceImpl implements IPlayService { @Override public void stopAudioBroadcast(String deviceId, String channelId) { - AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(deviceId, channelId); - if (audioBroadcastCatch != null) { + List audioBroadcastCatchList = new ArrayList<>(); + if (channelId == null) { + audioBroadcastCatchList.addAll(audioBroadcastManager.get(deviceId)); + }else { + audioBroadcastCatchList.add(audioBroadcastManager.get(deviceId, channelId)); + } + if (audioBroadcastCatchList.size() > 0) { + for (AudioBroadcastCatch audioBroadcastCatch : audioBroadcastCatchList) { + Device device = deviceService.getDevice(deviceId); + if (device == null || audioBroadcastCatch == null ) { + return; + } + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(deviceId, audioBroadcastCatch.getChannelId(), null, null); + if (sendRtpItem != null) { + redisCatchStorage.deleteSendRTPServer(deviceId, sendRtpItem.getChannelId(), null, null); + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + Map param = new HashMap<>(); + param.put("vhost", "__defaultVhost__"); + param.put("app", sendRtpItem.getApp()); + param.put("stream", sendRtpItem.getStreamId()); + zlmresTfulUtils.stopSendRtp(mediaInfo, param); + } - Device device = deviceService.getDevice(deviceId); - if (device == null) { - return; + audioBroadcastManager.del(deviceId, channelId); } - SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(deviceId, audioBroadcastCatch.getChannelId(), null, null); - if (sendRtpItem != null) { - redisCatchStorage.deleteSendRTPServer(deviceId, sendRtpItem.getChannelId(), null, null); - MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - Map param = new HashMap<>(); - param.put("vhost", "__defaultVhost__"); - param.put("app", sendRtpItem.getApp()); - param.put("stream", sendRtpItem.getStreamId()); - zlmresTfulUtils.stopSendRtp(mediaInfo, param); - } - - audioBroadcastManager.del(deviceId, channelId); } }