diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteDecodeException.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteDecodeException.java new file mode 100644 index 00000000..e9943f15 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteDecodeException.java @@ -0,0 +1,14 @@ +package com.genersoft.iot.vmp.gb28181.bean; + +import lombok.Data; + +@Data +public class InviteDecodeException extends RuntimeException{ + private int code; + private String msg; + + public InviteDecodeException(int code, String msg) { + this.code = code; + this.msg = msg; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteInfo.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteInfo.java new file mode 100644 index 00000000..2a04e293 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteInfo.java @@ -0,0 +1,19 @@ +package com.genersoft.iot.vmp.gb28181.bean; + +import lombok.Data; + +// 从INVITE消息中解析需要的信息 +@Data +public class InviteInfo { + private String requesterId; + private String channelId; + private String sessionName; + private String ssrc; + private boolean tcp; + private boolean tcpActive; + private String callId; + private Long startTime; + private Long stopTime; + private String downloadSpeed; + +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelService.java index a8e8c668..1f821f93 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelService.java @@ -1,6 +1,8 @@ package com.genersoft.iot.vmp.gb28181.service; +import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.github.pagehelper.PageInfo; import java.util.Collection; @@ -77,4 +79,6 @@ public interface IGbChannelService { void batchUpdate(List commonGBChannels); CommonGBChannel queryOneWithPlatform(Integer platformId, String channelDeviceId); + + void start(CommonGBChannel channel, ErrorCallback callback); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformChannelServiceImpl.java index 88a14007..b7991648 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformChannelServiceImpl.java @@ -7,7 +7,6 @@ import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.gb28181.service.IPlatformChannelService; import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper; import com.genersoft.iot.vmp.gb28181.dao.PlatformMapper; -import com.genersoft.iot.vmp.gb28181.dao.PlatformCatalogMapper; import com.genersoft.iot.vmp.gb28181.dao.PlatformChannelMapper; import com.genersoft.iot.vmp.gb28181.controller.bean.ChannelReduce; import lombok.extern.slf4j.Slf4j; @@ -47,9 +46,6 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService { @Autowired private DeviceChannelMapper deviceChannelMapper; - @Autowired - private PlatformCatalogMapper catalogManager; - @Autowired private PlatformMapper platformMapper; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 53518ad6..938047dc 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -154,41 +154,38 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements */ @Override public void process(RequestEvent evt) { + + SIPRequest request = (SIPRequest)evt.getRequest(); + try { + InviteInfo inviteInfo = decode(evt); + } catch (SdpParseException e) { + // 参数不全, 发400,请求错误 + try { + responseAck(request, Response.BAD_REQUEST); + } catch (SipException | InvalidArgumentException | ParseException e) { + log.error("[命令发送失败] invite BAD_REQUEST: {}", e.getMessage()); + } + return; + } catch (InviteDecodeException e) { + try { + responseAck(request, e.getCode(), e.getMsg()); + } catch (SipException | InvalidArgumentException | ParseException e) { + log.error("[命令发送失败] invite BAD_REQUEST: {}", e.getMessage()); + } + } + // Invite Request消息实现,此消息一般为级联消息,上级给下级发送请求视频指令 try { - SIPRequest request = (SIPRequest)evt.getRequest(); - String channelIdFromSub = SipUtils.getChannelIdFromRequest(request); - // 解析sdp消息, 使用jainsip 自带的sdp解析方式 - String contentString = new String(request.getRawContent()); - Gb28181Sdp gb28181Sdp = SipUtils.parseSDP(contentString); - SessionDescription sdp = gb28181Sdp.getBaseSdb(); - String sessionName = sdp.getSessionName().getValue(); - String channelIdFromSdp = null; - if(StringUtils.equalsIgnoreCase("Playback", sessionName)){ - URIField uriField = (URIField)sdp.getURI(); - channelIdFromSdp = uriField.getURI().split(":")[0]; - } - final String channelId = StringUtils.isNotBlank(channelIdFromSdp) ? channelIdFromSdp : channelIdFromSub; - String requesterId = SipUtils.getUserIdFromFromHeader(request); - CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME); - if (requesterId == null || channelId == null) { - log.info("无法从请求中获取到平台id,返回400"); - // 参数不全, 发400,请求错误 - try { - responseAck(request, Response.BAD_REQUEST); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] invite BAD_REQUEST: {}", e.getMessage()); - } - return; - } - log.info("[INVITE] requesterId: {}, callId: {}, 来自:{}:{}", - requesterId, callIdHeader.getCallId(), request.getRemoteAddress(), request.getRemotePort()); + + + // 查询请求是否来自上级平台\设备 Platform platform = storager.queryParentPlatByServerGBId(requesterId); + if (platform == null) { inviteFromDeviceHandle(request, requesterId, channelId); @@ -198,77 +195,8 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements MediaServer mediaServerItem = null; StreamPush streamPushItem = null; StreamProxy proxyByAppAndStream = null; - // 不是通道可能是直播流 - if (channel != null ) { - // 通道存在,发100,TRYING - try { - responseAck(request, Response.TRYING); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] invite TRYING: {}", e.getMessage()); - } - } else if (channel == null && gbStream != null) { - - String mediaServerId = gbStream.getMediaServerId(); - mediaServerItem = mediaServerService.getOne(mediaServerId); - if (mediaServerItem == null) { - if ("proxy".equals(gbStream.getStreamType())) { - log.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId); - try { - responseAck(request, Response.GONE); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] invite GONE: {}", e.getMessage()); - } - return; - } else { - streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream()); - if (streamPushItem != null) { - mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId()); - } - if (mediaServerItem == null) { - mediaServerItem = mediaServerService.getDefaultMediaServer(); - } - } - } else { - if ("push".equals(gbStream.getStreamType())) { - streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream()); - if (streamPushItem == null) { - log.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId); - try { - responseAck(request, Response.GONE); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] invite GONE: {}", e.getMessage()); - } - return; - } - } else if ("proxy".equals(gbStream.getStreamType())) { - proxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(gbStream.getApp(), gbStream.getStream()); - if (proxyByAppAndStream == null) { - log.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId); - try { - responseAck(request, Response.GONE); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] invite GONE: {}", e.getMessage()); - } - return; - } - } - } - try { - responseAck(request, Response.CALL_IS_BEING_FORWARDED); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] invite CALL_IS_BEING_FORWARDED: {}", e.getMessage()); - } -// } -// else if (catalog != null) { -// try { -// // 目录不支持点播 -// responseAck(request, Response.BAD_REQUEST, "catalog channel can not play"); -// } catch (SipException | InvalidArgumentException | ParseException e) { -// log.error("[命令发送失败] invite 目录不支持点播: {}", e.getMessage()); -// } -// return; - } else { - log.info("通道不存在,返回404: {}", channelId); + if (channel == null) { + log.info("[上级INVITE] 通道不存在,返回404: {}", channelId); try { // 通道不存在,发404,资源不存在 responseAck(request, Response.NOT_FOUND); @@ -277,70 +205,14 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } return; } - - Long startTime = null; - Long stopTime = null; - Instant start = null; - Instant end = null; - if (sdp.getTimeDescriptions(false) != null && sdp.getTimeDescriptions(false).size() > 0) { - TimeDescriptionImpl timeDescription = (TimeDescriptionImpl) (sdp.getTimeDescriptions(false).get(0)); - TimeField startTimeFiled = (TimeField) timeDescription.getTime(); - startTime = startTimeFiled.getStartTime(); - stopTime = startTimeFiled.getStopTime(); - - start = Instant.ofEpochSecond(startTime); - end = Instant.ofEpochSecond(stopTime); + // 通道存在,发100,TRYING + try { + responseAck(request, Response.TRYING); + } catch (SipException | InvalidArgumentException | ParseException e) { + log.error("[命令发送失败] invite TRYING: {}", e.getMessage()); } - // 获取支持的格式 - Vector mediaDescriptions = sdp.getMediaDescriptions(true); - // 查看是否支持PS 负载96 - //String ip = null; - int port = -1; - boolean mediaTransmissionTCP = false; - Boolean tcpActive = null; - for (Object description : mediaDescriptions) { - MediaDescription mediaDescription = (MediaDescription) description; - Media media = mediaDescription.getMedia(); - Vector mediaFormats = media.getMediaFormats(false); - if (mediaFormats.contains("96")) { - port = media.getMediaPort(); - //String mediaType = media.getMediaType(); - String protocol = media.getProtocol(); - // 区分TCP发流还是udp, 当前默认udp - if ("TCP/RTP/AVP".equalsIgnoreCase(protocol)) { - String setup = mediaDescription.getAttribute("setup"); - if (setup != null) { - mediaTransmissionTCP = true; - if ("active".equalsIgnoreCase(setup)) { - tcpActive = true; - } else if ("passive".equalsIgnoreCase(setup)) { - tcpActive = false; - } - } - } - break; - } - } - if (port == -1) { - log.info("不支持的媒体格式,返回415"); - // 回复不支持的格式 - try { - // 不支持的格式,发415 - responseAck(request, Response.UNSUPPORTED_MEDIA_TYPE); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] invite 不支持的格式: {}", e.getMessage()); - } - return; - } - String username = sdp.getOrigin().getUsername(); - String addressStr; - if(StringUtils.isEmpty(platform.getSendStreamIp())){ - addressStr = sdp.getConnection().getAddress(); - }else { - addressStr = platform.getSendStreamIp(); - } Device device = null; // 通过 channel 和 gbStream 是否为null 值判断来源是直播流合适国标 @@ -645,6 +517,111 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } } + private InviteInfo decode(RequestEvent evt) throws SdpException { + + InviteInfo inviteInfo = new InviteInfo(); + SIPRequest request = (SIPRequest)evt.getRequest(); + String channelIdFromSub = SipUtils.getChannelIdFromRequest(request); + + // 解析sdp消息, 使用jainsip 自带的sdp解析方式 + String contentString = new String(request.getRawContent()); + Gb28181Sdp gb28181Sdp = SipUtils.parseSDP(contentString); + SessionDescription sdp = gb28181Sdp.getBaseSdb(); + String sessionName = sdp.getSessionName().getValue(); + String channelIdFromSdp = null; + if(StringUtils.equalsIgnoreCase("Playback", sessionName)){ + URIField uriField = (URIField)sdp.getURI(); + channelIdFromSdp = uriField.getURI().split(":")[0]; + } + final String channelId = StringUtils.isNotBlank(channelIdFromSdp) ? channelIdFromSdp : channelIdFromSub; + String requesterId = SipUtils.getUserIdFromFromHeader(request); + CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME); + if (requesterId == null || channelId == null) { + log.warn("[解析INVITE消息] 无法从请求中获取到来源id,返回400错误"); + throw new InviteDecodeException(Response.BAD_REQUEST, "request decode fail"); + } + log.info("[INVITE] 来源ID: {}, callId: {}, 来自:{}:{}", + requesterId, callIdHeader.getCallId(), request.getRemoteAddress(), request.getRemotePort()); + inviteInfo.setRequesterId(requesterId); + inviteInfo.setChannelId(channelId); + inviteInfo.setSessionName(sessionName); + inviteInfo.setSsrc(gb28181Sdp.getSsrc()); + inviteInfo.setCallId(request.getCallIdHeader().getCallId()); + + // 如果是录像回放,则会存在录像的开始时间与结束时间 + Long startTime = null; + Long stopTime = null; + Instant start = null; + Instant end = null; + if (sdp.getTimeDescriptions(false) != null && sdp.getTimeDescriptions(false).size() > 0) { + TimeDescriptionImpl timeDescription = (TimeDescriptionImpl) (sdp.getTimeDescriptions(false).get(0)); + TimeField startTimeFiled = (TimeField) timeDescription.getTime(); + startTime = startTimeFiled.getStartTime(); + stopTime = startTimeFiled.getStopTime(); + + start = Instant.ofEpochSecond(startTime); + end = Instant.ofEpochSecond(stopTime); + } + // 获取支持的格式 + Vector mediaDescriptions = sdp.getMediaDescriptions(true); + // 查看是否支持PS 负载96 + //String ip = null; + int port = -1; + boolean mediaTransmissionTCP = false; + Boolean tcpActive = null; + for (Object description : mediaDescriptions) { + MediaDescription mediaDescription = (MediaDescription) description; + Media media = mediaDescription.getMedia(); + + Vector mediaFormats = media.getMediaFormats(false); + if (mediaFormats.contains("96")) { + port = media.getMediaPort(); + //String mediaType = media.getMediaType(); + String protocol = media.getProtocol(); + + // 区分TCP发流还是udp, 当前默认udp + if ("TCP/RTP/AVP".equalsIgnoreCase(protocol)) { + String setup = mediaDescription.getAttribute("setup"); + if (setup != null) { + mediaTransmissionTCP = true; + if ("active".equalsIgnoreCase(setup)) { + tcpActive = true; + } else if ("passive".equalsIgnoreCase(setup)) { + tcpActive = false; + } + } + } + break; + } + } + if (port == -1) { + log.info("[解析INVITE消息] 不支持的媒体格式,返回415"); + throw new InviteDecodeException(Response.UNSUPPORTED_MEDIA_TYPE, "unsupported media type"); + } + inviteInfo.setTcp(mediaTransmissionTCP); + inviteInfo.setTcpActive(tcpActive != null? tcpActive: false); + inviteInfo.setStartTime(startTime); + inviteInfo.setStopTime(stopTime); + String username = sdp.getOrigin().getUsername(); + String addressStr; + if(StringUtils.isEmpty(platform.getSendStreamIp())){ + addressStr = sdp.getConnection().getAddress(); + }else { + addressStr = platform.getSendStreamIp(); + } + + Vector sdpMediaDescriptions = sdp.getMediaDescriptions(true); + MediaDescription mediaDescription = null; + String downloadSpeed = "1"; + if (!sdpMediaDescriptions.isEmpty()) { + mediaDescription = (MediaDescription) sdpMediaDescriptions.get(0); + } + if (mediaDescription != null) { + downloadSpeed = mediaDescription.getAttribute("downloadspeed"); + } + + } + private void startSendRtpStreamHand(RequestEvent evt, SendRtpItem sendRtpItem, Platform parentPlatform, JSONObject jsonObject, Map param, CallIdHeader callIdHeader) { if (jsonObject == null) {