From e0783609285ed65c0243b9d4b1eb855c982cc1ae Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Wed, 11 Sep 2024 16:59:52 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E8=AF=AD=E9=9F=B3=E5=AF=B9?= =?UTF-8?q?=E8=AE=B2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/gb28181/bean/InviteInfo.java | 3 +- .../service/impl/DeviceServiceImpl.java | 1 + .../service/impl/InviteStreamServiceImpl.java | 6 +- .../gb28181/service/impl/PlayServiceImpl.java | 2 +- .../session/AudioBroadcastManager.java | 13 +- .../cmd/SIPRequestHeaderProvider.java | 5 +- .../request/impl/InviteRequestProcessor.java | 317 +++++++++--------- .../iot/vmp/gb28181/utils/SipUtils.java | 17 +- .../service/impl/MediaServerServiceImpl.java | 2 + .../media/zlm/ZLMMediaNodeServerService.java | 3 +- .../service/impl/StreamPushServiceImpl.java | 5 +- 11 files changed, 194 insertions(+), 180 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteInfo.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteInfo.java index e1736f95..57e83bd1 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteInfo.java @@ -6,7 +6,8 @@ import lombok.Data; @Data public class InviteInfo { private String requesterId; - private String channelId; + private String targetChannelId; + private String sourceChannelId; private String sessionName; private String ssrc; private boolean tcp; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java index 1bdfbf86..9d110a84 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java @@ -459,6 +459,7 @@ public class DeviceServiceImpl implements IDeviceService { if (!ObjectUtils.isEmpty(device.getStreamMode())) { deviceInStore.setStreamMode(device.getStreamMode()); } + deviceInStore.setBroadcastPushAfterAck(device.isBroadcastPushAfterAck()); // 目录订阅相关的信息 if (deviceInStore.getSubscribeCycleForCatalog() != device.getSubscribeCycleForCatalog()) { if (device.getSubscribeCycleForCatalog() > 0) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/InviteStreamServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/InviteStreamServiceImpl.java index 116414bd..3ca08706 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/InviteStreamServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/InviteStreamServiceImpl.java @@ -293,11 +293,13 @@ public class InviteStreamServiceImpl implements IInviteStreamService { public void call(InviteSessionType type, Integer channelId, String stream, int code, String msg, StreamInfo data) { String key = buildSubStreamKey(type, channelId, stream); List> callbacks = inviteErrorCallbackMap.get(key); - if (callbacks == null) { + if (callbacks == null || callbacks.isEmpty()) { return; } for (ErrorCallback callback : callbacks) { - callback.run(code, msg, data); + if (callback != null) { + callback.run(code, msg, data); + } } inviteErrorCallbackMap.remove(key); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java index e44a90f7..e620aa9b 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java @@ -127,7 +127,7 @@ public class PlayServiceImpl implements IPlayService { @Async("taskExecutor") @org.springframework.context.event.EventListener public void onApplicationEvent(MediaArrivalEvent event) { - if ("broadcast".equals(event.getApp())) { + if ("broadcast".equals(event.getApp()) || "talk".equals(event.getApp())) { if (event.getStream().indexOf("_") > 0) { String[] streamArray = event.getStream().split("_"); if (streamArray.length == 2) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java index 7dc9ca24..57067f17 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java @@ -2,7 +2,6 @@ package com.genersoft.iot.vmp.gb28181.session; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatch; -import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -52,15 +51,9 @@ public class AudioBroadcastManager { public List getByDeviceId(String deviceId) { List audioBroadcastCatchList= new ArrayList<>(); - if (SipUtils.isFrontEnd(deviceId)) { - if (data.get(deviceId) != null) { - audioBroadcastCatchList.add(data.get(deviceId)); - } - }else { - for (AudioBroadcastCatch broadcastCatch : data.values()) { - if (broadcastCatch.getDeviceId().equals(deviceId)) { - audioBroadcastCatchList.add(broadcastCatch); - } + for (AudioBroadcastCatch broadcastCatch : data.values()) { + if (broadcastCatch.getDeviceId().equals(deviceId)) { + audioBroadcastCatchList.add(broadcastCatch); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java index adc617da..7bfd91bf 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java @@ -164,11 +164,12 @@ public class SIPRequestHeaderProvider { // SipURI requestLine = SipFactory.getInstance().createAddressFactory().createSipURI(device.getDeviceId(), device.getHostAddress()); // via ArrayList viaHeaders = new ArrayList(); - ViaHeader viaHeader = SipFactory.getInstance().createHeaderFactory().createViaHeader(sipLayer.getLocalIp(device.getLocalIp()), sipConfig.getPort(), device.getTransport(), SipUtils.getNewViaTag()); + ViaHeader viaHeader = SipFactory.getInstance().createHeaderFactory().createViaHeader(sipLayer.getLocalIp(device.getLocalIp()), sipConfig.getPort(), device.getTransport(), transactionInfo.getViaBranch()); // viaHeader.setRPort(); viaHeaders.add(viaHeader); //from - SipURI fromSipURI = SipFactory.getInstance().createAddressFactory().createSipURI(sipConfig.getId(),sipConfig.getDomain()); +// SipURI fromSipURI = SipFactory.getInstance().createAddressFactory().createSipURI(sipConfig.getId(),sipConfig.getDomain()); + SipURI fromSipURI = SipFactory.getInstance().createAddressFactory().createSipURI(sipConfig.getId(), sipLayer.getLocalIp(device.getLocalIp()) + ":" + sipConfig.getPort()); Address fromAddress = SipFactory.getInstance().createAddressFactory().createAddress(fromSipURI); FromHeader fromHeader = SipFactory.getInstance().createHeaderFactory().createFromHeader(fromAddress, transactionInfo.getFromTag()); //to diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index f4d01f5b..a63f7fb0 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -46,6 +46,7 @@ import javax.sip.header.CallIdHeader; import javax.sip.message.Response; import java.text.ParseException; import java.time.Instant; +import java.util.List; import java.util.Vector; /** @@ -143,12 +144,12 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements // 查询请求是否来自上级平台\设备 Platform platform = platformService.queryPlatformByServerGBId(inviteInfo.getRequesterId()); if (platform == null) { - inviteFromDeviceHandle(request, inviteInfo.getRequesterId(), inviteInfo.getChannelId()); + inviteFromDeviceHandle(request, inviteInfo); } else { // 查询平台下是否有该通道 - CommonGBChannel channel= channelService.queryOneWithPlatform(platform.getId(), inviteInfo.getChannelId()); + CommonGBChannel channel= channelService.queryOneWithPlatform(platform.getId(), inviteInfo.getTargetChannelId()); if (channel == null) { - log.info("[上级INVITE] 通道不存在,返回404: {}", inviteInfo.getChannelId()); + log.info("[上级INVITE] 通道不存在,返回404: {}", inviteInfo.getTargetChannelId()); try { // 通道不存在,发404,资源不存在 responseAck(request, Response.NOT_FOUND); @@ -593,7 +594,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements InviteInfo inviteInfo = new InviteInfo(); SIPRequest request = (SIPRequest)evt.getRequest(); - String channelIdFromSub = SipUtils.getChannelIdFromRequest(request); + String[] channelIdArrayFromSub = SipUtils.getChannelIdFromRequest(request); // 解析sdp消息, 使用jainsip 自带的sdp解析方式 String contentString = new String(request.getRawContent()); @@ -605,7 +606,8 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements URIField uriField = (URIField)sdp.getURI(); channelIdFromSdp = uriField.getURI().split(":")[0]; } - final String channelId = StringUtils.isNotBlank(channelIdFromSdp) ? channelIdFromSdp : channelIdFromSub; + final String channelId = StringUtils.isNotBlank(channelIdFromSdp) ? channelIdFromSdp : + (channelIdArrayFromSub != null? channelIdArrayFromSub[0]: null); String requesterId = SipUtils.getUserIdFromFromHeader(request); CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME); if (requesterId == null || channelId == null) { @@ -615,7 +617,10 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements log.info("[INVITE] 来源ID: {}, callId: {}, 来自:{}:{}", requesterId, callIdHeader.getCallId(), request.getRemoteAddress(), request.getRemotePort()); inviteInfo.setRequesterId(requesterId); - inviteInfo.setChannelId(channelId); + inviteInfo.setTargetChannelId(channelId); + if (channelIdArrayFromSub != null && channelIdArrayFromSub.length == 2) { + inviteInfo.setSourceChannelId(channelIdArrayFromSub[1]); + } inviteInfo.setSessionName(sessionName); inviteInfo.setSsrc(gb28181Sdp.getSsrc()); inviteInfo.setCallId(callIdHeader.getCallId()); @@ -702,7 +707,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements private String createSendSdp(SendRtpInfo sendRtpItem, InviteInfo inviteInfo, String sdpIp) { StringBuilder content = new StringBuilder(200); content.append("v=0\r\n"); - content.append("o=" + inviteInfo.getChannelId() + " 0 0 IN IP4 " + sdpIp + "\r\n"); + content.append("o=" + inviteInfo.getTargetChannelId() + " 0 0 IN IP4 " + sdpIp + "\r\n"); content.append("s=" + inviteInfo.getSessionName() + "\r\n"); content.append("c=IN IP4 " + sdpIp + "\r\n"); if ("Playback".equalsIgnoreCase(inviteInfo.getSessionName())) { @@ -743,30 +748,31 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } } - public void inviteFromDeviceHandle(SIPRequest request, String requesterId, String channelId) { - - String realChannelId = null; + public void inviteFromDeviceHandle(SIPRequest request, InviteInfo inviteInfo) { + if (inviteInfo.getSourceChannelId() == null) { + log.warn("来自设备的Invite请求,无法从请求信息中确定请求来自的通道,已忽略,requesterId: {}", inviteInfo.getRequesterId()); + try { + responseAck(request, Response.FORBIDDEN); + } catch (SipException | InvalidArgumentException | ParseException e) { + log.error("[命令发送失败] 来自设备的Invite请求,无法从请求信息中确定所属设备 FORBIDDEN: {}", e.getMessage()); + } + return; + } // 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备) - Device device = redisCatchStorage.getDevice(requesterId); + Device device = redisCatchStorage.getDevice(inviteInfo.getRequesterId()); // 判断requesterId是设备还是通道 if (device == null) { - device = storager.queryVideoDeviceByChannelId(requesterId); - realChannelId = requesterId; - }else { - realChannelId = channelId; + device = storager.queryVideoDeviceByChannelId(inviteInfo.getRequesterId()); } if (device == null) { // 检查channelID是否可用 - device = redisCatchStorage.getDevice(channelId); - if (device == null) { - device = storager.queryVideoDeviceByChannelId(channelId); - realChannelId = channelId; - } + device = storager.queryVideoDeviceByChannelId(inviteInfo.getSourceChannelId()); } if (device == null) { - log.warn("来自设备的Invite请求,无法从请求信息中确定所属设备,已忽略,requesterId: {}/{}", requesterId, channelId); + log.warn("来自设备的Invite请求,无法从请求信息中确定所属设备,已忽略,requesterId: {}/{}", inviteInfo.getRequesterId(), + inviteInfo.getSourceChannelId()); try { responseAck(request, Response.FORBIDDEN); } catch (SipException | InvalidArgumentException | ParseException e) { @@ -774,19 +780,24 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } return; } - DeviceChannel deviceChannel = deviceChannelService.getOne(device.getDeviceId(), realChannelId); + DeviceChannel deviceChannel = deviceChannelService.getOne(device.getDeviceId(), inviteInfo.getSourceChannelId()); if (deviceChannel == null) { - log.warn("来自设备的Invite请求,无法从请求信息中确定所属通道,已忽略,requesterId: {}/{}", requesterId, realChannelId); - try { - responseAck(request, Response.FORBIDDEN); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] 来自设备的Invite请求,无法从请求信息中确定所属设备 FORBIDDEN: {}", e.getMessage()); + List audioBroadcastCatchList = audioBroadcastManager.getByDeviceId(device.getDeviceId()); + if (audioBroadcastCatchList.isEmpty()) { + log.warn("来自设备的Invite请求,无法从请求信息中确定所属通道,已忽略,requesterId: {}/{}", inviteInfo.getRequesterId(), inviteInfo.getSourceChannelId()); + try { + responseAck(request, Response.FORBIDDEN); + } catch (SipException | InvalidArgumentException | ParseException e) { + log.error("[命令发送失败] 来自设备的Invite请求,无法从请求信息中确定所属设备 FORBIDDEN: {}", e.getMessage()); + } + return; + }else { + deviceChannel = deviceChannelService.getOneById(audioBroadcastCatchList.get(0).getChannelId()); } - return; } AudioBroadcastCatch broadcastCatch = audioBroadcastManager.get(deviceChannel.getId()); if (broadcastCatch == null) { - log.warn("来自设备的Invite请求非语音广播,已忽略,requesterId: {}/{}", requesterId, channelId); + log.warn("来自设备的Invite请求非语音广播,已忽略,requesterId: {}/{}", inviteInfo.getRequesterId(), inviteInfo.getSourceChannelId()); try { responseAck(request, Response.FORBIDDEN); } catch (SipException | InvalidArgumentException | ParseException e) { @@ -794,143 +805,133 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } return; } - if (device != null) { - log.info("收到设备" + requesterId + "的语音广播Invite请求"); - String key = VideoManagerConstants.BROADCAST_WAITE_INVITE + device.getDeviceId(); - if (!SipUtils.isFrontEnd(device.getDeviceId())) { - key += broadcastCatch.getChannelId(); + log.info("收到设备" + inviteInfo.getRequesterId() + "的语音广播Invite请求"); + String key = VideoManagerConstants.BROADCAST_WAITE_INVITE + device.getDeviceId(); + if (!SipUtils.isFrontEnd(device.getDeviceId())) { + key += broadcastCatch.getChannelId(); + } + dynamicTask.stop(key); + try { + responseAck(request, Response.TRYING); + } catch (SipException | InvalidArgumentException | ParseException e) { + log.error("[命令发送失败] invite BAD_REQUEST: {}", e.getMessage()); + playService.stopAudioBroadcast(device, deviceChannel); + return; + } + String contentString = new String(request.getRawContent()); + + try { + Gb28181Sdp gb28181Sdp = SipUtils.parseSDP(contentString); + SessionDescription sdp = gb28181Sdp.getBaseSdb(); + // 获取支持的格式 + Vector mediaDescriptions = sdp.getMediaDescriptions(true); + + // 查看是否支持PS 负载96 + int port = -1; + boolean mediaTransmissionTCP = false; + Boolean tcpActive = null; + for (int i = 0; i < mediaDescriptions.size(); i++) { + MediaDescription mediaDescription = (MediaDescription) mediaDescriptions.get(i); + Media media = mediaDescription.getMedia(); + + Vector mediaFormats = media.getMediaFormats(false); +// if (mediaFormats.contains("8")) { + port = media.getMediaPort(); + String protocol = media.getProtocol(); + // 区分TCP发流还是udp, 当前默认udp + if ("TCP/RTP/AVP".equals(protocol)) { + String setup = mediaDescription.getAttribute("setup"); + if (setup != null) { + mediaTransmissionTCP = true; + if ("active".equals(setup)) { + tcpActive = true; + } else if ("passive".equals(setup)) { + tcpActive = false; + } + } + } + break; +// } } - dynamicTask.stop(key); - try { - responseAck(request, Response.TRYING); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] invite BAD_REQUEST: {}", e.getMessage()); - playService.stopAudioBroadcast(device, deviceChannel); + if (port == -1) { + log.info("不支持的媒体格式,返回415"); + // 回复不支持的格式 + try { + responseAck(request, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415 + } catch (SipException | InvalidArgumentException | ParseException e) { + log.error("[命令发送失败] invite 不支持的媒体格式: {}", e.getMessage()); + playService.stopAudioBroadcast(device, deviceChannel); + return; + } return; } - String contentString = new String(request.getRawContent()); + String addressStr = sdp.getOrigin().getAddress(); + log.info("设备{}请求语音流,地址:{}:{},ssrc:{}, {}", inviteInfo.getRequesterId(), addressStr, port, gb28181Sdp.getSsrc(), + mediaTransmissionTCP ? (tcpActive ? "TCP主动" : "TCP被动") : "UDP"); - try { - Gb28181Sdp gb28181Sdp = SipUtils.parseSDP(contentString); - SessionDescription sdp = gb28181Sdp.getBaseSdb(); - // 获取支持的格式 - Vector mediaDescriptions = sdp.getMediaDescriptions(true); - - // 查看是否支持PS 负载96 - int port = -1; - boolean mediaTransmissionTCP = false; - Boolean tcpActive = null; - for (int i = 0; i < mediaDescriptions.size(); i++) { - MediaDescription mediaDescription = (MediaDescription) mediaDescriptions.get(i); - Media media = mediaDescription.getMedia(); - - Vector mediaFormats = media.getMediaFormats(false); -// if (mediaFormats.contains("8")) { - port = media.getMediaPort(); - String protocol = media.getProtocol(); - // 区分TCP发流还是udp, 当前默认udp - if ("TCP/RTP/AVP".equals(protocol)) { - String setup = mediaDescription.getAttribute("setup"); - if (setup != null) { - mediaTransmissionTCP = true; - if ("active".equals(setup)) { - tcpActive = true; - } else if ("passive".equals(setup)) { - tcpActive = false; - } - } - } - break; -// } - } - if (port == -1) { - log.info("不支持的媒体格式,返回415"); - // 回复不支持的格式 - try { - responseAck(request, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415 - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] invite 不支持的媒体格式: {}", e.getMessage()); - playService.stopAudioBroadcast(device, deviceChannel); - return; - } - return; - } - String addressStr = sdp.getOrigin().getAddress(); - log.info("设备{}请求语音流,地址:{}:{},ssrc:{}, {}", requesterId, addressStr, port, gb28181Sdp.getSsrc(), - mediaTransmissionTCP ? (tcpActive ? "TCP主动" : "TCP被动") : "UDP"); - - MediaServer mediaServerItem = broadcastCatch.getMediaServerItem(); - if (mediaServerItem == null) { - log.warn("未找到语音喊话使用的zlm"); - try { - responseAck(request, Response.BUSY_HERE); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] invite 未找到可用的zlm: {}", e.getMessage()); - playService.stopAudioBroadcast(device, deviceChannel); - } - return; - } - log.info("设备{}请求语音流, 收流地址:{}:{},ssrc:{}, {}, 对讲方式:{}", requesterId, addressStr, port, gb28181Sdp.getSsrc(), - mediaTransmissionTCP ? (tcpActive ? "TCP主动" : "TCP被动") : "UDP", sdp.getSessionName().getValue()); - CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME); - - SendRtpInfo sendRtpItem = sendRtpServerService.createSendRtpInfo(mediaServerItem, addressStr, port, gb28181Sdp.getSsrc(), requesterId, - device.getDeviceId(), deviceChannel.getId(), - mediaTransmissionTCP, false); - - if (sendRtpItem == null) { - log.warn("服务器端口资源不足"); - try { - responseAck(request, Response.BUSY_HERE); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage()); - playService.stopAudioBroadcast(device, deviceChannel); - return; - } - return; - } - - sendRtpItem.setPlayType(InviteStreamType.BROADCAST); - sendRtpItem.setCallId(callIdHeader.getCallId()); - sendRtpItem.setStatus(1); - sendRtpItem.setApp(broadcastCatch.getApp()); - sendRtpItem.setStream(broadcastCatch.getStream()); - sendRtpItem.setPt(8); - sendRtpItem.setUsePs(false); - sendRtpItem.setRtcp(false); - sendRtpItem.setOnlyAudio(true); - sendRtpItem.setTcp(mediaTransmissionTCP); - if (tcpActive != null) { - sendRtpItem.setTcpActive(tcpActive); - } - - sendRtpServerService.update(sendRtpItem); - - Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, broadcastCatch.getApp(), broadcastCatch.getStream()); - if (streamReady) { - sendOk(device, deviceChannel, sendRtpItem, sdp, request, mediaServerItem, mediaTransmissionTCP, gb28181Sdp.getSsrc()); - } else { - log.warn("[语音通话], 未发现待推送的流,app={},stream={}", broadcastCatch.getApp(), broadcastCatch.getStream()); - try { - responseAck(request, Response.GONE); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] 语音通话 回复410失败, {}", e.getMessage()); - return; - } + MediaServer mediaServerItem = broadcastCatch.getMediaServerItem(); + if (mediaServerItem == null) { + log.warn("未找到语音喊话使用的zlm"); + try { + responseAck(request, Response.BUSY_HERE); + } catch (SipException | InvalidArgumentException | ParseException e) { + log.error("[命令发送失败] invite 未找到可用的zlm: {}", e.getMessage()); playService.stopAudioBroadcast(device, deviceChannel); } - } catch (SdpException e) { - log.error("[SDP解析异常]", e); + return; + } + log.info("设备{}请求语音流, 收流地址:{}:{},ssrc:{}, {}, 对讲方式:{}", inviteInfo.getRequesterId(), addressStr, port, gb28181Sdp.getSsrc(), + mediaTransmissionTCP ? (tcpActive ? "TCP主动" : "TCP被动") : "UDP", sdp.getSessionName().getValue()); + CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME); + + SendRtpInfo sendRtpItem = sendRtpServerService.createSendRtpInfo(mediaServerItem, addressStr, port, gb28181Sdp.getSsrc(), inviteInfo.getRequesterId(), + device.getDeviceId(), deviceChannel.getId(), + mediaTransmissionTCP, false); + + if (sendRtpItem == null) { + log.warn("服务器端口资源不足"); + try { + responseAck(request, Response.BUSY_HERE); + } catch (SipException | InvalidArgumentException | ParseException e) { + log.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage()); + playService.stopAudioBroadcast(device, deviceChannel); + return; + } + return; + } + + sendRtpItem.setPlayType(InviteStreamType.BROADCAST); + sendRtpItem.setCallId(callIdHeader.getCallId()); + sendRtpItem.setStatus(1); + sendRtpItem.setApp(broadcastCatch.getApp()); + sendRtpItem.setStream(broadcastCatch.getStream()); + sendRtpItem.setPt(8); + sendRtpItem.setUsePs(false); + sendRtpItem.setRtcp(false); + sendRtpItem.setOnlyAudio(true); + sendRtpItem.setTcp(mediaTransmissionTCP); + if (tcpActive != null) { + sendRtpItem.setTcpActive(tcpActive); + } + + sendRtpServerService.update(sendRtpItem); + + Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, broadcastCatch.getApp(), broadcastCatch.getStream()); + if (streamReady) { + sendOk(device, deviceChannel, sendRtpItem, sdp, request, mediaServerItem, mediaTransmissionTCP, gb28181Sdp.getSsrc()); + } else { + log.warn("[语音通话], 未发现待推送的流,app={},stream={}", broadcastCatch.getApp(), broadcastCatch.getStream()); + try { + responseAck(request, Response.GONE); + } catch (SipException | InvalidArgumentException | ParseException e) { + log.error("[命令发送失败] 语音通话 回复410失败, {}", e.getMessage()); + return; + } playService.stopAudioBroadcast(device, deviceChannel); } - } else { - log.warn("来自无效设备/平台的请求"); - try { - responseAck(request, Response.BAD_REQUEST); - ; // 不支持的格式,发415 - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] invite 来自无效设备/平台的请求, {}", e.getMessage()); - } + } catch (SdpException e) { + log.error("[SDP解析异常]", e); + playService.stopAudioBroadcast(device, deviceChannel); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java b/src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java index 70c8fa72..0bce5f87 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java @@ -18,7 +18,7 @@ import javax.sdp.SessionDescription; import javax.sip.PeerUnavailableException; import javax.sip.SipFactory; import javax.sip.header.FromHeader; -import javax.sip.header.Header; +import javax.sip.header.SubjectHeader; import javax.sip.header.UserAgentHeader; import javax.sip.message.Request; import java.text.ParseException; @@ -44,13 +44,22 @@ public class SipUtils { /** * 从subject读取channelId * */ - public static String getChannelIdFromRequest(Request request) { - Header subject = request.getHeader("subject"); + public static String[] getChannelIdFromRequest(Request request) { + SubjectHeader subject = (Subject)request.getHeader("subject"); if (subject == null) { // 如果缺失subject return null; } - return ((Subject) subject).getSubject().split(":")[0]; + String[] result = new String[2]; + String subjectStr = subject.getSubject(); + if (subjectStr.indexOf(",") > 0) { + String[] subjectSplit = subjectStr.split(","); + result[0] = subjectSplit[0].split(":")[0]; + result[1] = subjectSplit[1].split(":")[0]; + }else { + result[0] = subjectStr.split(":")[0]; + } + return result; } public static String getUserIdFromFromHeader(FromHeader fromHeader) { diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java index 97ec3b59..2b306496 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java @@ -855,6 +855,8 @@ public class MediaServerServiceImpl implements IMediaServerService { log.info("[startSendRtpStream] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType()); throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到mediaServer对应的实现类"); } + sendRtpItem.setRtcp(true); + log.info("[开始推流] {}/{}, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp()); mediaNodeServerService.startSendRtpStream(mediaServer, sendRtpItem); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java index 9c568269..1100bf66 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java @@ -376,7 +376,7 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService { param.put("is_udp", sendRtpItem.isTcp() ? "0" : "1"); if (!sendRtpItem.isTcp()) { // udp模式下开启rtcp保活 - param.put("udp_rtcp_timeout", sendRtpItem.isRtcp() ? "1" : "0"); + param.put("udp_rtcp_timeout", sendRtpItem.isRtcp() ? "500" : "0"); } param.put("dst_url", sendRtpItem.getIp()); param.put("dst_port", sendRtpItem.getPort()); @@ -384,6 +384,7 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService { if (jsonObject == null || jsonObject.getInteger("code") != 0 ) { throw new ControllerException(jsonObject.getInteger("code"), jsonObject.getString("msg")); } + log.info("[推流结果]:{} ,参数: {}",jsonObject, JSONObject.toJSONString(param)); } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java index 6e31bd23..b05dc382 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java @@ -457,7 +457,10 @@ public class StreamPushServiceImpl implements IStreamPushService { } List commonGBChannelList = new ArrayList<>(); for (StreamPush streamPush : streamPushList) { - commonGBChannelList.add(streamPush.buildCommonGBChannel()); + CommonGBChannel commonGBChannel = streamPush.buildCommonGBChannel(); + if (commonGBChannel != null) { + commonGBChannelList.add(streamPush.buildCommonGBChannel()); + } } gbChannelService.offline(commonGBChannelList); }