From ad240ba9a400f486986160814420ed7bb3bbc16e Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Tue, 13 Aug 2024 17:57:25 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=9B=BD=E6=A0=87=E7=BA=A7?= =?UTF-8?q?=E8=81=94=E7=82=B9=E6=92=AD=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../gb28181/bean/CommonChannelPlayInfo.java | 20 + .../gb28181/controller/PlayController.java | 5 +- .../gb28181/service/IGbChannelService.java | 3 +- .../iot/vmp/gb28181/service/IPlayService.java | 2 +- .../service/impl/GbChannelServiceImpl.java | 34 +- .../request/impl/InviteRequestProcessor.java | 806 ++++++++++-------- .../service/impl/MediaServerServiceImpl.java | 5 +- 7 files changed, 490 insertions(+), 385 deletions(-) create mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/bean/CommonChannelPlayInfo.java diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CommonChannelPlayInfo.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CommonChannelPlayInfo.java new file mode 100644 index 00000000..974ccf4f --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CommonChannelPlayInfo.java @@ -0,0 +1,20 @@ +package com.genersoft.iot.vmp.gb28181.bean; + +import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.media.bean.MediaServer; +import lombok.Data; + +@Data +public class CommonChannelPlayInfo { + + private StreamInfo streamInfo; + + private MediaServer mediaServer; + + public static CommonChannelPlayInfo build(MediaServer mediaServer, StreamInfo data) { + CommonChannelPlayInfo commonChannelPlayInfo = new CommonChannelPlayInfo(); + commonChannelPlayInfo.setMediaServer(mediaServer); + commonChannelPlayInfo.setStreamInfo(data); + return commonChannelPlayInfo; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/PlayController.java b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/PlayController.java index e4a12334..aca17d9e 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/PlayController.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/PlayController.java @@ -119,14 +119,13 @@ public class PlayController { // 录像查询以channelId作为deviceId查询 resultHolder.put(key, uuid, result); - playService.play(newMediaServerItem, deviceId, channelId, null, (code, msg, data) -> { + playService.play(newMediaServerItem, deviceId, channelId, null, (code, msg, streamInfo) -> { WVPResult wvpResult = new WVPResult<>(); if (code == InviteErrorCode.SUCCESS.getCode()) { wvpResult.setCode(ErrorCode.SUCCESS.getCode()); wvpResult.setMsg(ErrorCode.SUCCESS.getMsg()); - if (data != null) { - StreamInfo streamInfo = (StreamInfo)data; + if (streamInfo != null) { if (userSetting.getUseSourceIpAsStreamIp()) { streamInfo=streamInfo.clone();//深拷贝 String host; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelService.java index 1f821f93..68ca2b47 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelService.java @@ -1,6 +1,5 @@ package com.genersoft.iot.vmp.gb28181.service; -import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.github.pagehelper.PageInfo; @@ -80,5 +79,5 @@ public interface IGbChannelService { CommonGBChannel queryOneWithPlatform(Integer platformId, String channelDeviceId); - void start(CommonGBChannel channel, ErrorCallback callback); + void start(CommonGBChannel channel, ErrorCallback callback); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlayService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlayService.java index b7be536e..17287f4e 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlayService.java @@ -26,7 +26,7 @@ public interface IPlayService { void play(MediaServer mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channelId, ErrorCallback callback); - SSRCInfo play(MediaServer mediaServerItem, String deviceId, String channelId, String ssrc, ErrorCallback callback); + SSRCInfo play(MediaServer mediaServerItem, String deviceId, String channelId, String ssrc, ErrorCallback callback); StreamInfo onPublishHandlerForPlay(MediaServer mediaServerItem, MediaInfo mediaInfo, String deviceId, String channelId); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java index 69c0951c..0818d9ac 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java @@ -1,6 +1,6 @@ package com.genersoft.iot.vmp.gb28181.service.impl; -import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.dao.CommonGBChannelMapper; @@ -10,6 +10,8 @@ import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.gb28181.service.IDeviceService; import com.genersoft.iot.vmp.gb28181.service.IGbChannelService; +import com.genersoft.iot.vmp.gb28181.service.IPlayService; +import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; @@ -41,12 +43,18 @@ public class GbChannelServiceImpl implements IGbChannelService { @Autowired private IDeviceService deviceService; + @Autowired + private IPlayService playService; + @Autowired private RegionMapper regionMapper; @Autowired private GroupMapper groupMapper; + @Autowired + private UserSetting userSetting; + @Override public CommonGBChannel queryByDeviceId(String gbDeviceId) { return commonGBChannelMapper.queryByDeviceId(gbDeviceId); @@ -646,16 +654,34 @@ public class GbChannelServiceImpl implements IGbChannelService { } @Override - public void start(CommonGBChannel channel, ErrorCallback callback) { + public void start(CommonGBChannel channel, ErrorCallback callback) { + log.info("[点播通用通道] 通道: {}({})", channel.getGbName(), channel.getGbDeviceId()); if (channel.getGbDeviceDbId() > 0) { // 国标通道 Device device = deviceService.getDevice(channel.getGbDeviceDbId()); if (device == null) { log.warn("[点播] 未找到通道{}的设备信息", channel); - throw new PlayException(Response.SERVER_INTERNAL_ERROR, "serverInternalError"); + throw new PlayException(Response.SERVER_INTERNAL_ERROR, "server internal error"); + } + MediaServer mediaServer = playService.getNewMediaServerItem(device); + if (mediaServer == null) { + log.warn("[点播] 未找到可用媒体节点"); + throw new PlayException(Response.SERVER_INTERNAL_ERROR, "server internal error"); } - + playService.play(mediaServer, device.getDeviceId(), channel.getGbDeviceId(), null, (code, msg, data) -> { + if (callback != null) { + callback.run(code, msg, CommonChannelPlayInfo.build(mediaServer, data)); + } + }); + }else if (channel.getStreamProxyId() > 0){ + // 拉流代理 + }else if (channel.getStreamPushId() > 0) { + // 推流 + }else { + // 通道数据异常 + log.error("[点播通用通道] 通道数据异常,无法识别通道来源: {}({})", channel.getGbName(), channel.getGbDeviceId()); + throw new PlayException(Response.SERVER_INTERNAL_ERROR, "server internal error"); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 6accd18e..146380af 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -3,7 +3,6 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.InviteSessionType; -import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.SipConfig; @@ -29,19 +28,13 @@ import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.event.hook.HookType; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; -import com.genersoft.iot.vmp.service.bean.ErrorCallback; -import com.genersoft.iot.vmp.service.bean.InviteErrorCode; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; -import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; -import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy; import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyService; -import com.genersoft.iot.vmp.streamPush.bean.StreamPush; import com.genersoft.iot.vmp.streamPush.service.IStreamPushService; -import com.genersoft.iot.vmp.utils.DateUtil; import gov.nist.javax.sdp.TimeDescriptionImpl; import gov.nist.javax.sdp.fields.TimeField; import gov.nist.javax.sdp.fields.URIField; @@ -63,7 +56,6 @@ import javax.sip.header.CallIdHeader; import javax.sip.message.Response; import java.text.ParseException; import java.time.Instant; -import java.util.List; import java.util.Map; import java.util.Random; import java.util.Vector; @@ -168,6 +160,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements if (platform == null) { inviteFromDeviceHandle(request, inviteInfo.getRequesterId(), inviteInfo.getChannelId()); } else { + // 查询平台下是否有该通道 CommonGBChannel channel= channelService.queryOneWithPlatform(platform.getId(), inviteInfo.getChannelId()); if (channel == null) { @@ -180,402 +173,433 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } return; } + log.info("[上级Invite] 平台:{}, 通道:{}({}), 收流地址:{}:{},收流方式:{}, 点播类型:{}, ssrc:{}", + platform.getName(), channel.getGbName(), channel.getGbDeviceDbId(), inviteInfo.getIp(), + inviteInfo.getPort(), inviteInfo.isTcp()?(inviteInfo.isTcpActive()?"TCP主动":"TCP被动"): "UDP", + inviteInfo.getSessionName(), inviteInfo.getSsrc()); // 通道存在,发100,TRYING try { responseAck(request, Response.TRYING); } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] invite TRYING: {}", e.getMessage()); + log.error("[命令发送失败] 上级Invite TRYING: {}", e.getMessage()); } - channelService.start(channel, ((code, msg, data) -> { + channelService.start(channel, ((code, msg, commonChannelPlayInfo) -> { if (code != Response.OK) { try { responseAck(request, code, msg); } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] 点播失败: {}", e.getMessage()); + log.error("[命令发送失败] 上级Invite 点播失败: {}", e.getMessage()); } }else { + // 点播成功, TODO 可以在此处检测cancel命令是否存在,存在则不发送 + // 构建sendRTP内容 + SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(commonChannelPlayInfo.getMediaServer(), + inviteInfo.getIp(), inviteInfo.getPort(), inviteInfo.getSsrc(), platform.getServerGBId(), + commonChannelPlayInfo.getStreamInfo().getApp(), commonChannelPlayInfo.getStreamInfo().getStream(), + channel.getGbDeviceId(), inviteInfo.isTcp(), platform.isRtcp()); + if (inviteInfo.isTcp() && inviteInfo.isTcpActive()) { + sendRtpItem.setTcpActive(true); + } + sendRtpItem.setStatus(1); + sendRtpItem.setCallId(inviteInfo.getCallId()); + sendRtpItem.setPlayType("Play".equalsIgnoreCase(inviteInfo.getSessionName()) ? InviteStreamType.PLAY : InviteStreamType.PLAYBACK); + + redisCatchStorage.updateSendRTPSever(sendRtpItem); + String sdpIp = commonChannelPlayInfo.getMediaServer().getSdpIp(); + if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) { + sdpIp = platform.getSendStreamIp(); + } + String content = createSendSdp(sendRtpItem, inviteInfo, sdpIp); + // 超时未收到Ack应该回复bye,当前等待时间为10秒 + dynamicTask.startDelay(inviteInfo.getCallId(), () -> { + log.info("Ack 等待超时"); + mediaServerService.releaseSsrc(commonChannelPlayInfo.getMediaServer().getId(), sendRtpItem.getSsrc()); + // 回复bye + sendBye(platform, inviteInfo.getCallId()); + }, 60 * 1000); + try { + responseSdpAck(request, content, platform); + } catch (SipException | InvalidArgumentException | ParseException e) { + log.error("[命令发送失败] 上级Invite 发送 200(SDP): {}", e.getMessage()); + } + + // tcp主动模式,回复sdp后开启监听 + if (sendRtpItem.isTcpActive()) { + MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + try { + mediaServerService.startSendRtpPassive(mediaServer, sendRtpItem, 5); + redisCatchStorage.sendPlatformStartPlayMsg(sendRtpItem, platform); + }catch (ControllerException e) { + log.warn("[上级Invite] tcp主动模式 发流失败", e); + sendBye(platform, inviteInfo.getCallId()); + } + } } })); - - if (channel.getGbDeviceDbId() > 0) { - Device device = deviceService.getDevice(channel.getGbDeviceDbId()); - if (device == null) { - log.warn("点播平台{}的通道{}时未找到设备信息", requesterId, channel); - try { - responseAck(request, Response.SERVER_INTERNAL_ERROR); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] invite 未找到设备信息: {}", e.getMessage()); - } - return; - } - } - } } catch (SdpException e) { // 参数不全, 发400,请求错误 try { responseAck(request, Response.BAD_REQUEST); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] invite BAD_REQUEST: {}", e.getMessage()); + } catch (SipException | InvalidArgumentException | ParseException sendException) { + log.error("[命令发送失败] invite BAD_REQUEST: {}", sendException.getMessage()); } - return; } catch (InviteDecodeException e) { try { responseAck(request, e.getCode(), e.getMsg()); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] invite BAD_REQUEST: {}", e.getMessage()); + } catch (SipException | InvalidArgumentException | ParseException sendException) { + log.error("[命令发送失败] invite BAD_REQUEST: {}", sendException.getMessage()); } }catch (PlayException e) { try { responseAck(request, e.getCode(), e.getMsg()); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] invite 点播失败: {}", e.getMessage()); + } catch (SipException | InvalidArgumentException | ParseException sendException) { + log.error("[命令发送失败] invite 点播失败: {}", sendException.getMessage()); } } - // Invite Request消息实现,此消息一般为级联消息,上级给下级发送请求视频指令 - try { - - - - - - - - // 查询请求是否来自上级平台\设备 - Platform platform = storager.queryParentPlatByServerGBId(requesterId); - - if (platform == null) { - inviteFromDeviceHandle(request, requesterId, channelId); - - } else { - // 查询平台下是否有该通道 - CommonGBChannel channel= channelService.queryOneWithPlatform(platform.getId(), channelId); - MediaServer mediaServerItem = null; - StreamPush streamPushItem = null; - StreamProxy proxyByAppAndStream = null; - if (channel == null) { - log.info("[上级INVITE] 通道不存在,返回404: {}", channelId); - try { - // 通道不存在,发404,资源不存在 - responseAck(request, Response.NOT_FOUND); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] invite 通道不存在: {}", e.getMessage()); - } - return; - } - // 通道存在,发100,TRYING - try { - responseAck(request, Response.TRYING); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] invite TRYING: {}", e.getMessage()); - } - - - - Device device = null; - // 通过 channel 和 gbStream 是否为null 值判断来源是直播流合适国标 - if (channel != null) { - device = storager.queryVideoDeviceByPlatformIdAndChannelId(requesterId, channelId); - if (device == null) { - log.warn("点播平台{}的通道{}时未找到设备信息", requesterId, channel); - try { - responseAck(request, Response.SERVER_INTERNAL_ERROR); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] invite 未找到设备信息: {}", e.getMessage()); - } - return; - } - mediaServerItem = playService.getNewMediaServerItem(device); - if (mediaServerItem == null) { - log.warn("未找到可用的zlm"); - try { - responseAck(request, Response.BUSY_HERE); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] invite BUSY_HERE: {}", e.getMessage()); - } - return; - } - - String ssrc; - if (userSetting.getUseCustomSsrcForParentInvite() || gb28181Sdp.getSsrc() == null) { - // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 - ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); - }else { - ssrc = gb28181Sdp.getSsrc(); - } - String streamTypeStr = null; - if (mediaTransmissionTCP) { - if (tcpActive) { - streamTypeStr = "TCP-ACTIVE"; - } else { - streamTypeStr = "TCP-PASSIVE"; - } - } else { - streamTypeStr = "UDP"; - } - log.info("[上级Invite] {}, 平台:{}, 通道:{}, 收流地址:{}:{},收流方式:{}, ssrc:{}", - sessionName, username, channelId, addressStr, port, streamTypeStr, ssrc); - SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, - device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp()); - - if (tcpActive != null) { - sendRtpItem.setTcpActive(tcpActive); - } - if (sendRtpItem == null) { - log.warn("服务器端口资源不足"); - try { - responseAck(request, Response.BUSY_HERE); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage()); - } - return; - } - sendRtpItem.setCallId(callIdHeader.getCallId()); - sendRtpItem.setPlayType("Play".equalsIgnoreCase(sessionName) ? InviteStreamType.PLAY : InviteStreamType.PLAYBACK); - - Long finalStartTime = startTime; - Long finalStopTime = stopTime; - ErrorCallback hookEvent = (code, msg, data) -> { - StreamInfo streamInfo = (StreamInfo)data; - MediaServer mediaServerItemInUSe = mediaServerService.getOne(streamInfo.getMediaServerId()); - log.info("[上级Invite]下级已经开始推流。 回复200OK(SDP), {}/{}", streamInfo.getApp(), streamInfo.getStream()); - // * 0 等待设备推流上来 - // * 1 下级已经推流,等待上级平台回复ack - // * 2 推流中 - sendRtpItem.setStatus(1); - redisCatchStorage.updateSendRTPSever(sendRtpItem); - String sdpIp = mediaServerItemInUSe.getSdpIp(); - if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) { - sdpIp = platform.getSendStreamIp(); - } - StringBuffer content = new StringBuffer(200); - content.append("v=0\r\n"); - content.append("o=" + channelId + " 0 0 IN IP4 " + sdpIp + "\r\n"); - content.append("s=" + sessionName + "\r\n"); - content.append("c=IN IP4 " + sdpIp + "\r\n"); - if ("Playback".equalsIgnoreCase(sessionName)) { - content.append("t=" + finalStartTime + " " + finalStopTime + "\r\n"); - } else { - content.append("t=0 0\r\n"); - } - int localPort = sendRtpItem.getLocalPort(); - if (localPort == 0) { - // 非严格模式端口不统一, 增加兼容性,修改为一个不为0的端口 - localPort = new Random().nextInt(65535) + 1; - } - if (sendRtpItem.isTcp()) { - content.append("m=video " + localPort + " TCP/RTP/AVP 96\r\n"); - if (!sendRtpItem.isTcpActive()) { - content.append("a=setup:active\r\n"); - } else { - content.append("a=setup:passive\r\n"); - } - }else { - content.append("m=video " + localPort + " RTP/AVP 96\r\n"); - } - content.append("a=sendonly\r\n"); - content.append("a=rtpmap:96 PS/90000\r\n"); - content.append("y=" + sendRtpItem.getSsrc() + "\r\n"); - content.append("f=\r\n"); - - - try { - // 超时未收到Ack应该回复bye,当前等待时间为10秒 - dynamicTask.startDelay(callIdHeader.getCallId(), () -> { - log.info("Ack 等待超时"); - mediaServerService.releaseSsrc(mediaServerItemInUSe.getId(), sendRtpItem.getSsrc()); - // 回复bye - try { - cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId()); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); - } - }, 60 * 1000); - responseSdpAck(request, content.toString(), platform); - // tcp主动模式,回复sdp后开启监听 - if (sendRtpItem.isTcpActive()) { - MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - try { - mediaServerService.startSendRtpPassive(mediaServer, sendRtpItem, 5); - redisCatchStorage.sendPlatformStartPlayMsg(sendRtpItem, platform); - }catch (ControllerException e) {} - } - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] 国标级联 回复SdpAck", e); - } - }; - ErrorCallback errorEvent = ((statusCode, msg, data) -> { - log.info("[上级Invite] {}, 失败, 平台:{}, 通道:{}, code: {}, msg;{}", sessionName, username, channelId, statusCode, msg); - // 未知错误。直接转发设备点播的错误 - try { - Response response = getMessageFactory().createResponse(statusCode, evt.getRequest()); - sipSender.transmitRequest(request.getLocalAddress().getHostAddress(), response); - } catch (ParseException | SipException e) { - log.error("未处理的异常 ", e); - } - }); - sendRtpItem.setApp("rtp"); - if ("Playback".equalsIgnoreCase(sessionName)) { - sendRtpItem.setPlayType(InviteStreamType.PLAYBACK); - String startTimeStr = DateUtil.urlFormatter.format(start); - String endTimeStr = DateUtil.urlFormatter.format(end); - String stream = device.getDeviceId() + "_" + channelId + "_" + startTimeStr + "_" + endTimeStr; - int tcpMode = device.getStreamMode().equals("TCP-ACTIVE")? 2: (device.getStreamMode().equals("TCP-PASSIVE")? 1:0); - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, stream, null, - device.isSsrcCheck(), true, 0,false,!channel.isHasAudio(), false, tcpMode); - sendRtpItem.setStream(stream); - // 写入redis, 超时时回复 - redisCatchStorage.updateSendRTPSever(sendRtpItem); - playService.playBack(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start), - DateUtil.formatter.format(end), - (code, msg, data) -> { - if (code == InviteErrorCode.SUCCESS.getCode()) { - hookEvent.run(code, msg, data); - } else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()) { - log.info("[录像回放]超时, 用户:{}, 通道:{}", username, channelId); - redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); - errorEvent.run(code, msg, data); - } else { - errorEvent.run(code, msg, data); - } - }); - } else if ("Download".equalsIgnoreCase(sessionName)) { - // 获取指定的下载速度 - Vector sdpMediaDescriptions = sdp.getMediaDescriptions(true); - MediaDescription mediaDescription = null; - String downloadSpeed = "1"; - if (sdpMediaDescriptions.size() > 0) { - mediaDescription = (MediaDescription) sdpMediaDescriptions.get(0); - } - if (mediaDescription != null) { - downloadSpeed = mediaDescription.getAttribute("downloadspeed"); - } - - sendRtpItem.setPlayType(InviteStreamType.DOWNLOAD); - int tcpMode = device.getStreamMode().equals("TCP-ACTIVE")? 2: (device.getStreamMode().equals("TCP-PASSIVE")? 1:0); - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, null, - device.isSsrcCheck(), true, 0, false,!channel.isHasAudio(), false, tcpMode); - sendRtpItem.setStream(ssrcInfo.getStream()); - // 写入redis, 超时时回复 - redisCatchStorage.updateSendRTPSever(sendRtpItem); - playService.download(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start), - DateUtil.formatter.format(end), Integer.parseInt(downloadSpeed), - (code, msg, data) -> { - if (code == InviteErrorCode.SUCCESS.getCode()) { - hookEvent.run(code, msg, data); - } else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()) { - log.info("[录像下载]超时, 用户:{}, 通道:{}", username, channelId); - redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); - errorEvent.run(code, msg, data); - } else { - errorEvent.run(code, msg, data); - } - }); - } else { - sendRtpItem.setPlayType(InviteStreamType.PLAY); - String streamId = String.format("%s_%s", device.getDeviceId(), channelId); - sendRtpItem.setStream(streamId); - redisCatchStorage.updateSendRTPSever(sendRtpItem); - SSRCInfo ssrcInfo = playService.play(mediaServerItem, device.getDeviceId(), channelId, ssrc, ((code, msg, data) -> { - if (code == InviteErrorCode.SUCCESS.getCode()) { - hookEvent.run(code, msg, data); - } else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()) { - log.info("[上级点播]超时, 用户:{}, 通道:{}", username, channelId); - redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); - errorEvent.run(code, msg, data); - } else { - errorEvent.run(code, msg, data); - } - })); - sendRtpItem.setSsrc(ssrcInfo.getSsrc()); - redisCatchStorage.updateSendRTPSever(sendRtpItem); - - } - } else if (gbStream != null) { - SendRtpItem sendRtpItem = new SendRtpItem(); - if (!userSetting.getUseCustomSsrcForParentInvite() && gb28181Sdp.getSsrc() != null) { - sendRtpItem.setSsrc(gb28181Sdp.getSsrc()); - } - - if (tcpActive != null) { - sendRtpItem.setTcpActive(tcpActive); - } - sendRtpItem.setTcp(mediaTransmissionTCP); - sendRtpItem.setRtcp(platform.isRtcp()); - sendRtpItem.setPlatformName(platform.getName()); - sendRtpItem.setPlatformId(platform.getServerGBId()); - sendRtpItem.setMediaServerId(mediaServerItem.getId()); - sendRtpItem.setChannelId(channelId); - sendRtpItem.setIp(addressStr); - sendRtpItem.setPort(port); - sendRtpItem.setUsePs(true); - sendRtpItem.setApp(gbStream.getApp()); - sendRtpItem.setStream(gbStream.getStream()); - sendRtpItem.setCallId(callIdHeader.getCallId()); - sendRtpItem.setFromTag(request.getFromTag()); - sendRtpItem.setOnlyAudio(false); - sendRtpItem.setStatus(0); - sendRtpItem.setSessionName(sessionName); - // 清理可能存在的缓存避免用到旧的数据 - List sendRtpItemList = redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, gbStream.getStream()); - if (!sendRtpItemList.isEmpty()) { - for (SendRtpItem rtpItem : sendRtpItemList) { - redisCatchStorage.deleteSendRTPServer(rtpItem); - } - } - if ("push".equals(gbStream.getStreamType())) { - sendRtpItem.setPlayType(InviteStreamType.PUSH); - if (streamPushItem != null) { - // 从redis查询是否正在接收这个推流 - MediaInfo mediaInfo = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream()); - if (mediaInfo != null) { - sendRtpItem.setServerId(mediaInfo.getServerId()); - sendRtpItem.setMediaServerId(mediaInfo.getMediaServer().getId()); - - redisCatchStorage.updateSendRTPSever(sendRtpItem); - // 开始推流 - sendPushStream(sendRtpItem, mediaServerItem, platform, request); - }else { - if (!platform.isStartOfflinePush()) { - // 平台设置中关闭了拉起离线的推流则直接回复 - try { - log.info("[上级点播] 失败,推流设备未推流,channel: {}, app: {}, stream: {}", sendRtpItem.getChannelId(), sendRtpItem.getApp(), sendRtpItem.getStream()); - responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing"); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] invite 通道未推流: {}", e.getMessage()); - } - return; - } - notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request); - } - } - } else if ("proxy".equals(gbStream.getStreamType())) { - if (null != proxyByAppAndStream) { - sendRtpItem.setServerId(userSetting.getServerId()); - if (sendRtpItem.getSsrc() == null) { - // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 - String ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); - sendRtpItem.setSsrc(ssrc); - } - MediaInfo mediaInfo = redisCatchStorage.getProxyStream(gbStream.getApp(), gbStream.getStream()); - if (mediaInfo != null) { - sendProxyStream(sendRtpItem, mediaServerItem, platform, request); - } else { - //开启代理拉流 - notifyProxyStreamOnline(sendRtpItem, mediaServerItem, platform, request); - } - } - } - } - } - } catch (SdpParseException e) { - log.error("sdp解析错误", e); - } catch (SdpException e) { - log.error("未处理的异常 ", e); - } +// // Invite Request消息实现,此消息一般为级联消息,上级给下级发送请求视频指令 +// try { +// +// +// +// +// +// +// +// // 查询请求是否来自上级平台\设备 +// Platform platform = storager.queryParentPlatByServerGBId(requesterId); +// +// if (platform == null) { +// inviteFromDeviceHandle(request, requesterId, channelId); +// +// } else { +// // 查询平台下是否有该通道 +// CommonGBChannel channel= channelService.queryOneWithPlatform(platform.getId(), channelId); +// MediaServer mediaServerItem = null; +// StreamPush streamPushItem = null; +// StreamProxy proxyByAppAndStream = null; +// if (channel == null) { +// log.info("[上级INVITE] 通道不存在,返回404: {}", channelId); +// try { +// // 通道不存在,发404,资源不存在 +// responseAck(request, Response.NOT_FOUND); +// } catch (SipException | InvalidArgumentException | ParseException e) { +// log.error("[命令发送失败] invite 通道不存在: {}", e.getMessage()); +// } +// return; +// } +// // 通道存在,发100,TRYING +// try { +// responseAck(request, Response.TRYING); +// } catch (SipException | InvalidArgumentException | ParseException e) { +// log.error("[命令发送失败] invite TRYING: {}", e.getMessage()); +// } +// +// +// +// Device device = null; +// // 通过 channel 和 gbStream 是否为null 值判断来源是直播流合适国标 +// if (channel != null) { +// device = storager.queryVideoDeviceByPlatformIdAndChannelId(requesterId, channelId); +// if (device == null) { +// log.warn("点播平台{}的通道{}时未找到设备信息", requesterId, channel); +// try { +// responseAck(request, Response.SERVER_INTERNAL_ERROR); +// } catch (SipException | InvalidArgumentException | ParseException e) { +// log.error("[命令发送失败] invite 未找到设备信息: {}", e.getMessage()); +// } +// return; +// } +// mediaServerItem = playService.getNewMediaServerItem(device); +// if (mediaServerItem == null) { +// log.warn("未找到可用的zlm"); +// try { +// responseAck(request, Response.BUSY_HERE); +// } catch (SipException | InvalidArgumentException | ParseException e) { +// log.error("[命令发送失败] invite BUSY_HERE: {}", e.getMessage()); +// } +// return; +// } +// +// String ssrc; +// if (userSetting.getUseCustomSsrcForParentInvite() || gb28181Sdp.getSsrc() == null) { +// // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 +// ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); +// }else { +// ssrc = gb28181Sdp.getSsrc(); +// } +// String streamTypeStr = null; +// if (mediaTransmissionTCP) { +// if (tcpActive) { +// streamTypeStr = "TCP-ACTIVE"; +// } else { +// streamTypeStr = "TCP-PASSIVE"; +// } +// } else { +// streamTypeStr = "UDP"; +// } +// +// SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, +// device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp()); +// +// if (tcpActive != null) { +// sendRtpItem.setTcpActive(tcpActive); +// } +// if (sendRtpItem == null) { +// log.warn("服务器端口资源不足"); +// try { +// responseAck(request, Response.BUSY_HERE); +// } catch (SipException | InvalidArgumentException | ParseException e) { +// log.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage()); +// } +// return; +// } +// sendRtpItem.setCallId(callIdHeader.getCallId()); +// sendRtpItem.setPlayType("Play".equalsIgnoreCase(sessionName) ? InviteStreamType.PLAY : InviteStreamType.PLAYBACK); +// +// Long finalStartTime = startTime; +// Long finalStopTime = stopTime; +// ErrorCallback hookEvent = (code, msg, data) -> { +// StreamInfo streamInfo = (StreamInfo)data; +// MediaServer mediaServerItemInUSe = mediaServerService.getOne(streamInfo.getMediaServerId()); +// log.info("[上级Invite]下级已经开始推流。 回复200OK(SDP), {}/{}", streamInfo.getApp(), streamInfo.getStream()); +// // * 0 等待设备推流上来 +// // * 1 下级已经推流,等待上级平台回复ack +// // * 2 推流中 +// sendRtpItem.setStatus(1); +// redisCatchStorage.updateSendRTPSever(sendRtpItem); +// String sdpIp = mediaServerItemInUSe.getSdpIp(); +// if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) { +// sdpIp = platform.getSendStreamIp(); +// } +// StringBuffer content = new StringBuffer(200); +// content.append("v=0\r\n"); +// content.append("o=" + channelId + " 0 0 IN IP4 " + sdpIp + "\r\n"); +// content.append("s=" + sessionName + "\r\n"); +// content.append("c=IN IP4 " + sdpIp + "\r\n"); +// if ("Playback".equalsIgnoreCase(sessionName)) { +// content.append("t=" + finalStartTime + " " + finalStopTime + "\r\n"); +// } else { +// content.append("t=0 0\r\n"); +// } +// int localPort = sendRtpItem.getLocalPort(); +// if (localPort == 0) { +// // 非严格模式端口不统一, 增加兼容性,修改为一个不为0的端口 +// localPort = new Random().nextInt(65535) + 1; +// } +// if (sendRtpItem.isTcp()) { +// content.append("m=video " + localPort + " TCP/RTP/AVP 96\r\n"); +// if (!sendRtpItem.isTcpActive()) { +// content.append("a=setup:active\r\n"); +// } else { +// content.append("a=setup:passive\r\n"); +// } +// }else { +// content.append("m=video " + localPort + " RTP/AVP 96\r\n"); +// } +// content.append("a=sendonly\r\n"); +// content.append("a=rtpmap:96 PS/90000\r\n"); +// content.append("y=" + sendRtpItem.getSsrc() + "\r\n"); +// content.append("f=\r\n"); +// +// +// try { +// // 超时未收到Ack应该回复bye,当前等待时间为10秒 +// dynamicTask.startDelay(callIdHeader.getCallId(), () -> { +// log.info("Ack 等待超时"); +// mediaServerService.releaseSsrc(mediaServerItemInUSe.getId(), sendRtpItem.getSsrc()); +// // 回复bye +// try { +// cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId()); +// } catch (SipException | InvalidArgumentException | ParseException e) { +// log.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); +// } +// }, 60 * 1000); +// responseSdpAck(request, content.toString(), platform); +// // tcp主动模式,回复sdp后开启监听 +// if (sendRtpItem.isTcpActive()) { +// MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId()); +// try { +// mediaServerService.startSendRtpPassive(mediaServer, sendRtpItem, 5); +// redisCatchStorage.sendPlatformStartPlayMsg(sendRtpItem, platform); +// }catch (ControllerException e) {} +// } +// } catch (SipException | InvalidArgumentException | ParseException e) { +// log.error("[命令发送失败] 国标级联 回复SdpAck", e); +// } +// }; +// ErrorCallback errorEvent = ((statusCode, msg, data) -> { +// log.info("[上级Invite] {}, 失败, 平台:{}, 通道:{}, code: {}, msg;{}", sessionName, username, channelId, statusCode, msg); +// // 未知错误。直接转发设备点播的错误 +// try { +// Response response = getMessageFactory().createResponse(statusCode, evt.getRequest()); +// sipSender.transmitRequest(request.getLocalAddress().getHostAddress(), response); +// } catch (ParseException | SipException e) { +// log.error("未处理的异常 ", e); +// } +// }); +// sendRtpItem.setApp("rtp"); +// if ("Playback".equalsIgnoreCase(sessionName)) { +// sendRtpItem.setPlayType(InviteStreamType.PLAYBACK); +// String startTimeStr = DateUtil.urlFormatter.format(start); +// String endTimeStr = DateUtil.urlFormatter.format(end); +// String stream = device.getDeviceId() + "_" + channelId + "_" + startTimeStr + "_" + endTimeStr; +// int tcpMode = device.getStreamMode().equals("TCP-ACTIVE")? 2: (device.getStreamMode().equals("TCP-PASSIVE")? 1:0); +// SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, stream, null, +// device.isSsrcCheck(), true, 0,false,!channel.isHasAudio(), false, tcpMode); +// sendRtpItem.setStream(stream); +// // 写入redis, 超时时回复 +// redisCatchStorage.updateSendRTPSever(sendRtpItem); +// playService.playBack(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start), +// DateUtil.formatter.format(end), +// (code, msg, data) -> { +// if (code == InviteErrorCode.SUCCESS.getCode()) { +// hookEvent.run(code, msg, data); +// } else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()) { +// log.info("[录像回放]超时, 用户:{}, 通道:{}", username, channelId); +// redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); +// errorEvent.run(code, msg, data); +// } else { +// errorEvent.run(code, msg, data); +// } +// }); +// } else if ("Download".equalsIgnoreCase(sessionName)) { +// // 获取指定的下载速度 +// Vector sdpMediaDescriptions = sdp.getMediaDescriptions(true); +// MediaDescription mediaDescription = null; +// String downloadSpeed = "1"; +// if (sdpMediaDescriptions.size() > 0) { +// mediaDescription = (MediaDescription) sdpMediaDescriptions.get(0); +// } +// if (mediaDescription != null) { +// downloadSpeed = mediaDescription.getAttribute("downloadspeed"); +// } +// +// sendRtpItem.setPlayType(InviteStreamType.DOWNLOAD); +// int tcpMode = device.getStreamMode().equals("TCP-ACTIVE")? 2: (device.getStreamMode().equals("TCP-PASSIVE")? 1:0); +// SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, null, +// device.isSsrcCheck(), true, 0, false,!channel.isHasAudio(), false, tcpMode); +// sendRtpItem.setStream(ssrcInfo.getStream()); +// // 写入redis, 超时时回复 +// redisCatchStorage.updateSendRTPSever(sendRtpItem); +// playService.download(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start), +// DateUtil.formatter.format(end), Integer.parseInt(downloadSpeed), +// (code, msg, data) -> { +// if (code == InviteErrorCode.SUCCESS.getCode()) { +// hookEvent.run(code, msg, data); +// } else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()) { +// log.info("[录像下载]超时, 用户:{}, 通道:{}", username, channelId); +// redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); +// errorEvent.run(code, msg, data); +// } else { +// errorEvent.run(code, msg, data); +// } +// }); +// } else { +// sendRtpItem.setPlayType(InviteStreamType.PLAY); +// String streamId = String.format("%s_%s", device.getDeviceId(), channelId); +// sendRtpItem.setStream(streamId); +// redisCatchStorage.updateSendRTPSever(sendRtpItem); +// SSRCInfo ssrcInfo = playService.play(mediaServerItem, device.getDeviceId(), channelId, ssrc, ((code, msg, data) -> { +// if (code == InviteErrorCode.SUCCESS.getCode()) { +// hookEvent.run(code, msg, data); +// } else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()) { +// log.info("[上级点播]超时, 用户:{}, 通道:{}", username, channelId); +// redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); +// errorEvent.run(code, msg, data); +// } else { +// errorEvent.run(code, msg, data); +// } +// })); +// sendRtpItem.setSsrc(ssrcInfo.getSsrc()); +// redisCatchStorage.updateSendRTPSever(sendRtpItem); +// +// } +// } else if (gbStream != null) { +// SendRtpItem sendRtpItem = new SendRtpItem(); +// if (!userSetting.getUseCustomSsrcForParentInvite() && gb28181Sdp.getSsrc() != null) { +// sendRtpItem.setSsrc(gb28181Sdp.getSsrc()); +// } +// +// if (tcpActive != null) { +// sendRtpItem.setTcpActive(tcpActive); +// } +// sendRtpItem.setTcp(mediaTransmissionTCP); +// sendRtpItem.setRtcp(platform.isRtcp()); +// sendRtpItem.setPlatformName(platform.getName()); +// sendRtpItem.setPlatformId(platform.getServerGBId()); +// sendRtpItem.setMediaServerId(mediaServerItem.getId()); +// sendRtpItem.setChannelId(channelId); +// sendRtpItem.setIp(addressStr); +// sendRtpItem.setPort(port); +// sendRtpItem.setUsePs(true); +// sendRtpItem.setApp(gbStream.getApp()); +// sendRtpItem.setStream(gbStream.getStream()); +// sendRtpItem.setCallId(callIdHeader.getCallId()); +// sendRtpItem.setFromTag(request.getFromTag()); +// sendRtpItem.setOnlyAudio(false); +// sendRtpItem.setStatus(0); +// sendRtpItem.setSessionName(sessionName); +// // 清理可能存在的缓存避免用到旧的数据 +// List sendRtpItemList = redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, gbStream.getStream()); +// if (!sendRtpItemList.isEmpty()) { +// for (SendRtpItem rtpItem : sendRtpItemList) { +// redisCatchStorage.deleteSendRTPServer(rtpItem); +// } +// } +// if ("push".equals(gbStream.getStreamType())) { +// sendRtpItem.setPlayType(InviteStreamType.PUSH); +// if (streamPushItem != null) { +// // 从redis查询是否正在接收这个推流 +// MediaInfo mediaInfo = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream()); +// if (mediaInfo != null) { +// sendRtpItem.setServerId(mediaInfo.getServerId()); +// sendRtpItem.setMediaServerId(mediaInfo.getMediaServer().getId()); +// +// redisCatchStorage.updateSendRTPSever(sendRtpItem); +// // 开始推流 +// sendPushStream(sendRtpItem, mediaServerItem, platform, request); +// }else { +// if (!platform.isStartOfflinePush()) { +// // 平台设置中关闭了拉起离线的推流则直接回复 +// try { +// log.info("[上级点播] 失败,推流设备未推流,channel: {}, app: {}, stream: {}", sendRtpItem.getChannelId(), sendRtpItem.getApp(), sendRtpItem.getStream()); +// responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing"); +// } catch (SipException | InvalidArgumentException | ParseException e) { +// log.error("[命令发送失败] invite 通道未推流: {}", e.getMessage()); +// } +// return; +// } +// notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request); +// } +// } +// } else if ("proxy".equals(gbStream.getStreamType())) { +// if (null != proxyByAppAndStream) { +// sendRtpItem.setServerId(userSetting.getServerId()); +// if (sendRtpItem.getSsrc() == null) { +// // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 +// String ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); +// sendRtpItem.setSsrc(ssrc); +// } +// MediaInfo mediaInfo = redisCatchStorage.getProxyStream(gbStream.getApp(), gbStream.getStream()); +// if (mediaInfo != null) { +// sendProxyStream(sendRtpItem, mediaServerItem, platform, request); +// } else { +// //开启代理拉流 +// notifyProxyStreamOnline(sendRtpItem, mediaServerItem, platform, request); +// } +// } +// } +// } +// } +// } catch (SdpParseException e) { +// log.error("sdp解析错误", e); +// } catch (SdpException e) { +// log.error("未处理的异常 ", e); +// } } private InviteInfo decode(RequestEvent evt) throws SdpException { @@ -688,6 +712,42 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } + private String createSendSdp(SendRtpItem sendRtpItem, InviteInfo inviteInfo, String sdpIp) { + StringBuilder content = new StringBuilder(200); + content.append("v=0\r\n"); + content.append("o=" + inviteInfo.getChannelId() + " 0 0 IN IP4 " + sdpIp + "\r\n"); + content.append("s=" + inviteInfo.getSessionName() + "\r\n"); + content.append("c=IN IP4 " + sdpIp + "\r\n"); + if ("Playback".equalsIgnoreCase(inviteInfo.getSessionName())) { + content.append("t=" + inviteInfo.getStartTime() + " " + inviteInfo.getStopTime() + "\r\n"); + } else { + content.append("t=0 0\r\n"); + } + if (sendRtpItem.isTcp()) { + content.append("m=video " + sendRtpItem.getLocalPort() + " TCP/RTP/AVP 96\r\n"); + if (!sendRtpItem.isTcpActive()) { + content.append("a=setup:active\r\n"); + } else { + content.append("a=setup:passive\r\n"); + } + }else { + content.append("m=video " + sendRtpItem.getLocalPort() + " RTP/AVP 96\r\n"); + } + content.append("a=sendonly\r\n"); + content.append("a=rtpmap:96 PS/90000\r\n"); + content.append("y=" + sendRtpItem.getSsrc() + "\r\n"); + content.append("f=\r\n"); + return content.toString(); + } + + private void sendBye(Platform platform, String callId) { + try { + cmderFroPlatform.streamByeCmd(platform, callId); + } catch (SipException | InvalidArgumentException | ParseException e) { + log.error("[命令发送失败] 上级Invite 发送BYE: {}", e.getMessage()); + } + } + private void startSendRtpStreamHand(RequestEvent evt, SendRtpItem sendRtpItem, Platform parentPlatform, JSONObject jsonObject, Map param, CallIdHeader callIdHeader) { if (jsonObject == null) { diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java index 791c3420..39d65ad7 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java @@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.MediaConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; +import com.genersoft.iot.vmp.gb28181.bean.PlayException; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.media.bean.MediaInfo; @@ -878,8 +879,8 @@ public class MediaServerServiceImpl implements IMediaServerService { String app, String stream, String channelId, boolean tcp, boolean rtcp){ int localPort = sendRtpPortManager.getNextPort(serverItem); - if (localPort == 0) { - return null; + if (localPort <= 0) { + throw new PlayException(javax.sip.message.Response.SERVER_INTERNAL_ERROR, "server internal error"); } SendRtpItem sendRtpItem = new SendRtpItem(); sendRtpItem.setIp(ip);