From f5f768a247b876696a6ece256baf5b3a43524973 Mon Sep 17 00:00:00 2001 From: chenzhangyue Date: Sun, 10 Sep 2023 17:47:54 +0800 Subject: [PATCH] code review --- .../request/impl/InviteRequestProcessor.java | 901 +++++++++--------- 1 file changed, 456 insertions(+), 445 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index a4f3b5e7e..bb7d50669 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -1,5 +1,27 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; +import java.text.ParseException; +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.Vector; + +import javax.sdp.*; +import javax.sip.InvalidArgumentException; +import javax.sip.RequestEvent; +import javax.sip.SipException; +import javax.sip.header.CallIdHeader; +import javax.sip.message.Response; + +import org.apache.commons.collections4.CollectionUtils; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; @@ -28,28 +50,11 @@ import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.DateUtil; + import gov.nist.javax.sdp.TimeDescriptionImpl; import gov.nist.javax.sdp.fields.TimeField; import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPResponse; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.InitializingBean; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import javax.sdp.*; -import javax.sip.InvalidArgumentException; -import javax.sip.RequestEvent; -import javax.sip.SipException; -import javax.sip.header.CallIdHeader; -import javax.sip.message.Response; -import java.text.ParseException; -import java.time.Instant; -import java.util.HashMap; -import java.util.Map; -import java.util.Random; -import java.util.Vector; /** * SIP命令类型: INVITE请求 @@ -130,7 +135,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements */ @Override public void process(RequestEvent evt) { - // Invite Request消息实现,此消息一般为级联消息,上级给下级发送请求视频指令 + // Invite Request消息实现,此消息一般为级联消息,上级给下级发送请求视频指令 try { SIPRequest request = (SIPRequest)evt.getRequest(); String channelId = SipUtils.getChannelIdFromRequest(request); @@ -139,432 +144,262 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements if (requesterId == null || channelId == null) { logger.info("无法从FromHeader的Address中获取到平台id,返回400"); // 参数不全, 发400,请求错误 - try { - responseAck(request, Response.BAD_REQUEST); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] invite BAD_REQUEST: {}", e.getMessage()); - } + responseAckType(request, Response.BAD_REQUEST, "[命令发送失败] invite BAD_REQUEST: {}"); return; } - // 查询请求是否来自上级平台\设备 ParentPlatform platform = storager.queryParentPlatByServerGBId(requesterId); if (platform == null) { inviteFromDeviceHandle(request, requesterId); + return; + } - } else { - // 查询平台下是否有该通道 - DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId); - GbStream gbStream = storager.queryStreamInParentPlatform(requesterId, channelId); - PlatformCatalog catalog = storager.getCatalog(requesterId, channelId); + // 查询平台下是否有该通道 + DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId); + GbStream gbStream = storager.queryStreamInParentPlatform(requesterId, channelId); + PlatformCatalog catalog = storager.getCatalog(requesterId, channelId); - MediaServerItem mediaServerItem = null; - StreamPushItem streamPushItem = null; - StreamProxyItem proxyByAppAndStream =null; + MediaServerItem mediaServerItem = null; + StreamPushItem streamPushItem = null; + StreamProxyItem proxyByAppAndStream = null; + if (channel != null && gbStream == null) { // 不是通道可能是直播流 - if (channel != null && gbStream == null) { - // 通道存在,发100,TRYING - try { - responseAck(request, Response.TRYING); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] invite TRYING: {}", e.getMessage()); - } - } else if (channel == null && gbStream != null) { - - String mediaServerId = gbStream.getMediaServerId(); - mediaServerItem = mediaServerService.getOne(mediaServerId); - if (mediaServerItem == null) { - if ("proxy".equals(gbStream.getStreamType())) { - logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId); - try { - responseAck(request, Response.GONE); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] invite GONE: {}", e.getMessage()); - } - return; - } else { - streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream()); - if (streamPushItem != null) { - mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId()); - } - if (mediaServerItem == null) { - mediaServerItem = mediaServerService.getDefaultMediaServer(); - } - } + // 通道存在,发100,TRYING + responseAckType(request, Response.TRYING, "[命令发送失败] invite TRYING: {}"); + } else if (channel == null && gbStream != null) { + // 通道不存在,但是国标存在,标志直播流 + String mediaServerId = gbStream.getMediaServerId(); + mediaServerItem = mediaServerService.getOne(mediaServerId); + if (mediaServerItem == null) { + if ("proxy".equals(gbStream.getStreamType())) { + // 代理流,但是没有找到zlm + logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId); + responseAckType(request, Response.GONE, "[命令发送失败] invite GONE: {}"); + return; } else { - if ("push".equals(gbStream.getStreamType())) { - streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream()); - if (streamPushItem == null) { - logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId); - try { - responseAck(request, Response.GONE); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] invite GONE: {}", e.getMessage()); - } - return; - } - }else if("proxy".equals(gbStream.getStreamType())){ - proxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(gbStream.getApp(), gbStream.getStream()); - if (proxyByAppAndStream == null) { - logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId); - try { - responseAck(request, Response.GONE); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] invite GONE: {}", e.getMessage()); - } - return; - } + // 非代理拉流 获取推流的流媒体 + streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream()); + if (streamPushItem != null) { + mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId()); + } + if (mediaServerItem == null) { + mediaServerItem = mediaServerService.getDefaultMediaServer(); } } - try { - responseAck(request, Response.CALL_IS_BEING_FORWARDED); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] invite CALL_IS_BEING_FORWARDED: {}", e.getMessage()); - } - } else if (catalog != null) { - try { - // 目录不支持点播 - responseAck(request, Response.BAD_REQUEST, "catalog channel can not play"); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] invite 目录不支持点播: {}", e.getMessage()); - } - return; } else { - logger.info("通道不存在,返回404: {}", channelId); - try { - // 通道不存在,发404,资源不存在 - responseAck(request, Response.NOT_FOUND); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] invite 通道不存在: {}", e.getMessage()); - } - return; - } - // 解析sdp消息, 使用jainsip 自带的sdp解析方式 - String contentString = new String(request.getRawContent()); - - Gb28181Sdp gb28181Sdp = SipUtils.parseSDP(contentString); - SessionDescription sdp = gb28181Sdp.getBaseSdb(); - String sessionName = sdp.getSessionName().getValue(); - - Long startTime = null; - Long stopTime = null; - Instant start = null; - Instant end = null; - if (sdp.getTimeDescriptions(false) != null && sdp.getTimeDescriptions(false).size() > 0) { - TimeDescriptionImpl timeDescription = (TimeDescriptionImpl) (sdp.getTimeDescriptions(false).get(0)); - TimeField startTimeFiled = (TimeField) timeDescription.getTime(); - startTime = startTimeFiled.getStartTime(); - stopTime = startTimeFiled.getStopTime(); - - start = Instant.ofEpochSecond(startTime); - end = Instant.ofEpochSecond(stopTime); - } - // 获取支持的格式 - Vector mediaDescriptions = sdp.getMediaDescriptions(true); - // 查看是否支持PS 负载96 - //String ip = null; - int port = -1; - boolean mediaTransmissionTCP = false; - Boolean tcpActive = null; - for (Object description : mediaDescriptions) { - MediaDescription mediaDescription = (MediaDescription) description; - Media media = mediaDescription.getMedia(); - - Vector mediaFormats = media.getMediaFormats(false); - if (mediaFormats.contains("96")) { - port = media.getMediaPort(); - //String mediaType = media.getMediaType(); - String protocol = media.getProtocol(); - - // 区分TCP发流还是udp, 当前默认udp - if ("TCP/RTP/AVP".equalsIgnoreCase(protocol)) { - String setup = mediaDescription.getAttribute("setup"); - if (setup != null) { - mediaTransmissionTCP = true; - if ("active".equalsIgnoreCase(setup)) { - tcpActive = true; - } else if ("passive".equalsIgnoreCase(setup)) { - tcpActive = false; - } - } + if ("push".equals(gbStream.getStreamType())) { + streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream()); + if (streamPushItem == null) { + logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId); + responseAckType(request, Response.GONE, "[命令发送失败] invite GONE: {}"); + return; + } + } else if ("proxy".equals(gbStream.getStreamType())) { + proxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(gbStream.getApp(), gbStream.getStream()); + if (proxyByAppAndStream == null) { + logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId); + responseAckType(request, Response.GONE, "[命令发送失败] invite GONE: {}"); + return; } - break; } } - if (port == -1) { - logger.info("不支持的媒体格式,返回415"); - // 回复不支持的格式 - try { - // 不支持的格式,发415 - responseAck(request, Response.UNSUPPORTED_MEDIA_TYPE); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] invite 不支持的格式: {}", e.getMessage()); - } - return; + responseAckType(request, Response.CALL_IS_BEING_FORWARDED, "[命令发送失败] invite CALL_IS_BEING_FORWARDED: {}"); + } else if (catalog != null) { + try { + // 目录不支持点播 + responseAck(request, Response.BAD_REQUEST, "catalog channel can not play"); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] invite 目录不支持点播: {}", e.getMessage()); } - String username = sdp.getOrigin().getUsername(); - String addressStr = sdp.getConnection().getAddress(); + return; + } else { + logger.info("通道不存在,返回404: {}", channelId); + responseAckType(request, Response.NOT_FOUND, "[命令发送失败] invite 通道不存在: {}"); + return; + } + // 解析sdp消息, 使用jainsip 自带的sdp解析方式 + String contentString = new String(request.getRawContent()); + Gb28181Sdp gb28181Sdp = SipUtils.parseSDP(contentString); + SessionDescription sdp = gb28181Sdp.getBaseSdb(); + String sessionName = sdp.getSessionName().getValue(); - Device device = null; - // 通过 channel 和 gbStream 是否为null 值判断来源是直播流合适国标 - if (channel != null) { - device = storager.queryVideoDeviceByPlatformIdAndChannelId(requesterId, channelId); - if (device == null) { - logger.warn("点播平台{}的通道{}时未找到设备信息", requesterId, channel); - try { - responseAck(request, Response.SERVER_INTERNAL_ERROR); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] invite 未找到设备信息: {}", e.getMessage()); - } - return; - } - mediaServerItem = playService.getNewMediaServerItem(device); - if (mediaServerItem == null) { - logger.warn("未找到可用的zlm"); - try { - responseAck(request, Response.BUSY_HERE); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] invite BUSY_HERE: {}", e.getMessage()); - } - return; - } + Long startTime = null; + Long stopTime = null; + Instant start = null; + Instant end = null; + if (CollectionUtils.isNotEmpty(sdp.getTimeDescriptions(false))) { + TimeDescriptionImpl timeDescription = (TimeDescriptionImpl) (sdp.getTimeDescriptions(false).get(0)); + TimeField startTimeFiled = (TimeField) timeDescription.getTime(); + startTime = startTimeFiled.getStartTime(); + stopTime = startTimeFiled.getStopTime(); - String ssrc; - if (userSetting.getUseCustomSsrcForParentInvite() || gb28181Sdp.getSsrc() == null) { - // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 - ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); - }else { - ssrc = gb28181Sdp.getSsrc(); - } - String streamTypeStr = null; - if (mediaTransmissionTCP) { - if (tcpActive) { - streamTypeStr = "TCP-ACTIVE"; - }else { - streamTypeStr = "TCP-PASSIVE"; - } - }else { - streamTypeStr = "UDP"; - } - logger.info("[上级Invite] {}, 平台:{}, 通道:{}, 收流地址:{}:{},收流方式:{}, ssrc:{}", sessionName, username, channelId, addressStr, port, streamTypeStr, ssrc); - SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, - device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp()); + start = Instant.ofEpochSecond(startTime); + end = Instant.ofEpochSecond(stopTime); + } + // 获取支持的格式 + Vector mediaDescriptions = sdp.getMediaDescriptions(true); + // 查看是否支持PS 负载96 + // String ip = null; + int port = -1; + boolean mediaTransmissionTCP = false; + Boolean tcpActive = null; + for (Object description : mediaDescriptions) { + MediaDescription mediaDescription = (MediaDescription) description; + Media media = mediaDescription.getMedia(); - if (tcpActive != null) { - sendRtpItem.setTcpActive(tcpActive); - } - if (sendRtpItem == null) { - logger.warn("服务器端口资源不足"); - try { - responseAck(request, Response.BUSY_HERE); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage()); - } - return; - } - sendRtpItem.setCallId(callIdHeader.getCallId()); - sendRtpItem.setPlayType("Play".equalsIgnoreCase(sessionName) ? InviteStreamType.PLAY : InviteStreamType.PLAYBACK); + Vector mediaFormats = media.getMediaFormats(false); + if (mediaFormats.contains("96")) { + port = media.getMediaPort(); + // String mediaType = media.getMediaType(); + String protocol = media.getProtocol(); - Long finalStartTime = startTime; - Long finalStopTime = stopTime; - ErrorCallback hookEvent = (code, msg, data) -> { - StreamInfo streamInfo = (StreamInfo)data; - MediaServerItem mediaServerItemInUSe = mediaServerService.getOne(streamInfo.getMediaServerId()); - logger.info("[上级Invite]下级已经开始推流。 回复200OK(SDP), {}/{}", streamInfo.getApp(), streamInfo.getStream()); - // * 0 等待设备推流上来 - // * 1 下级已经推流,等待上级平台回复ack - // * 2 推流中 - sendRtpItem.setStatus(1); - redisCatchStorage.updateSendRTPSever(sendRtpItem); - - StringBuffer content = new StringBuffer(200); - content.append("v=0\r\n"); - content.append("o=" + channelId + " 0 0 IN IP4 " + mediaServerItemInUSe.getSdpIp() + "\r\n"); - content.append("s=" + sessionName + "\r\n"); - content.append("c=IN IP4 " + mediaServerItemInUSe.getSdpIp() + "\r\n"); - if ("Playback".equalsIgnoreCase(sessionName)) { - content.append("t=" + finalStartTime + " " + finalStopTime + "\r\n"); - } else { - content.append("t=0 0\r\n"); - } - int localPort = sendRtpItem.getLocalPort(); - if (localPort == 0) { - // 非严格模式端口不统一, 增加兼容性,修改为一个不为0的端口 - localPort = new Random().nextInt(65535) + 1; - } - content.append("m=video " + localPort + " RTP/AVP 96\r\n"); - content.append("a=sendonly\r\n"); - content.append("a=rtpmap:96 PS/90000\r\n"); - content.append("y=" + sendRtpItem.getSsrc() + "\r\n"); - content.append("f=\r\n"); - - - try { - // 超时未收到Ack应该回复bye,当前等待时间为10秒 - dynamicTask.startDelay(callIdHeader.getCallId(), () -> { - logger.info("Ack 等待超时"); - mediaServerService.releaseSsrc(mediaServerItemInUSe.getId(), sendRtpItem.getSsrc()); - // 回复bye - try { - cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId()); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); - } - }, 60 * 1000); - responseSdpAck(request, content.toString(), platform); - // tcp主动模式,回复sdp后开启监听 - if (sendRtpItem.isTcpActive()) { - MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - Map param = new HashMap<>(12); - param.put("vhost","__defaultVhost__"); - param.put("app",sendRtpItem.getApp()); - param.put("stream",sendRtpItem.getStreamId()); - param.put("ssrc", sendRtpItem.getSsrc()); - if (!sendRtpItem.isTcpActive()) { - param.put("dst_url",sendRtpItem.getIp()); - param.put("dst_port", sendRtpItem.getPort()); - } - String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; - param.put("is_udp", is_Udp); - param.put("src_port", localPort); - param.put("pt", sendRtpItem.getPt()); - param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); - param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); - if (!sendRtpItem.isTcp()) { - // 开启rtcp保活 - param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0"); - } - JSONObject startSendRtpStreamResult = zlmServerFactory.startSendRtpStreamForPassive(mediaInfo, param); - if (startSendRtpStreamResult != null) { - startSendRtpStreamHand(evt, sendRtpItem, null, startSendRtpStreamResult, param, callIdHeader); - } - } - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] 国标级联 回复SdpAck", e); - } - }; - ErrorCallback errorEvent = ((statusCode, msg, data) -> { - // 未知错误。直接转发设备点播的错误 - try { - if (statusCode > 0) { - Response response = getMessageFactory().createResponse(statusCode, evt.getRequest()); - sipSender.transmitRequest(request.getLocalAddress().getHostAddress(), response); - } - } catch (ParseException | SipException e) { - logger.error("未处理的异常 ", e); - } - }); - sendRtpItem.setApp("rtp"); - if ("Playback".equalsIgnoreCase(sessionName)) { - sendRtpItem.setPlayType(InviteStreamType.PLAYBACK); - String startTimeStr = DateUtil.urlFormatter.format(start); - String endTimeStr = DateUtil.urlFormatter.format(end); - String stream = device.getDeviceId() + "_" + channelId + "_" + startTimeStr + "_" + endTimeStr; - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, stream, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam()); - sendRtpItem.setStreamId(ssrcInfo.getStream()); - // 写入redis, 超时时回复 - redisCatchStorage.updateSendRTPSever(sendRtpItem); - playService.playBack(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start), - DateUtil.formatter.format(end), - (code, msg, data) -> { - if (code == InviteErrorCode.SUCCESS.getCode()){ - hookEvent.run(code, msg, data); - }else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()){ - logger.info("[录像回放]超时, 用户:{}, 通道:{}", username, channelId); - redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); - errorEvent.run(code, msg, data); - }else { - errorEvent.run(code, msg, data); - } - }); - } else if ("Download".equalsIgnoreCase(sessionName)) { - // 获取指定的下载速度 - Vector sdpMediaDescriptions = sdp.getMediaDescriptions(true); - MediaDescription mediaDescription = null; - String downloadSpeed = "1"; - if (sdpMediaDescriptions.size() > 0) { - mediaDescription = (MediaDescription) sdpMediaDescriptions.get(0); - } - if (mediaDescription != null) { - downloadSpeed = mediaDescription.getAttribute("downloadspeed"); - } - - sendRtpItem.setPlayType(InviteStreamType.DOWNLOAD); - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam()); - sendRtpItem.setStreamId(ssrcInfo.getStream()); - // 写入redis, 超时时回复 - redisCatchStorage.updateSendRTPSever(sendRtpItem); - playService.download(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start), - DateUtil.formatter.format(end), Integer.parseInt(downloadSpeed), - (code, msg, data) -> { - if (code == InviteErrorCode.SUCCESS.getCode()) { - hookEvent.run(code, msg, data); - } else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()) { - logger.info("[录像下载]超时, 用户:{}, 通道:{}", username, channelId); - redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); - errorEvent.run(code, msg, data); - } else { - errorEvent.run(code, msg, data); - } - }); - } else { - - SSRCInfo ssrcInfo = playService.play(mediaServerItem, device.getDeviceId(), channelId, ssrc, ((code, msg, data) -> { - if (code == InviteErrorCode.SUCCESS.getCode()) { - hookEvent.run(code, msg, data); - } else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()) { - logger.info("[上级点播]超时, 用户:{}, 通道:{}", username, channelId); - redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); - errorEvent.run(code, msg, data); - } else { - errorEvent.run(code, msg, data); - } - })); - sendRtpItem.setPlayType(InviteStreamType.PLAY); - String streamId = String.format("%s_%s", device.getDeviceId(), channelId); - sendRtpItem.setStreamId(streamId); - sendRtpItem.setSsrc(ssrcInfo.getSsrc()); - redisCatchStorage.updateSendRTPSever(sendRtpItem); - - } - } else if (gbStream != null) { - - String ssrc; - if (userSetting.getUseCustomSsrcForParentInvite() || gb28181Sdp.getSsrc() == null) { - // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 - ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); - }else { - ssrc = gb28181Sdp.getSsrc(); - } - - if("push".equals(gbStream.getStreamType())) { - if (streamPushItem != null && streamPushItem.isPushIng()) { - // 推流状态 - pushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); - } else { - // 未推流 拉起 - notifyStreamOnline(evt, request,gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); - } - }else if ("proxy".equals(gbStream.getStreamType())){ - if (null != proxyByAppAndStream) { - if(proxyByAppAndStream.isStatus()){ - pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); - }else{ - //开启代理拉流 - notifyStreamOnline(evt, request,gbStream, null, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + // 区分TCP发流还是udp, 当前默认udp + if ("TCP/RTP/AVP".equalsIgnoreCase(protocol)) { + String setup = mediaDescription.getAttribute("setup"); + if (setup != null) { + mediaTransmissionTCP = true; + if ("active".equalsIgnoreCase(setup)) { + tcpActive = true; + } else if ("passive".equalsIgnoreCase(setup)) { + tcpActive = false; } } - - } + break; } } + if (port == -1) { + logger.info("不支持的媒体格式,返回415"); + // 回复不支持的格式 + responseAckType(request, Response.UNSUPPORTED_MEDIA_TYPE, "[命令发送失败] invite 不支持的格式: {}"); + return; + } + String username = sdp.getOrigin().getUsername(); + String addressStr = sdp.getConnection().getAddress(); + + // 通过 channel 和 gbStream 是否为null 值判断来源是直播流还是国标 + if (channel != null) { + Device device = storager.queryVideoDeviceByPlatformIdAndChannelId(requesterId, channelId); + if (device == null) { + logger.warn("点播平台{}的通道{}时未找到设备信息", requesterId, channel); + responseAckType(request, Response.SERVER_INTERNAL_ERROR, "[命令发送失败] invite 未找到设备信息: {}"); + return; + } + mediaServerItem = playService.getNewMediaServerItem(device); + if (mediaServerItem == null) { + logger.warn("未找到可用的zlm"); + responseAckType(request, Response.BUSY_HERE, "[命令发送失败] invite BUSY_HERE: {}"); + return; + } + + String ssrc; + if (userSetting.getUseCustomSsrcForParentInvite() || gb28181Sdp.getSsrc() == null) { + // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 + ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) + : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); + } else { + ssrc = gb28181Sdp.getSsrc(); + } + String streamTypeStr = null; + if (mediaTransmissionTCP) { + if (tcpActive) { + streamTypeStr = "TCP-ACTIVE"; + } else { + streamTypeStr = "TCP-PASSIVE"; + } + } else { + streamTypeStr = "UDP"; + } + logger.info("[上级Invite] {}, 平台:{}, 通道:{}, 收流地址:{}:{},收流方式:{}, ssrc:{}", sessionName, username, channelId, addressStr, port, + streamTypeStr, ssrc); + SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, + device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp()); + + if (sendRtpItem == null) { + logger.warn("服务器端口资源不足"); + responseAckType(request, Response.BUSY_HERE, "[命令发送失败] invite 服务器端口资源不足: {}"); + return; + } + if (tcpActive != null) { + sendRtpItem.setTcpActive(tcpActive); + } + sendRtpItem.setCallId(callIdHeader.getCallId()); + sendRtpItem.setPlayType("Play".equalsIgnoreCase(sessionName) ? InviteStreamType.PLAY : InviteStreamType.PLAYBACK); + + // 获取ACK返回参数,点播推流参数 + String content = getContent(channelId, mediaServerItem.getSdpIp(), sessionName, startTime, stopTime, sendRtpItem); + + ErrorCallback hookEvent = getHookSuccessCallback(sendRtpItem, content, callIdHeader.getCallId(), platform, request); + ErrorCallback errorEvent = ((statusCode, msg, data) -> { + // 未知错误。直接转发设备点播的错误 + try { + if (statusCode > 0) { + Response response = getMessageFactory().createResponse(statusCode, evt.getRequest()); + sipSender.transmitRequest(request.getLocalAddress().getHostAddress(), response); + } + } catch (ParseException | SipException e) { + logger.error("未处理的异常 ", e); + } + }); + + // 创建回调 + ErrorCallback callback = callback(channelId, hookEvent, username, platform, callIdHeader, errorEvent); + + if ("Playback".equalsIgnoreCase(sessionName)) { + if (start == null || end == null) { + logger.info("[录像回放]未指定开始结束时间, 用户:{}, 通道:{}", username, channelId); + responseAckType(request, Response.BAD_REQUEST, "[命令发送失败] invite BAD_REQUEST: {}"); + return; + } + + // 回放 + playback(sendRtpItem, start, end, device, channelId, mediaServerItem, callback); + } else if ("Download".equalsIgnoreCase(sessionName)) { + // 获取指定的下载速度下载 + download(sdp, sendRtpItem, mediaServerItem, device, channelId, start, end, callback); + } else { + // 点播 + play(mediaServerItem, device, channelId, ssrc, callback, sendRtpItem); + } + } + + if (gbStream != null) { + String ssrc; + if (userSetting.getUseCustomSsrcForParentInvite() || gb28181Sdp.getSsrc() == null) { + // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 + ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) + : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); + } else { + ssrc = gb28181Sdp.getSsrc(); + } + + if ("push".equals(gbStream.getStreamType())) { + if (streamPushItem != null && streamPushItem.isPushIng()) { + // 推流状态 + pushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + } else { + // 未推流 拉起 + notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + } + } else if ("proxy".equals(gbStream.getStreamType())) { + if (null != proxyByAppAndStream) { + if (proxyByAppAndStream.isStatus()) { + pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive, + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + } else { + // 开启代理拉流 + notifyStreamOnline(evt, request, gbStream, null, platform, callIdHeader, mediaServerItem, port, tcpActive, + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + } + } + + } + } + } catch (SdpParseException e) { logger.error("sdp解析错误", e); } catch (SdpException e) { @@ -572,8 +407,203 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } } - private void startSendRtpStreamHand(RequestEvent evt, SendRtpItem sendRtpItem, ParentPlatform parentPlatform, - JSONObject jsonObject, Map param, CallIdHeader callIdHeader) { + /** + * 国标点播 + * + * @param mediaServerItem 流媒体服务器 + * @param device 设备 + * @param channelId 通道id + * @param ssrc ssrc + * @param sendRtpItem 发送rtp参数 + */ + private void play(MediaServerItem mediaServerItem, Device device, String channelId, String ssrc, ErrorCallback callback, SendRtpItem sendRtpItem) { + SSRCInfo ssrcInfo = playService.play(mediaServerItem, device.getDeviceId(), channelId, ssrc, callback); + sendRtpItem.setPlayType(InviteStreamType.PLAY); + String streamId = String.format("%s_%s", device.getDeviceId(), channelId); + sendRtpItem.setStreamId(streamId); + sendRtpItem.setSsrc(ssrcInfo.getSsrc()); + redisCatchStorage.updateSendRTPSever(sendRtpItem); + } + + /** + * 下载请求 + * + * @param sdp 下载速度 + * @param sendRtpItem 发送rtp参数 + * @param mediaServerItem 流媒体服务器 + * @param device 设备 + * @param channelId 通道id + * @param start 开始时间 + * @param end 结束时间 + * @throws SdpException sdp解析异常 + */ + private void download(SessionDescription sdp, SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, Device device, String channelId, Instant start, Instant end, ErrorCallback callback) throws SdpException { + Vector sdpMediaDescriptions = sdp.getMediaDescriptions(true); + MediaDescription mediaDescription = null; + String downloadSpeed = "1"; + if (CollectionUtils.isNotEmpty(sdpMediaDescriptions)) { + mediaDescription = (MediaDescription) sdpMediaDescriptions.get(0); + } + if (mediaDescription != null) { + downloadSpeed = mediaDescription.getAttribute("downloadspeed"); + } + + sendRtpItem.setPlayType(InviteStreamType.DOWNLOAD); + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, null, device.isSsrcCheck(), true, 0, false, + device.getStreamModeForParam()); + sendRtpItem.setStreamId(ssrcInfo.getStream()); + // 写入redis, 超时时回复 + redisCatchStorage.updateSendRTPSever(sendRtpItem); + playService.download(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start), + DateUtil.formatter.format(end), Integer.parseInt(downloadSpeed), + callback); + } + + /** + * 创建回调 + * @param channelId 通道id + * @param hookEvent 回调事件 + * @param username 用户名 + * @param platform 平台 + * @param callIdHeader callId + * @param errorEvent 错误回调 + * @return + */ + @NotNull + private ErrorCallback callback(String channelId, ErrorCallback hookEvent, String username, ParentPlatform platform, CallIdHeader callIdHeader, ErrorCallback errorEvent) { + return (code, msg, data) -> { + if (code == InviteErrorCode.SUCCESS.getCode()) { + hookEvent.run(code, msg, data); + } else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() + || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()) { + logger.info("[录像下载]超时, 用户:{}, 通道:{}", username, channelId); + redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); + errorEvent.run(code, msg, data); + } else { + errorEvent.run(code, msg, data); + } + }; + } + + private void playback(SendRtpItem sendRtpItem, Instant start, Instant end, Device device, String channelId, MediaServerItem mediaServerItem, ErrorCallback callback) { + sendRtpItem.setPlayType(InviteStreamType.PLAYBACK); + String startTimeStr = DateUtil.urlFormatter.format(start); + String endTimeStr = DateUtil.urlFormatter.format(end); + String stream = device.getDeviceId() + "_" + channelId + "_" + startTimeStr + "_" + endTimeStr; + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, stream, null, device.isSsrcCheck(), true, 0, false, + device.getStreamModeForParam()); + sendRtpItem.setStreamId(ssrcInfo.getStream()); + // 写入redis, 超时时回复 + redisCatchStorage.updateSendRTPSever(sendRtpItem); + playService.playBack(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start), + DateUtil.formatter.format(end), callback); + } + + + @NotNull + private ErrorCallback getHookSuccessCallback(SendRtpItem sendRtpItem, String content, String callId, ParentPlatform platform, SIPRequest request) { + ErrorCallback hookEvent = (code, msg, data) -> { + StreamInfo streamInfo = (StreamInfo) data; + MediaServerItem originMediaServer = mediaServerService.getOne(streamInfo.getMediaServerId()); + logger.info("[上级Invite]下级已经开始推流。 回复200OK(SDP), {}/{}", streamInfo.getApp(), streamInfo.getStream()); + // * 0 等待设备推流上来 + // * 1 下级已经推流,等待上级平台回复ack + // * 2 推流中 + sendRtpItem.setStatus(1); + redisCatchStorage.updateSendRTPSever(sendRtpItem); + + int localPort = sendRtpItem.getLocalPort(); + + try { + // 超时未收到Ack应该回复bye,当前等待时间为10秒 + dynamicTask.startDelay(callId, () -> { + logger.info("Ack 等待超时"); + mediaServerService.releaseSsrc(originMediaServer.getId(), sendRtpItem.getSsrc()); + // 回复bye + try { + cmderFroPlatform.streamByeCmd(platform, callId); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); + } + }, 60 * 1000); + responseSdpAck(request, content, platform); + // tcp主动模式,回复sdp后开启监听 + if (sendRtpItem.isTcpActive()) { + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + // 构造参数 + Map param = getParam(sendRtpItem, localPort); + + JSONObject startSendRtpStreamResult = zlmServerFactory.startSendRtpStreamForPassive(mediaInfo, param); + if (startSendRtpStreamResult != null) { + startSendRtpStreamHand(sendRtpItem, null, startSendRtpStreamResult, param); + } + } + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] 国标级联 回复SdpAck", e); + } + }; + return hookEvent; + } + + public Map getParam(SendRtpItem sendRtpItem, int localPort) { + Map param = new HashMap<>(12); + param.put("vhost", "__defaultVhost__"); + param.put("app", sendRtpItem.getApp()); + param.put("stream", sendRtpItem.getStreamId()); + param.put("ssrc", sendRtpItem.getSsrc()); + if (!sendRtpItem.isTcpActive()) { + param.put("dst_url", sendRtpItem.getIp()); + param.put("dst_port", sendRtpItem.getPort()); + } + String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; + param.put("is_udp", is_Udp); + param.put("src_port", localPort); + param.put("pt", sendRtpItem.getPt()); + param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); + param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); + if (!sendRtpItem.isTcp()) { + // 开启rtcp保活 + param.put("udp_rtcp_timeout", sendRtpItem.isRtcp() ? "1" : "0"); + } + return param; + } + + public String getContent(String channelId, String sdpIp, String sessionName, Long finalStartTime, Long finalStopTime, SendRtpItem sendRtpItem) { + StringBuffer content = new StringBuffer(200); + + content.append("v=0\r\n"); + content.append("o=" + channelId + " 0 0 IN IP4 " + sdpIp + "\r\n"); + content.append("s=" + sessionName + "\r\n"); + content.append("c=IN IP4 " + sdpIp + "\r\n"); + if ("Playback".equalsIgnoreCase(sessionName)) { + content.append("t=" + finalStartTime + " " + finalStopTime + "\r\n"); + } else { + content.append("t=0 0\r\n"); + } + int localPort = sendRtpItem.getLocalPort(); + if (localPort == 0) { + // 非严格模式端口不统一, 增加兼容性,修改为一个不为0的端口 + localPort = new Random().nextInt(65535) + 1; + } + content.append("m=video " + localPort + " RTP/AVP 96\r\n"); + content.append("a=sendonly\r\n"); + content.append("a=rtpmap:96 PS/90000\r\n"); + content.append("y=" + sendRtpItem.getSsrc() + "\r\n"); + content.append("f=\r\n"); + + return content.toString(); + } + + private void responseAckType(SIPRequest request, int trying, String format) { + try { + responseAck(request, trying); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error(format, e.getMessage()); + } + } + + private void startSendRtpStreamHand(SendRtpItem sendRtpItem, ParentPlatform parentPlatform, + JSONObject jsonObject, Map param) { if (jsonObject == null) { logger.error("下级TCP被动启动监听失败: 请检查ZLM服务"); } else if (jsonObject.getInteger("code") == 0) { @@ -599,11 +629,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements if (sendRtpItem == null) { logger.warn("服务器端口资源不足"); - try { - responseAck(request, Response.BUSY_HERE); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage()); - } + responseAckType(request, Response.BUSY_HERE, "[命令发送失败] invite 服务器端口资源不足: {}"); return; } if (tcpActive != null) { @@ -638,11 +664,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements if (sendRtpItem == null) { logger.warn("服务器端口资源不足"); - try { - responseAck(request, Response.BUSY_HERE); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage()); - } + responseAckType(request, Response.BUSY_HERE, "[命令发送失败] invite 服务器端口资源不足: {}"); return; } if (tcpActive != null) { @@ -673,6 +695,8 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); } } + + /** * 通知流上线 */ @@ -914,11 +938,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements Device device = redisCatchStorage.getDevice(requesterId); if (device != null) { logger.info("收到设备" + requesterId + "的语音广播Invite请求"); - try { - responseAck(request, Response.TRYING); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] invite BAD_REQUEST: {}", e.getMessage()); - } + responseAckType(request, Response.TRYING, "[命令发送失败] invite BAD_REQUEST: {}"); String contentString = new String(request.getRawContent()); // jainSip不支持y=字段, 移除移除以解析。 String ssrc = "0000000404"; @@ -959,11 +979,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements if (port == -1) { logger.info("不支持的媒体格式,返回415"); // 回复不支持的格式 - try { - responseAck(request, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415 - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] invite 不支持的媒体格式,返回415, {}", e.getMessage()); - } + responseAckType(request, Response.UNSUPPORTED_MEDIA_TYPE, "[命令发送失败] invite 不支持的媒体格式,返回415, {}"); return; } String username = sdp.getOrigin().getUsername(); @@ -974,14 +990,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } - } else { logger.warn("来自无效设备/平台的请求"); - try { - responseAck(request, Response.BAD_REQUEST);; // 不支持的格式,发415 - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] invite 来自无效设备/平台的请求, {}", e.getMessage()); - } + responseAckType(request, Response.BAD_REQUEST, "[命令发送失败] invite 来自无效设备/平台的请求, {}"); } } }