diff --git a/doc/README.md b/doc/README.md index 0fb5b86d..2bbc1844 100644 --- a/doc/README.md +++ b/doc/README.md @@ -52,7 +52,7 @@ - [X] 报警订阅 - [X] 目录订阅 - [ ] 语音广播 -- [ ] 语音对讲 +- [ ] 语音喊话 **作为下级平台** - [X] 注册 @@ -91,7 +91,7 @@ - [ ] 报警订阅 - [X] 目录订阅 - [ ] 语音广播 -- [ ] 语音对讲 +- [ ] 语音喊话 diff --git a/doc/_content/theory/broadcast_cascade.md b/doc/_content/theory/broadcast_cascade.md new file mode 100644 index 00000000..e59b8c25 --- /dev/null +++ b/doc/_content/theory/broadcast_cascade.md @@ -0,0 +1,46 @@ + + +# 点播流程 +> 以下为WVP-PRO级联语音喊话流程。 + +```plantuml +@startuml +"上级平台" -> "下级平台": 1. 发起语音喊话请求 +"上级平台" <-- "下级平台": 2. 200OK +"上级平台" <- "下级平台": 3. 回复Result OK +"上级平台" --> "下级平台": 4. 200OK + +"下级平台" -> "设备": 5. 发起语音喊话请求 +"下级平台" <-- "设备": 6. 200OK +"下级平台" <- "设备": 7. 回复Result OK +"下级平台" --> "设备": 8. 200OK + +"下级平台" <- "设备": 9. invite(broadcast) +"下级平台" --> "设备": 10. 100 trying +"下级平台" --> "设备": 11. 200OK SDP +"下级平台" <-- "设备": 12. ack + +"上级平台" <- "下级平台": 13. invite(broadcast) +"上级平台" --> "下级平台": 14. 100 trying +"上级平台" --> "下级平台": 15. 200OK SDP +"上级平台" <-- "下级平台": 16. ack + +"上级平台" -> "下级平台": 17. 推送RTP +"下级平台" -> "设备": 18. 推送RTP + +@enduml +``` + + +## 注册流程描述如下: +1. 用户从网页或调用接口发起点播请求; +2. WVP-PRO向摄像机发送Invite消息,消息头域中携带 Subject字段,表明点播的视频源ID、发送方媒体流序列号、ZLMediaKit接收流使用的IP、端口号、 + 接收端媒体流序列号等参数,SDP消息体中 s字段为“Play”代表实时点播,y字段描述SSRC值,f字段描述媒体参数。 +3. 摄像机向WVP-PRO回复200OK,消息体中描述了媒体流发送者发送媒体流的IP、端口、媒体格式、SSRC字段等内容。 +4. WVP-PRO向设备回复Ack, 会话建立成功。 +5. 设备向ZLMediaKit发送实时流。 +6. ZLMediaKit向WVP-PRO发送流改变事件。 +7. WVP-PRO向WEB用户回复播放地址。 +8. ZLMediaKit向WVP发送流无人观看事件。 +9. WVP-PRO向设备回复Bye, 结束会话。 +10. 设备回复200OK,会话结束成功。 diff --git a/doc/_sidebar.md b/doc/_sidebar.md index 3b10bae8..05101c19 100644 --- a/doc/_sidebar.md +++ b/doc/_sidebar.md @@ -19,6 +19,7 @@ * [树形结构](_content/theory/channel_tree.md) * [注册流程](_content/theory/register.md) * [点播流程](_content/theory/play.md) + * [级联语音喊话流程](_content/theory/broadcast_cascade.md) * **必备技巧** * [抓包](_content/skill/tcpdump.md) diff --git a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java index 251edf13..811f7e96 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java @@ -55,6 +55,8 @@ public class UserSetting { private String thirdPartyGBIdReg = "[\\s\\S]*"; + private String broadcastForPlatform = "UDP"; + private List interfaceAuthenticationExcludes = new ArrayList<>(); public Boolean getSavePositionHistory() { @@ -205,6 +207,14 @@ public class UserSetting { this.syncChannelOnDeviceOnline = syncChannelOnDeviceOnline; } + public String getBroadcastForPlatform() { + return broadcastForPlatform; + } + + public void setBroadcastForPlatform(String broadcastForPlatform) { + this.broadcastForPlatform = broadcastForPlatform; + } + public Boolean getPushStreamAfterAck() { return pushStreamAfterAck; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java index a57f7a3a..4fff3520 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.bean; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioBroadcastEvent; import gov.nist.javax.sip.message.SIPResponse; /** @@ -11,18 +12,24 @@ import gov.nist.javax.sip.message.SIPResponse; public class AudioBroadcastCatch { - public AudioBroadcastCatch(String deviceId, - String channelId, - AudioBroadcastCatchStatus status, - MediaServerItem mediaServerItem, - String app, - String stream) { + public AudioBroadcastCatch( + String deviceId, + String channelId, + MediaServerItem mediaServerItem, + String app, + String stream, + AudioBroadcastEvent event, + AudioBroadcastCatchStatus status, + boolean isFromPlatform + ) { this.deviceId = deviceId; this.channelId = channelId; this.status = status; - this.mediaServerItem = mediaServerItem; + this.event = event; + this.isFromPlatform = isFromPlatform; this.app = app; this.stream = stream; + this.mediaServerItem = mediaServerItem; } public AudioBroadcastCatch() { @@ -39,20 +46,25 @@ public class AudioBroadcastCatch { private String channelId; /** - * 使用的流媒体 + * 流媒体信息 */ private MediaServerItem mediaServerItem; /** - * 待推送给设备的流应用名 + * 关联的流APP */ private String app; /** - * 待推送给设备的流ID + * 关联的流STREAM */ private String stream; + /** + * 是否是级联语音喊话 + */ + private boolean isFromPlatform; + /** * 语音广播状态 */ @@ -63,6 +75,11 @@ public class AudioBroadcastCatch { */ private SipTransactionInfo sipTransactionInfo; + /** + * 请求结果回调 + */ + private AudioBroadcastEvent event; + public String getDeviceId() { return deviceId; @@ -123,4 +140,44 @@ public class AudioBroadcastCatch { public void setMediaServerItem(MediaServerItem mediaServerItem) { this.mediaServerItem = mediaServerItem; } + + public AudioBroadcastEvent getEvent() { + return event; + } + + public void setEvent(AudioBroadcastEvent event) { + this.event = event; + } + + public String getApp() { + return app; + } + + public void setApp(String app) { + this.app = app; + } + + public String getStream() { + return stream; + } + + public void setStream(String stream) { + this.stream = stream; + } + + public boolean isFromPlatform() { + return isFromPlatform; + } + + public void setFromPlatform(boolean fromPlatform) { + isFromPlatform = fromPlatform; + } + + public MediaServerItem getMediaServerItem() { + return mediaServerItem; + } + + public void setMediaServerItem(MediaServerItem mediaServerItem) { + this.mediaServerItem = mediaServerItem; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteStreamType.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteStreamType.java index a3098fb7..dde7639e 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteStreamType.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteStreamType.java @@ -2,7 +2,7 @@ package com.genersoft.iot.vmp.gb28181.bean; public enum InviteStreamType { - PLAY,PLAYBACK,PUSH,PROXY,CLOUD_RECORD_PUSH,CLOUD_RECORD_PROXY,TALK + PLAY,PLAYBACK,PUSH,PROXY,CLOUD_RECORD_PUSH,CLOUD_RECORD_PROXY,BROADCAST,TALK } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatform.java index b056cc71..262ac559 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatform.java @@ -66,7 +66,7 @@ public class ParentPlatform { * 设备端口 */ @Schema(description = "设备端口") - private String devicePort; + private int devicePort; /** * SIP认证用户名(默认使用设备国标编号) @@ -261,11 +261,11 @@ public class ParentPlatform { this.deviceIp = deviceIp; } - public String getDevicePort() { + public int getDevicePort() { return devicePort; } - public void setDevicePort(String devicePort) { + public void setDevicePort(int devicePort) { this.devicePort = devicePort; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java index 75643905..5cc9cb92 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java @@ -86,6 +86,11 @@ public class SipSubscribe { public String callId; public EventObject event; + public EventResult(int statusCode, String msg) { + this.statusCode = statusCode; + this.msg = msg; + } + public EventResult(EventObject event) { this.event = event; if (event instanceof ResponseEvent) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java index 0425356a..9d5425b2 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java @@ -1,8 +1,12 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd; +import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; +import com.genersoft.iot.vmp.service.bean.SSRCInfo; import javax.sip.InvalidArgumentException; import javax.sip.SipException; @@ -14,42 +18,56 @@ public interface ISIPCommanderForPlatform { /** * 向上级平台注册 + * * @param parentPlatform * @return */ - void register(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws InvalidArgumentException, ParseException, SipException; - void register(ParentPlatform parentPlatform, String callId, WWWAuthenticateHeader www, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent, boolean registerAgain, boolean isRegister) throws SipException, InvalidArgumentException, ParseException; + void register(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent) + throws InvalidArgumentException, ParseException, SipException; + + void register(ParentPlatform parentPlatform, String callId, WWWAuthenticateHeader www, + SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent, boolean registerAgain, boolean isRegister) + throws SipException, InvalidArgumentException, ParseException; /** * 向上级平台注销 + * * @param parentPlatform * @return */ - void unregister(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws InvalidArgumentException, ParseException, SipException; + void unregister(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent) + throws InvalidArgumentException, ParseException, SipException; /** * 向上级平发送心跳信息 + * * @param parentPlatform * @return callId(作为接受回复的判定) */ - String keepalive(ParentPlatform parentPlatform,SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws SipException, InvalidArgumentException, ParseException; + String keepalive(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent) + throws SipException, InvalidArgumentException, ParseException; /** * 向上级回复通道信息 - * @param channel 通道信息 + * + * @param channel 通道信息 * @param parentPlatform 平台信息 * @param sn * @param fromTag * @param size * @return */ - void catalogQuery(DeviceChannel channel, ParentPlatform parentPlatform, String sn, String fromTag, int size) throws SipException, InvalidArgumentException, ParseException; - void catalogQuery(List channels, ParentPlatform parentPlatform, String sn, String fromTag) throws InvalidArgumentException, ParseException, SipException; + void catalogQuery(DeviceChannel channel, ParentPlatform parentPlatform, String sn, String fromTag, int size) + throws SipException, InvalidArgumentException, ParseException; + + void catalogQuery(List channels, ParentPlatform parentPlatform, String sn, String fromTag) + throws InvalidArgumentException, ParseException, SipException; /** * 向上级回复DeviceInfo查询信息 + * * @param parentPlatform 平台信息 * @param sn SN * @param fromTag FROM头的tag信息 @@ -59,32 +77,37 @@ public interface ISIPCommanderForPlatform { /** * 向上级回复DeviceStatus查询信息 + * * @param parentPlatform 平台信息 * @param sn * @param fromTag * @return */ - void deviceStatusResponse(ParentPlatform parentPlatform,String channelId, String sn, String fromTag,int status) throws SipException, InvalidArgumentException, ParseException; + void deviceStatusResponse(ParentPlatform parentPlatform,String channelId, String sn, String fromTag,int status) throws SipException, InvalidArgumentException, ParseException ; /** * 向上级回复移动位置订阅消息 + * * @param parentPlatform 平台信息 - * @param gpsMsgInfo GPS信息 - * @param subscribeInfo 订阅相关的信息 + * @param gpsMsgInfo GPS信息 + * @param subscribeInfo 订阅相关的信息 * @return */ - void sendNotifyMobilePosition(ParentPlatform parentPlatform, GPSMsgInfo gpsMsgInfo, SubscribeInfo subscribeInfo) throws InvalidArgumentException, ParseException, NoSuchFieldException, SipException, IllegalAccessException; + void sendNotifyMobilePosition(ParentPlatform parentPlatform, GPSMsgInfo gpsMsgInfo, SubscribeInfo subscribeInfo) + throws InvalidArgumentException, ParseException, NoSuchFieldException, SipException, IllegalAccessException; /** * 向上级回复报警消息 + * * @param parentPlatform 平台信息 - * @param deviceAlarm 报警信息信息 + * @param deviceAlarm 报警信息信息 * @return */ void sendAlarmMessage(ParentPlatform parentPlatform, DeviceAlarm deviceAlarm) throws SipException, InvalidArgumentException, ParseException; /** * 回复catalog事件-增加/更新 + * * @param parentPlatform * @param deviceChannels */ @@ -92,22 +115,28 @@ public interface ISIPCommanderForPlatform { /** * 回复catalog事件-删除 + * * @param parentPlatform * @param deviceChannels */ - void sendNotifyForCatalogOther(String type, ParentPlatform parentPlatform, List deviceChannels, SubscribeInfo subscribeInfo, Integer index) throws InvalidArgumentException, ParseException, NoSuchFieldException, SipException, IllegalAccessException; + void sendNotifyForCatalogOther(String type, ParentPlatform parentPlatform, List deviceChannels, + SubscribeInfo subscribeInfo, Integer index) throws InvalidArgumentException, + ParseException, NoSuchFieldException, SipException, IllegalAccessException; /** * 回复recordInfo - * @param deviceChannel 通道信息 + * + * @param deviceChannel 通道信息 * @param parentPlatform 平台信息 - * @param fromTag fromTag - * @param recordInfo 录像信息 + * @param fromTag fromTag + * @param recordInfo 录像信息 */ - void recordInfo(DeviceChannel deviceChannel, ParentPlatform parentPlatform, String fromTag, RecordInfo recordInfo) throws SipException, InvalidArgumentException, ParseException; + void recordInfo(DeviceChannel deviceChannel, ParentPlatform parentPlatform, String fromTag, RecordInfo recordInfo) + throws SipException, InvalidArgumentException, ParseException; /** * 录像播放推送完成时发送MediaStatus消息 + * * @param platform * @param sendRtpItem * @return @@ -116,9 +145,19 @@ public interface ISIPCommanderForPlatform { /** * 向发起点播的上级回复bye + * * @param platform 平台信息 - * @param callId callId + * @param callId callId */ void streamByeCmd(ParentPlatform platform, String callId) throws SipException, InvalidArgumentException, ParseException; + void streamByeCmd(ParentPlatform platform, SendRtpItem sendRtpItem) throws SipException, InvalidArgumentException, ParseException; + + void streamByeCmd(ParentPlatform platform, String channelId, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException; + + void broadcastInviteCmd(ParentPlatform platform, String channelId, MediaServerItem mediaServerItem, + SSRCInfo ssrcInfo, ZlmHttpHookSubscribe.Event event, SipSubscribe.Event okEvent, + SipSubscribe.Event errorEvent) throws ParseException, SipException, InvalidArgumentException; + + void broadcastResultCmd(ParentPlatform platform, DeviceChannel deviceChannel, String sn, boolean result, SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java index 0fe11c01..b77a0a48 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java @@ -4,6 +4,7 @@ import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.gb28181.SipLayer; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo; import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; @@ -14,7 +15,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.util.DigestUtils; -import javax.sip.*; +import javax.sip.InvalidArgumentException; +import javax.sip.PeerUnavailableException; import javax.sip.address.Address; import javax.sip.address.SipURI; import javax.sip.header.*; @@ -22,7 +24,6 @@ import javax.sip.message.Request; import javax.validation.constraints.NotNull; import java.text.ParseException; import java.util.ArrayList; -import java.util.List; import java.util.UUID; /** @@ -175,7 +176,7 @@ public class SIPRequestHeaderPlarformProvider { SipURI requestURI = sipLayer.getSipFactory().createAddressFactory().createSipURI(parentPlatform.getServerGBId(), serverAddress); // via ArrayList viaHeaders = new ArrayList(); - ViaHeader viaHeader = sipLayer.getSipFactory().createHeaderFactory().createViaHeader(parentPlatform.getDeviceIp(), Integer.parseInt(parentPlatform.getDevicePort()), + ViaHeader viaHeader = sipLayer.getSipFactory().createHeaderFactory().createViaHeader(parentPlatform.getDeviceIp(), parentPlatform.getDevicePort(), parentPlatform.getTransport(), viaTag); viaHeader.setRPort(); viaHeaders.add(viaHeader); @@ -212,7 +213,7 @@ public class SIPRequestHeaderPlarformProvider { SipURI requestURI = sipLayer.getSipFactory().createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerIP()+ ":" + parentPlatform.getServerPort()); // via ArrayList viaHeaders = new ArrayList<>(); - ViaHeader viaHeader = sipLayer.getSipFactory().createHeaderFactory().createViaHeader(parentPlatform.getDeviceIp(), Integer.parseInt(parentPlatform.getDevicePort()), + ViaHeader viaHeader = sipLayer.getSipFactory().createHeaderFactory().createViaHeader(parentPlatform.getDeviceIp(), parentPlatform.getDevicePort(), parentPlatform.getTransport(), SipUtils.getNewViaTag()); viaHeader.setRPort(); viaHeaders.add(viaHeader); @@ -272,7 +273,7 @@ public class SIPRequestHeaderPlarformProvider { SipURI requestURI = sipLayer.getSipFactory().createAddressFactory().createSipURI(platform.getServerGBId(), platform.getServerIP()+ ":" + platform.getServerPort()); // via ArrayList viaHeaders = new ArrayList<>(); - ViaHeader viaHeader = sipLayer.getSipFactory().createHeaderFactory().createViaHeader(platform.getDeviceIp(), Integer.parseInt(platform.getDevicePort()), + ViaHeader viaHeader = sipLayer.getSipFactory().createHeaderFactory().createViaHeader(platform.getDeviceIp(), platform.getDevicePort(), platform.getTransport(), SipUtils.getNewViaTag()); viaHeader.setRPort(); viaHeaders.add(viaHeader); @@ -306,6 +307,82 @@ public class SIPRequestHeaderPlarformProvider { .createSipURI(platform.getDeviceGBId(), sipAddress)); request.addHeader(sipLayer.getSipFactory().createHeaderFactory().createContactHeader(concatAddress)); + return request; + } + + public Request createInviteRequest(ParentPlatform platform, String channelId, String content, String viaTag, String fromTag, String ssrc, CallIdHeader callIdHeader) throws PeerUnavailableException, ParseException, InvalidArgumentException { + Request request = null; + //请求行 + String platformHostAddress = platform.getServerIP() + ":" + platform.getServerPort(); + String localHostAddress = sipLayer.getLocalIp(platform.getDeviceIp())+":"+ platform.getDevicePort(); + SipURI requestLine = sipLayer.getSipFactory().createAddressFactory().createSipURI(channelId, platformHostAddress); + //via + ArrayList viaHeaders = new ArrayList(); + ViaHeader viaHeader = sipLayer.getSipFactory().createHeaderFactory().createViaHeader(sipLayer.getLocalIp(platform.getDeviceIp()), platform.getDevicePort(), platform.getTransport(), viaTag); + viaHeader.setRPort(); + viaHeaders.add(viaHeader); + + //from + SipURI fromSipURI = sipLayer.getSipFactory().createAddressFactory().createSipURI(platform.getDeviceGBId(), sipConfig.getDomain()); + Address fromAddress = sipLayer.getSipFactory().createAddressFactory().createAddress(fromSipURI); + FromHeader fromHeader = sipLayer.getSipFactory().createHeaderFactory().createFromHeader(fromAddress, fromTag); //必须要有标记,否则无法创建会话,无法回应ack + //to + SipURI toSipURI = sipLayer.getSipFactory().createAddressFactory().createSipURI(channelId, platformHostAddress); + Address toAddress = sipLayer.getSipFactory().createAddressFactory().createAddress(toSipURI); + ToHeader toHeader = sipLayer.getSipFactory().createHeaderFactory().createToHeader(toAddress,null); + + //Forwards + MaxForwardsHeader maxForwards = sipLayer.getSipFactory().createHeaderFactory().createMaxForwardsHeader(70); + + //ceq + CSeqHeader cSeqHeader = sipLayer.getSipFactory().createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(), Request.INVITE); + request = sipLayer.getSipFactory().createMessageFactory().createRequest(requestLine, Request.INVITE, callIdHeader, cSeqHeader,fromHeader, toHeader, viaHeaders, maxForwards); + + request.addHeader(SipUtils.createUserAgentHeader(sipLayer.getSipFactory(), gitUtil)); + + Address concatAddress = sipLayer.getSipFactory().createAddressFactory().createAddress(sipLayer.getSipFactory().createAddressFactory().createSipURI(sipConfig.getId(),localHostAddress)); + request.addHeader(sipLayer.getSipFactory().createHeaderFactory().createContactHeader(concatAddress)); + // Subject + SubjectHeader subjectHeader = sipLayer.getSipFactory().createHeaderFactory().createSubjectHeader(String.format("%s:%s,%s:%s", channelId, ssrc, sipConfig.getId(), 0)); + request.addHeader(subjectHeader); + ContentTypeHeader contentTypeHeader = sipLayer.getSipFactory().createHeaderFactory().createContentTypeHeader("APPLICATION", "SDP"); + request.setContent(content, contentTypeHeader); + return request; + } + + public Request createByteRequest(ParentPlatform platform, String channelId, SipTransactionInfo transactionInfo) throws PeerUnavailableException, ParseException, InvalidArgumentException { + String deviceHostAddress = platform.getDeviceIp() + ":" + platform.getDevicePort(); + Request request = null; + SipURI requestLine = sipLayer.getSipFactory().createAddressFactory().createSipURI(channelId, deviceHostAddress); + + // via + ArrayList viaHeaders = new ArrayList(); + ViaHeader viaHeader = sipLayer.getSipFactory().createHeaderFactory().createViaHeader(sipLayer.getLocalIp(platform.getDeviceIp()), platform.getDevicePort(), platform.getTransport(), SipUtils.getNewViaTag()); + viaHeaders.add(viaHeader); + //from + SipURI fromSipURI = sipLayer.getSipFactory().createAddressFactory().createSipURI(sipConfig.getId(),sipConfig.getDomain()); + Address fromAddress = sipLayer.getSipFactory().createAddressFactory().createAddress(fromSipURI); + FromHeader fromHeader = sipLayer.getSipFactory().createHeaderFactory().createFromHeader(fromAddress, transactionInfo.isAsSender()?transactionInfo.getFromTag():transactionInfo.getToTag()); + //to + SipURI toSipURI = sipLayer.getSipFactory().createAddressFactory().createSipURI(channelId, deviceHostAddress); + Address toAddress = sipLayer.getSipFactory().createAddressFactory().createAddress(toSipURI); + ToHeader toHeader = sipLayer.getSipFactory().createHeaderFactory().createToHeader(toAddress,transactionInfo.isAsSender()?transactionInfo.getToTag():transactionInfo.getFromTag()); + + //Forwards + MaxForwardsHeader maxForwards = sipLayer.getSipFactory().createHeaderFactory().createMaxForwardsHeader(70); + + //ceq + CSeqHeader cSeqHeader = sipLayer.getSipFactory().createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(), Request.BYE); + CallIdHeader callIdHeader = sipLayer.getSipFactory().createHeaderFactory().createCallIdHeader(transactionInfo.getCallId()); + request = sipLayer.getSipFactory().createMessageFactory().createRequest(requestLine, Request.BYE, callIdHeader, cSeqHeader,fromHeader, toHeader, viaHeaders, maxForwards); + + request.addHeader(SipUtils.createUserAgentHeader(sipLayer.getSipFactory(), gitUtil)); + + Address concatAddress = sipLayer.getSipFactory().createAddressFactory().createAddress(sipLayer.getSipFactory().createAddressFactory().createSipURI(sipConfig.getId(), sipLayer.getLocalIp(platform.getDeviceIp())+":"+ platform.getDevicePort())); + request.addHeader(sipLayer.getSipFactory().createHeaderFactory().createContactHeader(concatAddress)); + + request.addHeader(SipUtils.createUserAgentHeader(sipLayer.getSipFactory(), gitUtil)); + return request; } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index 3b6565a0..48551149 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -591,12 +591,12 @@ public class SIPCommander implements ISIPCommander { return; } if (!mediaServerItem.isRtpEnable()) { - // 单端口暂不支持语音对讲 - logger.info("[语音对讲] 单端口暂不支持此操作"); + // 单端口暂不支持语音喊话 + logger.info("[语音喊话] 单端口暂不支持此操作"); return; } - logger.info("[语音对讲] {} 分配的ZLM为: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), sendRtpItem.getPort()); + logger.info("[语音喊话] {} 分配的ZLM为: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), sendRtpItem.getPort()); HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId()); subscribe.addSubscribe(hookSubscribeForStreamChange, (MediaServerItem mediaServerItemInUse, JSONObject json) -> { if (event != null) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index 4fef72d7..9809a9f9 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -1,23 +1,32 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl; import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.conf.DynamicTask; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.SipLayer; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; +import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.SIPSender; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderPlarformProvider; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; +import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.dao.dto.PlatformRegisterInfo; import com.genersoft.iot.vmp.utils.DateUtil; import gov.nist.javax.sip.message.MessageFactoryImpl; import gov.nist.javax.sip.message.SIPRequest; +import gov.nist.javax.sip.message.SIPResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -27,6 +36,7 @@ import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import javax.sip.InvalidArgumentException; +import javax.sip.ResponseEvent; import javax.sip.SipException; import javax.sip.header.CallIdHeader; import javax.sip.header.WWWAuthenticateHeader; @@ -62,6 +72,16 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { @Autowired private SIPSender sipSender; + @Autowired + private ZlmHttpHookSubscribe subscribe; + + @Autowired + private UserSetting userSetting; + + + @Autowired + private VideoStreamSessionManager streamSession; + @Autowired private DynamicTask dynamicTask; @@ -336,6 +356,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { sipSender.transmitRequest(parentPlatform.getDeviceIp(), request); } + /** * 向上级回复DeviceStatus查询信息 * @param parentPlatform 平台信息 @@ -702,4 +723,107 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { } sipSender.transmitRequest(platform.getDeviceIp(),byeRequest); } + + @Override + public void streamByeCmd(ParentPlatform platform, String channelId, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException { + SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(platform.getServerGBId(), channelId, callId, stream); + if (ssrcTransaction == null) { + throw new SsrcTransactionNotFoundException(platform.getServerGBId(), channelId, callId, stream); + } + + mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc()); + mediaServerService.closeRTPServer(ssrcTransaction.getMediaServerId(), ssrcTransaction.getStream()); + streamSession.remove(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream()); + + Request byteRequest = headerProviderPlatformProvider.createByteRequest(platform, channelId, ssrcTransaction.getSipTransactionInfo()); + sipSender.transmitRequest(sipLayer.getLocalIp(platform.getDeviceIp()), byteRequest, null, okEvent); + } + + @Override + public void broadcastResultCmd(ParentPlatform platform, DeviceChannel deviceChannel, String sn, boolean result, SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException { + if (platform == null || deviceChannel == null) { + return; + } + String characterSet = platform.getCharacterSet(); + StringBuffer mediaStatusXml = new StringBuffer(200); + mediaStatusXml.append("\r\n"); + mediaStatusXml.append("\r\n"); + mediaStatusXml.append("Broadcast\r\n"); + mediaStatusXml.append("" + sn + "\r\n"); + mediaStatusXml.append("" + deviceChannel.getChannelId() + "\r\n"); + mediaStatusXml.append("" + (result?"OK":"ERROR") + "\r\n"); + mediaStatusXml.append("\r\n"); + + CallIdHeader callIdHeader = sipSender.getNewCallIdHeader(platform.getDeviceIp(), platform.getTransport()); + + SIPRequest messageRequest = (SIPRequest)headerProviderPlatformProvider.createMessageRequest(platform, mediaStatusXml.toString(), + SipUtils.getNewFromTag(), SipUtils.getNewViaTag(), callIdHeader); + + sipSender.transmitRequest(platform.getDeviceIp(),messageRequest, errorEvent, okEvent); + } + + @Override + public void broadcastInviteCmd(ParentPlatform platform, String channelId, MediaServerItem mediaServerItem, + SSRCInfo ssrcInfo, ZlmHttpHookSubscribe.Event event, SipSubscribe.Event okEvent, + SipSubscribe.Event errorEvent) throws ParseException, SipException, InvalidArgumentException { + String stream = ssrcInfo.getStream(); + + if (platform == null) { + return; + } + + logger.info("{} 分配的ZLM为: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort()); + HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId()); + subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json) -> { + if (event != null) { + event.response(mediaServerItemInUse, json); + subscribe.removeSubscribe(hookSubscribe); + } + }); + String sdpIp = mediaServerItem.getSdpIp(); + + StringBuffer content = new StringBuffer(200); + content.append("v=0\r\n"); + content.append("o=" + channelId + " 0 0 IN IP4 " + sdpIp + "\r\n"); + content.append("s=Play\r\n"); + content.append("c=IN IP4 " + sdpIp + "\r\n"); + content.append("t=0 0\r\n"); + + if ("TCP-PASSIVE".equalsIgnoreCase(userSetting.getBroadcastForPlatform())) { + content.append("m=video " + ssrcInfo.getPort() + " TCP/RTP/AVP 8 96\r\n"); + } else if ("TCP-ACTIVE".equalsIgnoreCase(userSetting.getBroadcastForPlatform())) { + content.append("m=video " + ssrcInfo.getPort() + " TCP/RTP/AVP 8 96\r\n"); + } else if ("UDP".equalsIgnoreCase(userSetting.getBroadcastForPlatform())) { + content.append("m=video " + ssrcInfo.getPort() + " RTP/AVP 8 96\r\n"); + } + + content.append("a=recvonly\r\n"); + content.append("a=rtpmap:8 PCMA/8000\r\n"); + content.append("a=rtpmap:96 PS/90000\r\n"); + if ("TCP-PASSIVE".equalsIgnoreCase(userSetting.getBroadcastForPlatform())) { + content.append("a=setup:passive\r\n"); + content.append("a=connection:new\r\n"); + }else if ("TCP-ACTIVE".equalsIgnoreCase(userSetting.getBroadcastForPlatform())) { + content.append("a=setup:active\r\n"); + content.append("a=connection:new\r\n"); + } + + content.append("y=" + ssrcInfo.getSsrc() + "\r\n");//ssrc + CallIdHeader callIdHeader = sipSender.getNewCallIdHeader(sipLayer.getLocalIp(platform.getDeviceIp()), platform.getTransport()); + + Request request = headerProviderPlatformProvider.createInviteRequest(platform, channelId, + content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), ssrcInfo.getSsrc(), + callIdHeader); + sipSender.transmitRequest(sipLayer.getLocalIp(platform.getDeviceIp()), request, (e -> { + streamSession.remove(platform.getServerGBId(), channelId, ssrcInfo.getStream()); + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); + errorEvent.response(e); + }), e -> { + // 这里为例避免一个通道的点播只有一个callID这个参数使用一个固定值 + ResponseEvent responseEvent = (ResponseEvent) e.event; + SIPResponse response = (SIPResponse) responseEvent.getResponse(); + streamSession.put(platform.getServerGBId(), channelId, callIdHeader.getCallId(), stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), response, VideoStreamSessionManager.SessionType.play); + okEvent.response(e); + }); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index b00c83bd..12bddb14 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -156,7 +156,6 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In } } } - } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index c620354d..c89279a1 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -202,7 +202,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements MediaServerItem mediaServerItem = null; StreamPushItem streamPushItem = null; - StreamProxyItem proxyByAppAndStream =null; + StreamProxyItem proxyByAppAndStream = null; // 不是通道可能是直播流 if (channel != null && gbStream == null) { // 通道存在,发100,TRYING @@ -1018,7 +1018,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME); - sendRtpItem.setPlayType(InviteStreamType.TALK); + sendRtpItem.setPlayType(InviteStreamType.BROADCAST); sendRtpItem.setCallId(callIdHeader.getCallId()); sendRtpItem.setPlatformId(requesterId); sendRtpItem.setStatus(1); @@ -1113,7 +1113,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } } catch (SipException | InvalidArgumentException | ParseException | SdpParseException e) { - logger.error("[命令发送失败] 语音对讲 回复200OK(SDP): {}", e.getMessage()); + logger.error("[命令发送失败] 语音喊话 回复200OK(SDP): {}", e.getMessage()); } return sipResponse; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java new file mode 100644 index 00000000..240f24f4 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java @@ -0,0 +1,200 @@ +package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.cmd; + +import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; +import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; +import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; +import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; +import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.NotifyMessageHandler; +import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.service.IDeviceService; +import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.service.IPlatformService; +import com.genersoft.iot.vmp.service.IPlayService; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.storager.IVideoManagerStorage; +import gov.nist.javax.sip.message.SIPRequest; +import org.dom4j.Element; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.sip.InvalidArgumentException; +import javax.sip.RequestEvent; +import javax.sip.SipException; +import javax.sip.message.Response; +import java.text.ParseException; + +/** + * 状态信息(心跳)报送 + */ +@Component +public class BroadcastNotifyMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler { + + private Logger logger = LoggerFactory.getLogger(BroadcastNotifyMessageHandler.class); + private final static String cmdType = "Broadcast"; + + @Autowired + private NotifyMessageHandler notifyMessageHandler; + + @Autowired + private IVideoManagerStorage storage; + + @Autowired + private ISIPCommanderForPlatform commanderForPlatform; + + @Autowired + private IMediaServerService mediaServerService; + + @Autowired + private IPlayService playService; + + @Autowired + private IDeviceService deviceService; + + @Autowired + private IPlatformService platformService; + + @Autowired + private AudioBroadcastManager audioBroadcastManager; + + @Autowired + private ZLMRTPServerFactory zlmrtpServerFactory; + + @Autowired + private IRedisCatchStorage redisCatchStorage; + + @Override + public void afterPropertiesSet() throws Exception { + notifyMessageHandler.addHandler(cmdType, this); + } + + @Override + public void handForDevice(RequestEvent evt, Device device, Element element) { + + } + + @Override + public void handForPlatform(RequestEvent evt, ParentPlatform platform, Element rootElement) { + // 来自上级平台的语音喊话请求 + SIPRequest request = (SIPRequest) evt.getRequest(); + try { + Element snElement = rootElement.element("SN"); + if (snElement == null) { + responseAck(request, Response.BAD_REQUEST, "sn must not null"); + return; + } + String sn = snElement.getText(); + Element targetIDElement = rootElement.element("TargetID"); + if (targetIDElement == null) { + responseAck(request, Response.BAD_REQUEST, "TargetID must not null"); + return; + } + String targetId = targetIDElement.getText(); + + + logger.info("[国标级联 语音喊话] platform: {}, channel: {}", platform.getServerGBId(), targetId); + + DeviceChannel deviceChannel = storage.queryChannelInParentPlatform(platform.getServerGBId(), targetId); + if (deviceChannel == null) { + responseAck(request, Response.NOT_FOUND, "TargetID not found"); + return; + } + // 向下级发送语音的喊话请求 + Device device = deviceService.getDevice(deviceChannel.getDeviceId()); + if (device == null) { + responseAck(request, Response.NOT_FOUND, "device not found"); + return; + } + responseAck(request, Response.OK); + + // 查看语音通道是否已经建立并且已经在使用 + if (playService.audioBroadcastInUse(device, targetId)) { + commanderForPlatform.broadcastResultCmd(platform, deviceChannel, sn, false,null, null); + return; + } + + MediaServerItem mediaServerForMinimumLoad = mediaServerService.getMediaServerForMinimumLoad(null); + commanderForPlatform.broadcastResultCmd(platform, deviceChannel, sn, true, eventResult->{ + logger.info("[国标级联] 语音喊话 回复失败 platform: {}, 错误:{}/{}", platform.getServerGBId(), eventResult.statusCode, eventResult.msg); + }, eventResult->{ + // 消息发送成功, 向上级发送invite,获取推流 + try { + platformService.broadcastInvite(platform, deviceChannel.getChannelId(), mediaServerForMinimumLoad, (mediaServerItem, response)->{ + // 上级平台推流成功 + String app = response.getString("app"); + String stream = response.getString("stream"); + AudioBroadcastCatch broadcastCatch = audioBroadcastManager.get(device.getDeviceId(), targetId); + if (broadcastCatch != null ) { + if (playService.audioBroadcastInUse(device, targetId)) { + logger.info("[国标级联] 语音喊话 设备正正在使用中 platform: {}, channel: {}", + platform.getServerGBId(), deviceChannel.getChannelId()); + // 查看语音通道已经建立且已经占用 回复BYE + try { + platformService.stopBroadcast(platform, deviceChannel.getChannelId(), stream); + } catch (InvalidArgumentException | ParseException | SsrcTransactionNotFoundException | + SipException e) { + logger.info("[消息发送失败] 国标级联 语音喊话 platform: {}, channel: {}", platform.getServerGBId(), deviceChannel.getChannelId()); + } + }else { + // 查看语音通道已经建立但是未占用 + broadcastCatch.setApp(app); + broadcastCatch.setStream(stream); + broadcastCatch.setMediaServerItem(mediaServerItem); + audioBroadcastManager.update(broadcastCatch); + // 推流到设备 + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, targetId, stream, null); + if (sendRtpItem == null) { + logger.warn("[国标级联] 语音喊话 异常,未找到发流信息, channelId: {}, stream: {}", targetId, stream); + logger.info("[国标级联] 语音喊话 重新开始,channelId: {}, stream: {}", targetId, stream); + try { + playService.audioBroadcastCmd(device, targetId, mediaServerItem, app, stream, 60, true, msg -> { + logger.info("[语音喊话] 通道建立成功, device: {}, channel: {}", device.getDeviceId(), targetId); + }); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.info("[消息发送失败] 国标级联 语音喊话 platform: {}", platform.getServerGBId()); + } + }else { + // 发流 + JSONObject jsonObject = zlmrtpServerFactory.startSendRtp(mediaServerItem, sendRtpItem); + if (jsonObject != null && jsonObject.getInteger("code") == 0 ) { + logger.info("[语音喊话] 自动推流成功, device: {}, channel: {}", device.getDeviceId(), targetId); + }else { + logger.info("[语音喊话] 推流失败, 结果: {}", jsonObject); + } + } + } + }else { + try { + playService.audioBroadcastCmd(device, targetId, mediaServerItem, app, stream, 60, true, msg -> { + logger.info("[语音喊话] 通道建立成功, device: {}, channel: {}", device.getDeviceId(), targetId); + }); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.info("[消息发送失败] 国标级联 语音喊话 platform: {}", platform.getServerGBId()); + } + } + + }, eventResultForBroadcastInvite -> { + // 收到错误 + logger.info("[国标级联-语音喊话] 与下级通道建立失败 device: {}, channel: {}, 错误:{}/{}", device.getDeviceId(), + targetId, eventResultForBroadcastInvite.statusCode, eventResultForBroadcastInvite.msg); + }, (code, msg)->{ + // 超时 + logger.info("[国标级联-语音喊话] 与下级通道建立超时 device: {}, channel: {}, 错误:{}/{}", device.getDeviceId(), + targetId, code, msg); + }); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.info("[消息发送失败] 国标级联 语音喊话 invite消息 platform: {}", platform.getServerGBId()); + } + }); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.info("[消息发送失败] 国标级联 语音喊话 platform: {}", platform.getServerGBId()); + } + + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/BroadcastResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/BroadcastResponseMessageHandler.java index cf5c34b5..4b6bbbc2 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/BroadcastResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/BroadcastResponseMessageHandler.java @@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatchStatus; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; +import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler; @@ -53,9 +54,9 @@ public class BroadcastResponseMessageHandler extends SIPRequestProcessorParent i @Override public void handForDevice(RequestEvent evt, Device device, Element rootElement) { - String channelId = getText(rootElement, "DeviceID"); SIPRequest request = (SIPRequest) evt.getRequest(); try { + String channelId = getText(rootElement, "DeviceID"); if (!audioBroadcastManager.exit(device.getDeviceId(), channelId)) { // 回复410 responseAck((SIPRequest) evt.getRequest(), Response.GONE); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java index b5a9ee7b..7feac173 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java @@ -1,39 +1,24 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.response.impl; -import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.gb28181.SipLayer; -import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction; -import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.SIPSender; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderProvider; import com.genersoft.iot.vmp.gb28181.transmit.event.response.SIPResponseProcessorAbstract; -import com.genersoft.iot.vmp.gb28181.utils.SipUtils; -import com.genersoft.iot.vmp.service.IDeviceService; -import com.genersoft.iot.vmp.utils.GitUtil; import gov.nist.javax.sip.ResponseEventExt; -import gov.nist.javax.sip.SipProviderImpl; import gov.nist.javax.sip.message.SIPResponse; -import gov.nist.javax.sip.stack.SIPClientTransaction; -import gov.nist.javax.sip.stack.SIPDialog; -import gov.nist.javax.sip.stack.SIPTransaction; -import gov.nist.javax.sip.stack.SIPTransactionImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; import javax.sdp.SdpFactory; import javax.sdp.SdpParseException; import javax.sdp.SessionDescription; -import javax.sip.*; -import javax.sip.address.Address; +import javax.sip.InvalidArgumentException; +import javax.sip.ResponseEvent; +import javax.sip.SipException; import javax.sip.address.SipURI; -import javax.sip.header.CSeqHeader; -import javax.sip.header.UserAgentHeader; import javax.sip.message.Request; import javax.sip.message.Response; import java.text.ParseException; @@ -104,6 +89,7 @@ public class InviteResponseProcessor extends SIPResponseProcessorAbstract { } else { sdp = SdpFactory.getInstance().createSessionDescription(contentString); } + // 查看是否是来自设备的,此是回复 SipURI requestUri = sipLayer.getSipFactory().createAddressFactory().createSipURI(sdp.getOrigin().getUsername(), event.getRemoteIpAddress() + ":" + event.getRemotePort()); Request reqAck = headerProvider.createAckRequest(response.getLocalAddress().getHostAddress(), requestUri, response); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index eebb4617..984a377e 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -280,6 +280,9 @@ public class ZLMHttpHookListener { logger.info("[ZLM HOOK] 流注销, {}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream()); } + JSONObject ret = new JSONObject(); + ret.put("code", 0); + ret.put("msg", "success"); MediaServerItem mediaInfo = mediaServerService.getOne(param.getMediaServerId()); JSONObject json = (JSONObject) JSON.toJSON(param); taskExecutor.execute(() -> { @@ -335,34 +338,34 @@ public class ZLMHttpHookListener { } }else if ("broadcast".equals(param.getApp())){ // 语音对讲推流 stream需要满足格式deviceId_channelId - if (param.getStream().indexOf("_") > 0) { - String[] streamArray = param.getStream().split("_"); - if (streamArray.length == 2) { - String deviceId = streamArray[0]; - String channelId = streamArray[1]; - Device device = deviceService.getDevice(deviceId); - if (device != null) { - if (param.isRegist()) { - if (audioBroadcastManager.exit(deviceId, channelId)) { - playService.stopAudioBroadcast(deviceId, channelId); - } - // 开启语音对讲通道 - try { - playService.audioBroadcastCmd(device, channelId, 60, mediaInfo, param.getApp(), param.getStream(), (msg)->{ - logger.info("[语音对讲] 通道建立成功, device: {}, channel: {}", deviceId, channelId); - }); - } catch (InvalidArgumentException | ParseException | SipException e) { - logger.error("[命令发送失败] 语音对讲: {}", e.getMessage()); - } - }else { - // 流注销 - playService.stopAudioBroadcast(deviceId, channelId); - } - } else{ - logger.info("[语音对讲] 未找到设备:{}", deviceId); - } - } - } + if (param.getStream().indexOf("_") > 0) { + String[] streamArray = param.getStream().split("_"); + if (streamArray.length == 2) { + String deviceId = streamArray[0]; + String channelId = streamArray[1]; + Device device = deviceService.getDevice(deviceId); + if (device != null) { + if (param.isRegist()) { + if (audioBroadcastManager.exit(deviceId, channelId)) { + playService.stopAudioBroadcast(deviceId, channelId); + } + // 开启语音对讲通道 + try { + playService.audioBroadcastCmd(device, channelId, mediaInfo, param.getApp(), param.getStream(), 60, false, (msg)->{ + logger.info("[语音对讲] 通道建立成功, device: {}, channel: {}", deviceId, channelId); + }); + } catch (InvalidArgumentException | ParseException | SipException e) { + logger.error("[命令发送失败] 语音对讲: {}", e.getMessage()); + } + }else { + // 流注销 + playService.stopAudioBroadcast(deviceId, channelId); + } + } else{ + logger.info("[语音对讲] 未找到设备:{}", deviceId); + } + } + } }else if ("talk".equals(param.getApp())){ // 语音对讲推流 stream需要满足格式deviceId_channelId if (param.getStream().indexOf("_") > 0) { diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java index b382a3d2..9a9dbe16 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java @@ -92,7 +92,7 @@ public class ZLMRTPServerFactory { return result; } - public int createRTPServer(MediaServerItem mediaServerItem, String streamId, int ssrc, Integer port) { + public int createRTPServer(MediaServerItem mediaServerItem, String streamId, int ssrc, Integer port, Boolean onlyAuto) { int result = -1; // 查询此rtp server 是否已经存在 JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaServerItem, streamId); @@ -108,7 +108,7 @@ public class ZLMRTPServerFactory { JSONObject jsonObject = zlmresTfulUtils.closeRtpServer(mediaServerItem, param); if (jsonObject != null ) { if (jsonObject.getInteger("code") == 0) { - return createRTPServer(mediaServerItem, streamId, ssrc, port); + return createRTPServer(mediaServerItem, streamId, ssrc, port, onlyAuto); }else { logger.warn("[开启rtpServer], 重启RtpServer错误"); } @@ -131,6 +131,9 @@ public class ZLMRTPServerFactory { param.put("port", port); } param.put("ssrc", ssrc); + if (onlyAuto != null) { + param.put("only_audio", onlyAuto?"1":"0"); + } JSONObject openRtpServerResultJson = zlmresTfulUtils.openRtpServer(mediaServerItem, param); logger.info(JSONObject.toJSONString(openRtpServerResultJson)); if (openRtpServerResultJson != null) { @@ -352,4 +355,52 @@ public class ZLMRTPServerFactory { return result; } + public JSONObject startSendRtp(MediaServerItem mediaInfo, SendRtpItem sendRtpItem) { + String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; + logger.info("rtp/{}开始向上级推流, 目标={}:{},SSRC={}", sendRtpItem.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc()); + Map param = new HashMap<>(12); + param.put("vhost","__defaultVhost__"); + param.put("app",sendRtpItem.getApp()); + param.put("stream",sendRtpItem.getStreamId()); + param.put("ssrc", sendRtpItem.getSsrc()); + param.put("src_port", sendRtpItem.getLocalPort()); + param.put("pt", sendRtpItem.getPt()); + param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); + param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); + if (!sendRtpItem.isTcp()) { + // udp模式下开启rtcp保活 + param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0"); + } + + if (mediaInfo == null) { + return null; + } + // 如果是非严格模式,需要关闭端口占用 + JSONObject startSendRtpStreamResult = null; + if (sendRtpItem.getLocalPort() != 0) { + HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(sendRtpItem.getSsrc(), null, mediaInfo.getId()); + hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout); + if (releasePort(mediaInfo, sendRtpItem.getSsrc())) { + if (sendRtpItem.isTcpActive()) { + startSendRtpStreamResult = startSendRtpPassive(mediaInfo, param); + System.out.println(JSON.toJSON(param)); + }else { + param.put("is_udp", is_Udp); + param.put("dst_url", sendRtpItem.getIp()); + param.put("dst_port", sendRtpItem.getPort()); + startSendRtpStreamResult = startSendRtpStream(mediaInfo, param); + } + } + }else { + if (sendRtpItem.isTcpActive()) { + startSendRtpStreamResult = startSendRtpPassive(mediaInfo, param); + }else { + param.put("is_udp", is_Udp); + param.put("dst_url", sendRtpItem.getIp()); + param.put("dst_port", sendRtpItem.getPort()); + startSendRtpStreamResult = startSendRtpStream(mediaInfo, param); + } + } + return startSendRtpStreamResult; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java b/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java index 1233455f..bac3d50a 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java @@ -47,7 +47,7 @@ public interface IMediaServerService { SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, String ssrc, boolean ssrcCheck, boolean isPlayback); - SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, String ssrc, boolean ssrcCheck, boolean isPlayback, Integer port); + SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, String ssrc, boolean ssrcCheck, boolean isPlayback, Integer port, Boolean onlyAuto); void closeRTPServer(MediaServerItem mediaServerItem, String streamId); diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java index 17f8b37f..be8a8f5f 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java @@ -1,8 +1,17 @@ package com.genersoft.iot.vmp.service; +import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; +import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.service.bean.InviteTimeOutCallback; import com.github.pagehelper.PageInfo; +import javax.sip.InvalidArgumentException; +import javax.sip.SipException; +import java.text.ParseException; + /** * 国标平台的业务类 * @author lin @@ -48,4 +57,23 @@ public interface IPlatformService { * @param platformId 平台 */ void sendNotifyMobilePosition(String platformId); + + /** + * 向上级发送语音喊话的消息 + * @param platform 平台 + * @param channelId 通道 + * @param hookEvent hook事件 + * @param errorEvent 信令错误事件 + * @param timeoutCallback 超时事件 + */ + void broadcastInvite(ParentPlatform platform, String channelId, MediaServerItem mediaServerItem, ZlmHttpHookSubscribe.Event hookEvent, + SipSubscribe.Event errorEvent, InviteTimeOutCallback timeoutCallback) throws InvalidArgumentException, ParseException, SipException; + + /** + * 语音喊话回复BYE + * @param platform 平台 + * @param channelId 通道 + * @param stream 流信息 + */ + void stopBroadcast(ParentPlatform platform, String channelId, String stream )throws InvalidArgumentException, ParseException, SsrcTransactionNotFoundException, SipException; } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java index b87176fc..3d6352b4 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java @@ -12,7 +12,9 @@ import com.genersoft.iot.vmp.service.bean.PlayBackCallback; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult; import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioBroadcastEvent; +import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import gov.nist.javax.sip.message.SIPResponse; +import org.springframework.web.context.request.async.DeferredResult; import javax.sip.InvalidArgumentException; import javax.sip.SipException; @@ -54,6 +56,11 @@ public interface IPlayService { void zlmServerOnline(String mediaServerId); AudioBroadcastResult audioBroadcast(Device device, String channelId, Boolean broadcastMode); + + boolean audioBroadcastCmd(Device device, String channelId, MediaServerItem mediaServerItem, String app, String stream, int timeout, boolean isFromPlatform, AudioBroadcastEvent event) throws InvalidArgumentException, ParseException, SipException; + + boolean audioBroadcastInUse(Device device, String channelId); + void stopAudioBroadcast(String deviceId, String channelId); void audioBroadcastCmd(Device device, String channelId, int timeout, MediaServerItem mediaServerItem, String sourceApp, String sourceStream, AudioBroadcastEvent event) throws InvalidArgumentException, ParseException, SipException; diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java index 522879a8..226fa6fd 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java @@ -1,16 +1,5 @@ package com.genersoft.iot.vmp.service.impl; -import java.time.LocalDateTime; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import com.genersoft.iot.vmp.media.zlm.ZLMRunner; -import com.genersoft.iot.vmp.service.IStreamProxyService; -import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; @@ -39,8 +28,6 @@ import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; -import java.time.LocalDateTime; -import java.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -134,7 +121,8 @@ public class MediaServerServiceImpl implements IMediaServerService { } @Override - public SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, String presetSsrc, boolean ssrcCheck, boolean isPlayback, Integer port) { + public SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, String presetSsrc, boolean ssrcCheck, + boolean isPlayback, Integer port, Boolean onlyAuto) { if (mediaServerItem == null || mediaServerItem.getId() == null) { logger.info("[openRTPServer] 失败, mediaServerItem == null || mediaServerItem.getId() == null"); return null; @@ -163,7 +151,7 @@ public class MediaServerServiceImpl implements IMediaServerService { } int rtpServerPort; if (mediaServerItem.isRtpEnable()) { - rtpServerPort = zlmrtpServerFactory.createRTPServer(mediaServerItem, streamId, ssrcCheck?Integer.parseInt(ssrc):0, port); + rtpServerPort = zlmrtpServerFactory.createRTPServer(mediaServerItem, streamId, ssrcCheck?Integer.parseInt(ssrc):0, port, onlyAuto); } else { rtpServerPort = mediaServerItem.getRtpProxyPort(); } @@ -174,7 +162,7 @@ public class MediaServerServiceImpl implements IMediaServerService { @Override public SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, String ssrc, boolean ssrcCheck, boolean isPlayback) { - return openRTPServer(mediaServerItem, streamId, ssrc, ssrcCheck, isPlayback, null); + return openRTPServer(mediaServerItem, streamId, ssrc, ssrcCheck, isPlayback, null, null); } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java index 1d48d54d..1a788d27 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java @@ -1,15 +1,25 @@ package com.genersoft.iot.vmp.service.impl; +import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; +import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IPlatformService; +import com.genersoft.iot.vmp.service.IPlayService; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; +import com.genersoft.iot.vmp.service.bean.InviteTimeOutCallback; +import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.dao.GbStreamMapper; import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper; @@ -21,11 +31,13 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import javax.sip.InvalidArgumentException; +import javax.sip.ResponseEvent; import javax.sip.SipException; import java.text.ParseException; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; /** * @author lin @@ -65,6 +77,16 @@ public class PlatformServiceImpl implements IPlatformService { @Autowired private UserSetting userSetting; + @Autowired + private ZlmHttpHookSubscribe subscribe; + + @Autowired + private VideoStreamSessionManager streamSession; + + + @Autowired + private IPlayService playService; + @Override @@ -310,4 +332,138 @@ public class PlatformServiceImpl implements IPlatformService { } } } + + @Override + public void broadcastInvite(ParentPlatform platform, String channelId, MediaServerItem mediaServerItem, ZlmHttpHookSubscribe.Event hookEvent, + SipSubscribe.Event errorEvent, InviteTimeOutCallback timeoutCallback) throws InvalidArgumentException, ParseException, SipException { + + if (mediaServerItem == null) { + logger.info("[国标级联] 语音喊话未找到可用的zlm. platform: {}", platform.getServerGBId()); + return; + } + StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(platform.getServerGBId(), channelId); + if (streamInfo != null) { + // 如果zlm不存在这个流,则删除数据即可 + MediaServerItem mediaServerItemForStreamInfo = mediaServerService.getOne(streamInfo.getMediaServerId()); + if (mediaServerItemForStreamInfo != null) { + Boolean ready = zlmrtpServerFactory.isStreamReady(mediaServerItemForStreamInfo, streamInfo.getApp(), streamInfo.getStream()); + if (!ready) { + // 错误存在于redis中的数据 + redisCatchStorage.stopPlay(streamInfo); + }else { + // 流确实尚在推流,直接回调结果 + JSONObject json = new JSONObject(); + json.put("app", streamInfo.getApp()); + json.put("stream", streamInfo.getStream()); + hookEvent.response(mediaServerItemForStreamInfo, json); + return; + } + } + } + + String streamId = null; + if (mediaServerItem.isRtpEnable()) { + streamId = String.format("%s_%s", platform.getServerGBId(), channelId); + } + // 默认不进行SSRC校验, TODO 后续可改为配置 + boolean ssrcCheck = false; + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, ssrcCheck, false, null, true); + if (ssrcInfo == null || ssrcInfo.getPort() < 0) { + logger.info("[国标级联] 发起语音喊话 开启端口监听失败, platform: {}, channel: {}", platform.getServerGBId(), channelId); + errorEvent.response(new SipSubscribe.EventResult(-1, "端口监听失败")); + return; + } + logger.info("[国标级联] 发起语音喊话 deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验:{}", + platform.getServerGBId(), channelId, ssrcInfo.getPort(), userSetting.getBroadcastForPlatform(), ssrcInfo.getSsrc(), ssrcCheck); + + String timeOutTaskKey = UUID.randomUUID().toString(); + dynamicTask.startDelay(timeOutTaskKey, () -> { + // 执行超时任务时查询是否已经成功,成功了则不执行超时任务,防止超时任务取消失败的情况 + if (redisCatchStorage.queryPlayByDevice(platform.getServerGBId(), channelId) == null) { + logger.info("[国标级联] 发起语音喊话 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", platform.getServerGBId(), channelId, ssrcInfo.getPort(), ssrcInfo.getSsrc()); + // 点播超时回复BYE 同时释放ssrc以及此次点播的资源 + try { + commanderForPlatform.streamByeCmd(platform, channelId, ssrcInfo.getStream(), null, null); + } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { + logger.error("[点播超时], 发送BYE失败 {}", e.getMessage()); + } finally { + timeoutCallback.run(1, "收流超时"); + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); + mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); + streamSession.remove(platform.getServerGBId(), channelId, ssrcInfo.getStream()); + mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); + } + } + }, userSetting.getPlayTimeout()); + commanderForPlatform.broadcastInviteCmd(platform, channelId, mediaServerItem, ssrcInfo, (mediaServerItemForInvite, response)->{ + dynamicTask.stop(timeOutTaskKey); + // hook响应 + playService.onPublishHandlerForPlay(mediaServerItemForInvite, response, platform.getServerGBId(), channelId); + // 收到流 + if (hookEvent != null) { + hookEvent.response(mediaServerItem, response); + } + }, event -> { + // 收到200OK 检测ssrc是否有变化,防止上级自定义了ssrc + ResponseEvent responseEvent = (ResponseEvent) event.event; + String contentString = new String(responseEvent.getResponse().getRawContent()); + // 获取ssrc + int ssrcIndex = contentString.indexOf("y="); + // 检查是否有y字段 + if (ssrcIndex >= 0) { + //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容 + String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); + // 查询到ssrc不一致且开启了ssrc校验则需要针对处理 + if (ssrcInfo.getSsrc().equals(ssrcInResponse) || ssrcCheck) { + return; + } + logger.info("[点播消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse); + if (!mediaServerItem.isRtpEnable()) { + logger.info("[点播消息] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse); + + if (!mediaServerItem.getSsrcConfig().checkSsrc(ssrcInResponse)) { + // ssrc 不可用 + // 释放ssrc + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); + streamSession.remove(platform.getServerGBId(), channelId, ssrcInfo.getStream()); + event.msg = "下级自定义了ssrc,但是此ssrc不可用"; + event.statusCode = 400; + errorEvent.response(event); + return; + } + + // 单端口模式streamId也有变化,需要重新设置监听 + if (!mediaServerItem.isRtpEnable()) { + // 添加订阅 + HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId()); + subscribe.removeSubscribe(hookSubscribe); + hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase()); + subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> { + logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString()); + dynamicTask.stop(timeOutTaskKey); + // hook响应 + playService.onPublishHandlerForPlay(mediaServerItemInUse, response, platform.getServerGBId(), channelId); + hookEvent.response(mediaServerItemInUse, response); + }); + } + // 关闭rtp server + mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); + // 重新开启ssrc server + mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, false, false, ssrcInfo.getPort(), true); + + + } + } + }, eventResult -> { + // 收到错误回复 + if (errorEvent != null) { + errorEvent.response(eventResult); + } + }); + } + + @Override + public void stopBroadcast(ParentPlatform platform, String channelId, String stream) throws InvalidArgumentException, ParseException, SsrcTransactionNotFoundException, SipException { + commanderForPlatform.streamByeCmd(platform, channelId, stream, null, null); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index b89f173f..7e85f0c7 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -490,7 +490,7 @@ public class PlayServiceImpl implements IPlayService { // 关闭rtp server mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); // 重新开启ssrc server - mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), false, ssrcInfo.getPort()); + mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), false, ssrcInfo.getPort(), false); } } @@ -731,7 +731,7 @@ public class PlayServiceImpl implements IPlayService { // 关闭rtp server mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); // 重新开启ssrc server - mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), true, ssrcInfo.getPort()); + mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), true, ssrcInfo.getPort(), false); } } } @@ -966,16 +966,16 @@ public class PlayServiceImpl implements IPlayService { } @Override - public void audioBroadcastCmd(Device device, String channelId, int timeout, MediaServerItem mediaServerItem, String sourceApp, String sourceStream, AudioBroadcastEvent event) throws InvalidArgumentException, ParseException, SipException { + public boolean audioBroadcastCmd(Device device, String channelId, MediaServerItem mediaServerItem, String app, String stream, int timeout, boolean isFromPlatform, AudioBroadcastEvent event) throws InvalidArgumentException, ParseException, SipException { if (device == null || channelId == null) { - return; + return false; } logger.info("[语音喊话] device: {}, channel: {}", device.getDeviceId(), channelId); DeviceChannel deviceChannel = storager.queryChannel(device.getDeviceId(), channelId); if (deviceChannel == null) { logger.warn("开启语音广播的时候未找到通道: {}", channelId); event.call("开启语音广播的时候未找到通道"); - return; + return false; } // 查询通道使用状态 if (audioBroadcastManager.exit(device.getDeviceId(), channelId)) { @@ -986,7 +986,7 @@ public class PlayServiceImpl implements IPlayService { if (streamReady) { logger.warn("语音广播已经开启: {}", channelId); event.call("语音广播已经开启"); - return; + return false; } else { stopAudioBroadcast(device.getDeviceId(), channelId); } @@ -1008,8 +1008,7 @@ public class PlayServiceImpl implements IPlayService { // 发送通知 cmder.audioBroadcastCmd(device, channelId, eventResultForOk -> { // 发送成功 - AudioBroadcastCatch audioBroadcastCatch = new AudioBroadcastCatch(device.getDeviceId(), channelId, - AudioBroadcastCatchStatus.Ready, mediaServerItem, sourceApp, sourceStream); + AudioBroadcastCatch audioBroadcastCatch = new AudioBroadcastCatch(device.getDeviceId(), channelId, mediaServerItem, app, stream, event, AudioBroadcastCatchStatus.Ready, isFromPlatform); audioBroadcastManager.update(audioBroadcastCatch); }, eventResultForError -> { // 发送失败 @@ -1017,6 +1016,24 @@ public class PlayServiceImpl implements IPlayService { event.call("语音广播发送失败"); stopAudioBroadcast(device.getDeviceId(), channelId); }); + return true; + } + + @Override + public boolean audioBroadcastInUse(Device device, String channelId) { + if (audioBroadcastManager.exit(device.getDeviceId(), channelId)) { + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null); + if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) { + // 查询流是否存在,不存在则认为是异常状态 + MediaServerItem mediaServerServiceOne = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerServiceOne, sendRtpItem.getApp(), sendRtpItem.getStreamId()); + if (streamReady) { + logger.warn("语音广播通道使用中: {}", channelId); + return true; + } + } + } + return false; } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java index 3eba74ec..96031f81 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java @@ -250,10 +250,10 @@ public class PlayController { @GetMapping("/broadcast/{deviceId}/{channelId}") @PostMapping("/broadcast/{deviceId}/{channelId}") public AudioBroadcastResult broadcastApi(@PathVariable String deviceId, @PathVariable String channelId, Integer timeout, Boolean broadcastMode) { - if (logger.isDebugEnabled()) { - logger.debug("语音广播API调用"); - } - Device device = storager.queryVideoDevice(deviceId); + if (logger.isDebugEnabled()) { + logger.debug("语音广播API调用"); + } + Device device = storager.queryVideoDevice(deviceId); if (device == null) { throw new ControllerException(ErrorCode.ERROR400.getCode(), "未找到设备: " + deviceId); } @@ -265,7 +265,6 @@ public class PlayController { } - @Operation(summary = "停止语音广播") @Parameter(name = "deviceId", description = "设备Id", required = true) @Parameter(name = "channelId", description = "通道Id", required = true) diff --git a/src/main/resources/all-application.yml b/src/main/resources/all-application.yml index c48c26ef..4f7a8f68 100644 --- a/src/main/resources/all-application.yml +++ b/src/main/resources/all-application.yml @@ -195,6 +195,8 @@ user-settings: gb-send-stream-strict: false # 设备上线时是否自动同步通道 sync-channel-on-device-online: false + # 国标级联语音喊话发流模式 * UDP:udp传输 TCP-ACTIVE:tcp主动模式 TCP-PASSIVE:tcp被动模式 + broadcast-for-platform: UDP # 是否使用设备来源Ip作为回复IP, 不设置则为 false sip-use-source-ip-as-remote-address: false # 是否开启sip日志 diff --git a/web_src/src/components/dialog/devicePlayer.vue b/web_src/src/components/dialog/devicePlayer.vue index 0bc3335a..61cf0667 100644 --- a/web_src/src/components/dialog/devicePlayer.vue +++ b/web_src/src/components/dialog/devicePlayer.vue @@ -663,46 +663,46 @@ export default { this.startBroadcast(streamInfo.rtc) } - }else { - this.$message({ - showClose: true, - message: res.data.msg, - type: "error", - }); - } - }); - }else if (this.broadcastStatus === 1) { - this.broadcastStatus = -1; - this.broadcastRtc.close() - } - }, - startBroadcast(url) { - // 获取推流鉴权Key - this.$axios({ - method: 'post', - url: '/api/user/userInfo', - }).then((res) => { - if (res.data.code !== 0) { - this.$message({ - showClose: true, - message: "获取推流鉴权Key失败", - type: "error", - }); - this.broadcastStatus = -1; - } else { - let pushKey = res.data.data.pushKey; - // 获取推流鉴权KEY - url += "&sign=" + crypto.createHash('md5').update(pushKey, "utf8").digest('hex') - console.log("开始语音对讲: " + url) - this.broadcastRtc = new ZLMRTCClient.Endpoint({ - debug: true, // 是否打印日志 - zlmsdpUrl: url, //流地址 - simulecast: false, - useCamera: false, - audioEnable: true, - videoEnable: false, - recvOnly: false, - }) + }else { + this.$message({ + showClose: true, + message: res.data.msg, + type: "error", + }); + } + }); + }else if (this.broadcastStatus === 1) { + this.broadcastStatus = -1; + this.broadcastRtc.close() + } + }, + startBroadcast(url){ + // 获取推流鉴权Key + this.$axios({ + method: 'post', + url: '/api/user/userInfo', + }).then( (res)=> { + if (res.data.code !== 0) { + this.$message({ + showClose: true, + message: "获取推流鉴权Key失败", + type: "error", + }); + this.broadcastStatus = -1; + }else { + let pushKey = res.data.data.pushKey; + // 获取推流鉴权KEY + url += "&sign=" + crypto.createHash('md5').update(pushKey, "utf8").digest('hex') + console.log("开始语音喊话: " + url) + this.broadcastRtc = new ZLMRTCClient.Endpoint({ + debug: true, // 是否打印日志 + zlmsdpUrl: url, //流地址 + simulecast: false, + useCamera: false, + audioEnable: true, + videoEnable: false, + recvOnly: false, + }) // webrtcPlayer.on(ZLMRTCClient.Events.WEBRTC_ON_REMOTE_STREAMS,(e)=>{//获取到了远端流,可以播放 // console.error('播放成功',e.streams) @@ -715,15 +715,15 @@ export default { // // this.eventcallbacK("LOCAL STREAM", "获取到了本地流") // }); - this.broadcastRtc.on(ZLMRTCClient.Events.WEBRTC_NOT_SUPPORT, (e) => {// 获取到了本地流 - console.error('不支持webrtc', e) - this.$message({ - showClose: true, - message: '不支持webrtc, 无法进行语音对讲', - type: 'error' - }); - this.broadcastStatus = -1; - }); + this.broadcastRtc.on(ZLMRTCClient.Events.WEBRTC_NOT_SUPPORT,(e)=>{// 获取到了本地流 + console.error('不支持webrtc',e) + this.$message({ + showClose: true, + message: '不支持webrtc, 无法进行语音喊话', + type: 'error' + }); + this.broadcastStatus = -1; + }); this.broadcastRtc.on(ZLMRTCClient.Events.WEBRTC_ICE_CANDIDATE_ERROR, (e) => {// ICE 协商出错 console.error('ICE 协商出错')