From 663130df4556c35b8b390a74df571af8185d974d Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Sun, 19 Feb 2023 12:46:53 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84=E6=94=AF=E6=8C=81=E8=AF=AD?= =?UTF-8?q?=E9=9F=B3=E5=AF=B9=E8=AE=B2talk?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../vmp/gb28181/bean/AudioBroadcastCatch.java | 14 +- .../iot/vmp/gb28181/bean/SendRtpItem.java | 23 +- .../iot/vmp/gb28181/event/SipSubscribe.java | 1 - .../session/VideoStreamSessionManager.java | 3 +- .../iot/vmp/gb28181/task/SipRunner.java | 4 +- .../gb28181/transmit/cmd/ISIPCommander.java | 7 +- .../transmit/cmd/impl/SIPCommander.java | 22 +- .../cmd/impl/SIPCommanderFroPlatform.java | 2 +- .../request/impl/AckRequestProcessor.java | 6 +- .../request/impl/ByeRequestProcessor.java | 11 +- .../request/impl/InviteRequestProcessor.java | 14 +- .../impl/info/InfoRequestProcessor.java | 2 +- .../cmd/MediaStatusNotifyMessageHandler.java | 2 +- .../iot/vmp/gb28181/utils/SipUtils.java | 2 +- .../vmp/media/zlm/ZLMHttpHookListener.java | 121 ++--- .../iot/vmp/media/zlm/ZLMRESTfulUtils.java | 4 + .../vmp/media/zlm/ZLMRTPServerFactory.java | 8 +- .../iot/vmp/service/IPlayService.java | 14 +- .../vmp/service/impl/DeviceServiceImpl.java | 2 +- .../vmp/service/impl/PlatformServiceImpl.java | 2 +- .../iot/vmp/service/impl/PlayServiceImpl.java | 459 ++++++++++-------- .../storager/impl/RedisCatchStorageImpl.java | 2 +- .../iot/vmp/vmanager/bean/StreamContent.java | 53 ++ .../vmanager/gb28181/play/PlayController.java | 16 +- ...dioBroadcastEvent.java => AudioEvent.java} | 2 +- .../vmp/web/gb28181/ApiStreamController.java | 2 +- 26 files changed, 457 insertions(+), 341 deletions(-) rename src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/bean/{AudioBroadcastEvent.java => AudioEvent.java} (74%) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java index 88db8071..ef5829e6 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.gb28181.bean; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import gov.nist.javax.sip.message.SIPResponse; /** @@ -10,10 +11,11 @@ import gov.nist.javax.sip.message.SIPResponse; public class AudioBroadcastCatch { - public AudioBroadcastCatch(String deviceId, String channelId, AudioBroadcastCatchStatus status) { + public AudioBroadcastCatch(String deviceId, String channelId, AudioBroadcastCatchStatus status, MediaServerItem mediaServerItem) { this.deviceId = deviceId; this.channelId = channelId; this.status = status; + this.mediaServerItem = mediaServerItem; } public AudioBroadcastCatch() { @@ -39,6 +41,8 @@ public class AudioBroadcastCatch { */ private SipTransactionInfo sipTransactionInfo; + private MediaServerItem mediaServerItem; + public String getDeviceId() { return deviceId; @@ -75,4 +79,12 @@ public class AudioBroadcastCatch { public void setSipTransactionInfoByRequset(SIPResponse response) { this.sipTransactionInfo = new SipTransactionInfo(response, false); } + + public MediaServerItem getMediaServerItem() { + return mediaServerItem; + } + + public void setMediaServerItem(MediaServerItem mediaServerItem) { + this.mediaServerItem = mediaServerItem; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java index c1fe2c1f..361bdc6d 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java @@ -49,7 +49,7 @@ public class SendRtpItem { /** * 设备推流的streamId */ - private String streamId; + private String stream; /** * 是否为tcp @@ -117,6 +117,11 @@ public class SendRtpItem { */ private InviteStreamType playType; + /** + * 发流的同时收流 + */ + private String receiveStream; + public String getIp() { return ip; } @@ -181,12 +186,12 @@ public class SendRtpItem { this.app = app; } - public String getStreamId() { - return streamId; + public String getStream() { + return stream; } - public void setStreamId(String streamId) { - this.streamId = streamId; + public void setStream(String stream) { + this.stream = stream; } public boolean isTcp() { @@ -292,4 +297,12 @@ public class SendRtpItem { public void setRtcp(boolean rtcp) { this.rtcp = rtcp; } + + public String getReceiveStream() { + return receiveStream; + } + + public void setReceiveStream(String receiveStream) { + this.receiveStream = receiveStream; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java index e7409bf4..75643905 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java @@ -1,6 +1,5 @@ package com.genersoft.iot.vmp.gb28181.event; -import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.DeviceNotFoundEvent; import gov.nist.javax.sip.message.SIPRequest; import org.slf4j.Logger; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java index 1bd9850f..fad84a6b 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java @@ -29,7 +29,8 @@ public class VideoStreamSessionManager { play, playback, download, - broadcast + broadcast, + talk } /** diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/SipRunner.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/SipRunner.java index 451c589c..5b2cfb7a 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/SipRunner.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/SipRunner.java @@ -74,12 +74,12 @@ public class SipRunner implements CommandLineRunner { if (sendRtpItems.size() > 0) { for (SendRtpItem sendRtpItem : sendRtpItems) { MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(),sendRtpItem.getChannelId(), sendRtpItem.getCallId(),sendRtpItem.getStreamId()); + redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(),sendRtpItem.getChannelId(), sendRtpItem.getCallId(),sendRtpItem.getStream()); if (mediaServerItem != null) { Map param = new HashMap<>(); param.put("vhost","__defaultVhost__"); param.put("app",sendRtpItem.getApp()); - param.put("stream",sendRtpItem.getStreamId()); + param.put("stream",sendRtpItem.getStream()); param.put("ssrc",sendRtpItem.getSsrc()); JSONObject jsonObject = zlmresTfulUtils.stopSendRtp(mediaServerItem, param); if (jsonObject != null && jsonObject.getInteger("code") == 0) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java index b6e5d61d..ddf063d5 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java @@ -2,10 +2,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; -import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm; -import com.genersoft.iot.vmp.gb28181.bean.InviteStreamCallback; -import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo; +import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; @@ -131,7 +128,7 @@ public interface ISIPCommander { */ void streamByeCmd(Device device, String channelId, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException; - void talkStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, String callId, ZlmHttpHookSubscribe.Event event, ZlmHttpHookSubscribe.Event eventForPush, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException; + void talkStreamCmd(MediaServerItem mediaServerItem, SendRtpItem sendRtpItem, Device device, String channelId, String callId, ZlmHttpHookSubscribe.Event event, ZlmHttpHookSubscribe.Event eventForPush, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException; void streamByeCmd(Device device, String channelId, String stream, String callId) throws InvalidArgumentException, ParseException, SipException, SsrcTransactionNotFoundException; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index 80c032ca..79d47898 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -32,7 +32,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.DependsOn; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; -import org.springframework.util.StringUtils; import javax.sip.InvalidArgumentException; import javax.sip.ResponseEvent; @@ -584,9 +583,9 @@ public class SIPCommander implements ISIPCommander { } @Override - public void talkStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, String callId, ZlmHttpHookSubscribe.Event event, ZlmHttpHookSubscribe.Event eventForPush, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException { + public void talkStreamCmd(MediaServerItem mediaServerItem, SendRtpItem sendRtpItem, Device device, String channelId, String callId, ZlmHttpHookSubscribe.Event event, ZlmHttpHookSubscribe.Event eventForPush, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException { - String stream = ssrcInfo.getStream(); + String stream = sendRtpItem.getStream(); if (device == null) { return; @@ -597,7 +596,7 @@ public class SIPCommander implements ISIPCommander { return; } - logger.info("[语音对讲] {} 分配的ZLM为: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort()); + logger.info("[语音对讲] {} 分配的ZLM为: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), sendRtpItem.getPort()); HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId()); subscribe.addSubscribe(hookSubscribeForStreamChange, (MediaServerItem mediaServerItemInUse, JSONObject json) -> { if (event != null) { @@ -622,24 +621,27 @@ public class SIPCommander implements ISIPCommander { content.append("c=IN IP4 " + mediaServerItem.getSdpIp() + "\r\n"); content.append("t=0 0\r\n"); - content.append("m=audio " + ssrcInfo.getPort() + " RTP/AVP 8\r\n"); + content.append("m=audio " + sendRtpItem.getPort() + " TCP/RTP/AVP 8\r\n"); + content.append("a=setup:passive\r\n"); + content.append("a=connection:new\r\n"); content.append("a=sendrecv\r\n"); content.append("a=rtpmap:8 PCMA/8000\r\n"); - content.append("y=" + ssrcInfo.getSsrc() + "\r\n");//ssrc + content.append("y=" + sendRtpItem.getSsrc() + "\r\n");//ssrc // f字段:f= v/编码格式/分辨率/帧率/码率类型/码率大小a/编码格式/码率大小/采样率 content.append("f=v/////a/1/8/1" + "\r\n"); - Request request = headerProvider.createInviteRequest(device, channelId, content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null, ssrcInfo.getSsrc(), callIdHeader); + Request request = headerProvider.createInviteRequest(device, channelId, content.toString(), + SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null, sendRtpItem.getSsrc(), callIdHeader); sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, (e -> { - streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); + streamSession.remove(device.getDeviceId(), channelId, sendRtpItem.getStream()); + mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc()); errorEvent.response(e); }), e -> { // 这里为例避免一个通道的点播只有一个callID这个参数使用一个固定值 ResponseEvent responseEvent = (ResponseEvent) e.event; SIPResponse response = (SIPResponse) responseEvent.getResponse(); - streamSession.put(device.getDeviceId(), channelId, "talk", stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), response, VideoStreamSessionManager.SessionType.play); + streamSession.put(device.getDeviceId(), channelId, "talk", stream, sendRtpItem.getSsrc(), mediaServerItem.getId(), response, VideoStreamSessionManager.SessionType.play); okEvent.response(e); }); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index 3f731c44..914f6292 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -675,7 +675,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); if (mediaServerItem != null) { mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc()); - zlmrtpServerFactory.closeRtpServer(mediaServerItem, sendRtpItem.getStreamId()); + zlmrtpServerFactory.closeRtpServer(mediaServerItem, sendRtpItem.getStream()); } SIPRequest byeRequest = headerProviderPlatformProvider.createByeRequest(platform, sendRtpItem); if (byeRequest == null) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index 68bc38b0..b00c83bd 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -102,12 +102,12 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In } String isUdp = sendRtpItem.isTcp() ? "0" : "1"; MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - logger.info("收到ACK,rtp/{}开始向上级推流, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStreamId(), + logger.info("收到ACK,rtp/{}开始向上级推流, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp()); Map param = new HashMap<>(12); param.put("vhost","__defaultVhost__"); param.put("app",sendRtpItem.getApp()); - param.put("stream",sendRtpItem.getStreamId()); + param.put("stream",sendRtpItem.getStream()); param.put("ssrc", sendRtpItem.getSsrc()); param.put("src_port", sendRtpItem.getLocalPort()); param.put("pt", sendRtpItem.getPt()); @@ -121,7 +121,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In if (mediaInfo == null) { RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance( - sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStreamId(), + sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(), sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio()); redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index 3ad1fdee..5758d23e 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -97,7 +97,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In if (sendRtpItem != null){ logger.info("[收到bye] {}/{}", sendRtpItem.getPlatformId(), sendRtpItem.getChannelId()); - String streamId = sendRtpItem.getStreamId(); + String streamId = sendRtpItem.getStream(); MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId()); if (mediaServerItem == null) { return; @@ -105,7 +105,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In Boolean ready = zlmrtpServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), streamId); if (!ready) { - logger.info("[收到bye] 发现流{}/{}已经结束,不需处理", sendRtpItem.getApp(), sendRtpItem.getStreamId()); + logger.info("[收到bye] 发现流{}/{}已经结束,不需处理", sendRtpItem.getApp(), sendRtpItem.getStream()); return; } Map param = new HashMap<>(); @@ -113,7 +113,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In param.put("app",sendRtpItem.getApp()); param.put("stream",streamId); param.put("ssrc",sendRtpItem.getSsrc()); - logger.info("[收到bye] 停止向上级推流:{}", streamId); + logger.info("[收到bye] 停止推流:{}", streamId); MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), callIdHeader.getCallId(), null); zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param); @@ -129,15 +129,14 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In try { logger.warn("[停止点播] {}/{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); cmder.streamByeCmd(device, sendRtpItem.getChannelId(), streamId, null); - } catch (InvalidArgumentException | ParseException | SipException | - SsrcTransactionNotFoundException e) { + } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { logger.error("[收到bye] {} 无其它观看者,通知设备停止推流, 发送BYE失败 {}",streamId, e.getMessage()); } } if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) { MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, - sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(), + sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), sendRtpItem.getPlatformId(), null, null, sendRtpItem.getMediaServerId()); redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 56cadb34..2a6fe500 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -478,7 +478,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements if ("Playback".equalsIgnoreCase(sessionName)) { sendRtpItem.setPlayType(InviteStreamType.PLAYBACK); SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, device.isSsrcCheck(), true); - sendRtpItem.setStreamId(ssrcInfo.getStream()); + sendRtpItem.setStream(ssrcInfo.getStream()); // 写入redis, 超时时回复 redisCatchStorage.updateSendRTPSever(sendRtpItem); playService.playBack(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start), @@ -523,7 +523,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, device.isSsrcCheck(), false); logger.info(JSONObject.toJSONString(ssrcInfo)); - sendRtpItem.setStreamId(ssrcInfo.getStream()); + sendRtpItem.setStream(ssrcInfo.getStream()); sendRtpItem.setSsrc(ssrc.equals(ssrcDefault) ? ssrcInfo.getSsrc() : ssrc); // 写入redis, 超时时回复 @@ -533,12 +533,12 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), finalChannelId, callIdHeader.getCallId(), null); }); } else { - sendRtpItem.setStreamId(playTransaction.getStream()); + sendRtpItem.setStream(playTransaction.getStream()); // 写入redis, 超时时回复 redisCatchStorage.updateSendRTPSever(sendRtpItem); JSONObject jsonObject = new JSONObject(); jsonObject.put("app", sendRtpItem.getApp()); - jsonObject.put("stream", sendRtpItem.getStreamId()); + jsonObject.put("stream", sendRtpItem.getStream()); hookEvent.response(mediaServerItem, jsonObject); } } @@ -986,9 +986,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}, {}", requesterId, addressStr, port, ssrc, mediaTransmissionTCP ? (tcpActive? "TCP主动":"TCP被动") : "UDP"); - MediaServerItem mediaServerItem = playService.getNewMediaServerItem(device); + MediaServerItem mediaServerItem = audioBroadcastCatch.getMediaServerItem(); if (mediaServerItem == null) { - logger.warn("未找到可用的zlm"); + logger.warn("未找到语音喊话使用的zlm"); try { responseAck(request, Response.BUSY_HERE); } catch (SipException | InvalidArgumentException | ParseException e) { @@ -1022,7 +1022,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements sendRtpItem.setPlatformId(requesterId); sendRtpItem.setStatus(1); sendRtpItem.setApp(app); - sendRtpItem.setStreamId(stream); + sendRtpItem.setStream(stream); sendRtpItem.setPt(8); sendRtpItem.setUsePs(false); sendRtpItem.setRtcp(false); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/info/InfoRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/info/InfoRequestProcessor.java index a4d49d5a..f889b9e7 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/info/InfoRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/info/InfoRequestProcessor.java @@ -102,7 +102,7 @@ public class InfoRequestProcessor extends SIPRequestProcessorParent implements I String contentSubType = header.getContentSubType(); if ("Application".equalsIgnoreCase(contentType) && "MANSRTSP".equalsIgnoreCase(contentSubType)) { SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId()); - String streamId = sendRtpItem.getStreamId(); + String streamId = sendRtpItem.getStream(); StreamInfo streamInfo = redisCatchStorage.queryPlayback(null, null, streamId, null); if (null == streamInfo) { responseAck(request, Response.NOT_FOUND, "stream " + streamId + " not found"); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java index b15003cc..d5d80e6f 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java @@ -90,7 +90,7 @@ public class MediaStatusNotifyMessageHandler extends SIPRequestProcessorParent i try { cmder.streamByeCmd(device, ssrcTransaction.getChannelId(), null, callIdHeader.getCallId()); - } catch (InvalidArgumentException | ParseException | SsrcTransactionNotFoundException | SipException e) { + } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { logger.error("[录像流]推送完毕,收到关流通知, 发送BYE失败 {}", e.getMessage()); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java b/src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java index 45c5a90a..07a3044f 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java @@ -122,7 +122,7 @@ public class SipUtils { } public static String getNewCallId() { - return (int) Math.floor(Math.random() * 10000) + ""; + return (int) Math.floor(Math.random() * 1000000000) + ""; } public static int getTypeCodeFromGbCode(String deviceId) { diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index dae42c75..a59106f2 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -9,9 +9,9 @@ import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.media.zlm.dto.HookType; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; @@ -274,12 +274,12 @@ public class ZLMHttpHookListener { logger.info("[ZLM HOOK] 流注销, {}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream()); } - + MediaServerItem mediaInfo = mediaServerService.getOne(param.getMediaServerId()); JSONObject json = (JSONObject) JSON.toJSON(param); taskExecutor.execute(() -> { ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_stream_changed, json); if (subscribe != null) { - MediaServerItem mediaInfo = mediaServerService.getOne(param.getMediaServerId()); + if (mediaInfo != null) { subscribe.response(mediaInfo, json); } @@ -343,7 +343,7 @@ public class ZLMHttpHookListener { } // 开启语音对讲通道 try { - playService.audioBroadcastCmd(device, channelId, 60, (msg)->{ + playService.audioBroadcastCmd(device, channelId, mediaInfo, 60, (msg)->{ logger.info("[语音对讲] 通道建立成功, device: {}, channel: {}", deviceId, channelId); }); } catch (InvalidArgumentException | ParseException | SipException e) { @@ -360,62 +360,30 @@ public class ZLMHttpHookListener { } }else if ("talk".equals(param.getApp())){ // 语音对讲推流 stream需要满足格式deviceId_channelId - if (param.isRegist() && param.getStream().indexOf("_") > 0) { - String[] streamArray = param.getStream().split("_"); - if (streamArray.length == 2) { - String deviceId = streamArray[0]; - String channelId = streamArray[1]; - Device device = deviceService.getDevice(deviceId); - if (device != null) { - DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId); - if (deviceChannel != null) { - if (audioBroadcastManager.exit(deviceId, channelId)) { - // 直接推流 - SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, param.getStream(), null); - if (sendRtpItem == null) { - // TODO 可能数据错误,重新开启语音通道 - }else { - MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - logger.info("rtp/{}开始向上级推流, 目标={}:{},SSRC={}", sendRtpItem.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc()); - Map sendParam = new HashMap<>(12); - sendParam.put("vhost","__defaultVhost__"); - sendParam.put("app",sendRtpItem.getApp()); - sendParam.put("stream",sendRtpItem.getStreamId()); - sendParam.put("ssrc", sendRtpItem.getSsrc()); - sendParam.put("src_port", sendRtpItem.getLocalPort()); - sendParam.put("pt", sendRtpItem.getPt()); - sendParam.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); - sendParam.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); - - JSONObject jsonObject; - if (sendRtpItem.isTcpActive()) { - jsonObject = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, sendParam); - } else { - sendParam.put("is_udp", sendRtpItem.isTcp() ? "0" : "1"); - sendParam.put("dst_url", sendRtpItem.getIp()); - sendParam.put("dst_port", sendRtpItem.getPort()); - jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, sendParam); - } - if (jsonObject != null && jsonObject.getInteger("code") == 0) { - logger.info("[语音对讲] 自动推流成功, device: {}, channel: {}", deviceId, channelId); - } - } - }else { - // 开启语音对讲通道 - MediaServerItem mediaServerItem = mediaServerService.getOne(param.getMediaServerId()); - playService.talk(mediaServerItem, device, channelId, (mediaServer, jsonObject)->{ - System.out.println("开始推流"); - }, eventResult -> { - System.out.println(eventResult.msg); - }, ()->{ - System.out.println("超时"); - }); - } - - } - } - } - } + if (param.getStream().indexOf("_") > 0) { + String[] streamArray = param.getStream().split("_"); + if (streamArray.length == 2) { + String deviceId = streamArray[0]; + String channelId = streamArray[1]; + Device device = deviceService.getDevice(deviceId); + if (device != null) { + if (param.isRegist()) { + if (audioBroadcastManager.exit(deviceId, channelId)) { + playService.stopAudioBroadcast(deviceId, channelId); + } + // 开启语音对讲通道 + playService.talkCmd(device, channelId, mediaInfo, param.getStream(), (msg)->{ + logger.info("[语音对讲] 通道建立成功, device: {}, channel: {}", deviceId, channelId); + }); + }else { + // 流注销 + playService.stopTalk(device, channelId, param.isRegist()); + } + } else{ + logger.info("[语音对讲] 未找到设备:{}", deviceId); + } + } + } }else{ if (!"rtp".equals(param.getApp())){ @@ -475,16 +443,21 @@ public class ZLMHttpHookListener { ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId); Device device = deviceService.getDevice(platformId); - try { + if (platform != null) { - commanderFroPlatform.streamByeCmd(platform, sendRtpItem); + try { + commanderFroPlatform.streamByeCmd(platform, sendRtpItem); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); + } } else { - cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId()); + try { + cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId()); + } catch (SipException | InvalidArgumentException | ParseException | + SsrcTransactionNotFoundException e) { + logger.error("[命令发送失败] 发送BYE: {}", e.getMessage()); + } } - } catch (SipException | InvalidArgumentException | ParseException | - SsrcTransactionNotFoundException e) { - logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); - } } } } @@ -527,7 +500,7 @@ public class ZLMHttpHookListener { logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); } redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(), - sendRtpItem.getCallId(), sendRtpItem.getStreamId()); + sendRtpItem.getCallId(), sendRtpItem.getStream()); } } } @@ -556,8 +529,7 @@ public class ZLMHttpHookListener { try { cmder.streamByeCmd(device, streamInfoForPlayBackCatch.getChannelId(), streamInfoForPlayBackCatch.getStream(), null); - } catch (InvalidArgumentException | ParseException | SipException | - SsrcTransactionNotFoundException e) { + } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { logger.error("[无人观看]回放, 发送BYE失败 {}", e.getMessage()); } } @@ -573,6 +545,13 @@ public class ZLMHttpHookListener { ret.put("close", false); return ret; } + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, param.getStream(), null); + if ("talk".equals(sendRtpItem.getApp())){ + ret.put("close", false); + return ret; + } + }else if ("talk".equals(param.getApp()) || "broadcast".equals(param.getApp())){ + ret.put("close", false); } else { // 非国标流 推流/拉流代理 // 拉流代理 @@ -734,7 +713,7 @@ public class ZLMHttpHookListener { logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); } redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(), - sendRtpItem.getCallId(), sendRtpItem.getStreamId()); + sendRtpItem.getCallId(), sendRtpItem.getStream()); } } }); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java index b6753c43..9e1020ab 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java @@ -291,6 +291,10 @@ public class ZLMRESTfulUtils { return sendPost(mediaServerItem, "startSendRtpPassive",param, null); } + public JSONObject startSendRtpPassive(MediaServerItem mediaServerItem, Map param, RequestCallback callback) { + return sendPost(mediaServerItem, "startSendRtpPassive",param, callback); + } + public JSONObject stopSendRtp(MediaServerItem mediaServerItem, Map param) { return sendPost(mediaServerItem, "stopSendRtp",param, null); } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java index 4a6a94a7..b382a3d2 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java @@ -229,7 +229,7 @@ public class ZLMRTPServerFactory { sendRtpItem.setPort(port); sendRtpItem.setSsrc(ssrc); sendRtpItem.setApp(app); - sendRtpItem.setStreamId(stream); + sendRtpItem.setStream(stream); sendRtpItem.setPlatformId(platformId); sendRtpItem.setChannelId(channelId); sendRtpItem.setTcp(tcp); @@ -290,6 +290,10 @@ public class ZLMRTPServerFactory { return zlmresTfulUtils.startSendRtpPassive(mediaServerItem, param); } + public JSONObject startSendRtpPassive(MediaServerItem mediaServerItem, Mapparam, ZLMRESTfulUtils.RequestCallback callback) { + return zlmresTfulUtils.startSendRtpPassive(mediaServerItem, param, callback); + } + /** * 查询待转推的流是否就绪 */ @@ -343,7 +347,7 @@ public class ZLMRTPServerFactory { result= true; logger.info("[停止RTP推流] 成功"); } else { - logger.error("[停止RTP推流] 失败: {}, 参数:{}->\r\n{}",jsonObject.getString("msg"), JSON.toJSON(param), jsonObject); + logger.warn("[停止RTP推流] 失败: {}, 参数:{}->\r\n{}",jsonObject.getString("msg"), JSON.toJSON(param), jsonObject); } return result; } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java index ffdaef3e..23c29c7c 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java @@ -11,10 +11,8 @@ import com.genersoft.iot.vmp.service.bean.InviteTimeOutCallback; import com.genersoft.iot.vmp.service.bean.PlayBackCallback; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult; -import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioBroadcastEvent; -import com.genersoft.iot.vmp.vmanager.bean.WVPResult; +import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioEvent; import gov.nist.javax.sip.message.SIPResponse; -import org.springframework.web.context.request.async.DeferredResult; import javax.sip.InvalidArgumentException; import javax.sip.SipException; @@ -29,10 +27,6 @@ public interface IPlayService { void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId); - void talk(MediaServerItem mediaServerItem, Device device, String channelId, - ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, - Runnable timeoutCallback); - void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, InviteTimeOutCallback timeoutCallback); @@ -62,7 +56,7 @@ public interface IPlayService { AudioBroadcastResult audioBroadcast(Device device, String channelId); void stopAudioBroadcast(String deviceId, String channelId); - void audioBroadcastCmd(Device device, String channelId, int timeout, AudioBroadcastEvent event) throws InvalidArgumentException, ParseException, SipException; + void audioBroadcastCmd(Device device, String channelId, MediaServerItem mediaServerItem, int timeout, AudioEvent event) throws InvalidArgumentException, ParseException, SipException; void pauseRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException; @@ -72,4 +66,8 @@ public interface IPlayService { void startSendRtpStreamHand(SendRtpItem sendRtpItem, ParentPlatform parentPlatform, JSONObject jsonObject, Map param, CallIdHeader callIdHeader); + + void talkCmd(Device device, String channelId, MediaServerItem mediaServerItem, String stream, AudioEvent event); + + void stopTalk(Device device, String channelId, Boolean streamIsReady); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java index 1c57d0b2..07da5bec 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java @@ -202,7 +202,7 @@ public class DeviceServiceImpl implements IDeviceService { Map param = new HashMap<>(); param.put("vhost", "__defaultVhost__"); param.put("app", sendRtpItem.getApp()); - param.put("stream", sendRtpItem.getStreamId()); + param.put("stream", sendRtpItem.getStream()); zlmresTfulUtils.stopSendRtp(mediaInfo, param); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java index 136689ca..1d48d54d 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java @@ -253,7 +253,7 @@ public class PlatformServiceImpl implements IPlatformService { Map param = new HashMap<>(3); param.put("vhost", "__defaultVhost__"); param.put("app", sendRtpItem.getApp()); - param.put("stream", sendRtpItem.getStreamId()); + param.put("stream", sendRtpItem.getStream()); zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param); } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 56efa72a..1708f6ed 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -41,7 +41,7 @@ import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.StreamContent; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; -import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioBroadcastEvent; +import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioEvent; import gov.nist.javax.sip.message.SIPResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -134,8 +134,8 @@ public class PlayServiceImpl implements IPlayService { @Override public void play(MediaServerItem mediaServerItem, String deviceId, String channelId, - ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, - Runnable timeoutCallback) { + ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, + Runnable timeoutCallback) { if (mediaServerItem == null) { throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm"); } @@ -243,194 +243,148 @@ public class PlayServiceImpl implements IPlayService { } } - @Override - public void talk(MediaServerItem mediaServerItem, Device device, String channelId, - ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, - Runnable timeoutCallback) { - String streamId = null; - if (mediaServerItem.isRtpEnable()) { - streamId = String.format("%s_%s", device.getDeviceId(), channelId); + private void talk(MediaServerItem mediaServerItem, Device device, String channelId, String stream, + ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, + Runnable timeoutCallback, AudioEvent audioEvent) { + + String playSsrc = mediaServerItem.getSsrcConfig().getPlaySsrc(); + if (playSsrc == null) { + audioEvent.call("ssrc已经用尽"); + return; } - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false); - logger.info("[对讲开始] deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck()); + SendRtpItem sendRtpItem = new SendRtpItem(); + sendRtpItem.setApp("talk"); + sendRtpItem.setStream(stream); + sendRtpItem.setSsrc(playSsrc); + sendRtpItem.setDeviceId(device.getDeviceId()); + sendRtpItem.setPlatformId(device.getDeviceId()); + sendRtpItem.setChannelId(channelId); + sendRtpItem.setRtcp(false); + sendRtpItem.setMediaServerId(mediaServerItem.getId()); + sendRtpItem.setOnlyAudio(true); + sendRtpItem.setPlayType(InviteStreamType.TALK); + sendRtpItem.setPt(8); + sendRtpItem.setStatus(1); + sendRtpItem.setTcpActive(false); + sendRtpItem.setTcp(true); + sendRtpItem.setUsePs(false); + sendRtpItem.setReceiveStream(stream); + + + int port = zlmrtpServerFactory.keepPort(mediaServerItem, playSsrc); + //端口获取失败的ssrcInfo 没有必要发送点播指令 + if (port <= 0) { + logger.info("[语音对讲] 端口分配异常,deviceId={},channelId={}", device.getDeviceId(), channelId); + audioEvent.call("端口分配异常"); + return; + } + sendRtpItem.setLocalPort(port); + sendRtpItem.setPort(port); + logger.info("[语音对讲]开始 deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, sendRtpItem.getLocalPort(), device.getStreamMode(), sendRtpItem.getSsrc(), false); // 超时处理 String timeOutTaskKey = UUID.randomUUID().toString(); - SSRCInfo finalSsrcInfo = ssrcInfo; - System.out.println("设置超时任务: " + timeOutTaskKey); dynamicTask.startDelay(timeOutTaskKey, () -> { - logger.info("[对讲超时] 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", device.getDeviceId(), channelId, finalSsrcInfo.getPort(), finalSsrcInfo.getSsrc()); + logger.info("[语音对讲] 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", device.getDeviceId(), channelId, sendRtpItem.getPort(), sendRtpItem.getSsrc()); timeoutCallback.run(); // 点播超时回复BYE 同时释放ssrc以及此次点播的资源 try { - cmder.streamByeCmd(device, channelId, finalSsrcInfo.getStream(), null); - } catch (InvalidArgumentException | ParseException | SipException e) { - logger.error("[对讲超时], 发送BYE失败 {}", e.getMessage()); - } catch (SsrcTransactionNotFoundException e) { + cmder.streamByeCmd(device, channelId, sendRtpItem.getStream(), null); + } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { + logger.error("[语音对讲]超时, 发送BYE失败 {}", e.getMessage()); + } finally { timeoutCallback.run(); - mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc()); - mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream()); - streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); + mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc()); + streamSession.remove(device.getDeviceId(), channelId, sendRtpItem.getStream()); } }, userSetting.getPlayTimeout()); - final String ssrc = ssrcInfo.getSsrc(); - final String stream = ssrcInfo.getStream(); - //端口获取失败的ssrcInfo 没有必要发送点播指令 - if (ssrcInfo.getPort() <= 0) { - logger.info("[对讲] 端口分配异常,deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channelId, ssrcInfo); - return; - } String callId = SipUtils.getNewCallId(); - boolean pushing = false; + + zlmrtpServerFactory.releasePort(mediaServerItem, playSsrc); + Map param = new HashMap<>(12); + param.put("vhost","__defaultVhost__"); + param.put("app", sendRtpItem.getApp()); + param.put("stream", sendRtpItem.getStream()); + param.put("ssrc", sendRtpItem.getSsrc()); + param.put("src_port", sendRtpItem.getLocalPort()); + param.put("pt", sendRtpItem.getPt()); + param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); + param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); + param.put("is_udp", sendRtpItem.isTcp() ? "0" : "1"); + param.put("recv_stream_id", sendRtpItem.getReceiveStream()); + param.put("close_delay_ms", userSetting.getPlayTimeout() * 1000); + + zlmrtpServerFactory.startSendRtpPassive(mediaServerItem, param, jsonObject -> { + if (jsonObject == null || jsonObject.getInteger("code") != 0 ) { + mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc()); + logger.info("[语音对讲]失败 deviceId: {}, channelId: {}", device.getDeviceId(), channelId); + audioEvent.call("失败, " + jsonObject.getString("msg")); + // 查看是否已经建立了通道,存在则发送bye + stopTalk(device, channelId); + } + }); + + // 查看设备是否已经在推流 -// MediaItem mediaItem = zlmrtpServerFactory.getMediaInfo(mediaServerItem, "rtp",ssrcInfo.getStream()); -// if (mediaItem != null) { -// SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, -// mediaItem.getOriginSock().getPeer_ip(), mediaItem.getOriginSock().getPeer_port(), ssrcInfo.getSsrc(), device.getDeviceId(), -// device.getDeviceId(), channelId, -// false); -// -// sendRtpItem.setTcpActive(false); -// sendRtpItem.setCallId(callId); -// sendRtpItem.setPlayType(InviteStreamType.TALK); -// sendRtpItem.setStatus(1); -// sendRtpItem.setIp(mediaItem.getOriginSock().getPeer_ip()); -// sendRtpItem.setPort(mediaItem.getOriginSock().getPeer_port()); -// sendRtpItem.setTcpActive(false); -// sendRtpItem.setStreamId(ssrcInfo.getStream()); -// sendRtpItem.setApp("1000"); -// sendRtpItem.setStreamId("1000"); -// sendRtpItem.setSsrc(ssrc); -// sendRtpItem.setOnlyAudio(true); -// redisCatchStorage.updateSendRTPSever(sendRtpItem); -// -// Map param = new HashMap<>(12); -// param.put("vhost","__defaultVhost__"); -// param.put("app",sendRtpItem.getApp()); -// param.put("stream",sendRtpItem.getStreamId()); -// param.put("ssrc", sendRtpItem.getSsrc()); -// param.put("dst_url", sendRtpItem.getIp()); -// param.put("dst_port", sendRtpItem.getPort()); -// param.put("src_port", sendRtpItem.getLocalPort()); -// param.put("pt", sendRtpItem.getPt()); -// param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); -// param.put("is_udp", sendRtpItem.isTcp() ? "0" : "1"); -// param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); -// JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaServerItem, param); -// System.out.println(2222); -// System.out.println(jsonObject); -// }else { - try { - cmder.talkStreamCmd(mediaServerItem, ssrcInfo, device, channelId, callId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> { - logger.info("[对讲] 流已生成, 开始推流: " + response.toJSONString()); - dynamicTask.stop(timeOutTaskKey); - // TODO 暂不做处理 - }, (MediaServerItem mediaServerItemInuse, JSONObject json) -> { - logger.info("[对讲] 设备开始推流: " + json.toJSONString()); - dynamicTask.stop(timeOutTaskKey); - // 获取远程IP端口 作为回复语音流的地址 - String ip = json.getString("ip"); - Integer port = json.getInteger("port"); - logger.info("[设备开始推流]{}/{}, 来自ip:{}, 端口:{}", device.getDeviceId(), channelId, ip, port); - // 查看平台推流是否就绪 -// Boolean ready = zlmrtpServerFactory.isStreamReady(mediaServerItemInuse, "talk", stream); -// if (!ready) { -// try { -// cmder.streamByeCmd(device, channelId, finalSsrcInfo.getStream(), null); -// } catch (InvalidArgumentException | ParseException | SipException e) { -// logger.error("[对讲超时], 发送BYE失败 {}", e.getMessage()); -// } catch (SsrcTransactionNotFoundException e) { -// timeoutCallback.run(); -// mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc()); -// mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream()); -// streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); -// } -// }else { -// try { -// Thread.sleep(1000); -// } catch (InterruptedException e) { -// throw new RuntimeException(e); -// } - SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, ip, port, ssrcInfo.getSsrc(), device.getDeviceId(), - device.getDeviceId(), channelId, - false, false); + try { + cmder.talkStreamCmd(mediaServerItem, sendRtpItem, device, channelId, callId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> { + logger.info("[语音对讲] 流已生成, 开始推流: " + response.toJSONString()); + dynamicTask.stop(timeOutTaskKey); + // TODO 暂不做处理 + }, (MediaServerItem mediaServerItemInuse, JSONObject json) -> { + logger.info("[语音对讲] 设备开始推流: " + json.toJSONString()); + dynamicTask.stop(timeOutTaskKey); + }, (event) -> { + dynamicTask.stop(timeOutTaskKey); -// if (sendRtpItem.getLocalPort() == 0) { -// logger.warn("服务器端口资源不足"); -// try { -// cmder.streamByeCmd(device, channelId, finalSsrcInfo.getStream(), null); -// } catch (InvalidArgumentException | ParseException | SipException e) { -// logger.error("[对讲超时], 发送BYE失败 {}", e.getMessage()); -// } catch (SsrcTransactionNotFoundException e) { -// timeoutCallback.run(); -// mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc()); -// mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream()); -// streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); -// } -// return; -// } - sendRtpItem.setTcpActive(false); - sendRtpItem.setCallId(callId); - sendRtpItem.setPlayType(InviteStreamType.TALK); - sendRtpItem.setStatus(1); - sendRtpItem.setIp(ip); - sendRtpItem.setPort(port); - sendRtpItem.setTcpActive(false); - sendRtpItem.setApp("1000"); - sendRtpItem.setStreamId("1000"); - sendRtpItem.setSsrc(ssrc); - sendRtpItem.setOnlyAudio(true); - sendRtpItem.setRtcp(false); + if (event.event instanceof ResponseEvent) { + ResponseEvent responseEvent = (ResponseEvent) event.event; + if (responseEvent.getResponse() instanceof SIPResponse) { + SIPResponse response = (SIPResponse) responseEvent.getResponse(); + sendRtpItem.setFromTag(response.getFromTag()); + sendRtpItem.setToTag(response.getToTag()); + sendRtpItem.setCallId(response.getCallIdHeader().getCallId()); redisCatchStorage.updateSendRTPSever(sendRtpItem); - Map param = new HashMap<>(12); - param.put("vhost","__defaultVhost__"); - param.put("app",sendRtpItem.getApp()); - param.put("stream",sendRtpItem.getStreamId()); - param.put("ssrc", sendRtpItem.getSsrc()); - param.put("dst_url", sendRtpItem.getIp()); - param.put("dst_port", sendRtpItem.getPort()); - param.put("src_port", sendRtpItem.getLocalPort()); - param.put("pt", sendRtpItem.getPt()); - param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); - param.put("is_udp", sendRtpItem.isTcp() ? "0" : "1"); - param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); - JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaServerItemInuse, param); - System.out.println(11111); - System.out.println(sendRtpItem.getIp() + ":" + sendRtpItem.getPort()); -// System.out.println(jsonObject); -// } + streamSession.put(device.getDeviceId(), channelId, response.getCallIdHeader().getCallId(), + sendRtpItem.getStream(), sendRtpItem.getSsrc(), sendRtpItem.getMediaServerId(), + response, VideoStreamSessionManager.SessionType.talk); + } else { + logger.error("[语音对讲]收到的消息错误,response不是SIPResponse"); + } + } else { + logger.error("[语音对讲]收到的消息错误,event不是ResponseEvent"); + } - }, (event) -> { - - }, (event) -> { - dynamicTask.stop(timeOutTaskKey); - mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream()); - // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc()); - - streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); - errorEvent.response(event); - }); - } catch (InvalidArgumentException | SipException | ParseException e) { - - logger.error("[命令发送失败] 对讲消息: {}", e.getMessage()); + }, (event) -> { dynamicTask.stop(timeOutTaskKey); - mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream()); + mediaServerService.closeRTPServer(mediaServerItem, sendRtpItem.getStream()); // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc()); + mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc()); + streamSession.remove(device.getDeviceId(), channelId, sendRtpItem.getStream()); + errorEvent.response(event); + }); + } catch (InvalidArgumentException | SipException | ParseException e) { - streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); - SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(new CmdSendFailEvent(null)); - eventResult.msg = "命令发送失败"; - errorEvent.response(eventResult); - } + logger.error("[命令发送失败] 对讲消息: {}", e.getMessage()); + dynamicTask.stop(timeOutTaskKey); + mediaServerService.closeRTPServer(mediaServerItem, sendRtpItem.getStream()); + // 释放ssrc + mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc()); + + streamSession.remove(device.getDeviceId(), channelId, sendRtpItem.getStream()); + SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(new CmdSendFailEvent(null)); + eventResult.msg = "命令发送失败"; + errorEvent.response(eventResult); + } // } } + + @Override public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, @@ -446,7 +400,8 @@ public class PlayServiceImpl implements IPlayService { // 点播超时回复BYE 同时释放ssrc以及此次点播的资源 try { cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null); - } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { + } catch (InvalidArgumentException | ParseException | SipException | + SsrcTransactionNotFoundException e) { logger.error("[点播超时], 发送BYE失败 {}", e.getMessage()); } finally { timeoutCallback.run(1, "收流超时"); @@ -483,7 +438,7 @@ public class PlayServiceImpl implements IPlayService { onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId); hookEvent.response(mediaServerItemInuse, response); logger.info("[点播成功] deviceId: {}, channelId: {}", device.getDeviceId(), channelId); - String streamUrl = String.format("http://127.0.0.1:%s/%s/%s.live.flv", mediaServerItemInuse.getHttpPort(), "rtp", ssrcInfo.getStream()); + String streamUrl = String.format("http://127.0.0.1:%s/%s/%s.live.flv", mediaServerItemInuse.getHttpPort(), "rtp", ssrcInfo.getStream()); String path = "snap"; String fileName = device.getDeviceId() + "_" + channelId + ".jpg"; // 请求截图 @@ -652,8 +607,8 @@ public class PlayServiceImpl implements IPlayService { @Override public void playBack(String deviceId, String channelId, String startTime, - String endTime, InviteStreamCallback inviteStreamCallback, - PlayBackCallback callback) { + String endTime, InviteStreamCallback inviteStreamCallback, + PlayBackCallback callback) { Device device = storager.queryVideoDevice(deviceId); if (device == null) { return; @@ -666,9 +621,9 @@ public class PlayServiceImpl implements IPlayService { @Override public void playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, - String deviceId, String channelId, String startTime, - String endTime, InviteStreamCallback infoCallBack, - PlayBackCallback playBackCallback) { + String deviceId, String channelId, String startTime, + String endTime, InviteStreamCallback infoCallBack, + PlayBackCallback playBackCallback) { if (mediaServerItem == null || ssrcInfo == null) { return; } @@ -792,7 +747,6 @@ public class PlayServiceImpl implements IPlayService { } - @Override public void download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback playBackCallback) { Device device = storager.queryVideoDevice(deviceId); @@ -977,7 +931,7 @@ public class PlayServiceImpl implements IPlayService { cmder.streamByeCmd(device, ssrcTransaction.getChannelId(), ssrcTransaction.getStream(), null); } catch (InvalidArgumentException | ParseException | SipException | - SsrcTransactionNotFoundException e) { + SsrcTransactionNotFoundException e) { logger.error("[zlm离线]为正在使用此zlm的设备, 发送BYE失败 {}", e.getMessage()); } } @@ -987,6 +941,7 @@ public class PlayServiceImpl implements IPlayService { @Override public AudioBroadcastResult audioBroadcast(Device device, String channelId) { + // TODO 必须多端口模式才支持语音喊话鹤语音对讲 if (device == null || channelId == null) { return null; } @@ -1005,13 +960,13 @@ public class PlayServiceImpl implements IPlayService { AudioBroadcastResult audioBroadcastResult = new AudioBroadcastResult(); audioBroadcastResult.setApp(app); audioBroadcastResult.setStream(stream); - audioBroadcastResult.setStreamInfo(new StreamContent(mediaService.getStreamInfoByAppAndStream(mediaServerItem, app, stream, null, null, null,false))); + audioBroadcastResult.setStreamInfo(new StreamContent(mediaService.getStreamInfoByAppAndStream(mediaServerItem, app, stream, null, null, null, false))); audioBroadcastResult.setCodec("G.711"); return audioBroadcastResult; } @Override - public void audioBroadcastCmd(Device device, String channelId, int timeout, AudioBroadcastEvent event) throws InvalidArgumentException, ParseException, SipException { + public void audioBroadcastCmd(Device device, String channelId, MediaServerItem mediaServerItem, int timeout, AudioEvent event) throws InvalidArgumentException, ParseException, SipException { if (device == null || channelId == null) { return; } @@ -1027,8 +982,8 @@ public class PlayServiceImpl implements IPlayService { SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null); if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) { // 查询流是否存在,不存在则认为是异常状态 - MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStreamId()); + MediaServerItem mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream()); if (streamReady) { logger.warn("语音广播已经开启: {}", channelId); event.call("语音广播已经开启"); @@ -1038,11 +993,23 @@ public class PlayServiceImpl implements IPlayService { } } } + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null); + if (sendRtpItem != null) { + MediaServerItem mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServer, "rtp", sendRtpItem.getReceiveStream()); + if (streamReady) { + logger.warn("[语音对讲] 进行中: {}", channelId); + event.call("语音对讲进行中"); + return; + } else { + stopTalk(device, channelId); + } + } // 发送通知 cmder.audioBroadcastCmd(device, channelId, eventResultForOk -> { // 发送成功 - AudioBroadcastCatch audioBroadcastCatch = new AudioBroadcastCatch(device.getDeviceId(), channelId, AudioBroadcastCatchStatus.Ready); + AudioBroadcastCatch audioBroadcastCatch = new AudioBroadcastCatch(device.getDeviceId(), channelId, AudioBroadcastCatchStatus.Ready, mediaServerItem); audioBroadcastManager.update(audioBroadcastCatch); }, eventResultForError -> { // 发送失败 @@ -1053,19 +1020,18 @@ public class PlayServiceImpl implements IPlayService { } - @Override public void stopAudioBroadcast(String deviceId, String channelId) { List audioBroadcastCatchList = new ArrayList<>(); if (channelId == null) { audioBroadcastCatchList.addAll(audioBroadcastManager.get(deviceId)); - }else { + } else { audioBroadcastCatchList.add(audioBroadcastManager.get(deviceId, channelId)); } if (audioBroadcastCatchList.size() > 0) { for (AudioBroadcastCatch audioBroadcastCatch : audioBroadcastCatchList) { Device device = deviceService.getDevice(deviceId); - if (device == null || audioBroadcastCatch == null ) { + if (device == null || audioBroadcastCatch == null) { return; } SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(deviceId, audioBroadcastCatch.getChannelId(), null, null); @@ -1075,7 +1041,7 @@ public class PlayServiceImpl implements IPlayService { Map param = new HashMap<>(); param.put("vhost", "__defaultVhost__"); param.put("app", sendRtpItem.getApp()); - param.put("stream", sendRtpItem.getStreamId()); + param.put("stream", sendRtpItem.getStream()); zlmresTfulUtils.stopSendRtp(mediaInfo, param); try { cmder.streamByeCmd(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null); @@ -1199,12 +1165,12 @@ public class PlayServiceImpl implements IPlayService { String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - logger.info("收到ACK,rtp/{}开始向上级推流, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStreamId(), + logger.info("收到ACK,rtp/{}开始向上级推流, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp()); Map param = new HashMap<>(12); - param.put("vhost","__defaultVhost__"); - param.put("app",sendRtpItem.getApp()); - param.put("stream",sendRtpItem.getStreamId()); + param.put("vhost", "__defaultVhost__"); + param.put("app", sendRtpItem.getApp()); + param.put("stream", sendRtpItem.getStream()); param.put("ssrc", sendRtpItem.getSsrc()); param.put("src_port", sendRtpItem.getLocalPort()); param.put("pt", sendRtpItem.getPt()); @@ -1213,12 +1179,12 @@ public class PlayServiceImpl implements IPlayService { param.put("is_udp", is_Udp); if (!sendRtpItem.isTcp()) { // udp模式下开启rtcp保活 - param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0"); + param.put("udp_rtcp_timeout", sendRtpItem.isRtcp() ? "1" : "0"); } if (mediaInfo == null) { RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance( - sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStreamId(), + sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(), sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio()); redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> { @@ -1233,16 +1199,16 @@ public class PlayServiceImpl implements IPlayService { if (zlmrtpServerFactory.releasePort(mediaInfo, sendRtpItem.getSsrc())) { if (sendRtpItem.isTcpActive()) { startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param); - }else { + } else { param.put("dst_url", sendRtpItem.getIp()); param.put("dst_port", sendRtpItem.getPort()); startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); } } - }else { + } else { if (sendRtpItem.isTcpActive()) { startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param); - }else { + } else { param.put("dst_url", sendRtpItem.getIp()); param.put("dst_port", sendRtpItem.getPort()); startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); @@ -1260,10 +1226,10 @@ public class PlayServiceImpl implements IPlayService { if (jsonObject == null) { logger.error("RTP推流失败: 请检查ZLM服务"); } else if (jsonObject.getInteger("code") == 0) { - logger.info("调用ZLM推流接口, 结果: {}", jsonObject); - logger.info("RTP推流成功[ {}/{} ],{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port")); + logger.info("调用ZLM推流接口, 结果: {}", jsonObject); + logger.info("RTP推流成功[ {}/{} ],{}->{}:{}, ", param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port")); } else { - logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"), JSON.toJSONString(param)); + logger.error("RTP推流失败: {}, 参数:{}", jsonObject.getString("msg"), JSON.toJSONString(param)); if (sendRtpItem.isOnlyAudio()) { Device device = deviceService.getDevice(sendRtpItem.getDeviceId()); AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); @@ -1275,7 +1241,7 @@ public class PlayServiceImpl implements IPlayService { logger.error("[命令发送失败] 停止语音对讲: {}", e.getMessage()); } } - }else { + } else { // 向上级平台 try { commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId()); @@ -1285,4 +1251,105 @@ public class PlayServiceImpl implements IPlayService { } } } + + @Override + public void talkCmd(Device device, String channelId, MediaServerItem mediaServerItem, String stream, AudioEvent event) { + if (device == null || channelId == null) { + return; + } + // TODO 必须多端口模式才支持语音喊话鹤语音对讲 + logger.info("[语音对讲] device: {}, channel: {}", device.getDeviceId(), channelId); + DeviceChannel deviceChannel = storager.queryChannel(device.getDeviceId(), channelId); + if (deviceChannel == null) { + logger.warn("开启语音对讲的时候未找到通道: {}", channelId); + event.call("开启语音对讲的时候未找到通道"); + return; + } + // 查询通道使用状态 + if (audioBroadcastManager.exit(device.getDeviceId(), channelId)) { + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null); + if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) { + // 查询流是否存在,不存在则认为是异常状态 + MediaServerItem mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream()); + if (streamReady) { + logger.warn("[语音对讲] 正在语音广播,无法开启语音通话: {}", channelId); + event.call("正在语音广播"); + return; + } else { + stopAudioBroadcast(device.getDeviceId(), channelId); + } + } + } + + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, stream, null); + if (sendRtpItem != null) { + MediaServerItem mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServer, "rtp", sendRtpItem.getReceiveStream()); + if (streamReady) { + logger.warn("[语音对讲] 进行中: {}", channelId); + event.call("语音对讲进行中"); + return; + } else { + stopTalk(device, channelId); + } + } + + talk(mediaServerItem, device, channelId, stream, (MediaServerItem mediaServerItem1, JSONObject response) -> { + logger.info("[语音对讲] 收到设备发来的流"); + }, eventResult -> { + logger.warn("[语音对讲] 失败,{}/{}, 错误码 {} {}", device.getDeviceId(), channelId, eventResult.statusCode, eventResult.msg); + event.call("失败,错误码 " + eventResult.statusCode + ", " + eventResult.msg); + }, () -> { + logger.warn("[语音对讲] 失败,{}/{} 超时", device.getDeviceId(), channelId); + event.call("失败,超时 "); + stopTalk(device, channelId); + }, errorMsg -> { + logger.warn("[语音对讲] 失败,{}/{} {}", device.getDeviceId(), channelId, errorMsg); + event.call(errorMsg); + stopTalk(device, channelId); + }); + } + + private void stopTalk(Device device, String channelId) { + stopTalk(device, channelId, null); + } + + @Override + public void stopTalk(Device device, String channelId, Boolean streamIsReady) { + logger.info("[语音对讲] 停止, {}/{}", device.getDeviceId(), channelId); + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null); + if (sendRtpItem == null) { + logger.info("[语音对讲] 停止失败, 未找到发送信息,可能已经停止"); + return; + } + // 停止向设备推流 + String mediaServerId = sendRtpItem.getMediaServerId(); + if (mediaServerId == null) { + return; + } + + MediaServerItem mediaServer = mediaServerService.getOne(mediaServerId); + + if (streamIsReady == null || streamIsReady) { + Map param = new HashMap<>(); + param.put("vhost", "__defaultVhost__"); + param.put("app", sendRtpItem.getApp()); + param.put("stream", sendRtpItem.getStream()); + param.put("ssrc", sendRtpItem.getSsrc()); + zlmrtpServerFactory.stopSendRtpStream(mediaServer, param); + } + + mediaServer.getSsrcConfig().releaseSsrc(sendRtpItem.getSsrc()); + + SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, null, sendRtpItem.getStream()); + if (ssrcTransaction != null) { + try { + cmder.streamByeCmd(device, channelId, sendRtpItem.getStream(), null); + } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { + logger.info("[语音对讲] 停止消息发送失败,可能已经停止"); + } + } + redisCatchStorage.deleteSendRTPServer(device.getDeviceId(), channelId,null, null); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index e997e4d5..92f28a1c 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -378,7 +378,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { + sendRtpItem.getMediaServerId() + "_" + sendRtpItem.getPlatformId() + "_" + sendRtpItem.getChannelId() + "_" - + sendRtpItem.getStreamId() + "_" + + sendRtpItem.getStream() + "_" + sendRtpItem.getCallId(); RedisUtil.set(key, sendRtpItem); } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/bean/StreamContent.java b/src/main/java/com/genersoft/iot/vmp/vmanager/bean/StreamContent.java index e7c24aae..e577baa2 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/bean/StreamContent.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/bean/StreamContent.java @@ -1,43 +1,96 @@ package com.genersoft.iot.vmp.vmanager.bean; import com.genersoft.iot.vmp.common.StreamInfo; +import io.swagger.v3.oas.annotations.media.Schema; +@Schema(description = "流信息") public class StreamContent { + @Schema(description = "应用名") private String app; + + @Schema(description = "流ID") private String stream; + @Schema(description = "IP") private String ip; + @Schema(description = "HTTP-FLV流地址") private String flv; + @Schema(description = "HTTPS-FLV流地址") private String https_flv; + + @Schema(description = "Websocket-FLV流地址") private String ws_flv; + + @Schema(description = "Websockets-FLV流地址") private String wss_flv; + + @Schema(description = "HTTP-FMP4流地址") private String fmp4; + + @Schema(description = "HTTPS-FMP4流地址") private String https_fmp4; + + @Schema(description = "Websocket-FMP4流地址") private String ws_fmp4; + + @Schema(description = "Websockets-FMP4流地址") private String wss_fmp4; + + @Schema(description = "HLS流地址") private String hls; + + @Schema(description = "HTTPS-HLS流地址") private String https_hls; + + @Schema(description = "Websocket-HLS流地址") private String ws_hls; + + @Schema(description = "Websockets-HLS流地址") private String wss_hls; + + @Schema(description = "HTTP-TS流地址") private String ts; + + @Schema(description = "HTTPS-TS流地址") private String https_ts; + + @Schema(description = "Websocket-TS流地址") private String ws_ts; + + @Schema(description = "Websockets-TS流地址") private String wss_ts; + + @Schema(description = "RTMP流地址") private String rtmp; + + @Schema(description = "RTMPS流地址") private String rtmps; + + @Schema(description = "RTSP流地址") private String rtsp; + + @Schema(description = "RTSPS流地址") private String rtsps; + + @Schema(description = "RTC流地址") private String rtc; + @Schema(description = "RTCS流地址") private String rtcs; + + @Schema(description = "流媒体ID") private String mediaServerId; + + @Schema(description = "流编码信息") private Object tracks; + @Schema(description = "开始时间") private String startTime; + @Schema(description = "结束时间") private String endTime; private double progress; diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java index 0805586c..c0f64cf4 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java @@ -19,11 +19,7 @@ import com.genersoft.iot.vmp.service.IMediaService; import com.genersoft.iot.vmp.service.IPlayService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; -import com.genersoft.iot.vmp.vmanager.bean.DeferredResultEx; -import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult; -import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; -import com.genersoft.iot.vmp.vmanager.bean.StreamContent; -import com.genersoft.iot.vmp.vmanager.bean.WVPResult; +import com.genersoft.iot.vmp.vmanager.bean.*; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; import io.swagger.v3.oas.annotations.tags.Tag; @@ -269,14 +265,6 @@ public class PlayController { } - @GetMapping("/1111") - public void broadcastApi1() { - MediaServerItem defaultMediaServer = mediaServerService.getMediaServerForMinimumLoad(null); - Device device = storager.queryVideoDevice("34020000001320090001"); - playService.talk(defaultMediaServer, device, "34020000001370000001", null, null, null); - - } - @Operation(summary = "停止语音广播") @Parameter(name = "deviceId", description = "设备Id", required = true) @@ -289,7 +277,7 @@ public class PlayController { } // try { // playService.stopAudioBroadcast(deviceId, channelId); -// } catch (InvalidArgumentException | ParseException | SsrcTransactionNotFoundException | SipException e) { +// } catch (InvalidArgumentException | ParseException | SipException e) { // logger.error("[命令发送失败] 停止语音: {}", e.getMessage()); // throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage()); // } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/bean/AudioBroadcastEvent.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/bean/AudioEvent.java similarity index 74% rename from src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/bean/AudioBroadcastEvent.java rename to src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/bean/AudioEvent.java index 55b710f2..b4954bfd 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/bean/AudioBroadcastEvent.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/bean/AudioEvent.java @@ -4,6 +4,6 @@ package com.genersoft.iot.vmp.vmanager.gb28181.play.bean; /** * @author lin */ -public interface AudioBroadcastEvent { +public interface AudioEvent { void call(String msg); } diff --git a/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java b/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java index 5381a3ae..44266ded 100644 --- a/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java +++ b/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java @@ -185,7 +185,7 @@ public class ApiStreamController { } try { cmder.streamByeCmd(device, code, streamInfo.getStream(), null); - } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { + } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { JSONObject result = new JSONObject(); result.put("error","发送BYE失败:" + e.getMessage()); return result;