From 4b827f3897600e97023ded3df83a2f2551131d53 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Tue, 13 Dec 2022 11:57:07 +0800 Subject: [PATCH] =?UTF-8?q?=E7=BA=A7=E8=81=94=E8=AF=AD=E9=9F=B3=E5=AF=B9?= =?UTF-8?q?=E8=AE=B2=E9=83=A8=E5=88=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- doc/README.md | 4 +- doc/_content/theory/broadcast_cascade.md | 46 ++++ doc/_sidebar.md | 1 + .../genersoft/iot/vmp/conf/UserSetting.java | 10 + .../vmp/gb28181/bean/AudioBroadcastCatch.java | 83 +++++++- .../vmp/gb28181/bean/InviteStreamType.java | 2 +- .../iot/vmp/gb28181/bean/ParentPlatform.java | 6 +- .../iot/vmp/gb28181/event/SipSubscribe.java | 6 +- .../session/AudioBroadcastManager.java | 4 - .../callback/DeferredResultHolder.java | 2 - .../cmd/ISIPCommanderForPlatform.java | 79 +++++-- .../cmd/SIPRequestHeaderPlarformProvider.java | 88 +++++++- .../transmit/cmd/impl/SIPCommander.java | 6 +- .../cmd/impl/SIPCommanderFroPlatform.java | 123 +++++++++++ .../request/impl/AckRequestProcessor.java | 59 +----- .../request/impl/InviteRequestProcessor.java | 7 +- .../cmd/BroadcastNotifyMessageHandler.java | 200 ++++++++++++++++++ .../cmd/BroadcastResponseMessageHandler.java | 20 +- .../impl/InviteResponseProcessor.java | 22 +- .../vmp/media/zlm/ZLMHttpHookListener.java | 91 +++----- .../vmp/media/zlm/ZLMRTPServerFactory.java | 48 +++++ .../iot/vmp/service/IPlatformService.java | 28 +++ .../iot/vmp/service/IPlayService.java | 9 +- .../vmp/service/impl/PlatformServiceImpl.java | 155 ++++++++++++++ .../iot/vmp/service/impl/PlayServiceImpl.java | 50 +++-- .../vmanager/gb28181/play/PlayController.java | 2 +- src/main/resources/all-application.yml | 4 +- .../src/components/dialog/devicePlayer.vue | 8 +- 28 files changed, 940 insertions(+), 223 deletions(-) create mode 100644 doc/_content/theory/broadcast_cascade.md create mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java diff --git a/doc/README.md b/doc/README.md index 0fb5b86d..2bbc1844 100644 --- a/doc/README.md +++ b/doc/README.md @@ -52,7 +52,7 @@ - [X] 报警订阅 - [X] 目录订阅 - [ ] 语音广播 -- [ ] 语音对讲 +- [ ] 语音喊话 **作为下级平台** - [X] 注册 @@ -91,7 +91,7 @@ - [ ] 报警订阅 - [X] 目录订阅 - [ ] 语音广播 -- [ ] 语音对讲 +- [ ] 语音喊话 diff --git a/doc/_content/theory/broadcast_cascade.md b/doc/_content/theory/broadcast_cascade.md new file mode 100644 index 00000000..e59b8c25 --- /dev/null +++ b/doc/_content/theory/broadcast_cascade.md @@ -0,0 +1,46 @@ + + +# 点播流程 +> 以下为WVP-PRO级联语音喊话流程。 + +```plantuml +@startuml +"上级平台" -> "下级平台": 1. 发起语音喊话请求 +"上级平台" <-- "下级平台": 2. 200OK +"上级平台" <- "下级平台": 3. 回复Result OK +"上级平台" --> "下级平台": 4. 200OK + +"下级平台" -> "设备": 5. 发起语音喊话请求 +"下级平台" <-- "设备": 6. 200OK +"下级平台" <- "设备": 7. 回复Result OK +"下级平台" --> "设备": 8. 200OK + +"下级平台" <- "设备": 9. invite(broadcast) +"下级平台" --> "设备": 10. 100 trying +"下级平台" --> "设备": 11. 200OK SDP +"下级平台" <-- "设备": 12. ack + +"上级平台" <- "下级平台": 13. invite(broadcast) +"上级平台" --> "下级平台": 14. 100 trying +"上级平台" --> "下级平台": 15. 200OK SDP +"上级平台" <-- "下级平台": 16. ack + +"上级平台" -> "下级平台": 17. 推送RTP +"下级平台" -> "设备": 18. 推送RTP + +@enduml +``` + + +## 注册流程描述如下: +1. 用户从网页或调用接口发起点播请求; +2. WVP-PRO向摄像机发送Invite消息,消息头域中携带 Subject字段,表明点播的视频源ID、发送方媒体流序列号、ZLMediaKit接收流使用的IP、端口号、 + 接收端媒体流序列号等参数,SDP消息体中 s字段为“Play”代表实时点播,y字段描述SSRC值,f字段描述媒体参数。 +3. 摄像机向WVP-PRO回复200OK,消息体中描述了媒体流发送者发送媒体流的IP、端口、媒体格式、SSRC字段等内容。 +4. WVP-PRO向设备回复Ack, 会话建立成功。 +5. 设备向ZLMediaKit发送实时流。 +6. ZLMediaKit向WVP-PRO发送流改变事件。 +7. WVP-PRO向WEB用户回复播放地址。 +8. ZLMediaKit向WVP发送流无人观看事件。 +9. WVP-PRO向设备回复Bye, 结束会话。 +10. 设备回复200OK,会话结束成功。 diff --git a/doc/_sidebar.md b/doc/_sidebar.md index 3b10bae8..05101c19 100644 --- a/doc/_sidebar.md +++ b/doc/_sidebar.md @@ -19,6 +19,7 @@ * [树形结构](_content/theory/channel_tree.md) * [注册流程](_content/theory/register.md) * [点播流程](_content/theory/play.md) + * [级联语音喊话流程](_content/theory/broadcast_cascade.md) * **必备技巧** * [抓包](_content/skill/tcpdump.md) diff --git a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java index a2d3054e..581ea6f8 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java @@ -47,6 +47,8 @@ public class UserSetting { private String thirdPartyGBIdReg = "[\\s\\S]*"; + private String broadcastForPlatform = "UDP"; + private List interfaceAuthenticationExcludes = new ArrayList<>(); public Boolean getSavePositionHistory() { @@ -196,4 +198,12 @@ public class UserSetting { public void setSyncChannelOnDeviceOnline(Boolean syncChannelOnDeviceOnline) { this.syncChannelOnDeviceOnline = syncChannelOnDeviceOnline; } + + public String getBroadcastForPlatform() { + return broadcastForPlatform; + } + + public void setBroadcastForPlatform(String broadcastForPlatform) { + this.broadcastForPlatform = broadcastForPlatform; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java index 88db8071..d9e89fa2 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java @@ -1,6 +1,8 @@ package com.genersoft.iot.vmp.gb28181.bean; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioBroadcastEvent; import gov.nist.javax.sip.message.SIPResponse; /** @@ -10,10 +12,24 @@ import gov.nist.javax.sip.message.SIPResponse; public class AudioBroadcastCatch { - public AudioBroadcastCatch(String deviceId, String channelId, AudioBroadcastCatchStatus status) { + public AudioBroadcastCatch( + String deviceId, + String channelId, + MediaServerItem mediaServerItem, + String app, + String stream, + AudioBroadcastEvent event, + AudioBroadcastCatchStatus status, + boolean isFromPlatform + ) { this.deviceId = deviceId; this.channelId = channelId; this.status = status; + this.event = event; + this.isFromPlatform = isFromPlatform; + this.app = app; + this.stream = stream; + this.mediaServerItem = mediaServerItem; } public AudioBroadcastCatch() { @@ -29,6 +45,26 @@ public class AudioBroadcastCatch { */ private String channelId; + /** + * 流媒体信息 + */ + private MediaServerItem mediaServerItem; + + /** + * 关联的流APP + */ + private String app; + + /** + * 关联的流STREAM + */ + private String stream; + + /** + * 是否是级联语音喊话 + */ + private boolean isFromPlatform; + /** * 语音广播状态 */ @@ -39,6 +75,11 @@ public class AudioBroadcastCatch { */ private SipTransactionInfo sipTransactionInfo; + /** + * 请求结果回调 + */ + private AudioBroadcastEvent event; + public String getDeviceId() { return deviceId; @@ -75,4 +116,44 @@ public class AudioBroadcastCatch { public void setSipTransactionInfoByRequset(SIPResponse response) { this.sipTransactionInfo = new SipTransactionInfo(response, false); } + + public AudioBroadcastEvent getEvent() { + return event; + } + + public void setEvent(AudioBroadcastEvent event) { + this.event = event; + } + + public String getApp() { + return app; + } + + public void setApp(String app) { + this.app = app; + } + + public String getStream() { + return stream; + } + + public void setStream(String stream) { + this.stream = stream; + } + + public boolean isFromPlatform() { + return isFromPlatform; + } + + public void setFromPlatform(boolean fromPlatform) { + isFromPlatform = fromPlatform; + } + + public MediaServerItem getMediaServerItem() { + return mediaServerItem; + } + + public void setMediaServerItem(MediaServerItem mediaServerItem) { + this.mediaServerItem = mediaServerItem; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteStreamType.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteStreamType.java index a3098fb7..dde7639e 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteStreamType.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteStreamType.java @@ -2,7 +2,7 @@ package com.genersoft.iot.vmp.gb28181.bean; public enum InviteStreamType { - PLAY,PLAYBACK,PUSH,PROXY,CLOUD_RECORD_PUSH,CLOUD_RECORD_PROXY,TALK + PLAY,PLAYBACK,PUSH,PROXY,CLOUD_RECORD_PUSH,CLOUD_RECORD_PROXY,BROADCAST,TALK } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatform.java index b056cc71..262ac559 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatform.java @@ -66,7 +66,7 @@ public class ParentPlatform { * 设备端口 */ @Schema(description = "设备端口") - private String devicePort; + private int devicePort; /** * SIP认证用户名(默认使用设备国标编号) @@ -261,11 +261,11 @@ public class ParentPlatform { this.deviceIp = deviceIp; } - public String getDevicePort() { + public int getDevicePort() { return devicePort; } - public void setDevicePort(String devicePort) { + public void setDevicePort(int devicePort) { this.devicePort = devicePort; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java index e7409bf4..5cc9cb92 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java @@ -1,6 +1,5 @@ package com.genersoft.iot.vmp.gb28181.event; -import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.DeviceNotFoundEvent; import gov.nist.javax.sip.message.SIPRequest; import org.slf4j.Logger; @@ -87,6 +86,11 @@ public class SipSubscribe { public String callId; public EventObject event; + public EventResult(int statusCode, String msg) { + this.statusCode = statusCode; + this.msg = msg; + } + public EventResult(EventObject event) { this.event = event; if (event instanceof ResponseEvent) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java index 7186fad7..072d0cbc 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java @@ -23,10 +23,6 @@ public class AudioBroadcastManager { public static Map data = new ConcurrentHashMap<>(); - public void add(AudioBroadcastCatch audioBroadcastCatch) { - this.update(audioBroadcastCatch); - } - public void update(AudioBroadcastCatch audioBroadcastCatch) { if (SipUtils.isFrontEnd(audioBroadcastCatch.getDeviceId())) { data.put(audioBroadcastCatch.getDeviceId(), audioBroadcastCatch); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java index a351445e..43d41865 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java @@ -49,8 +49,6 @@ public class DeferredResultHolder { public static final String CALLBACK_CMD_ALARM = "CALLBACK_ALARM"; - public static final String CALLBACK_CMD_BROADCAST = "CALLBACK_BROADCAST"; - private Map> map = new ConcurrentHashMap<>(); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java index 13a36d7b..f0d453b7 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java @@ -1,8 +1,12 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd; +import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; +import com.genersoft.iot.vmp.service.bean.SSRCInfo; import javax.sip.InvalidArgumentException; import javax.sip.SipException; @@ -14,77 +18,98 @@ public interface ISIPCommanderForPlatform { /** * 向上级平台注册 + * * @param parentPlatform * @return */ - void register(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws InvalidArgumentException, ParseException, SipException; - void register(ParentPlatform parentPlatform, String callId, WWWAuthenticateHeader www, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent, boolean registerAgain, boolean isRegister) throws SipException, InvalidArgumentException, ParseException; + void register(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent) + throws InvalidArgumentException, ParseException, SipException; + + void register(ParentPlatform parentPlatform, String callId, WWWAuthenticateHeader www, + SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent, boolean registerAgain, boolean isRegister) + throws SipException, InvalidArgumentException, ParseException; /** * 向上级平台注销 + * * @param parentPlatform * @return */ - void unregister(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws InvalidArgumentException, ParseException, SipException; + void unregister(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent) + throws InvalidArgumentException, ParseException, SipException; /** * 向上级平发送心跳信息 + * * @param parentPlatform * @return callId(作为接受回复的判定) */ - String keepalive(ParentPlatform parentPlatform,SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws SipException, InvalidArgumentException, ParseException; + String keepalive(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent) + throws SipException, InvalidArgumentException, ParseException; /** * 向上级回复通道信息 - * @param channel 通道信息 + * + * @param channel 通道信息 * @param parentPlatform 平台信息 * @param sn * @param fromTag * @param size * @return */ - void catalogQuery(DeviceChannel channel, ParentPlatform parentPlatform, String sn, String fromTag, int size) throws SipException, InvalidArgumentException, ParseException; - void catalogQuery(List channels, ParentPlatform parentPlatform, String sn, String fromTag) throws InvalidArgumentException, ParseException, SipException; + void catalogQuery(DeviceChannel channel, ParentPlatform parentPlatform, String sn, String fromTag, int size) + throws SipException, InvalidArgumentException, ParseException; + + void catalogQuery(List channels, ParentPlatform parentPlatform, String sn, String fromTag) + throws InvalidArgumentException, ParseException, SipException; /** * 向上级回复DeviceInfo查询信息 + * * @param parentPlatform 平台信息 * @param sn * @param fromTag * @return */ - void deviceInfoResponse(ParentPlatform parentPlatform, String sn, String fromTag) throws SipException, InvalidArgumentException, ParseException; + void deviceInfoResponse(ParentPlatform parentPlatform, String sn, String fromTag) + throws SipException, InvalidArgumentException, ParseException; /** * 向上级回复DeviceStatus查询信息 + * * @param parentPlatform 平台信息 * @param sn * @param fromTag * @return */ - void deviceStatusResponse(ParentPlatform parentPlatform, String sn, String fromTag) throws SipException, InvalidArgumentException, ParseException; + void deviceStatusResponse(ParentPlatform parentPlatform, String sn, String fromTag) + throws SipException, InvalidArgumentException, ParseException; /** * 向上级回复移动位置订阅消息 + * * @param parentPlatform 平台信息 - * @param gpsMsgInfo GPS信息 - * @param subscribeInfo 订阅相关的信息 + * @param gpsMsgInfo GPS信息 + * @param subscribeInfo 订阅相关的信息 * @return */ - void sendNotifyMobilePosition(ParentPlatform parentPlatform, GPSMsgInfo gpsMsgInfo, SubscribeInfo subscribeInfo) throws InvalidArgumentException, ParseException, NoSuchFieldException, SipException, IllegalAccessException; + void sendNotifyMobilePosition(ParentPlatform parentPlatform, GPSMsgInfo gpsMsgInfo, SubscribeInfo subscribeInfo) + throws InvalidArgumentException, ParseException, NoSuchFieldException, SipException, IllegalAccessException; /** * 向上级回复报警消息 + * * @param parentPlatform 平台信息 - * @param deviceAlarm 报警信息信息 + * @param deviceAlarm 报警信息信息 * @return */ void sendAlarmMessage(ParentPlatform parentPlatform, DeviceAlarm deviceAlarm) throws SipException, InvalidArgumentException, ParseException; /** * 回复catalog事件-增加/更新 + * * @param parentPlatform * @param deviceChannels */ @@ -92,22 +117,28 @@ public interface ISIPCommanderForPlatform { /** * 回复catalog事件-删除 + * * @param parentPlatform * @param deviceChannels */ - void sendNotifyForCatalogOther(String type, ParentPlatform parentPlatform, List deviceChannels, SubscribeInfo subscribeInfo, Integer index) throws InvalidArgumentException, ParseException, NoSuchFieldException, SipException, IllegalAccessException; + void sendNotifyForCatalogOther(String type, ParentPlatform parentPlatform, List deviceChannels, + SubscribeInfo subscribeInfo, Integer index) throws InvalidArgumentException, + ParseException, NoSuchFieldException, SipException, IllegalAccessException; /** * 回复recordInfo - * @param deviceChannel 通道信息 + * + * @param deviceChannel 通道信息 * @param parentPlatform 平台信息 - * @param fromTag fromTag - * @param recordInfo 录像信息 + * @param fromTag fromTag + * @param recordInfo 录像信息 */ - void recordInfo(DeviceChannel deviceChannel, ParentPlatform parentPlatform, String fromTag, RecordInfo recordInfo) throws SipException, InvalidArgumentException, ParseException; + void recordInfo(DeviceChannel deviceChannel, ParentPlatform parentPlatform, String fromTag, RecordInfo recordInfo) + throws SipException, InvalidArgumentException, ParseException; /** * 录像播放推送完成时发送MediaStatus消息 + * * @param platform * @param sendRtpItem * @return @@ -116,9 +147,19 @@ public interface ISIPCommanderForPlatform { /** * 向发起点播的上级回复bye + * * @param platform 平台信息 - * @param callId callId + * @param callId callId */ void streamByeCmd(ParentPlatform platform, String callId) throws SipException, InvalidArgumentException, ParseException; + void streamByeCmd(ParentPlatform platform, SendRtpItem sendRtpItem) throws SipException, InvalidArgumentException, ParseException; + + void streamByeCmd(ParentPlatform platform, String channelId, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException; + + void broadcastInviteCmd(ParentPlatform platform, String channelId, MediaServerItem mediaServerItem, + SSRCInfo ssrcInfo, ZlmHttpHookSubscribe.Event event, SipSubscribe.Event okEvent, + SipSubscribe.Event errorEvent) throws ParseException, SipException, InvalidArgumentException; + + void broadcastResultCmd(ParentPlatform platform, DeviceChannel deviceChannel, String sn, boolean result, SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java index 0fe11c01..e80f4279 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java @@ -4,6 +4,7 @@ import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.gb28181.SipLayer; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo; import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; @@ -14,7 +15,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.util.DigestUtils; -import javax.sip.*; +import javax.sip.InvalidArgumentException; +import javax.sip.PeerUnavailableException; import javax.sip.address.Address; import javax.sip.address.SipURI; import javax.sip.header.*; @@ -22,7 +24,6 @@ import javax.sip.message.Request; import javax.validation.constraints.NotNull; import java.text.ParseException; import java.util.ArrayList; -import java.util.List; import java.util.UUID; /** @@ -175,7 +176,7 @@ public class SIPRequestHeaderPlarformProvider { SipURI requestURI = sipLayer.getSipFactory().createAddressFactory().createSipURI(parentPlatform.getServerGBId(), serverAddress); // via ArrayList viaHeaders = new ArrayList(); - ViaHeader viaHeader = sipLayer.getSipFactory().createHeaderFactory().createViaHeader(parentPlatform.getDeviceIp(), Integer.parseInt(parentPlatform.getDevicePort()), + ViaHeader viaHeader = sipLayer.getSipFactory().createHeaderFactory().createViaHeader(parentPlatform.getDeviceIp(), parentPlatform.getDevicePort(), parentPlatform.getTransport(), viaTag); viaHeader.setRPort(); viaHeaders.add(viaHeader); @@ -212,7 +213,7 @@ public class SIPRequestHeaderPlarformProvider { SipURI requestURI = sipLayer.getSipFactory().createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerIP()+ ":" + parentPlatform.getServerPort()); // via ArrayList viaHeaders = new ArrayList<>(); - ViaHeader viaHeader = sipLayer.getSipFactory().createHeaderFactory().createViaHeader(parentPlatform.getDeviceIp(), Integer.parseInt(parentPlatform.getDevicePort()), + ViaHeader viaHeader = sipLayer.getSipFactory().createHeaderFactory().createViaHeader(parentPlatform.getDeviceIp(), parentPlatform.getDevicePort(), parentPlatform.getTransport(), SipUtils.getNewViaTag()); viaHeader.setRPort(); viaHeaders.add(viaHeader); @@ -272,7 +273,7 @@ public class SIPRequestHeaderPlarformProvider { SipURI requestURI = sipLayer.getSipFactory().createAddressFactory().createSipURI(platform.getServerGBId(), platform.getServerIP()+ ":" + platform.getServerPort()); // via ArrayList viaHeaders = new ArrayList<>(); - ViaHeader viaHeader = sipLayer.getSipFactory().createHeaderFactory().createViaHeader(platform.getDeviceIp(), Integer.parseInt(platform.getDevicePort()), + ViaHeader viaHeader = sipLayer.getSipFactory().createHeaderFactory().createViaHeader(platform.getDeviceIp(), platform.getDevicePort(), platform.getTransport(), SipUtils.getNewViaTag()); viaHeader.setRPort(); viaHeaders.add(viaHeader); @@ -306,6 +307,83 @@ public class SIPRequestHeaderPlarformProvider { .createSipURI(platform.getDeviceGBId(), sipAddress)); request.addHeader(sipLayer.getSipFactory().createHeaderFactory().createContactHeader(concatAddress)); + return request; + } + + public Request createInviteRequest(ParentPlatform platform, String channelId, String toString, String viaTag, String fromTag, Object content, String ssrc, CallIdHeader callIdHeader, String transport) throws PeerUnavailableException, ParseException, InvalidArgumentException { + Request request = null; + //请求行 + String deviceHostAddress = platform.getDeviceIp() + ":" + platform.getDevicePort(); + SipURI requestLine = sipLayer.getSipFactory().createAddressFactory().createSipURI(channelId, deviceHostAddress); + //via + ArrayList viaHeaders = new ArrayList(); + HeaderFactory headerFactory = sipLayer.getSipFactory().createHeaderFactory(); + ViaHeader viaHeader = sipLayer.getSipFactory().createHeaderFactory().createViaHeader(sipLayer.getLocalIp(platform.getDeviceIp()), platform.getDevicePort(), platform.getTransport(), viaTag); + viaHeader.setRPort(); + viaHeaders.add(viaHeader); + + //from + SipURI fromSipURI = sipLayer.getSipFactory().createAddressFactory().createSipURI(sipConfig.getId(), sipConfig.getDomain()); + Address fromAddress = sipLayer.getSipFactory().createAddressFactory().createAddress(fromSipURI); + FromHeader fromHeader = sipLayer.getSipFactory().createHeaderFactory().createFromHeader(fromAddress, fromTag); //必须要有标记,否则无法创建会话,无法回应ack + //to + SipURI toSipURI = sipLayer.getSipFactory().createAddressFactory().createSipURI(channelId, deviceHostAddress); + Address toAddress = sipLayer.getSipFactory().createAddressFactory().createAddress(toSipURI); + ToHeader toHeader = sipLayer.getSipFactory().createHeaderFactory().createToHeader(toAddress,null); + + //Forwards + MaxForwardsHeader maxForwards = sipLayer.getSipFactory().createHeaderFactory().createMaxForwardsHeader(70); + + //ceq + CSeqHeader cSeqHeader = sipLayer.getSipFactory().createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(), Request.INVITE); + request = sipLayer.getSipFactory().createMessageFactory().createRequest(requestLine, Request.INVITE, callIdHeader, cSeqHeader,fromHeader, toHeader, viaHeaders, maxForwards); + + request.addHeader(SipUtils.createUserAgentHeader(sipLayer.getSipFactory(), gitUtil)); + + Address concatAddress = sipLayer.getSipFactory().createAddressFactory().createAddress(sipLayer.getSipFactory().createAddressFactory().createSipURI(sipConfig.getId(), sipLayer.getLocalIp(platform.getDeviceIp())+":"+ deviceHostAddress)); + // Address concatAddress = sipLayer.getSipFactory().createAddressFactory().createAddress(sipLayer.getSipFactory().createAddressFactory().createSipURI(sipConfig.getId(), device.getHost().getIp()+":"+device.getHost().getPort())); + request.addHeader(sipLayer.getSipFactory().createHeaderFactory().createContactHeader(concatAddress)); + // Subject + SubjectHeader subjectHeader = sipLayer.getSipFactory().createHeaderFactory().createSubjectHeader(String.format("%s:%s,%s:%s", channelId, ssrc, sipConfig.getId(), 0)); + request.addHeader(subjectHeader); + ContentTypeHeader contentTypeHeader = sipLayer.getSipFactory().createHeaderFactory().createContentTypeHeader("APPLICATION", "SDP"); + request.setContent(content, contentTypeHeader); + return request; + } + + public Request createByteRequest(ParentPlatform platform, String channelId, SipTransactionInfo transactionInfo) throws PeerUnavailableException, ParseException, InvalidArgumentException { + String deviceHostAddress = platform.getDeviceIp() + ":" + platform.getDevicePort(); + Request request = null; + SipURI requestLine = sipLayer.getSipFactory().createAddressFactory().createSipURI(channelId, deviceHostAddress); + + // via + ArrayList viaHeaders = new ArrayList(); + ViaHeader viaHeader = sipLayer.getSipFactory().createHeaderFactory().createViaHeader(sipLayer.getLocalIp(platform.getDeviceIp()), platform.getDevicePort(), platform.getTransport(), SipUtils.getNewViaTag()); + viaHeaders.add(viaHeader); + //from + SipURI fromSipURI = sipLayer.getSipFactory().createAddressFactory().createSipURI(sipConfig.getId(),sipConfig.getDomain()); + Address fromAddress = sipLayer.getSipFactory().createAddressFactory().createAddress(fromSipURI); + FromHeader fromHeader = sipLayer.getSipFactory().createHeaderFactory().createFromHeader(fromAddress, transactionInfo.isFromServer()?transactionInfo.getFromTag():transactionInfo.getToTag()); + //to + SipURI toSipURI = sipLayer.getSipFactory().createAddressFactory().createSipURI(channelId, deviceHostAddress); + Address toAddress = sipLayer.getSipFactory().createAddressFactory().createAddress(toSipURI); + ToHeader toHeader = sipLayer.getSipFactory().createHeaderFactory().createToHeader(toAddress,transactionInfo.isFromServer()?transactionInfo.getToTag():transactionInfo.getFromTag()); + + //Forwards + MaxForwardsHeader maxForwards = sipLayer.getSipFactory().createHeaderFactory().createMaxForwardsHeader(70); + + //ceq + CSeqHeader cSeqHeader = sipLayer.getSipFactory().createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(), Request.BYE); + CallIdHeader callIdHeader = sipLayer.getSipFactory().createHeaderFactory().createCallIdHeader(transactionInfo.getCallId()); + request = sipLayer.getSipFactory().createMessageFactory().createRequest(requestLine, Request.BYE, callIdHeader, cSeqHeader,fromHeader, toHeader, viaHeaders, maxForwards); + + request.addHeader(SipUtils.createUserAgentHeader(sipLayer.getSipFactory(), gitUtil)); + + Address concatAddress = sipLayer.getSipFactory().createAddressFactory().createAddress(sipLayer.getSipFactory().createAddressFactory().createSipURI(sipConfig.getId(), sipLayer.getLocalIp(platform.getDeviceIp())+":"+ platform.getDevicePort())); + request.addHeader(sipLayer.getSipFactory().createHeaderFactory().createContactHeader(concatAddress)); + + request.addHeader(SipUtils.createUserAgentHeader(sipLayer.getSipFactory(), gitUtil)); + return request; } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index 29f2becc..425dd931 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -590,12 +590,12 @@ public class SIPCommander implements ISIPCommander { return; } if (!mediaServerItem.isRtpEnable()) { - // 单端口暂不支持语音对讲 - logger.info("[语音对讲] 单端口暂不支持此操作"); + // 单端口暂不支持语音喊话 + logger.info("[语音喊话] 单端口暂不支持此操作"); return; } - logger.info("[语音对讲] {} 分配的ZLM为: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort()); + logger.info("[语音喊话] {} 分配的ZLM为: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort()); HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId()); subscribe.addSubscribe(hookSubscribeForStreamChange, (MediaServerItem mediaServerItemInUse, JSONObject json) -> { if (event != null) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index 08cc3c3f..d0d72bfe 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -1,22 +1,31 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl; import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.SipLayer; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; +import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.SIPSender; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderPlarformProvider; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; +import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.dao.dto.PlatformRegisterInfo; import com.genersoft.iot.vmp.utils.DateUtil; import gov.nist.javax.sip.message.MessageFactoryImpl; import gov.nist.javax.sip.message.SIPRequest; +import gov.nist.javax.sip.message.SIPResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -26,6 +35,7 @@ import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import javax.sip.InvalidArgumentException; +import javax.sip.ResponseEvent; import javax.sip.SipException; import javax.sip.header.CallIdHeader; import javax.sip.header.WWWAuthenticateHeader; @@ -61,6 +71,16 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { @Autowired private SIPSender sipSender; + @Autowired + private ZlmHttpHookSubscribe subscribe; + + @Autowired + private UserSetting userSetting; + + + @Autowired + private VideoStreamSessionManager streamSession; + @Override public void register(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws InvalidArgumentException, ParseException, SipException { register(parentPlatform, null, null, errorEvent, okEvent, false, true); @@ -645,4 +665,107 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { } sipSender.transmitRequest(platform.getDeviceIp(),byeRequest); } + + @Override + public void streamByeCmd(ParentPlatform platform, String channelId, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException { + SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(platform.getServerGBId(), channelId, callId, stream); + if (ssrcTransaction == null) { + throw new SsrcTransactionNotFoundException(platform.getServerGBId(), channelId, callId, stream); + } + + mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc()); + mediaServerService.closeRTPServer(ssrcTransaction.getMediaServerId(), ssrcTransaction.getStream()); + streamSession.remove(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream()); + + Request byteRequest = headerProviderPlatformProvider.createByteRequest(platform, channelId, ssrcTransaction.getSipTransactionInfo()); + sipSender.transmitRequest(sipLayer.getLocalIp(platform.getDeviceIp()), byteRequest, null, okEvent); + } + + @Override + public void broadcastResultCmd(ParentPlatform platform, DeviceChannel deviceChannel, String sn, boolean result, SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException { + if (platform == null || deviceChannel == null) { + return; + } + String characterSet = platform.getCharacterSet(); + StringBuffer mediaStatusXml = new StringBuffer(200); + mediaStatusXml.append("\r\n"); + mediaStatusXml.append("\r\n"); + mediaStatusXml.append("Broadcast\r\n"); + mediaStatusXml.append("" + sn + "\r\n"); + mediaStatusXml.append("" + deviceChannel.getChannelId() + "\r\n"); + mediaStatusXml.append("" + (result?"OK":"ERROR") + "\r\n"); + mediaStatusXml.append("\r\n"); + + CallIdHeader callIdHeader = sipSender.getNewCallIdHeader(platform.getDeviceIp(), platform.getTransport()); + + SIPRequest messageRequest = (SIPRequest)headerProviderPlatformProvider.createMessageRequest(platform, mediaStatusXml.toString(), + SipUtils.getNewFromTag(), SipUtils.getNewViaTag(), callIdHeader); + + sipSender.transmitRequest(platform.getDeviceIp(),messageRequest, errorEvent, okEvent); + } + + @Override + public void broadcastInviteCmd(ParentPlatform platform, String channelId, MediaServerItem mediaServerItem, + SSRCInfo ssrcInfo, ZlmHttpHookSubscribe.Event event, SipSubscribe.Event okEvent, + SipSubscribe.Event errorEvent) throws ParseException, SipException, InvalidArgumentException { + String stream = ssrcInfo.getStream(); + + if (platform == null) { + return; + } + + logger.info("{} 分配的ZLM为: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort()); + HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId()); + subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json) -> { + if (event != null) { + event.response(mediaServerItemInUse, json); + subscribe.removeSubscribe(hookSubscribe); + } + }); + String sdpIp = mediaServerItem.getSdpIp(); + + StringBuffer content = new StringBuffer(200); + content.append("v=0\r\n"); + content.append("o=" + channelId + " 0 0 IN IP4 " + sdpIp + "\r\n"); + content.append("s=Play\r\n"); + content.append("c=IN IP4 " + sdpIp + "\r\n"); + content.append("t=0 0\r\n"); + + if ("TCP-PASSIVE".equalsIgnoreCase(userSetting.getBroadcastForPlatform())) { + content.append("m=video " + ssrcInfo.getPort() + " TCP/RTP/AVP 8 96\r\n"); + } else if ("TCP-ACTIVE".equalsIgnoreCase(userSetting.getBroadcastForPlatform())) { + content.append("m=video " + ssrcInfo.getPort() + " TCP/RTP/AVP 8 96\r\n"); + } else if ("UDP".equalsIgnoreCase(userSetting.getBroadcastForPlatform())) { + content.append("m=video " + ssrcInfo.getPort() + " RTP/AVP 8 96\r\n"); + } + + content.append("a=recvonly\r\n"); + content.append("a=rtpmap:8 PCMA/8000\r\n"); + content.append("a=rtpmap:96 PS/90000\r\n"); + if ("TCP-PASSIVE".equalsIgnoreCase(userSetting.getBroadcastForPlatform())) { + content.append("a=setup:passive\r\n"); + content.append("a=connection:new\r\n"); + }else if ("TCP-ACTIVE".equalsIgnoreCase(userSetting.getBroadcastForPlatform())) { + content.append("a=setup:active\r\n"); + content.append("a=connection:new\r\n"); + } + + content.append("y=" + ssrcInfo.getSsrc() + "\r\n");//ssrc + CallIdHeader callIdHeader = sipSender.getNewCallIdHeader(sipLayer.getLocalIp(platform.getDeviceIp()), platform.getTransport()); + + Request request = headerProviderPlatformProvider.createInviteRequest(platform, channelId, + content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null, ssrcInfo.getSsrc(), + callIdHeader ,platform.getTransport()); + sipSender.transmitRequest(sipLayer.getLocalIp(platform.getDeviceIp()), request, (e -> { + streamSession.remove(platform.getServerGBId(), channelId, ssrcInfo.getStream()); + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); + errorEvent.response(e); + }), e -> { + // 这里为例避免一个通道的点播只有一个callID这个参数使用一个固定值 + ResponseEvent responseEvent = (ResponseEvent) e.event; + SIPResponse response = (SIPResponse) responseEvent.getResponse(); + streamSession.put(platform.getServerGBId(), channelId, callIdHeader.getCallId(), stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), response, VideoStreamSessionManager.SessionType.play); + okEvent.response(e); + }); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index c5220a9e..10b0daa7 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -16,8 +16,6 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.service.IMediaServerService; @@ -40,8 +38,6 @@ import javax.sip.header.FromHeader; import javax.sip.header.HeaderAddress; import javax.sip.header.ToHeader; import java.text.ParseException; -import java.util.HashMap; -import java.util.Map; /** * SIP命令类型: ACK请求 @@ -123,68 +119,31 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); logger.info("收到ACK,rtp/{}开始向上级推流, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp()); - Map param = new HashMap<>(12); - param.put("vhost","__defaultVhost__"); - param.put("app",sendRtpItem.getApp()); - param.put("stream",sendRtpItem.getStreamId()); - param.put("ssrc", sendRtpItem.getSsrc()); - param.put("src_port", sendRtpItem.getLocalPort()); - param.put("pt", sendRtpItem.getPt()); - param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); - param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); - if (!sendRtpItem.isTcp()) { - // udp模式下开启rtcp保活 - param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0"); - } - if (mediaInfo == null) { RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance( sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(), sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio()); redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> { - startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, json, param, callIdHeader); + startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, json, callIdHeader); }); - } else { - // 如果是非严格模式,需要关闭端口占用 - JSONObject startSendRtpStreamResult = null; - if (sendRtpItem.getLocalPort() != 0) { - HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(sendRtpItem.getSsrc(), null, mediaInfo.getId()); - hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout); - if (zlmrtpServerFactory.releasePort(mediaInfo, sendRtpItem.getSsrc())) { - if (sendRtpItem.isTcpActive()) { - startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param); - }else { - param.put("is_udp", is_Udp); - param.put("dst_url", sendRtpItem.getIp()); - param.put("dst_port", sendRtpItem.getPort()); - startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); - } - } - }else { - if (sendRtpItem.isTcpActive()) { - startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param); - }else { - param.put("is_udp", is_Udp); - param.put("dst_url", sendRtpItem.getIp()); - param.put("dst_port", sendRtpItem.getPort()); - startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); - } - } + }else { + JSONObject startSendRtpStreamResult = zlmrtpServerFactory.startSendRtp(mediaInfo, sendRtpItem); if (startSendRtpStreamResult != null) { - startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, startSendRtpStreamResult, param, callIdHeader); + startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, startSendRtpStreamResult, callIdHeader); } } } + private void startSendRtpStreamHand(RequestEvent evt, SendRtpItem sendRtpItem, ParentPlatform parentPlatform, - JSONObject jsonObject, Map param, CallIdHeader callIdHeader) { + JSONObject jsonObject, CallIdHeader callIdHeader) { if (jsonObject == null) { logger.error("RTP推流失败: 请检查ZLM服务"); } else if (jsonObject.getInteger("code") == 0) { logger.info("调用ZLM推流接口, 结果: {}", jsonObject); - logger.info("RTP推流成功[ {}/{} ],{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port")); + logger.info("RTP推流成功[ {}/{} ],{}->{}:{}, " ,sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getIp(), sendRtpItem.getPort()); } else { - logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"), JSON.toJSONString(param)); + logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"), JSON.toJSONString(sendRtpItem)); if (sendRtpItem.isOnlyAudio()) { Device device = deviceService.getDevice(sendRtpItem.getDeviceId()); AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); @@ -193,7 +152,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In cmder.streamByeCmd(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null); } catch (SipException | ParseException | InvalidArgumentException | SsrcTransactionNotFoundException e) { - logger.error("[命令发送失败] 停止语音对讲: {}", e.getMessage()); + logger.error("[命令发送失败] 停止语音喊话: {}", e.getMessage()); } } }else { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index e31995c3..6e188a58 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -201,7 +201,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements MediaServerItem mediaServerItem = null; StreamPushItem streamPushItem = null; - StreamProxyItem proxyByAppAndStream =null; + StreamProxyItem proxyByAppAndStream = null; // 不是通道可能是直播流 if (channel != null && gbStream == null) { // 通道存在,发100,TRYING @@ -1001,7 +1001,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements String stream = device.getDeviceId() + "_" + audioBroadcastCatch.getChannelId(); CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME); - sendRtpItem.setPlayType(InviteStreamType.TALK); + sendRtpItem.setPlayType(InviteStreamType.BROADCAST); sendRtpItem.setCallId(callIdHeader.getCallId()); sendRtpItem.setPlatformId(requesterId); sendRtpItem.setStatus(1); @@ -1013,6 +1013,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements sendRtpItem.setOnlyAudio(true); redisCatchStorage.updateSendRTPSever(sendRtpItem); + Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, app, stream); if (streamReady) { sendOk(device, sendRtpItem, sdp, request, mediaServerItem, mediaTransmissionTCP, ssrc); @@ -1084,7 +1085,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements audioBroadcastManager.update(audioBroadcastCatch); } catch (SipException | InvalidArgumentException | ParseException | SdpParseException e) { - logger.error("[命令发送失败] 语音对讲 回复200OK(SDP): {}", e.getMessage()); + logger.error("[命令发送失败] 语音喊话 回复200OK(SDP): {}", e.getMessage()); } return sipResponse; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java new file mode 100644 index 00000000..353aaf2b --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java @@ -0,0 +1,200 @@ +package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.cmd; + +import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; +import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; +import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; +import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; +import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.NotifyMessageHandler; +import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.service.IDeviceService; +import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.service.IPlatformService; +import com.genersoft.iot.vmp.service.IPlayService; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.storager.IVideoManagerStorage; +import gov.nist.javax.sip.message.SIPRequest; +import org.dom4j.Element; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.sip.InvalidArgumentException; +import javax.sip.RequestEvent; +import javax.sip.SipException; +import javax.sip.message.Response; +import java.text.ParseException; + +/** + * 状态信息(心跳)报送 + */ +@Component +public class BroadcastNotifyMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler { + + private Logger logger = LoggerFactory.getLogger(BroadcastNotifyMessageHandler.class); + private final static String cmdType = "Broadcast"; + + @Autowired + private NotifyMessageHandler notifyMessageHandler; + + @Autowired + private IVideoManagerStorage storage; + + @Autowired + private ISIPCommanderForPlatform commanderForPlatform; + + @Autowired + private IMediaServerService mediaServerService; + + @Autowired + private IPlayService playService; + + @Autowired + private IDeviceService deviceService; + + @Autowired + private IPlatformService platformService; + + @Autowired + private AudioBroadcastManager audioBroadcastManager; + + @Autowired + private ZLMRTPServerFactory zlmrtpServerFactory; + + @Autowired + private IRedisCatchStorage redisCatchStorage; + + @Override + public void afterPropertiesSet() throws Exception { + notifyMessageHandler.addHandler(cmdType, this); + } + + @Override + public void handForDevice(RequestEvent evt, Device device, Element element) { + + } + + @Override + public void handForPlatform(RequestEvent evt, ParentPlatform platform, Element rootElement) { + // 来自上级平台的语音喊话请求 + SIPRequest request = (SIPRequest) evt.getRequest(); + try { + Element snElement = rootElement.element("SN"); + if (snElement == null) { + responseAck(request, Response.BAD_REQUEST, "sn must not null"); + return; + } + String sn = snElement.getText(); + Element targetIDElement = rootElement.element("TargetID"); + if (targetIDElement == null) { + responseAck(request, Response.BAD_REQUEST, "TargetID must not null"); + return; + } + String targetId = targetIDElement.getText(); + + + logger.info("[国标级联 语音喊话] platform: {}, channel: {}", platform.getServerGBId(), targetId); + + DeviceChannel deviceChannel = storage.queryChannelInParentPlatform(platform.getServerGBId(), targetId); + if (deviceChannel == null) { + responseAck(request, Response.NOT_FOUND, "TargetID not found"); + return; + } + // 向下级发送语音的喊话请求 + Device device = deviceService.getDevice(deviceChannel.getDeviceId()); + if (device == null) { + responseAck(request, Response.NOT_FOUND, "device not found"); + return; + } + responseAck(request, Response.OK); + + // 查看语音通道是否已经建立并且已经在使用 + if (playService.audioBroadcastInUse(device, targetId)) { + commanderForPlatform.broadcastResultCmd(platform, deviceChannel, sn, false,null, null); + return; + } + + MediaServerItem mediaServerForMinimumLoad = mediaServerService.getMediaServerForMinimumLoad(); + commanderForPlatform.broadcastResultCmd(platform, deviceChannel, sn, true, eventResult->{ + logger.info("[国标级联] 语音喊话 回复失败 platform: {}, 错误:{}/{}", platform.getServerGBId(), eventResult.statusCode, eventResult.msg); + }, eventResult->{ + // 消息发送成功, 向上级发送invite,获取推流 + try { + platformService.broadcastInvite(platform, deviceChannel.getChannelId(), mediaServerForMinimumLoad, (mediaServerItem, response)->{ + // 上级平台推流成功 + String app = response.getString("app"); + String stream = response.getString("stream"); + AudioBroadcastCatch broadcastCatch = audioBroadcastManager.get(device.getDeviceId(), targetId); + if (broadcastCatch != null ) { + if (playService.audioBroadcastInUse(device, targetId)) { + logger.info("[国标级联] 语音喊话 设备正正在使用中 platform: {}, channel: {}", + platform.getServerGBId(), deviceChannel.getChannelId()); + // 查看语音通道已经建立且已经占用 回复BYE + try { + platformService.stopBroadcast(platform, deviceChannel.getChannelId(), stream); + } catch (InvalidArgumentException | ParseException | SsrcTransactionNotFoundException | + SipException e) { + logger.info("[消息发送失败] 国标级联 语音喊话 platform: {}, channel: {}", platform.getServerGBId(), deviceChannel.getChannelId()); + } + }else { + // 查看语音通道已经建立但是未占用 + broadcastCatch.setApp(app); + broadcastCatch.setStream(stream); + broadcastCatch.setMediaServerItem(mediaServerItem); + audioBroadcastManager.update(broadcastCatch); + // 推流到设备 + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, targetId, stream, null); + if (sendRtpItem == null) { + logger.warn("[国标级联] 语音喊话 异常,未找到发流信息, channelId: {}, stream: {}", targetId, stream); + logger.info("[国标级联] 语音喊话 重新开始,channelId: {}, stream: {}", targetId, stream); + try { + playService.audioBroadcastCmd(device, targetId, mediaServerItem, app, stream, 60, true, msg -> { + logger.info("[语音喊话] 通道建立成功, device: {}, channel: {}", device.getDeviceId(), targetId); + }); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.info("[消息发送失败] 国标级联 语音喊话 platform: {}", platform.getServerGBId()); + } + }else { + // 发流 + JSONObject jsonObject = zlmrtpServerFactory.startSendRtp(mediaServerItem, sendRtpItem); + if (jsonObject != null && jsonObject.getInteger("code") == 0 ) { + logger.info("[语音喊话] 自动推流成功, device: {}, channel: {}", device.getDeviceId(), targetId); + }else { + logger.info("[语音喊话] 推流失败, 结果: {}", jsonObject); + } + } + } + }else { + try { + playService.audioBroadcastCmd(device, targetId, mediaServerItem, app, stream, 60, true, msg -> { + logger.info("[语音喊话] 通道建立成功, device: {}, channel: {}", device.getDeviceId(), targetId); + }); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.info("[消息发送失败] 国标级联 语音喊话 platform: {}", platform.getServerGBId()); + } + } + + }, eventResultForBroadcastInvite -> { + // 收到错误 + logger.info("[国标级联-语音喊话] 与下级通道建立失败 device: {}, channel: {}, 错误:{}/{}", device.getDeviceId(), + targetId, eventResultForBroadcastInvite.statusCode, eventResultForBroadcastInvite.msg); + }, (code, msg)->{ + // 超时 + logger.info("[国标级联-语音喊话] 与下级通道建立超时 device: {}, channel: {}, 错误:{}/{}", device.getDeviceId(), + targetId, code, msg); + }); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.info("[消息发送失败] 国标级联 语音喊话 invite消息 platform: {}", platform.getServerGBId()); + } + }); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.info("[消息发送失败] 国标级联 语音喊话 platform: {}", platform.getServerGBId()); + } + + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/BroadcastResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/BroadcastResponseMessageHandler.java index 56fb7894..2cf20726 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/BroadcastResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/BroadcastResponseMessageHandler.java @@ -1,17 +1,14 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd; -import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatch; import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatchStatus; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; -import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler; -import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; import gov.nist.javax.sip.message.SIPRequest; import org.dom4j.Element; import org.slf4j.Logger; @@ -52,26 +49,13 @@ public class BroadcastResponseMessageHandler extends SIPRequestProcessorParent i public void handForDevice(RequestEvent evt, Device device, Element rootElement) { try { String channelId = getText(rootElement, "DeviceID"); - String key = DeferredResultHolder.CALLBACK_CMD_BROADCAST + device.getDeviceId() + channelId; - - // 此处是对本平台发出Broadcast指令的应答 - JSONObject json = new JSONObject(); - XmlUtil.node2Json(rootElement, json); - if (logger.isDebugEnabled()) { - logger.debug(json.toJSONString()); - } - RequestMessage msg = new RequestMessage(); - msg.setKey(key); - msg.setData(json); - deferredResultHolder.invokeAllResult(msg); - - if (!audioBroadcastManager.exit(device.getDeviceId(), channelId)) { // 回复410 responseAck((SIPRequest) evt.getRequest(), Response.GONE); return; } - logger.info("收到语音广播的回复:{}/{}", device.getDeviceId(), channelId ); + String result = getText(rootElement, "Result"); + logger.info("收到语音广播的回复 {}:{}/{}", result, device.getDeviceId(), channelId ); AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(device.getDeviceId(), channelId); audioBroadcastCatch.setStatus(AudioBroadcastCatchStatus.WaiteInvite); audioBroadcastManager.update(audioBroadcastCatch); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java index d0ba97eb..16fa6d83 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java @@ -1,39 +1,24 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.response.impl; -import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.gb28181.SipLayer; -import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction; -import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.SIPSender; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderProvider; import com.genersoft.iot.vmp.gb28181.transmit.event.response.SIPResponseProcessorAbstract; -import com.genersoft.iot.vmp.gb28181.utils.SipUtils; -import com.genersoft.iot.vmp.service.IDeviceService; -import com.genersoft.iot.vmp.utils.GitUtil; import gov.nist.javax.sip.ResponseEventExt; -import gov.nist.javax.sip.SipProviderImpl; import gov.nist.javax.sip.message.SIPResponse; -import gov.nist.javax.sip.stack.SIPClientTransaction; -import gov.nist.javax.sip.stack.SIPDialog; -import gov.nist.javax.sip.stack.SIPTransaction; -import gov.nist.javax.sip.stack.SIPTransactionImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; import javax.sdp.SdpFactory; import javax.sdp.SdpParseException; import javax.sdp.SessionDescription; -import javax.sip.*; -import javax.sip.address.Address; +import javax.sip.InvalidArgumentException; +import javax.sip.ResponseEvent; +import javax.sip.SipException; import javax.sip.address.SipURI; -import javax.sip.header.CSeqHeader; -import javax.sip.header.UserAgentHeader; import javax.sip.message.Request; import javax.sip.message.Response; import java.text.ParseException; @@ -104,6 +89,7 @@ public class InviteResponseProcessor extends SIPResponseProcessorAbstract { } else { sdp = SdpFactory.getInstance().createSessionDescription(contentString); } + // 查看是否是来自设备的,此是回复 SipURI requestUri = sipLayer.getSipFactory().createAddressFactory().createSipURI(sdp.getOrigin().getUsername(), event.getRemoteIpAddress() + ":" + event.getRemotePort()); Request reqAck = headerProvider.createAckRequest(response.getLocalAddress().getHostAddress(), requestUri, response); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 88d7e141..51ff7adc 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -287,15 +287,19 @@ public class ZLMHttpHookListener { logger.info("[ZLM HOOK] 流注销, {}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream()); } + JSONObject ret = new JSONObject(); + ret.put("code", 0); + ret.put("msg", "success"); JSONObject json = (JSONObject) JSON.toJSON(param); + MediaServerItem mediaInfo = mediaServerService.getOne(param.getMediaServerId()); + if (mediaInfo == null) { + return ret; + } taskExecutor.execute(()-> { ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_stream_changed, json); - if (subscribe != null) { - MediaServerItem mediaInfo = mediaServerService.getOne(param.getMediaServerId()); - if (mediaInfo != null) { - subscribe.response(mediaInfo, json); - } + if (subscribe != null ) { + subscribe.response(mediaInfo, json); } // 流消失移除redis play List tracks = param.getTracks(); @@ -343,7 +347,7 @@ public class ZLMHttpHookListener { } } }else if ("broadcast".equals(param.getApp())){ - // 语音对讲推流 stream需要满足格式deviceId_channelId + // 语音喊话推流 stream需要满足格式deviceId_channelId if (param.isRegist() && param.getStream().indexOf("_") > 0) { String[] streamArray = param.getStream().split("_"); if (streamArray.length == 2) { @@ -359,53 +363,38 @@ public class ZLMHttpHookListener { if (sendRtpItem == null) { // TODO 可能数据错误,重新开启语音通道 }else { - String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; - MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - logger.info("rtp/{}开始向上级推流, 目标={}:{},SSRC={}", sendRtpItem.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc()); - Map sendParam = new HashMap<>(12); - sendParam.put("vhost","__defaultVhost__"); - sendParam.put("app",sendRtpItem.getApp()); - sendParam.put("stream",sendRtpItem.getStreamId()); - sendParam.put("ssrc", sendRtpItem.getSsrc()); - sendParam.put("src_port", sendRtpItem.getLocalPort()); - sendParam.put("pt", sendRtpItem.getPt()); - sendParam.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); - sendParam.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); - - JSONObject jsonObject; - if (sendRtpItem.isTcpActive()) { - jsonObject = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, sendParam); - } else { - sendParam.put("is_udp", is_Udp); - sendParam.put("dst_url", sendRtpItem.getIp()); - sendParam.put("dst_port", sendRtpItem.getPort()); - jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, sendParam); - } - if (jsonObject != null && jsonObject.getInteger("code") == 0) { - logger.info("[语音对讲] 自动推流成功, device: {}, channel: {}", deviceId, channelId); + JSONObject jsonObject = zlmrtpServerFactory.startSendRtp(mediaInfo, sendRtpItem); + if (jsonObject != null && jsonObject.getInteger("code") == 0 ) { + logger.info("[语音喊话] 自动推流成功, device: {}, channel: {}", device.getDeviceId(), channelId); }else { - logger.info("[语音对讲] 推流失败, 结果: {}", jsonObject); + logger.info("[语音喊话] 推流失败, 结果: {}", jsonObject); } } }else { - // 开启语音对讲通道 + // 开启语音喊话通道 try { - playService.audioBroadcastCmd(device, channelId, 60, (msg)->{ - logger.info("[语音对讲] 通道建立成功, device: {}, channel: {}", deviceId, channelId); + playService.audioBroadcastCmd(device, channelId, mediaInfo, param.getApp(), param.getStream(),60, false, (msg)->{ + logger.info("[语音喊话] 通道建立成功, device: {}, channel: {}", deviceId, channelId); }); } catch (InvalidArgumentException | ParseException | SipException e) { - logger.error("[命令发送失败] 语音对讲: {}", e.getMessage()); + logger.error("[命令发送失败] 语音喊话: {}", e.getMessage()); } } + }else { + logger.info("[语音喊话] 推流指向的·通道{}未找到", channelId); } + }else { + logger.info("[语音喊话] 推流指向的·设备{}未找到", deviceId); } + }else { + logger.info("[语音喊话] 推流格式有误, 格式为: broadcast/设备编号_通道编号 "); } } }else if ("talk".equals(param.getApp())){ - // 语音对讲推流 stream需要满足格式deviceId_channelId + // 语音喊话推流 stream需要满足格式deviceId_channelId if (param.isRegist() && param.getStream().indexOf("_") > 0) { String[] streamArray = param.getStream().split("_"); if (streamArray.length == 2) { @@ -421,33 +410,11 @@ public class ZLMHttpHookListener { if (sendRtpItem == null) { // TODO 可能数据错误,重新开启语音通道 }else { - MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); logger.info("rtp/{}开始向上级推流, 目标={}:{},SSRC={}", sendRtpItem.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc()); - Map sendParam = new HashMap<>(12); - sendParam.put("vhost","__defaultVhost__"); - sendParam.put("app",sendRtpItem.getApp()); - sendParam.put("stream",sendRtpItem.getStreamId()); - sendParam.put("ssrc", sendRtpItem.getSsrc()); - sendParam.put("src_port", sendRtpItem.getLocalPort()); - sendParam.put("pt", sendRtpItem.getPt()); - sendParam.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); - sendParam.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); - - JSONObject jsonObject; - if (sendRtpItem.isTcpActive()) { - jsonObject = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, sendParam); - } else { - sendParam.put("is_udp", sendRtpItem.isTcp() ? "0" : "1"); - sendParam.put("dst_url", sendRtpItem.getIp()); - sendParam.put("dst_port", sendRtpItem.getPort()); - jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, sendParam); - } - if (jsonObject != null && jsonObject.getInteger("code") == 0) { - logger.info("[语音对讲] 自动推流成功, device: {}, channel: {}", deviceId, channelId); - } + zlmrtpServerFactory.startSendRtp(mediaInfo, sendRtpItem); } }else { - // 开启语音对讲通道 + // 开启语音喊话通道 MediaServerItem mediaServerItem = mediaServerService.getOne(param.getMediaServerId()); playService.talk(mediaServerItem, device, channelId, (mediaServer, jsonObject)->{ System.out.println("开始推流"); @@ -549,9 +516,7 @@ public class ZLMHttpHookListener { } } - JSONObject ret = new JSONObject(); - ret.put("code", 0); - ret.put("msg", "success"); + return ret; } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java index 4a6a94a7..c7d9966b 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java @@ -348,4 +348,52 @@ public class ZLMRTPServerFactory { return result; } + public JSONObject startSendRtp(MediaServerItem mediaInfo, SendRtpItem sendRtpItem) { + String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; + logger.info("rtp/{}开始向上级推流, 目标={}:{},SSRC={}", sendRtpItem.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc()); + Map param = new HashMap<>(12); + param.put("vhost","__defaultVhost__"); + param.put("app",sendRtpItem.getApp()); + param.put("stream",sendRtpItem.getStreamId()); + param.put("ssrc", sendRtpItem.getSsrc()); + param.put("src_port", sendRtpItem.getLocalPort()); + param.put("pt", sendRtpItem.getPt()); + param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); + param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); + if (!sendRtpItem.isTcp()) { + // udp模式下开启rtcp保活 + param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0"); + } + + if (mediaInfo == null) { + return null; + } + // 如果是非严格模式,需要关闭端口占用 + JSONObject startSendRtpStreamResult = null; + if (sendRtpItem.getLocalPort() != 0) { + HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(sendRtpItem.getSsrc(), null, mediaInfo.getId()); + hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout); + if (releasePort(mediaInfo, sendRtpItem.getSsrc())) { + if (sendRtpItem.isTcpActive()) { + startSendRtpStreamResult = startSendRtpPassive(mediaInfo, param); + System.out.println(JSON.toJSON(param)); + }else { + param.put("is_udp", is_Udp); + param.put("dst_url", sendRtpItem.getIp()); + param.put("dst_port", sendRtpItem.getPort()); + startSendRtpStreamResult = startSendRtpStream(mediaInfo, param); + } + } + }else { + if (sendRtpItem.isTcpActive()) { + startSendRtpStreamResult = startSendRtpPassive(mediaInfo, param); + }else { + param.put("is_udp", is_Udp); + param.put("dst_url", sendRtpItem.getIp()); + param.put("dst_port", sendRtpItem.getPort()); + startSendRtpStreamResult = startSendRtpStream(mediaInfo, param); + } + } + return startSendRtpStreamResult; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java index 17f8b37f..be8a8f5f 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java @@ -1,8 +1,17 @@ package com.genersoft.iot.vmp.service; +import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; +import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.service.bean.InviteTimeOutCallback; import com.github.pagehelper.PageInfo; +import javax.sip.InvalidArgumentException; +import javax.sip.SipException; +import java.text.ParseException; + /** * 国标平台的业务类 * @author lin @@ -48,4 +57,23 @@ public interface IPlatformService { * @param platformId 平台 */ void sendNotifyMobilePosition(String platformId); + + /** + * 向上级发送语音喊话的消息 + * @param platform 平台 + * @param channelId 通道 + * @param hookEvent hook事件 + * @param errorEvent 信令错误事件 + * @param timeoutCallback 超时事件 + */ + void broadcastInvite(ParentPlatform platform, String channelId, MediaServerItem mediaServerItem, ZlmHttpHookSubscribe.Event hookEvent, + SipSubscribe.Event errorEvent, InviteTimeOutCallback timeoutCallback) throws InvalidArgumentException, ParseException, SipException; + + /** + * 语音喊话回复BYE + * @param platform 平台 + * @param channelId 通道 + * @param stream 流信息 + */ + void stopBroadcast(ParentPlatform platform, String channelId, String stream )throws InvalidArgumentException, ParseException, SsrcTransactionNotFoundException, SipException; } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java index 4ab2f4a5..3f7e13de 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java @@ -53,10 +53,13 @@ public interface IPlayService { void zlmServerOnline(String mediaServerId); - AudioBroadcastResult audioBroadcast(Device device, String channelId); - void stopAudioBroadcast(String deviceId, String channelId); + AudioBroadcastResult audioBroadcastInfo(Device device, String channelId); - void audioBroadcastCmd(Device device, String channelId, int timeout, AudioBroadcastEvent event) throws InvalidArgumentException, ParseException, SipException; + boolean audioBroadcastCmd(Device device, String channelId, MediaServerItem mediaServerItem, String app, String stream, int timeout, boolean isFromPlatform, AudioBroadcastEvent event) throws InvalidArgumentException, ParseException, SipException; + + boolean audioBroadcastInUse(Device device, String channelId); + + void stopAudioBroadcast(String deviceId, String channelId); void pauseRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException; diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java index fbc507a6..5b0f67b1 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java @@ -1,15 +1,25 @@ package com.genersoft.iot.vmp.service.impl; +import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; +import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IPlatformService; +import com.genersoft.iot.vmp.service.IPlayService; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; +import com.genersoft.iot.vmp.service.bean.InviteTimeOutCallback; +import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.dao.GbStreamMapper; import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper; @@ -21,11 +31,13 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import javax.sip.InvalidArgumentException; +import javax.sip.ResponseEvent; import javax.sip.SipException; import java.text.ParseException; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; /** * @author lin @@ -65,6 +77,16 @@ public class PlatformServiceImpl implements IPlatformService { @Autowired private UserSetting userSetting; + @Autowired + private ZlmHttpHookSubscribe subscribe; + + @Autowired + private VideoStreamSessionManager streamSession; + + + @Autowired + private IPlayService playService; + @Override @@ -295,4 +317,137 @@ public class PlatformServiceImpl implements IPlatformService { } } } + + @Override + public void broadcastInvite(ParentPlatform platform, String channelId, MediaServerItem mediaServerItem, ZlmHttpHookSubscribe.Event hookEvent, + SipSubscribe.Event errorEvent, InviteTimeOutCallback timeoutCallback) throws InvalidArgumentException, ParseException, SipException { + + if (mediaServerItem == null) { + logger.info("[国标级联] 语音喊话未找到可用的zlm. platform: {}", platform.getServerGBId()); + return; + } + StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(platform.getServerGBId(), channelId); + if (streamInfo != null) { + // 如果zlm不存在这个流,则删除数据即可 + MediaServerItem mediaServerItemForStreamInfo = mediaServerService.getOne(streamInfo.getMediaServerId()); + if (mediaServerItemForStreamInfo != null) { + Boolean ready = zlmrtpServerFactory.isStreamReady(mediaServerItemForStreamInfo, streamInfo.getApp(), streamInfo.getStream()); + if (!ready) { + // 错误存在于redis中的数据 + redisCatchStorage.stopPlay(streamInfo); + }else { + // 流确实尚在推流,直接回调结果 + JSONObject json = new JSONObject(); + json.put("app", streamInfo.getApp()); + json.put("stream", streamInfo.getStream()); + hookEvent.response(mediaServerItemForStreamInfo, json); + return; + } + } + } + + String streamId = null; + if (mediaServerItem.isRtpEnable()) { + streamId = String.format("%s_%s", platform.getServerGBId(), channelId); + } + // 默认不进行SSRC校验, TODO 后续可改为配置 + boolean ssrcCheck = false; + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, ssrcCheck, false); + if (ssrcInfo == null || ssrcInfo.getPort() < 0) { + logger.info("[国标级联] 发起语音喊话 开启端口监听失败, platform: {}, channel: {}", platform.getServerGBId(), channelId); + errorEvent.response(new SipSubscribe.EventResult(-1, "端口监听失败")); + return; + } + logger.info("[国标级联] 发起语音喊话 deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验:{}", + platform.getServerGBId(), channelId, ssrcInfo.getPort(), userSetting.getBroadcastForPlatform(), ssrcInfo.getSsrc(), ssrcCheck); + + String timeOutTaskKey = UUID.randomUUID().toString(); + dynamicTask.startDelay(timeOutTaskKey, () -> { + // 执行超时任务时查询是否已经成功,成功了则不执行超时任务,防止超时任务取消失败的情况 + if (redisCatchStorage.queryPlayByDevice(platform.getServerGBId(), channelId) == null) { + logger.info("[国标级联] 发起语音喊话 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", platform.getServerGBId(), channelId, ssrcInfo.getPort(), ssrcInfo.getSsrc()); + // 点播超时回复BYE 同时释放ssrc以及此次点播的资源 + try { + commanderForPlatform.streamByeCmd(platform, channelId, ssrcInfo.getStream(), null, null); + } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { + logger.error("[点播超时], 发送BYE失败 {}", e.getMessage()); + } finally { + timeoutCallback.run(1, "收流超时"); + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); + mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); + streamSession.remove(platform.getServerGBId(), channelId, ssrcInfo.getStream()); + mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); + } + } + }, userSetting.getPlayTimeout()); + commanderForPlatform.broadcastInviteCmd(platform, channelId, mediaServerItem, ssrcInfo, (mediaServerItemForInvite, response)->{ + dynamicTask.stop(timeOutTaskKey); + // hook响应 + playService.onPublishHandlerForPlay(mediaServerItemForInvite, response, platform.getServerGBId(), channelId); + // 收到流 + if (hookEvent != null) { + hookEvent.response(mediaServerItem, response); + } + }, event -> { + // 收到200OK 检测ssrc是否有变化,防止上级自定义了ssrc + ResponseEvent responseEvent = (ResponseEvent) event.event; + String contentString = new String(responseEvent.getResponse().getRawContent()); + // 获取ssrc + int ssrcIndex = contentString.indexOf("y="); + // 检查是否有y字段 + if (ssrcIndex >= 0) { + //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容 + String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); + // 查询到ssrc不一致且开启了ssrc校验则需要针对处理 + if (ssrcInfo.getSsrc().equals(ssrcInResponse) || ssrcCheck) { + return; + } + logger.info("[点播消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse); + if (!mediaServerItem.isRtpEnable()) { + logger.info("[点播消息] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse); + + if (!mediaServerItem.getSsrcConfig().checkSsrc(ssrcInResponse)) { + // ssrc 不可用 + // 释放ssrc + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); + streamSession.remove(platform.getServerGBId(), channelId, ssrcInfo.getStream()); + event.msg = "下级自定义了ssrc,但是此ssrc不可用"; + event.statusCode = 400; + errorEvent.response(event); + return; + } + + // 单端口模式streamId也有变化,需要重新设置监听 + if (!mediaServerItem.isRtpEnable()) { + // 添加订阅 + HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId()); + subscribe.removeSubscribe(hookSubscribe); + hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase()); + subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> { + logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString()); + dynamicTask.stop(timeOutTaskKey); + // hook响应 + playService.onPublishHandlerForPlay(mediaServerItemInUse, response, platform.getServerGBId(), channelId); + hookEvent.response(mediaServerItemInUse, response); + }); + } + // 关闭rtp server + mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); + // 重新开启ssrc server + mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, false, false, ssrcInfo.getPort()); + + } + } + }, eventResult -> { + // 收到错误回复 + if (errorEvent != null) { + errorEvent.response(eventResult); + } + }); + } + + @Override + public void stopBroadcast(ParentPlatform platform, String channelId, String stream) throws InvalidArgumentException, ParseException, SsrcTransactionNotFoundException, SipException { + commanderForPlatform.streamByeCmd(platform, channelId, stream, null, null); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 32f0364f..5e62981a 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -987,7 +987,7 @@ public class PlayServiceImpl implements IPlayService { } @Override - public AudioBroadcastResult audioBroadcast(Device device, String channelId) { + public AudioBroadcastResult audioBroadcastInfo(Device device, String channelId) { if (device == null || channelId == null) { return null; } @@ -1012,46 +1012,51 @@ public class PlayServiceImpl implements IPlayService { } @Override - public void audioBroadcastCmd(Device device, String channelId, int timeout, AudioBroadcastEvent event) throws InvalidArgumentException, ParseException, SipException { + public boolean audioBroadcastCmd(Device device, String channelId, MediaServerItem mediaServerItem, String app, String stream, int timeout, boolean isFromPlatform, AudioBroadcastEvent event) throws InvalidArgumentException, ParseException, SipException { if (device == null || channelId == null) { - return; + return false; } logger.info("[语音喊话] device: {}, channel: {}", device.getDeviceId(), channelId); DeviceChannel deviceChannel = storager.queryChannel(device.getDeviceId(), channelId); if (deviceChannel == null) { logger.warn("开启语音广播的时候未找到通道: {}", channelId); event.call("开启语音广播的时候未找到通道"); - return; + return false; } // 查询通道使用状态 - if (audioBroadcastManager.exit(device.getDeviceId(), channelId)) { - SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null); - if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) { - // 查询流是否存在,不存在则认为是异常状态 - MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStreamId()); - if (streamReady) { - logger.warn("语音广播已经开启: {}", channelId); - event.call("语音广播已经开启"); - return; - } else { - audioBroadcastManager.del(deviceChannel.getDeviceId(), channelId); - redisCatchStorage.deleteSendRTPServer(device.getDeviceId(), channelId, sendRtpItem.getCallId(), sendRtpItem.getStreamId()); - } - } + if (audioBroadcastInUse(device, channelId)) { + return false; } // 发送通知 cmder.audioBroadcastCmd(device, channelId, eventResultForOk -> { // 发送成功 - AudioBroadcastCatch audioBroadcastCatch = new AudioBroadcastCatch(device.getDeviceId(), channelId, AudioBroadcastCatchStatus.Ready); - audioBroadcastManager.add(audioBroadcastCatch); + AudioBroadcastCatch audioBroadcastCatch = new AudioBroadcastCatch(device.getDeviceId(), channelId, mediaServerItem, app, stream, event, AudioBroadcastCatchStatus.Ready, isFromPlatform); + audioBroadcastManager.update(audioBroadcastCatch); }, eventResultForError -> { // 发送失败 logger.error("语音广播发送失败: {}:{}", channelId, eventResultForError.msg); event.call("语音广播发送失败"); stopAudioBroadcast(device.getDeviceId(), channelId); }); + return true; + } + + @Override + public boolean audioBroadcastInUse(Device device, String channelId) { + if (audioBroadcastManager.exit(device.getDeviceId(), channelId)) { + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null); + if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) { + // 查询流是否存在,不存在则认为是异常状态 + MediaServerItem mediaServerServiceOne = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerServiceOne, sendRtpItem.getApp(), sendRtpItem.getStreamId()); + if (streamReady) { + logger.warn("语音广播通道使用中: {}", channelId); + return true; + } + } + } + return false; } @@ -1075,6 +1080,9 @@ public class PlayServiceImpl implements IPlayService { param.put("stream", sendRtpItem.getStreamId()); zlmresTfulUtils.stopSendRtp(mediaInfo, param); } + if (audioBroadcastCatch.isFromPlatform()) { + // TODO 向上级发送BYE结束语音喊话 + } audioBroadcastManager.del(deviceId, channelId); } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java index 19b2adc6..0747b9ad 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java @@ -267,7 +267,7 @@ public class PlayController { throw new ControllerException(ErrorCode.ERROR400.getCode(), "未找到通道: " + channelId); } - return playService.audioBroadcast(device, channelId); + return playService.audioBroadcastInfo(device, channelId); } diff --git a/src/main/resources/all-application.yml b/src/main/resources/all-application.yml index ba150fb9..92c2acb2 100644 --- a/src/main/resources/all-application.yml +++ b/src/main/resources/all-application.yml @@ -168,7 +168,7 @@ user-settings: # 保存移动位置历史轨迹:true:保留历史数据,false:仅保留最后的位置(默认) save-position-history: false # 点播等待超时时间,单位:毫秒 - play-timeout: 3000 + play-timeout: 18000 # 上级点播等待超时时间,单位:毫秒 platform-play-timeout: 60000 # 是否开启接口鉴权 @@ -195,6 +195,8 @@ user-settings: gb-send-stream-strict: false # 设备上线时是否自动同步通道 sync-channel-on-device-online: false + # 国标级联语音喊话发流模式 * UDP:udp传输 TCP-ACTIVE:tcp主动模式 TCP-PASSIVE:tcp被动模式 + broadcast-for-platform: UDP # 关闭在线文档(生产环境建议关闭) springdoc: diff --git a/web_src/src/components/dialog/devicePlayer.vue b/web_src/src/components/dialog/devicePlayer.vue index 89b3e07b..cd58f412 100644 --- a/web_src/src/components/dialog/devicePlayer.vue +++ b/web_src/src/components/dialog/devicePlayer.vue @@ -279,7 +279,7 @@ - +

@@ -854,7 +854,7 @@ export default { if (this.broadcastStatus == -1) { // 默认状态, 开始 this.broadcastStatus = 0 - // 发起语音对讲 + // 发起语音喊话 this.$axios({ method: 'get', url: '/api/play/broadcast/' + this.deviceId + '/' + this.channelId + "?timeout=30" @@ -897,7 +897,7 @@ export default { let pushKey = res.data.data.pushKey; // 获取推流鉴权KEY url += "&sign=" + crypto.createHash('md5').update(pushKey, "utf8").digest('hex') - console.log("开始语音对讲: " + url) + console.log("开始语音喊话: " + url) this.broadcastRtc = new ZLMRTCClient.Endpoint({ debug: true, // 是否打印日志 zlmsdpUrl: url, //流地址 @@ -923,7 +923,7 @@ export default { console.error('不支持webrtc',e) this.$message({ showClose: true, - message: '不支持webrtc, 无法进行语音对讲', + message: '不支持webrtc, 无法进行语音喊话', type: 'error' }); this.broadcastStatus = -1;