diff --git a/sql/2.6.8补丁更新.sql b/sql/2.6.8补丁更新.sql new file mode 100644 index 00000000..8ce9d543 --- /dev/null +++ b/sql/2.6.8补丁更新.sql @@ -0,0 +1,2 @@ +alter table media_server + add sendRtpPortRange varchar(50) not null; \ No newline at end of file diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java index e82140f9..274f7a8a 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java +++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java @@ -159,6 +159,7 @@ public class VideoManagerConstants { public static final String WVP_STREAM_GB_ID_PREFIX = "memberNo_"; public static final String WVP_STREAM_GPS_MSG_PREFIX = "WVP_STREAM_GPS_MSG_"; public static final String WVP_OTHER_SEND_RTP_INFO = "VMP_OTHER_SEND_RTP_INFO_"; + public static final String WVP_OTHER_RECEIVE_RTP_INFO = "VMP_OTHER_RECEIVE_RTP_INFO_"; /** * Redis Const diff --git a/src/main/java/com/genersoft/iot/vmp/conf/MediaConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/MediaConfig.java index fca6d63a..ead79970 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/MediaConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/MediaConfig.java @@ -233,5 +233,4 @@ public class MediaConfig{ } return false; } - } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java b/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java index 5a8db178..90b5291e 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.gb28181.bean.GbStringMsgParserFactory; import com.genersoft.iot.vmp.gb28181.conf.DefaultProperties; import com.genersoft.iot.vmp.gb28181.transmit.ISIPProcessorObserver; import gov.nist.javax.sip.SipProviderImpl; @@ -63,6 +64,7 @@ public class SipLayer implements CommandLineRunner { SipStackImpl sipStack; try { sipStack = (SipStackImpl)SipFactory.getInstance().createSipStack(DefaultProperties.getProperties(monitorIp, userSetting.getSipLog())); + sipStack.setMessageParserFactory(new GbStringMsgParserFactory()); } catch (PeerUnavailableException e) { logger.error("[SIP SERVER] SIP服务启动失败, 监听地址{}失败,请检查ip是否正确", monitorIp); return; @@ -75,7 +77,6 @@ public class SipLayer implements CommandLineRunner { tcpSipProvider.setDialogErrorsAutomaticallyHandled(); tcpSipProvider.addSipListener(sipProcessorObserver); tcpSipProviderMap.put(monitorIp, tcpSipProvider); - logger.info("[SIP SERVER] tcp://{}:{} 启动成功", monitorIp, port); } catch (TransportNotSupportedException | TooManyListenersException diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java index 60f5cf69..541c111f 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java @@ -244,6 +244,9 @@ public class Device { } public Integer getStreamModeForParam() { + if (streamMode == null) { + return 0; + } if (streamMode.equalsIgnoreCase("UDP")) { return 0; }else if (streamMode.equalsIgnoreCase("TCP-PASSIVE")) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/GBStringMsgParser.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/GBStringMsgParser.java new file mode 100644 index 00000000..2abc3569 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/GBStringMsgParser.java @@ -0,0 +1,457 @@ +package com.genersoft.iot.vmp.gb28181.bean; + +import gov.nist.core.CommonLogger; +import gov.nist.core.Host; +import gov.nist.core.HostNameParser; +import gov.nist.core.StackLogger; +import gov.nist.javax.sip.SIPConstants; +import gov.nist.javax.sip.address.AddressImpl; +import gov.nist.javax.sip.address.GenericURI; +import gov.nist.javax.sip.address.SipUri; +import gov.nist.javax.sip.address.TelephoneNumber; +import gov.nist.javax.sip.header.*; +import gov.nist.javax.sip.message.SIPMessage; +import gov.nist.javax.sip.message.SIPRequest; +import gov.nist.javax.sip.message.SIPResponse; +import gov.nist.javax.sip.parser.*; + +import java.io.UnsupportedEncodingException; +import java.text.ParseException; + +public class GBStringMsgParser implements MessageParser { + + protected static boolean computeContentLengthFromMessage = false; + + private static StackLogger logger = CommonLogger.getLogger(StringMsgParser.class); + + /** + * @since v0.9 + */ + public GBStringMsgParser() { + super(); + } + + /** + * Parse a buffer containing a single SIP Message where the body is an array + * of un-interpreted bytes. This is intended for parsing the message from a + * memory buffer when the buffer. Incorporates a bug fix for a bug that was + * noted by Will Sullin of Callcast + * + * @param msgBuffer + * a byte buffer containing the messages to be parsed. This can + * consist of multiple SIP Messages concatenated together. + * @return a SIPMessage[] structure (request or response) containing the + * parsed SIP message. + * @exception ParseException + * is thrown when an illegal message has been encountered + * (and the rest of the buffer is discarded). + * @see ParseExceptionListener + */ + public SIPMessage parseSIPMessage(byte[] msgBuffer, boolean readBody, boolean strict, ParseExceptionListener parseExceptionListener) throws ParseException { + if (msgBuffer == null || msgBuffer.length == 0) + return null; + + int i = 0; + + // Squeeze out any leading control character. + try { + while (msgBuffer[i] < 0x20) + i++; + } + catch (ArrayIndexOutOfBoundsException e) { + // Array contains only control char, return null. + if (logger.isLoggingEnabled(StackLogger.TRACE_DEBUG)) { + logger.logDebug("handled only control char so returning null"); + } + return null; + } + + // Iterate thru the request/status line and headers. + String currentLine = null; + String currentHeader = null; + boolean isFirstLine = true; + SIPMessage message = null; + do + { + int lineStart = i; + + // Find the length of the line. + try { + while (msgBuffer[i] != '\r' && msgBuffer[i] != '\n') + i++; + } + catch (ArrayIndexOutOfBoundsException e) { + // End of the message. + break; + } + int lineLength = i - lineStart; + + // Make it a String. + try { + currentLine = new String(msgBuffer, lineStart, lineLength, "UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new ParseException("Bad message encoding!", 0); + } + + currentLine = trimEndOfLine(currentLine); + + if (currentLine.length() == 0) { + // Last header line, process the previous buffered header. + if (currentHeader != null && message != null) { + processHeader(currentHeader, message, parseExceptionListener, msgBuffer); + } + + } + else { + if (isFirstLine) { + message = processFirstLine(currentLine, parseExceptionListener, msgBuffer); + } else { + char firstChar = currentLine.charAt(0); + if (firstChar == '\t' || firstChar == ' ') { + if (currentHeader == null) + throw new ParseException("Bad header continuation.", 0); + + // This is a continuation, append it to the previous line. + currentHeader += currentLine.substring(1); + } + else { + if (currentHeader != null && message != null) { + processHeader(currentHeader, message, parseExceptionListener, msgBuffer); + } + currentHeader = currentLine; + } + } + } + + if (msgBuffer[i] == '\r' && msgBuffer.length > i+1 && msgBuffer[i+1] == '\n') + i++; + + i++; + + isFirstLine = false; + } while (currentLine.length() > 0); // End do - while + + if (message == null) throw new ParseException("Bad message", 0); + message.setSize(i); + + // Check for content legth header + if (readBody && message.getContentLength() != null ) { + if ( message.getContentLength().getContentLength() != 0) { + int bodyLength = msgBuffer.length - i; + + byte[] body = new byte[bodyLength]; + System.arraycopy(msgBuffer, i, body, 0, bodyLength); + message.setMessageContent(body,!strict,computeContentLengthFromMessage,message.getContentLength().getContentLength()); + } else if (message.getCSeqHeader().getMethod().equalsIgnoreCase("MESSAGE")) { + int bodyLength = msgBuffer.length - i; + + byte[] body = new byte[bodyLength]; + System.arraycopy(msgBuffer, i, body, 0, bodyLength); + message.setMessageContent(body,!strict,computeContentLengthFromMessage,bodyLength); + }else if (!computeContentLengthFromMessage && strict) { + String last4Chars = new String(msgBuffer, msgBuffer.length - 4, 4); + if(!"\r\n\r\n".equals(last4Chars)) { + throw new ParseException("Extraneous characters at the end of the message ",i); + } + } + } + + return message; + } + + protected static String trimEndOfLine(String line) { + if (line == null) + return line; + + int i = line.length() - 1; + while (i >= 0 && line.charAt(i) <= 0x20) + i--; + + if (i == line.length() - 1) + return line; + + if (i == -1) + return ""; + + return line.substring(0, i+1); + } + + protected SIPMessage processFirstLine(String firstLine, ParseExceptionListener parseExceptionListener, byte[] msgBuffer) throws ParseException { + SIPMessage message; + if (!firstLine.startsWith(SIPConstants.SIP_VERSION_STRING)) { + message = new SIPRequest(); + try { + RequestLine requestLine = new RequestLineParser(firstLine + "\n") + .parse(); + ((SIPRequest) message).setRequestLine(requestLine); + } catch (ParseException ex) { + if (parseExceptionListener != null) + try { + parseExceptionListener.handleException(ex, message, + RequestLine.class, firstLine, new String(msgBuffer, "UTF-8")); + } catch (UnsupportedEncodingException e) { + e.printStackTrace(); + } + else + throw ex; + + } + } else { + message = new SIPResponse(); + try { + StatusLine sl = new StatusLineParser(firstLine + "\n").parse(); + ((SIPResponse) message).setStatusLine(sl); + } catch (ParseException ex) { + if (parseExceptionListener != null) { + try { + parseExceptionListener.handleException(ex, message, + StatusLine.class, firstLine, new String(msgBuffer, "UTF-8")); + } catch (UnsupportedEncodingException e) { + e.printStackTrace(); + } + } else + throw ex; + + } + } + return message; + } + + protected void processHeader(String header, SIPMessage message, ParseExceptionListener parseExceptionListener, byte[] rawMessage) throws ParseException { + if (header == null || header.length() == 0) + return; + + HeaderParser headerParser = null; + try { + headerParser = ParserFactory.createParser(header + "\n"); + } catch (ParseException ex) { + // https://java.net/jira/browse/JSIP-456 + if (parseExceptionListener != null) { + parseExceptionListener.handleException(ex, message, null, + header, null); + return; + } else { + throw ex; + } + } + + try { + SIPHeader sipHeader = headerParser.parse(); + message.attachHeader(sipHeader, false); + } catch (ParseException ex) { + if (parseExceptionListener != null) { + String headerName = Lexer.getHeaderName(header); + Class headerClass = NameMap.getClassFromName(headerName); + if (headerClass == null) { + headerClass = ExtensionHeaderImpl.class; + + } + try { + parseExceptionListener.handleException(ex, message, + headerClass, header, new String(rawMessage, "UTF-8")); + } catch (UnsupportedEncodingException e) { + e.printStackTrace(); + } + + } + } + } + + /** + * Parse an address (nameaddr or address spec) and return and address + * structure. + * + * @param address + * is a String containing the address to be parsed. + * @return a parsed address structure. + * @since v1.0 + * @exception ParseException + * when the address is badly formatted. + */ + public AddressImpl parseAddress(String address) throws ParseException { + AddressParser addressParser = new AddressParser(address); + return addressParser.address(true); + } + + /** + * Parse a host:port and return a parsed structure. + * + * @param hostport + * is a String containing the host:port to be parsed + * @return a parsed address structure. + * @since v1.0 + * @exception throws + * a ParseException when the address is badly formatted. + * + public HostPort parseHostPort(String hostport) throws ParseException { + Lexer lexer = new Lexer("charLexer", hostport); + return new HostNameParser(lexer).hostPort(); + + } + */ + + /** + * Parse a host name and return a parsed structure. + * + * @param host + * is a String containing the host name to be parsed + * @return a parsed address structure. + * @since v1.0 + * @exception ParseException + * a ParseException when the hostname is badly formatted. + */ + public Host parseHost(String host) throws ParseException { + Lexer lexer = new Lexer("charLexer", host); + return new HostNameParser(lexer).host(); + + } + + /** + * Parse a telephone number return a parsed structure. + * + * @param telephone_number + * is a String containing the telephone # to be parsed + * @return a parsed address structure. + * @since v1.0 + * @exception ParseException + * a ParseException when the address is badly formatted. + */ + public TelephoneNumber parseTelephoneNumber(String telephone_number) + throws ParseException { + // Bug fix contributed by Will Scullin + return new URLParser(telephone_number).parseTelephoneNumber(true); + + } + + /** + * Parse a SIP url from a string and return a URI structure for it. + * + * @param url + * a String containing the URI structure to be parsed. + * @return A parsed URI structure + * @exception ParseException + * if there was an error parsing the message. + */ + + public SipUri parseSIPUrl(String url) throws ParseException { + try { + return new URLParser(url).sipURL(true); + } catch (ClassCastException ex) { + throw new ParseException(url + " Not a SIP URL ", 0); + } + } + + /** + * Parse a uri from a string and return a URI structure for it. + * + * @param url + * a String containing the URI structure to be parsed. + * @return A parsed URI structure + * @exception ParseException + * if there was an error parsing the message. + */ + + public GenericURI parseUrl(String url) throws ParseException { + return new URLParser(url).parse(); + } + + /** + * Parse an individual SIP message header from a string. + * + * @param header + * String containing the SIP header. + * @return a SIPHeader structure. + * @exception ParseException + * if there was an error parsing the message. + */ + public static SIPHeader parseSIPHeader(String header) throws ParseException { + int start = 0; + int end = header.length() - 1; + try { + // Squeeze out any leading control character. + while (header.charAt(start) <= 0x20) + start++; + + // Squeeze out any trailing control character. + while (header.charAt(end) <= 0x20) + end--; + } + catch (ArrayIndexOutOfBoundsException e) { + // Array contains only control char. + throw new ParseException("Empty header.", 0); + } + + StringBuilder buffer = new StringBuilder(end + 1); + int i = start; + int lineStart = start; + boolean endOfLine = false; + while (i <= end) { + char c = header.charAt(i); + if (c == '\r' || c == '\n') { + if (!endOfLine) { + buffer.append(header.substring(lineStart, i)); + endOfLine = true; + } + } + else { + if (endOfLine) { + endOfLine = false; + if (c == ' ' || c == '\t') { + buffer.append(' '); + lineStart = i + 1; + } + else { + lineStart = i; + } + } + } + + i++; + } + buffer.append(header.substring(lineStart, i)); + buffer.append('\n'); + + HeaderParser hp = ParserFactory.createParser(buffer.toString()); + if (hp == null) + throw new ParseException("could not create parser", 0); + return hp.parse(); + } + + /** + * Parse the SIP Request Line + * + * @param requestLine + * a String containing the request line to be parsed. + * @return a RequestLine structure that has the parsed RequestLine + * @exception ParseException + * if there was an error parsing the requestLine. + */ + + public RequestLine parseSIPRequestLine(String requestLine) + throws ParseException { + requestLine += "\n"; + return new RequestLineParser(requestLine).parse(); + } + + /** + * Parse the SIP Response message status line + * + * @param statusLine + * a String containing the Status line to be parsed. + * @return StatusLine class corresponding to message + * @exception ParseException + * if there was an error parsing + * @see StatusLine + */ + + public StatusLine parseSIPStatusLine(String statusLine) + throws ParseException { + statusLine += "\n"; + return new StatusLineParser(statusLine).parse(); + } + + public static void setComputeContentLengthFromMessage( + boolean computeContentLengthFromMessage) { + GBStringMsgParser.computeContentLengthFromMessage = computeContentLengthFromMessage; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/WvpSipDate.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/GbSipDate.java similarity index 97% rename from src/main/java/com/genersoft/iot/vmp/gb28181/bean/WvpSipDate.java rename to src/main/java/com/genersoft/iot/vmp/gb28181/bean/GbSipDate.java index f2a256b9..0631c5d0 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/WvpSipDate.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/GbSipDate.java @@ -8,7 +8,7 @@ import java.util.*; /** * 重写jain sip的SIPDate解决与国标时间格式不一致的问题 */ -public class WvpSipDate extends SIPDate { +public class GbSipDate extends SIPDate { /** * @@ -17,7 +17,7 @@ public class WvpSipDate extends SIPDate { private Calendar javaCal; - public WvpSipDate(long timeMillis) { + public GbSipDate(long timeMillis) { this.javaCal = new GregorianCalendar(TimeZone.getDefault(), Locale.getDefault()); Date date = new Date(timeMillis); this.javaCal.setTime(date); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/GbStringMsgParserFactory.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/GbStringMsgParserFactory.java new file mode 100644 index 00000000..3a9a1d19 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/GbStringMsgParserFactory.java @@ -0,0 +1,21 @@ +package com.genersoft.iot.vmp.gb28181.bean; + +import gov.nist.javax.sip.parser.MessageParser; +import gov.nist.javax.sip.parser.MessageParserFactory; +import gov.nist.javax.sip.stack.SIPTransactionStack; + +public class GbStringMsgParserFactory implements MessageParserFactory { + + /** + * msg parser is completely stateless, reuse isntance for the whole stack + * fixes https://github.com/RestComm/jain-sip/issues/92 + */ + private static GBStringMsgParser msgParser = new GBStringMsgParser(); + /* + * (non-Javadoc) + * @see gov.nist.javax.sip.parser.MessageParserFactory#createMessageParser(gov.nist.javax.sip.stack.SIPTransactionStack) + */ + public MessageParser createMessageParser(SIPTransactionStack stack) { + return msgParser; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/conf/DefaultProperties.java b/src/main/java/com/genersoft/iot/vmp/gb28181/conf/DefaultProperties.java index 59fa8f6d..7e355e54 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/conf/DefaultProperties.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/conf/DefaultProperties.java @@ -31,6 +31,8 @@ public class DefaultProperties { properties.setProperty("gov.nist.javax.sip.CANCEL_CLIENT_TRANSACTION_CHECKED", "true"); // 为_NULL _对话框传递_终止的_事件 properties.setProperty("gov.nist.javax.sip.DELIVER_TERMINATED_EVENT_FOR_NULL_DIALOG", "true"); + // 是否自动计算content length的实际长度,默认不计算 + properties.setProperty("gov.nist.javax.sip.COMPUTE_CONTENT_LENGTH_FROM_MESSAGE_BODY", "true"); // 会话清理策略 properties.setProperty("gov.nist.javax.sip.RELEASE_REFERENCES_STRATEGY", "Normal"); // 处理由该服务器处理的基于底层TCP的保持生存超时 @@ -42,6 +44,8 @@ public class DefaultProperties { // 定义应用程序打算多久审计一次 SIP 堆栈,了解其内部线程的健康状况(该属性指定连续审计之间的时间(以毫秒为单位)) properties.setProperty("gov.nist.javax.sip.THREAD_AUDIT_INTERVAL_IN_MILLISECS", "30000"); +// properties.setProperty("gov.nist.javax.sip.MESSAGE_PROCESSOR_FACTORY", "gov.nist.javax.sip.stack.NioMessageProcessorFactory"); + /** * sip_server_log.log 和 sip_debug_log.log ERROR, INFO, WARNING, OFF, DEBUG, TRACE */ diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/conf/ServerLoggerImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/conf/ServerLoggerImpl.java index 19e1906c..c7b1f6e6 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/conf/ServerLoggerImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/conf/ServerLoggerImpl.java @@ -1,5 +1,6 @@ package com.genersoft.iot.vmp.gb28181.conf; +import gov.nist.core.CommonLogger; import gov.nist.core.ServerLogger; import gov.nist.core.StackLogger; import gov.nist.javax.sip.message.SIPMessage; @@ -84,7 +85,7 @@ public class ServerLoggerImpl implements ServerLogger { } if(sipStack instanceof SIPTransactionStack) { this.sipStack = (SIPTransactionStack)sipStack; - this.stackLogger = this.sipStack.getStackLogger(); + this.stackLogger = CommonLogger.getLogger(SIPTransactionStack.class); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index 25576cd6..34b7f037 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -67,7 +67,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { private SipSubscribe sipSubscribe; @Autowired - private ZLMServerFactory zlmserverfactory; + private ZLMServerFactory zlmServerFactory; @Autowired private SipLayer sipLayer; @@ -753,7 +753,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { .append("\r\n") .append("RecordInfo\r\n") .append("" +recordInfo.getSn() + "\r\n") - .append("" + recordInfo.getDeviceId() + "\r\n") + .append("" + recordInfo.getChannelId() + "\r\n") .append("" + recordInfo.getSumNum() + "\r\n"); if (recordInfo.getRecordList() == null ) { recordXml.append("\r\n"); @@ -842,7 +842,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); if (mediaServerItem != null) { mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc()); - zlmserverfactory.closeRtpServer(mediaServerItem, sendRtpItem.getStream()); + zlmServerFactory.closeRtpServer(mediaServerItem, sendRtpItem.getStreamId()); } SIPRequest byeRequest = headerProviderPlatformProvider.createByeRequest(platform, sendRtpItem); if (byeRequest == null) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java index 8e1b6d8b..bc43a3a0 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java @@ -206,7 +206,24 @@ public abstract class SIPRequestProcessorParent { Byte[] bytes = new Byte[0]; byte[] bytesResult = ArrayUtils.toPrimitive(result.toArray(bytes)); - Document xml = reader.read(new ByteArrayInputStream(bytesResult)); + Document xml; + try { + xml = reader.read(new ByteArrayInputStream(bytesResult)); + }catch (DocumentException e) { + logger.warn("[xml解析异常]: 愿文如下: \r\n{}", new String(bytesResult)); + logger.warn("[xml解析异常]: 愿文如下: 尝试兼容性处理"); + String[] xmlLineArray = new String(bytesResult).split("\\r?\\n"); + + // 兼容海康的address字段带有<破换xml结构导致无法解析xml的问题 + StringBuilder stringBuilder = new StringBuilder(); + for (String s : xmlLineArray) { + if (s.startsWith(" { - if (code == InviteErrorCode.SUCCESS.getCode()) { + }else { + + SSRCInfo ssrcInfo = playService.play(mediaServerItem, device.getDeviceId(), channelId, ssrc, ((code, msg, data) -> { + if (code == InviteErrorCode.SUCCESS.getCode()){ hookEvent.run(code, msg, data); } else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()) { logger.info("[上级点播]超时, 用户:{}, 通道:{}", username, channelId); @@ -506,6 +498,16 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements errorEvent.run(code, msg, data); } })); + sendRtpItem.setPlayType(InviteStreamType.PLAY); + String streamId = null; + if (mediaServerItem.isRtpEnable()) { + streamId = String.format("%s_%s", device.getDeviceId(), channelId); + }else { + streamId = String.format("%08x", Integer.parseInt(ssrcInfo.getSsrc())).toUpperCase(); + } + sendRtpItem.setStream(streamId); + sendRtpItem.setSsrc(ssrcInfo.getSsrc()); + redisCatchStorage.updateSendRTPSever(sendRtpItem); } } else if (gbStream != null) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java index 20b3fa16..70a9f770 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java @@ -6,7 +6,7 @@ import com.genersoft.iot.vmp.gb28181.auth.DigestServerAuthenticationHelper; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.RemoteAddressInfo; import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo; -import com.genersoft.iot.vmp.gb28181.bean.WvpSipDate; +import com.genersoft.iot.vmp.gb28181.bean.GbSipDate; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.SIPSender; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; @@ -148,8 +148,8 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen // 添加date头 SIPDateHeader dateHeader = new SIPDateHeader(); // 使用自己修改的 - WvpSipDate wvpSipDate = new WvpSipDate(Calendar.getInstance(Locale.ENGLISH).getTimeInMillis()); - dateHeader.setDate(wvpSipDate); + GbSipDate gbSipDate = new GbSipDate(Calendar.getInstance(Locale.ENGLISH).getTimeInMillis()); + dateHeader.setDate(gbSipDate); response.addHeader(dateHeader); if (request.getExpires() == null) { @@ -169,7 +169,18 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen device.setGeoCoordSys("WGS84"); device.setDeviceId(deviceId); device.setOnLine(false); + }else { + if (ObjectUtils.isEmpty(device.getStreamMode())) { + device.setStreamMode("UDP"); + } + if (ObjectUtils.isEmpty(device.getCharset())) { + device.setCharset("GB2312"); + } + if (ObjectUtils.isEmpty(device.getGeoCoordSys())) { + device.setGeoCoordSys("WGS84"); + } } + device.setIp(remoteAddressInfo.getIp()); device.setPort(remoteAddressInfo.getPort()); device.setHostAddress(remoteAddressInfo.getIp().concat(":").concat(String.valueOf(remoteAddressInfo.getPort()))); @@ -210,8 +221,8 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen // 添加date头 SIPDateHeader dateHeader = new SIPDateHeader(); // 使用自己修改的 - WvpSipDate wvpSipDate = new WvpSipDate(Calendar.getInstance(Locale.ENGLISH).getTimeInMillis()); - dateHeader.setDate(wvpSipDate); + GbSipDate gbSipDate = new GbSipDate(Calendar.getInstance(Locale.ENGLISH).getTimeInMillis()); + dateHeader.setDate(gbSipDate); response.addHeader(dateHeader); // 添加Contact头 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java index 5ec76dd8..729eec39 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java @@ -78,7 +78,9 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp device.setKeepaliveIntervalTime(60); }else { long lastTime = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(device.getKeepaliveTime()); - device.setKeepaliveIntervalTime(new Long(System.currentTimeMillis()/1000-lastTime).intValue()); + if (System.currentTimeMillis()/1000-lastTime > 10) { + device.setKeepaliveIntervalTime(new Long(System.currentTimeMillis()/1000-lastTime).intValue()); + } } device.setKeepaliveTime(DateUtil.getNow()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/utils/XmlUtil.java b/src/main/java/com/genersoft/iot/vmp/gb28181/utils/XmlUtil.java index a83e7c9f..a08231c8 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/utils/XmlUtil.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/utils/XmlUtil.java @@ -380,13 +380,18 @@ public class XmlUtil { } } // 父设备/区域/系统ID - String realParentId = parentID; - if (!ObjectUtils.isEmpty(parentID)) { + + if (!ObjectUtils.isEmpty(parentID) ) { if (parentID.contains("/")) { String[] parentIdArray = parentID.split("/"); - realParentId = parentIdArray[parentIdArray.length - 1]; + deviceChannel.setParentId(parentIdArray[parentIdArray.length - 1]); + }else { + if (parentID.length()%2 == 0) { + deviceChannel.setParentId(parentID); + }else { + logger.warn("[xml解析] 不规范的parentID:{}, 已舍弃", parentID); + } } - deviceChannel.setParentId(realParentId); }else { if (!ObjectUtils.isEmpty(businessGroupID)) { deviceChannel.setParentId(businessGroupID); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/SendRtpPortManager.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/SendRtpPortManager.java index 8366a4a2..893e52a0 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/SendRtpPortManager.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/SendRtpPortManager.java @@ -1,13 +1,20 @@ package com.genersoft.iot.vmp.media.zlm; +import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaSendRtpPortInfo; +import com.genersoft.iot.vmp.utils.redis.RedisUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + @Component public class SendRtpPortManager { @@ -29,27 +36,55 @@ public class SendRtpPortManager { } public int getNextPort(String mediaServerId) { - String key = KEY + userSetting.getServerId() + "_" + mediaServerId; - MediaSendRtpPortInfo mediaSendRtpPortInfo = (MediaSendRtpPortInfo)redisTemplate.opsForValue().get(key); + String sendIndexKey = KEY + userSetting.getServerId() + "_" + mediaServerId; + MediaSendRtpPortInfo mediaSendRtpPortInfo = (MediaSendRtpPortInfo)redisTemplate.opsForValue().get(sendIndexKey); if (mediaSendRtpPortInfo == null) { - logger.warn("[发送端口管理] 获取{}的发送端口时未找到端口信息", mediaSendRtpPortInfo); + logger.warn("[发送端口管理] 获取{}的发送端口时未找到端口信息", mediaServerId); return 0; } - int port; - if (mediaSendRtpPortInfo.getCurrent() %2 != 0) { - port = mediaSendRtpPortInfo.getCurrent() + 1; - }else { - port = mediaSendRtpPortInfo.getCurrent() + 2; - } - if (port > mediaSendRtpPortInfo.getEnd()) { - if (mediaSendRtpPortInfo.getStart() %2 != 0) { - port = mediaSendRtpPortInfo.getStart() + 1; - }else { - port = mediaSendRtpPortInfo.getStart(); + + String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + + userSetting.getServerId() + "_*"; + List queryResult = RedisUtil.scan(redisTemplate, key); + Map sendRtpItemMap = new HashMap<>(); + + for (Object o : queryResult) { + SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(o); + if (sendRtpItem != null) { + sendRtpItemMap.put(sendRtpItem.getLocalPort(), sendRtpItem); } } + + int port = getPort(mediaSendRtpPortInfo.getCurrent(), + mediaSendRtpPortInfo.getStart(), + mediaSendRtpPortInfo.getEnd(), checkPort -> sendRtpItemMap.get(checkPort) == null); + mediaSendRtpPortInfo.setCurrent(port); - redisTemplate.opsForValue().set(key, mediaSendRtpPortInfo); + redisTemplate.opsForValue().set(sendIndexKey, mediaSendRtpPortInfo); + return port; + } + + interface CheckPortCallback{ + boolean check(int port); + } + + private int getPort(int current, int start, int end, CheckPortCallback checkPortCallback) { + int port; + if (current %2 != 0) { + port = current + 1; + }else { + port = current + 2; + } + if (port > end) { + if (start %2 != 0) { + port = start + 1; + }else { + port = start; + } + } + if (!checkPortCallback.check(port)) { + return getPort(port, start, end, checkPortCallback); + } return port; } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 71179d11..a33665c7 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -5,6 +5,7 @@ import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.InviteInfo; import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.*; @@ -27,11 +28,13 @@ import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; +import com.genersoft.iot.vmp.vmanager.bean.OtherRtpSendInfo; import com.genersoft.iot.vmp.vmanager.bean.StreamContent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.ObjectUtils; import org.springframework.web.bind.annotation.*; @@ -124,6 +127,9 @@ public class ZLMHttpHookListener { @Autowired private ThreadPoolTaskExecutor taskExecutor; + @Autowired + private RedisTemplate redisTemplate; + /** * 服务器定时上报时间,上报间隔可配置,默认10s上报一次 */ @@ -232,10 +238,7 @@ public class ZLMHttpHookListener { HookResultForOnPublish result = HookResultForOnPublish.SUCCESS(); - if (!"rtp".equals(param.getApp())) { - result.setEnable_audio(true); - } - + result.setEnable_audio(true); taskExecutor.execute(() -> { ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_publish, json); if (subscribe != null) { @@ -264,7 +267,6 @@ public class ZLMHttpHookListener { // 如果是录像下载就设置视频间隔十秒 if (ssrcTransactionForAll.get(0).getType() == InviteSessionType.DOWNLOAD) { result.setMp4_max_second(10); - result.setEnable_audio(true); result.setEnable_mp4(true); } // 如果是talk对讲,则默认获取声音 @@ -291,6 +293,14 @@ public class ZLMHttpHookListener { } } } + if (param.getApp().equalsIgnoreCase("rtp")) { + String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_" + param.getStream(); + OtherRtpSendInfo otherRtpSendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(receiveKey); + if (otherRtpSendInfo != null) { + result.setEnable_mp4(true); + } + } + logger.info("[ZLM HOOK]推流鉴权 响应:{}->{}->>>>{}", param.getMediaServerId(), param, result); return result; } @@ -522,8 +532,6 @@ public class ZLMHttpHookListener { if ("rtp".equals(param.getApp())) { ret.put("close", userSetting.getStreamOnDemand()); // 国标流, 点播/录像回放/录像下载 -// StreamInfo streamInfoForPlayCatch = redisCatchStorage.queryPlayByStreamId(param.getStream()); - InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream()); // 点播 if (inviteInfo != null) { @@ -588,7 +596,7 @@ public class ZLMHttpHookListener { // 拉流代理 StreamProxyItem streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream()); if (streamProxyItem != null) { - if (streamProxyItem.isEnableDisableNoneReader()) { + if (streamProxyItem.isEnableRemoveNoneReader()) { // 无人观看自动移除 ret.put("close", true); streamProxyService.del(param.getApp(), param.getStream()); @@ -670,7 +678,7 @@ public class ZLMHttpHookListener { resultHolder.put(key, uuid, result); if (!exist) { - playService.play(mediaInfo, deviceId, channelId, (code, message, data) -> { + playService.play(mediaInfo, deviceId, channelId, null, (code, message, data) -> { msg.setData(new HookResult(code, message)); resultHolder.invokeResult(msg); }); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java index 41ef4d07..cbc5fde6 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java @@ -61,7 +61,7 @@ public class ZLMMediaListManager { private UserSetting userSetting; @Autowired - private ZLMServerFactory ZLMServerFactory; + private ZLMServerFactory zlmServerFactory; @Autowired private IMediaServerService mediaServerService; @@ -97,7 +97,7 @@ public class ZLMMediaListManager { public void sendStreamEvent(String app, String stream, String mediaServerId) { MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); // 查看推流状态 - Boolean streamReady = ZLMServerFactory.isStreamReady(mediaServerItem, app, stream); + Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, app, stream); if (streamReady != null && streamReady) { ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(app, stream); if (channelOnlineEventLister != null) { diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java index c81e00da..e0ac71fc 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java @@ -86,10 +86,13 @@ public class ZLMServerFactory { }else { param.put("port", port); } - param.put("ssrc", ssrc); if (onlyAuto != null) { param.put("only_audio", onlyAuto?"1":"0"); } + if (ssrc != 0) { + param.put("ssrc", ssrc); + } + JSONObject openRtpServerResultJson = zlmresTfulUtils.openRtpServer(mediaServerItem, param); logger.info(JSONObject.toJSONString(openRtpServerResultJson)); if (openRtpServerResultJson != null) { diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookType.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookType.java index a4557e9a..235cea7b 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookType.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookType.java @@ -21,5 +21,6 @@ public enum HookType { on_server_started, on_rtp_server_timeout, - on_server_keepalive + on_server_keepalive, + on_send_rtp_stopped } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/HookResultForOnPublish.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/HookResultForOnPublish.java index 3a58d156..68d969f4 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/HookResultForOnPublish.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/HookResultForOnPublish.java @@ -51,5 +51,13 @@ public class HookResultForOnPublish extends HookResult{ this.mp4_save_path = mp4_save_path; } - + @Override + public String toString() { + return "HookResultForOnPublish{" + + "enable_audio=" + enable_audio + + ", enable_mp4=" + enable_mp4 + + ", mp4_max_second=" + mp4_max_second + + ", mp4_save_path='" + mp4_save_path + '\'' + + '}'; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java index 7725e1bc..60d377da 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java @@ -27,7 +27,7 @@ public interface IPlayService { void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, ErrorCallback callback); - SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId, ErrorCallback callback); + SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId, String ssrc, ErrorCallback callback); StreamInfo onPublishHandlerForPlay(MediaServerItem mediaServerItem, HookParam hookParam, String deviceId, String channelId); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java index e683da68..0fa3aa61 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java @@ -518,8 +518,12 @@ public class DeviceServiceImpl implements IDeviceService { if (!ObjectUtils.isEmpty(device.getMediaServerId())) { deviceInStore.setMediaServerId(device.getMediaServerId()); } - deviceInStore.setSdpIp(device.getSdpIp()); - deviceInStore.setCharset(device.getCharset()); + if (!ObjectUtils.isEmpty(device.getCharset())) { + deviceInStore.setCharset(device.getCharset()); + } + if (!ObjectUtils.isEmpty(device.getSdpIp())) { + deviceInStore.setSdpIp(device.getSdpIp()); + } // 目录订阅相关的信息 if (device.getSubscribeCycleForCatalog() > 0) { @@ -550,10 +554,18 @@ public class DeviceServiceImpl implements IDeviceService { removeMobilePositionSubscribe(deviceInStore); } } - // 坐标系变化,需要重新计算GCJ02坐标和WGS84坐标 - if (!deviceInStore.getGeoCoordSys().equals(device.getGeoCoordSys())) { - updateDeviceChannelGeoCoordSys(device); + if (deviceInStore.getGeoCoordSys() != null) { + // 坐标系变化,需要重新计算GCJ02坐标和WGS84坐标 + if (!deviceInStore.getGeoCoordSys().equals(device.getGeoCoordSys())) { + updateDeviceChannelGeoCoordSys(device); + } + }else { + device.setGeoCoordSys("WGS84"); } + if (device.getCharset() == null) { + device.setCharset("GB2312"); + } + // 更新redis redisCatchStorage.updateDevice(device); deviceMapper.updateCustom(device); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java index dd1b3c38..ec5e2f91 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java @@ -183,7 +183,7 @@ public class MediaServerServiceImpl implements IMediaServerService { } int rtpServerPort; if (mediaServerItem.isRtpEnable()) { - rtpServerPort = zlmServerFactory.createRTPServer(mediaServerItem, streamId, ssrcCheck?Integer.parseInt(ssrc):0, port, onlyAuto, reUsePort, tcpMode); + rtpServerPort = zlmServerFactory.createRTPServer(mediaServerItem, streamId, ssrcCheck?Integer.parseInt(ssrc):0, port, reUsePort, tcpMode); } else { rtpServerPort = mediaServerItem.getRtpProxyPort(); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index d39f46c4..e0946b5d 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -100,7 +100,7 @@ public class PlayServiceImpl implements IPlayService { private ZLMRESTfulUtils zlmresTfulUtils; @Autowired - private ZLMServerFactory zlmserverfactory; + private ZLMServerFactory zlmServerFactory; @Autowired private AssistRESTfulUtils assistRESTfulUtils; @@ -148,7 +148,7 @@ public class PlayServiceImpl implements IPlayService { @Override - public SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId, ErrorCallback callback) { + public SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId, String ssrc, ErrorCallback callback) { if (mediaServerItem == null) { throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm"); } @@ -174,7 +174,7 @@ public class PlayServiceImpl implements IPlayService { String mediaServerId = streamInfo.getMediaServerId(); MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId); - Boolean ready = zlmserverfactory.isStreamReady(mediaInfo, "rtp", streamId); + Boolean ready = zlmServerFactory.isStreamReady(mediaInfo, "rtp", streamId); if (ready != null && ready) { callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null, @@ -445,18 +445,7 @@ public class PlayServiceImpl implements IPlayService { streamInfo); logger.info("[点播成功] deviceId: {}, channelId:{}, 码流类型:{}", device.getDeviceId(), device.isSwitchPrimarySubStream() ? "辅码流" : "主码流"); - String streamUrl; - if (mediaServerItemInuse.getRtspPort() != 0) { - streamUrl = String.format("rtsp://127.0.0.1:%s/%s/%s", mediaServerItemInuse.getRtspPort(), "rtp", ssrcInfo.getStream()); - }else { - streamUrl = String.format("http://127.0.0.1:%s/%s/%s.live.mp4", mediaServerItemInuse.getHttpPort(), "rtp", ssrcInfo.getStream()); - } - String path = "snap"; - String fileName = device.getDeviceId() + "_" + channelId + ".jpg"; - // 请求截图 - logger.info("[请求截图]: " + fileName); - zlmresTfulUtils.getSnap(mediaServerItemInuse, streamUrl, 15, 1, path, fileName); - + snapOnPlay(mediaServerItemInuse, device.getDeviceId(), channelId, ssrcInfo.getStream()); }, (event) -> { inviteInfo.setStatus(InviteSessionStatus.ok); @@ -539,6 +528,7 @@ public class PlayServiceImpl implements IPlayService { InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); + snapOnPlay(mediaServerItemInUse, device.getDeviceId(), channelId, stream); }); return; } @@ -614,11 +604,33 @@ public class PlayServiceImpl implements IPlayService { } } - @Override - public StreamInfo onPublishHandlerForPlay(MediaServerItem mediaServerItem, HookParam hookParam, String deviceId, String channelId) { - OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam) hookParam; - StreamInfo streamInfo = onPublishHandler(mediaServerItem, streamChangedHookParam, deviceId, channelId); + /** + * 点播成功时调用截图. + * + * @param mediaServerItemInuse media + * @param deviceId 设备 ID + * @param channelId 通道 ID + * @param stream ssrc + */ + private void snapOnPlay(MediaServerItem mediaServerItemInuse, String deviceId, String channelId, String stream) { + String streamUrl; + if (mediaServerItemInuse.getRtspPort() != 0) { + streamUrl = String.format("rtsp://127.0.0.1:%s/%s/%s", mediaServerItemInuse.getRtspPort(), "rtp", stream); + } else { + streamUrl = String.format("http://127.0.0.1:%s/%s/%s.live.mp4", mediaServerItemInuse.getHttpPort(), "rtp", stream); + } + String path = "snap"; + String fileName = deviceId + "_" + channelId + ".jpg"; + // 请求截图 + logger.info("[请求截图]: " + fileName); + zlmresTfulUtils.getSnap(mediaServerItemInuse, streamUrl, 15, 1, path, fileName); + } + + private StreamInfo onPublishHandlerForPlay(MediaServerItem mediaServerItem, HookParam hookParam, String deviceId, String channelId) { + StreamInfo streamInfo = null; Device device = redisCatchStorage.getDevice(deviceId); + OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam; + streamInfo = onPublishHandler(mediaServerItem, streamChangedHookParam, deviceId, channelId); if (streamInfo != null) { DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId); if (deviceChannel != null) { @@ -1656,7 +1668,7 @@ public class PlayServiceImpl implements IPlayService { } MediaServerItem newMediaServerItem = getNewMediaServerItem(device); - play(newMediaServerItem, deviceId, channelId, (code, msg, data)->{ + play(newMediaServerItem, deviceId, channelId, null, (code, msg, data)->{ if (code == InviteErrorCode.SUCCESS.getCode()) { InviteInfo inviteInfoForPlay = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId); if (inviteInfoForPlay != null && inviteInfoForPlay.getStreamInfo() != null) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java index c0bf588b..fb844c9d 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java @@ -4,14 +4,13 @@ import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.GeneralCallback; import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; @@ -41,6 +40,7 @@ import org.springframework.util.ObjectUtils; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; /** * 视频代理业务 @@ -86,6 +86,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @Autowired private ZlmHttpHookSubscribe hookSubscribe; + @Autowired + private DynamicTask dynamicTask; + @Autowired DataSourceTransactionManager dataSourceTransactionManager; @@ -124,9 +127,6 @@ public class StreamProxyServiceImpl implements IStreamProxyService { port = mediaInfo.getRtspPort(); schemaForUri = schema; }else if (schema.equalsIgnoreCase("flv")) { - port = mediaInfo.getHttpPort(); - schemaForUri = "http"; - }else if (schema.equalsIgnoreCase("rtmp")) { port = mediaInfo.getRtmpPort(); schemaForUri = schema; }else { @@ -155,17 +155,28 @@ public class StreamProxyServiceImpl implements IStreamProxyService { return; } - HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(param.getApp(), param.getStream(), true, "rtsp", mediaInfo.getId()); - hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> { - StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( - mediaInfo, param.getApp(), param.getStream(), null, null); - callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); - }); + String talkKey = UUID.randomUUID().toString(); + dynamicTask.startCron(talkKey, ()->{ + StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(param.getApp(), param.getStream(), mediaInfo.getId(), false); + if (streamInfo != null) { + callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); + } + }, 1000); + String delayTalkKey = UUID.randomUUID().toString(); + dynamicTask.startDelay(delayTalkKey, ()->{ + StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(param.getApp(), param.getStream(), mediaInfo.getId(), false); + if (streamInfo != null) { + callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); + }else { + dynamicTask.stop(talkKey); + callback.run(ErrorCode.ERROR100.getCode(), "超时", null); + } + }, 5000); if (param.isEnable()) { JSONObject jsonObject = addStreamProxyToZlm(param); if (jsonObject != null && jsonObject.getInteger("code") == 0) { - hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); + dynamicTask.stop(talkKey); StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( mediaInfo, param.getApp(), param.getStream(), null, null); callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); @@ -292,10 +303,10 @@ public class StreamProxyServiceImpl implements IStreamProxyService { return null; } if ("default".equals(param.getType())){ - result = zlmresTfulUtils.addStreamProxy(mediaServerItem, param.getApp(), param.getStream(), param.getUrl(), + result = zlmresTfulUtils.addStreamProxy(mediaServerItem, param.getApp(), param.getStream(), param.getUrl().trim(), param.isEnableAudio(), param.isEnableMp4(), param.getRtpType()); }else if ("ffmpeg".equals(param.getType())) { - result = zlmresTfulUtils.addFFmpegSource(mediaServerItem, param.getSrcUrl(), param.getDstUrl(), + result = zlmresTfulUtils.addFFmpegSource(mediaServerItem, param.getSrcUrl().trim(), param.getDstUrl(), param.getTimeoutMs() + "", param.isEnableAudio(), param.isEnableMp4(), param.getFfmpegCmdKey()); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java index 9b8e3129..2b0f366b 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java @@ -72,7 +72,7 @@ public class RedisGbPlayMsgListener implements MessageListener { private RedisTemplate redisTemplate; @Autowired - private ZLMServerFactory ZLMServerFactory; + private ZLMServerFactory zlmServerFactory; @Autowired private IMediaServerService mediaServerService; @@ -230,7 +230,7 @@ public class RedisGbPlayMsgListener implements MessageListener { param.put("pt", requestPushStreamMsg.getPt()); param.put("use_ps", requestPushStreamMsg.isPs() ? "1" : "0"); param.put("only_audio", requestPushStreamMsg.isOnlyAudio() ? "1" : "0"); - JSONObject jsonObject = ZLMServerFactory.startSendRtpStream(mediaInfo, param); + JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaInfo, param); // 回复消息 responsePushStream(jsonObject, fromId, serial); } @@ -267,7 +267,7 @@ public class RedisGbPlayMsgListener implements MessageListener { return; } // 确定流是否在线 - Boolean streamReady = ZLMServerFactory.isStreamReady(mediaServerItem, content.getApp(), content.getStream()); + Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, content.getApp(), content.getStream()); if (streamReady != null && streamReady) { logger.info("[回复推流信息] {}/{}", content.getApp(), content.getStream()); responseSendItem(mediaServerItem, content, toId, serial); @@ -311,7 +311,7 @@ public class RedisGbPlayMsgListener implements MessageListener { * 将获取到的sendItem发送出去 */ private void responseSendItem(MediaServerItem mediaServerItem, RequestSendItemMsg content, String toId, String serial) { - SendRtpItem sendRtpItem = ZLMServerFactory.createSendRtpItem(mediaServerItem, content.getIp(), + SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, content.getIp(), content.getPort(), content.getSsrc(), content.getPlatformId(), content.getApp(), content.getStream(), content.getChannelId(), content.getTcp(), content.getRtcp()); diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/bean/OtherRtpSendInfo.java b/src/main/java/com/genersoft/iot/vmp/vmanager/bean/OtherRtpSendInfo.java index 225e40c4..75c05d3b 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/bean/OtherRtpSendInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/bean/OtherRtpSendInfo.java @@ -5,12 +5,17 @@ public class OtherRtpSendInfo { /** * 发流IP */ - private String ip; + private String sendLocalIp; /** - * 发流端口 + * 音频发流端口 */ - private int port; + private int sendLocalPortForAudio; + + /** + * 视频发流端口 + */ + private int sendLocalPortForVideo; /** * 收流IP @@ -18,9 +23,14 @@ public class OtherRtpSendInfo { private String receiveIp; /** - * 收流端口 + * 音频收流端口 */ - private int receivePort; + private int receivePortForAudio; + + /** + * 视频收流端口 + */ + private int receivePortForVideo; /** * 会话ID @@ -48,23 +58,6 @@ public class OtherRtpSendInfo { private String pushSSRC; - - public String getIp() { - return ip; - } - - public void setIp(String ip) { - this.ip = ip; - } - - public int getPort() { - return port; - } - - public void setPort(int port) { - this.port = port; - } - public String getReceiveIp() { return receiveIp; } @@ -73,12 +66,20 @@ public class OtherRtpSendInfo { this.receiveIp = receiveIp; } - public int getReceivePort() { - return receivePort; + public int getReceivePortForAudio() { + return receivePortForAudio; } - public void setReceivePort(int receivePort) { - this.receivePort = receivePort; + public void setReceivePortForAudio(int receivePortForAudio) { + this.receivePortForAudio = receivePortForAudio; + } + + public int getReceivePortForVideo() { + return receivePortForVideo; + } + + public void setReceivePortForVideo(int receivePortForVideo) { + this.receivePortForVideo = receivePortForVideo; } public String getCallId() { @@ -121,15 +122,45 @@ public class OtherRtpSendInfo { this.pushSSRC = pushSSRC; } + + public String getSendLocalIp() { + return sendLocalIp; + } + + public void setSendLocalIp(String sendLocalIp) { + this.sendLocalIp = sendLocalIp; + } + + public int getSendLocalPortForAudio() { + return sendLocalPortForAudio; + } + + public void setSendLocalPortForAudio(int sendLocalPortForAudio) { + this.sendLocalPortForAudio = sendLocalPortForAudio; + } + + public int getSendLocalPortForVideo() { + return sendLocalPortForVideo; + } + + public void setSendLocalPortForVideo(int sendLocalPortForVideo) { + this.sendLocalPortForVideo = sendLocalPortForVideo; + } + @Override public String toString() { return "OtherRtpSendInfo{" + - "ip='" + ip + '\'' + - ", port=" + port + + "sendLocalIp='" + sendLocalIp + '\'' + + ", sendLocalPortForAudio=" + sendLocalPortForAudio + + ", sendLocalPortForVideo=" + sendLocalPortForVideo + ", receiveIp='" + receiveIp + '\'' + - ", receivePort=" + receivePort + + ", receivePortForAudio=" + receivePortForAudio + + ", receivePortForVideo=" + receivePortForVideo + ", callId='" + callId + '\'' + ", stream='" + stream + '\'' + + ", pushApp='" + pushApp + '\'' + + ", pushStream='" + pushStream + '\'' + + ", pushSSRC='" + pushSSRC + '\'' + '}'; } } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java index 06dfb006..b182b265 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java @@ -470,7 +470,6 @@ public class DeviceQuery { public void getSnap(HttpServletResponse resp, @PathVariable String deviceId, @PathVariable String channelId, @RequestParam(required = false) String mark) { try { - final InputStream in = Files.newInputStream(new File("snap" + File.separator + deviceId + "_" + channelId + (mark == null? ".jpg": ("_" + mark + ".jpg"))).toPath()); resp.setContentType(MediaType.IMAGE_PNG_VALUE); IOUtils.copy(in, resp.getOutputStream()); diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java index f298d5c4..e67e373d 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java @@ -121,7 +121,7 @@ public class PlayController { // 录像查询以channelId作为deviceId查询 resultHolder.put(key, uuid, result); - playService.play(newMediaServerItem, deviceId, channelId, (code, msg, data) -> { + playService.play(newMediaServerItem, deviceId, channelId, null, (code, msg, data) -> { WVPResult wvpResult = new WVPResult<>(); if (code == InviteErrorCode.SUCCESS.getCode()) { wvpResult.setCode(ErrorCode.SUCCESS.getCode()); diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java index 84ccc0b6..d3225a21 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java @@ -11,11 +11,9 @@ import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; -import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.service.IInviteStreamService; import com.genersoft.iot.vmp.service.IPlayService; import com.genersoft.iot.vmp.service.bean.InviteErrorCode; -import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.StreamContent; @@ -53,15 +51,9 @@ public class PlaybackController { @Autowired private SIPCommander cmder; - @Autowired - private ZLMServerFactory ZLMServerFactory; - @Autowired private IVideoManagerStorage storager; - @Autowired - private IRedisCatchStorage redisCatchStorage; - @Autowired private IInviteStreamService inviteStreamService; diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java index 197e85e7..513e940f 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java @@ -3,21 +3,18 @@ package com.genersoft.iot.vmp.vmanager.rtp; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.DynamicTask; -import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.conf.VersionInfo; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRtpServerTimeoutHookParam; -import com.genersoft.iot.vmp.service.IDeviceChannelService; -import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.service.IMediaServerService; -import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.utils.redis.RedisUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.OtherRtpSendInfo; import io.swagger.v3.oas.annotations.Operation; @@ -28,16 +25,15 @@ import okhttp3.Request; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.ResponseBody; -import org.springframework.web.bind.annotation.RestController; +import org.springframework.util.ObjectUtils; +import org.springframework.web.bind.annotation.*; import java.io.IOException; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.TimeUnit; @SuppressWarnings("rawtypes") @@ -61,37 +57,16 @@ public class RtpController { @Autowired private IMediaServerService mediaServerService; - @Autowired - private VersionInfo versionInfo; - - @Autowired - private SipConfig sipConfig; - @Autowired private UserSetting userSetting; - @Autowired - private IDeviceService deviceService; - - @Autowired - private IDeviceChannelService channelService; - @Autowired private DynamicTask dynamicTask; - @Autowired private RedisTemplate redisTemplate; - @Value("${server.port}") - private int serverPort; - - - @Autowired - private IRedisCatchStorage redisCatchStorage; - - @GetMapping(value = "/receive/open") @ResponseBody @Operation(summary = "开启收流和获取发流信息") @@ -101,7 +76,7 @@ public class RtpController { @Parameter(name = "stream", description = "形成的流的ID", required = true) @Parameter(name = "tcpMode", description = "收流模式, 0为UDP, 1为TCP被动", required = true) @Parameter(name = "callBack", description = "回调地址,如果收流超时会通道回调通知,回调为get请求,参数为callId", required = true) - public OtherRtpSendInfo openRtpServer(Boolean isSend, String ssrc, String callId, String stream, Integer tcpMode, String callBack) { + public OtherRtpSendInfo openRtpServer(Boolean isSend, @RequestParam(required = false)String ssrc, String callId, String stream, Integer tcpMode, String callBack) { logger.info("[第三方服务对接->开启收流和获取发流信息] isSend->{}, ssrc->{}, callId->{}, stream->{}, tcpMode->{}, callBack->{}", isSend, ssrc, callId, stream, tcpMode==0?"UDP":"TCP被动", callBack); @@ -123,12 +98,16 @@ public class RtpController { }catch (NumberFormatException e) { throw new ControllerException(ErrorCode.ERROR100.getCode(),"ssrc格式错误"); } - } - int localPort = zlmServerFactory.createRTPServer(mediaServerItem, stream, ssrcInt, null, false, false, tcpMode); + String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_" + callId + "_" + stream; + int localPortForVideo = zlmServerFactory.createRTPServer(mediaServerItem, stream, ssrcInt, null, false, tcpMode); + int localPortForAudio = zlmServerFactory.createRTPServer(mediaServerItem, stream + "_a" , ssrcInt, null, false, tcpMode); + if (localPortForVideo == 0 || localPortForAudio == 0) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "获取端口失败"); + } // 注册回调如果rtp收流超时则通过回调发送通知 if (callBack != null) { - HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, mediaServerItem.getId()); + HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(stream, String.valueOf(ssrcInt), mediaServerItem.getId()); // 订阅 zlm启动事件, 新的zlm也会从这里进入系统 hookSubscribe.addSubscribe(hookSubscribeForRtpServerTimeout, (mediaServerItemInUse, hookParam)->{ @@ -142,22 +121,33 @@ public class RtpController { try { client.newCall(request).execute(); } catch (IOException e) { - logger.error("[开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调失败", callId, e); + logger.error("[第三方服务对接->开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调失败", callId, e); } + hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout); } }); } - String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + callId; + String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + "_" + callId; OtherRtpSendInfo otherRtpSendInfo = new OtherRtpSendInfo(); otherRtpSendInfo.setReceiveIp(mediaServerItem.getSdpIp()); - otherRtpSendInfo.setReceivePort(localPort); + otherRtpSendInfo.setReceivePortForVideo(localPortForVideo); + otherRtpSendInfo.setReceivePortForAudio(localPortForAudio); otherRtpSendInfo.setCallId(callId); otherRtpSendInfo.setStream(stream); + + // 将信息写入redis中,以备后用 + redisTemplate.opsForValue().set(receiveKey, otherRtpSendInfo); if (isSend != null && isSend) { - int port = sendRtpPortManager.getNextPort(mediaServerItem.getId()); - otherRtpSendInfo.setIp(mediaServerItem.getSdpIp()); - otherRtpSendInfo.setPort(port); - logger.info("[开启收流和获取发流信息] 结果,callId->{}, {}", callId, otherRtpSendInfo); + // 预创建发流信息 + int portForVideo = sendRtpPortManager.getNextPort(mediaServerItem.getId()); + int portForAudio = sendRtpPortManager.getNextPort(mediaServerItem.getId()); + + otherRtpSendInfo.setSendLocalIp(mediaServerItem.getSdpIp()); + otherRtpSendInfo.setSendLocalPortForVideo(portForVideo); + otherRtpSendInfo.setSendLocalPortForAudio(portForAudio); + // 将信息写入redis中,以备后用 + redisTemplate.opsForValue().set(key, otherRtpSendInfo, 300, TimeUnit.SECONDS); + logger.info("[第三方服务对接->开启收流和获取发流信息] 结果,callId->{}, {}", callId, otherRtpSendInfo); } // 将信息写入redis中,以备后用 redisTemplate.opsForValue().set(key, otherRtpSendInfo, 300, TimeUnit.SECONDS); @@ -172,25 +162,69 @@ public class RtpController { logger.info("[第三方服务对接->关闭收流] stream->{}", stream); MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer(); zlmServerFactory.closeRtpServer(mediaServerItem,stream); + zlmServerFactory.closeRtpServer(mediaServerItem,stream + "_a"); + String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_*_" + stream; + List scan = RedisUtil.scan(redisTemplate, receiveKey); + if (scan.size() > 0) { + for (Object key : scan) { + // 将信息写入redis中,以备后用 + redisTemplate.delete(key); + } + } } @GetMapping(value = "/send/start") @ResponseBody @Operation(summary = "发送流") @Parameter(name = "ssrc", description = "发送流的SSRC", required = true) - @Parameter(name = "ip", description = "目标IP", required = true) - @Parameter(name = "port", description = "目标端口", required = true) + @Parameter(name = "dstIpForAudio", description = "目标音频收流IP", required = false) + @Parameter(name = "dstIpForVideo", description = "目标视频收流IP", required = false) + @Parameter(name = "dstPortForAudio", description = "目标音频收流端口", required = false) + @Parameter(name = "dstPortForVideo", description = "目标视频收流端口", required = false) @Parameter(name = "app", description = "待发送应用名", required = true) @Parameter(name = "stream", description = "待发送流Id", required = true) @Parameter(name = "callId", description = "整个过程的唯一标识,不传则使用随机端口发流", required = true) - @Parameter(name = "onlyAudio", description = "是否只有音频", required = true) @Parameter(name = "isUdp", description = "是否为UDP", required = true) - @Parameter(name = "streamType", description = "流类型,1为es流,2为ps流, 默认es流", required = false) - public void sendRTP(String ssrc, String ip, Integer port, String app, String stream, String callId, Boolean onlyAudio, Boolean isUdp, Integer streamType) { - logger.info("[第三方服务对接->发送流] ssrc->{}, ip->{}, port->{}, app->{}, stream->{}, callId->{}, onlyAudio->{}, streamType->{}", - ssrc, ip, port, app, stream, callId, onlyAudio, streamType == 1? "ES":"PS"); + @Parameter(name = "ptForAudio", description = "rtp的音频pt", required = false) + @Parameter(name = "ptForVideo", description = "rtp的视频pt", required = false) + public void sendRTP(String ssrc, + @RequestParam(required = false)String dstIpForAudio, + @RequestParam(required = false)String dstIpForVideo, + @RequestParam(required = false)Integer dstPortForAudio, + @RequestParam(required = false)Integer dstPortForVideo, + String app, + String stream, + String callId, + Boolean isUdp, + @RequestParam(required = false)Integer ptForAudio, + @RequestParam(required = false)Integer ptForVideo + ) { + logger.info("[第三方服务对接->发送流] " + + "ssrc->{}, \r\n" + + "dstIpForAudio->{}, \n" + + "dstIpForAudio->{}, \n" + + "dstPortForAudio->{}, \n" + + "dstPortForVideo->{}, \n" + + "app->{}, \n" + + "stream->{}, \n" + + "callId->{}, \n" + + "ptForAudio->{}, \n" + + "ptForVideo->{}", + ssrc, + dstIpForAudio, + dstIpForVideo, + dstPortForAudio, + dstPortForVideo, + app, + stream, + callId, + ptForAudio, + ptForVideo); + if (!((dstPortForAudio > 0 && !ObjectUtils.isEmpty(dstPortForAudio) || (dstPortForVideo > 0 && !ObjectUtils.isEmpty(dstIpForVideo))))) { + throw new ControllerException(ErrorCode.ERROR400.getCode(), "至少应该存在一组音频或视频发送参数"); + } MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer(); - String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + callId; + String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + "_" + callId; OtherRtpSendInfo sendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key); if (sendInfo == null) { sendInfo = new OtherRtpSendInfo(); @@ -199,40 +233,131 @@ public class RtpController { sendInfo.setPushStream(stream); sendInfo.setPushSSRC(ssrc); - Map param = new HashMap<>(12); - param.put("vhost","__defaultVhost__"); - param.put("app",app); - param.put("stream",stream); - param.put("ssrc", ssrc); + Map paramForAudio; + Map paramForVideo; + if (!ObjectUtils.isEmpty(dstIpForAudio) && dstPortForAudio > 0) { + paramForAudio = new HashMap<>(); + paramForAudio.put("vhost","__defaultVhost__"); + paramForAudio.put("app",app); + paramForAudio.put("stream",stream); + paramForAudio.put("ssrc", ssrc); - param.put("dst_url",ip); - param.put("dst_port", port); - String is_Udp = isUdp ? "1" : "0"; - param.put("is_udp", is_Udp); - param.put("src_port", sendInfo.getPort()); - param.put("use_ps", streamType==2 ? "1" : "0"); - param.put("only_audio", onlyAudio ? "1" : "0"); + paramForAudio.put("dst_url", dstIpForAudio); + paramForAudio.put("dst_port", dstPortForAudio); + String is_Udp = isUdp ? "1" : "0"; + paramForAudio.put("is_udp", is_Udp); + paramForAudio.put("src_port", sendInfo.getSendLocalPortForAudio()); + paramForAudio.put("use_ps", "0"); + paramForAudio.put("only_audio", "1"); + if (ptForAudio != null) { + paramForAudio.put("pt", ptForAudio); + } - JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, param); - if (jsonObject.getInteger("code") == 0) { - logger.info("[第三方服务对接->发送流] 发流成功,callId->{}", callId); - redisTemplate.opsForValue().set(key, sendInfo); + } else { + paramForAudio = null; + } + if (!ObjectUtils.isEmpty(dstIpForVideo) && dstPortForVideo > 0) { + paramForVideo = new HashMap<>(); + paramForVideo.put("vhost","__defaultVhost__"); + paramForVideo.put("app",app); + paramForVideo.put("stream",stream); + paramForVideo.put("ssrc", ssrc); + + paramForVideo.put("dst_url", dstIpForVideo); + paramForVideo.put("dst_port", dstPortForVideo); + String is_Udp = isUdp ? "1" : "0"; + paramForVideo.put("is_udp", is_Udp); + paramForVideo.put("src_port", sendInfo.getSendLocalPortForVideo()); + paramForVideo.put("use_ps", "0"); + paramForVideo.put("only_audio", "0"); + if (ptForVideo != null) { + paramForVideo.put("pt", ptForVideo); + } + + } else { + paramForVideo = null; + } + + Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, app, stream); + if (streamReady) { + if (paramForVideo != null) { + JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, paramForVideo); + if (jsonObject.getInteger("code") == 0) { + logger.info("[第三方服务对接->发送流] 视频流发流成功,callId->{},param->{}", callId, paramForVideo); + redisTemplate.opsForValue().set(key, sendInfo); + }else { + redisTemplate.delete(key); + logger.info("[第三方服务对接->发送流] 视频流发流失败,callId->{}, {}", callId, jsonObject.getString("msg")); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "[视频流发流失败] " + jsonObject.getString("msg")); + } + } + if(paramForAudio != null) { + JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, paramForAudio); + if (jsonObject.getInteger("code") == 0) { + logger.info("[第三方服务对接->发送流] 音频流发流成功,callId->{},param->{}", callId, paramForAudio); + redisTemplate.opsForValue().set(key, sendInfo); + }else { + redisTemplate.delete(key); + logger.info("[第三方服务对接->发送流] 音频流发流失败,callId->{}, {}", callId, jsonObject.getString("msg")); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "[音频流发流失败] " + jsonObject.getString("msg")); + } + } }else { - redisTemplate.delete(key); - logger.info("[第三方服务对接->发送流] 发流失败,callId->{}, {}", callId, jsonObject.getString("msg")); - throw new ControllerException(ErrorCode.ERROR100.getCode(), "[发流失败] " + jsonObject.getString("msg")); + logger.info("[第三方服务对接->发送流] 流不存在,等待流上线,callId->{}", callId); + String uuid = UUID.randomUUID().toString(); + HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(app, stream, true, "rtsp", mediaServerItem.getId()); + dynamicTask.startDelay(uuid, ()->{ + logger.info("[第三方服务对接->发送流] 等待流上线超时 callId->{}", callId); + redisTemplate.delete(key); + hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); + }, 10000); + + // 订阅 zlm启动事件, 新的zlm也会从这里进入系统 + OtherRtpSendInfo finalSendInfo = sendInfo; + hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); + hookSubscribe.addSubscribe(hookSubscribeForStreamChange, + (mediaServerItemInUse, response)->{ + dynamicTask.stop(uuid); + logger.info("[第三方服务对接->发送流] 流上线,开始发流 callId->{}", callId); + try { + Thread.sleep(400); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + if (paramForVideo != null) { + JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, paramForVideo); + if (jsonObject.getInteger("code") == 0) { + logger.info("[第三方服务对接->发送流] 视频流发流成功,callId->{},param->{}", callId, paramForVideo); + redisTemplate.opsForValue().set(key, finalSendInfo); + }else { + redisTemplate.delete(key); + logger.info("[第三方服务对接->发送流] 视频流发流失败,callId->{}, {}", callId, jsonObject.getString("msg")); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "[视频流发流失败] " + jsonObject.getString("msg")); + } + } + if(paramForAudio != null) { + JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, paramForAudio); + if (jsonObject.getInteger("code") == 0) { + logger.info("[第三方服务对接->发送流] 音频流发流成功,callId->{},param->{}", callId, paramForAudio); + redisTemplate.opsForValue().set(key, finalSendInfo); + }else { + redisTemplate.delete(key); + logger.info("[第三方服务对接->发送流] 音频流发流失败,callId->{}, {}", callId, jsonObject.getString("msg")); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "[音频流发流失败] " + jsonObject.getString("msg")); + } + } + hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); + }); } } - - @GetMapping(value = "/send/stop") @ResponseBody @Operation(summary = "关闭发送流") @Parameter(name = "callId", description = "整个过程的唯一标识,不传则使用随机端口发流", required = true) public void closeSendRTP(String callId) { logger.info("[第三方服务对接->关闭发送流] callId->{}", callId); - String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + callId; + String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + "_" + callId; OtherRtpSendInfo sendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key); if (sendInfo == null){ throw new ControllerException(ErrorCode.ERROR100.getCode(), "未开启发流"); @@ -250,6 +375,7 @@ public class RtpController { }else { logger.info("[第三方服务对接->关闭发送流] 成功 callId->{}", callId); } + redisTemplate.delete(key); } } diff --git a/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiDeviceController.java b/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiDeviceController.java index f8911643..7b59f7f7 100644 --- a/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiDeviceController.java +++ b/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiDeviceController.java @@ -114,17 +114,8 @@ public class ApiDeviceController { @RequestParam(required = false)String q, @RequestParam(required = false)Boolean online ){ -// if (logger.isDebugEnabled()) { -// logger.debug("查询所有视频设备API调用"); -// } + JSONObject result = new JSONObject(); - // 查询设备是否存在 -// Device device = storager.queryVideoDevice(serial); -// if (device == null) { -// result.put("ChannelCount", 0); -// result.put("ChannelList", "[]"); -// return result; -// } List deviceChannels; List channelIds = null; if (!ObjectUtils.isEmpty(code)) { diff --git a/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java b/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java index 556adf62..18fdfa93 100644 --- a/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java +++ b/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java @@ -122,7 +122,7 @@ public class ApiStreamController { MediaServerItem newMediaServerItem = playService.getNewMediaServerItem(device); - playService.play(newMediaServerItem, serial, code, (errorCode, msg, data) -> { + playService.play(newMediaServerItem, serial, code, null, (errorCode, msg, data) -> { if (errorCode == InviteErrorCode.SUCCESS.getCode()) { InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, serial, code); if (inviteInfo != null && inviteInfo.getStreamInfo() != null) { diff --git a/src/main/resources/all-application.yml b/src/main/resources/all-application.yml index bdedfc78..df148bbd 100644 --- a/src/main/resources/all-application.yml +++ b/src/main/resources/all-application.yml @@ -145,6 +145,8 @@ media: enable: true # [可选] 在此范围内选择端口用于媒体流传输, 必须提前在zlm上配置该属性,不然自动配置此属性可能不成功 port-range: 30000,30500 # 端口范围 + # [可选] 国标级联在此范围内选择端口发送媒体流 + send-port-range: 30000,30500 # 端口范围 # 录像辅助服务, 部署此服务可以实现zlm录像的管理与下载, 0 表示不使用 record-assist-port: 0 diff --git a/src/main/resources/wvpssl.jks b/src/main/resources/wvpssl.jks deleted file mode 100644 index 92b98b77..00000000 Binary files a/src/main/resources/wvpssl.jks and /dev/null differ