From 724963324aaa63feca2c6aed13785dc8485bc02c Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Mon, 26 Jun 2023 19:06:14 +0800 Subject: [PATCH 01/10] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E4=B8=8A=E7=BA=A7?= =?UTF-8?q?=E7=82=B9=E6=92=AD=E6=97=B6=E5=A6=82=E6=9E=9C=E6=8E=A8=E6=B5=81?= =?UTF-8?q?=E4=BF=A1=E6=81=AF=E4=B8=ADmediaServerID=E9=94=99=E8=AF=AF?= =?UTF-8?q?=E7=9A=84=E6=83=85=E5=86=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/genersoft/iot/vmp/gb28181/SipLayer.java | 10 +++++----- .../cmd/impl/SIPCommanderFroPlatform.java | 2 ++ .../request/impl/InviteRequestProcessor.java | 15 +++++---------- .../iot/vmp/media/zlm/ZLMHttpHookListener.java | 2 +- .../iot/vmp/service/impl/PlatformServiceImpl.java | 1 + 5 files changed, 14 insertions(+), 16 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java b/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java index 78238462..5a8db178 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java @@ -64,7 +64,7 @@ public class SipLayer implements CommandLineRunner { try { sipStack = (SipStackImpl)SipFactory.getInstance().createSipStack(DefaultProperties.getProperties(monitorIp, userSetting.getSipLog())); } catch (PeerUnavailableException e) { - logger.error("[Sip Server] SIP服务启动失败, 监听地址{}失败,请检查ip是否正确", monitorIp); + logger.error("[SIP SERVER] SIP服务启动失败, 监听地址{}失败,请检查ip是否正确", monitorIp); return; } @@ -76,12 +76,12 @@ public class SipLayer implements CommandLineRunner { tcpSipProvider.addSipListener(sipProcessorObserver); tcpSipProviderMap.put(monitorIp, tcpSipProvider); - logger.info("[Sip Server] tcp://{}:{} 启动成功", monitorIp, port); + logger.info("[SIP SERVER] tcp://{}:{} 启动成功", monitorIp, port); } catch (TransportNotSupportedException | TooManyListenersException | ObjectInUseException | InvalidArgumentException e) { - logger.error("[Sip Server] tcp://{}:{} SIP服务启动失败,请检查端口是否被占用或者ip是否正确" + logger.error("[SIP SERVER] tcp://{}:{} SIP服务启动失败,请检查端口是否被占用或者ip是否正确" , monitorIp, port); } @@ -93,12 +93,12 @@ public class SipLayer implements CommandLineRunner { udpSipProviderMap.put(monitorIp, udpSipProvider); - logger.info("[Sip Server] udp://{}:{} 启动成功", monitorIp, port); + logger.info("[SIP SERVER] udp://{}:{} 启动成功", monitorIp, port); } catch (TransportNotSupportedException | TooManyListenersException | ObjectInUseException | InvalidArgumentException e) { - logger.error("[Sip Server] udp://{}:{} SIP服务启动失败,请检查端口是否被占用或者ip是否正确" + logger.error("[SIP SERVER] udp://{}:{} SIP服务启动失败,请检查端口是否被占用或者ip是否正确" , monitorIp, port); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index eb5f18ed..d223b8f3 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -214,6 +214,8 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { continue; }else { if (channel.getChannelId().length() != 20) { + logger.warn("[编号长度异常] {} 长度错误,请使用20位长度的国标编号,当前长度:{}", channel.getChannelId(), channel.getChannelId().length()); + catalogXml.append("\r\n"); continue; } switch (Integer.parseInt(channel.getChannelId().substring(10, 13))){ diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 9a4be86d..3d83d226 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -183,16 +183,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements return; } else { streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream()); - if (streamPushItem == null || streamPushItem.getServerId().equals(userSetting.getServerId())) { - logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId); - try { - responseAck(request, Response.GONE); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] invite GONE: {}", e.getMessage()); - } - return; - }else { - // TODO 可能漏回复消息 + if (streamPushItem != null) { + mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId()); + } + if (mediaServerItem == null) { + mediaServerItem = mediaServerService.getDefaultMediaServer(); } } } else { diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 4e268b9f..9ffb5240 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -533,7 +533,7 @@ public class ZLMHttpHookListener { } return ret; } - // 推流具有主动性,暂时不做处理 + // TODO 推流具有主动性,暂时不做处理 // StreamPushItem streamPushItem = streamPushService.getPush(app, streamId); // if (streamPushItem != null) { // // TODO 发送停止 diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java index 81943095..f1dcda1c 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java @@ -101,6 +101,7 @@ public class PlatformServiceImpl implements IPlatformService { // 行政区划默认去编号的前6位 parentPlatform.setAdministrativeDivision(parentPlatform.getServerGBId().substring(0,6)); } + parentPlatform.setTreeType("CivilCode"); parentPlatform.setCatalogId(parentPlatform.getDeviceGBId()); int result = platformMapper.addParentPlatform(parentPlatform); // 添加缓存 From c7d15150237c946fbb4f5547dab0018a5f573d88 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Tue, 27 Jun 2023 15:34:38 +0800 Subject: [PATCH 02/10] =?UTF-8?q?=E5=A2=9E=E5=8A=A0redis=E9=80=9A=E9=81=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/common/VideoManagerConstants.java | 15 ++++ .../service/bean/MessageForPushChannel.java | 17 ++++- .../vmanager/gb28181/ptz/PtzController.java | 3 + .../iot/vmp/vmanager/rtp/RtpController.java | 76 +++++++++++++++++++ 4 files changed, 108 insertions(+), 3 deletions(-) create mode 100644 src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java index ccfe77ec..8105c0be 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java +++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java @@ -101,6 +101,21 @@ public class VideoManagerConstants { */ public static final String VM_MSG_STREAM_PUSH_REQUESTED = "VM_MSG_STREAM_PUSH_REQUESTED"; + /** + * redis 消息通知上级平台开始观看流 + */ + public static final String VM_MSG_STREAM_START_PLAY_NOTIFY = "VM_MSG_STREAM_START_PLAY_NOTIFY"; + + /** + * redis 消息通知上级平台停止观看流 + */ + public static final String VM_MSG_STREAM_STOP_PLAY_NOTIFY = "VM_MSG_STREAM_STOP_PLAY_NOTIFY"; + + /** + * redis 消息接收关闭一个推流 + */ + public static final String VM_MSG_STREAM_PUSH_CLOSE_REQUESTED = "VM_MSG_STREAM_PUSH_CLOSE_REQUESTED"; + /** * redis 消息通知平台通知设备推流结果 diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java b/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java index 8491fc5e..886f8c29 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java @@ -1,7 +1,5 @@ package com.genersoft.iot.vmp.service.bean; -import java.util.stream.Stream; - /** * 当上级平台 * @author lin @@ -29,10 +27,15 @@ public class MessageForPushChannel { private String gbId; /** - * 请求的平台ID + * 请求的平台国标编号 */ private String platFormId; + /** + * 请求的平台自增ID + */ + private String platFormIndex; + /** * 请求平台名称 */ @@ -128,4 +131,12 @@ public class MessageForPushChannel { public void setMediaServerId(String mediaServerId) { this.mediaServerId = mediaServerId; } + + public String getPlatFormIndex() { + return platFormIndex; + } + + public void setPlatFormIndex(String platFormIndex) { + this.platFormIndex = platFormIndex; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/ptz/PtzController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/ptz/PtzController.java index e9ea457d..283cfe33 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/ptz/PtzController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/ptz/PtzController.java @@ -97,6 +97,9 @@ public class PtzController { cmdCode = 32; break; case "stop": + horizonSpeed = 0; + verticalSpeed = 0; + zoomSpeed = 0; break; default: break; diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java new file mode 100644 index 00000000..304ccf95 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java @@ -0,0 +1,76 @@ +package com.genersoft.iot.vmp.vmanager.rtp; + +import com.genersoft.iot.vmp.conf.SipConfig; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.conf.VersionInfo; +import com.genersoft.iot.vmp.conf.exception.ControllerException; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.service.*; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.tags.Tag; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.web.bind.annotation.*; + +import java.util.List; + +@SuppressWarnings("rawtypes") +@Tag(name = "第三方服务对接") + +@RestController +@RequestMapping("/api/rtp") +public class RtpController { + + @Autowired + private ZlmHttpHookSubscribe zlmHttpHookSubscribe; + + @Autowired + private IMediaServerService mediaServerService; + + @Autowired + private VersionInfo versionInfo; + + @Autowired + private SipConfig sipConfig; + + @Autowired + private UserSetting userSetting; + + @Autowired + private IDeviceService deviceService; + + @Autowired + private IDeviceChannelService channelService; + + @Autowired + private IStreamPushService pushService; + + + @Autowired + private IStreamProxyService proxyService; + + + @Value("${server.port}") + private int serverPort; + + + @Autowired + private IRedisCatchStorage redisCatchStorage; + + + @GetMapping(value = "/send/ready") + @ResponseBody + @Operation(summary = "为发送视频流获取信息") + public List getMediaServerList(Boolean onlySender, ) { + MediaServerItem mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null); + if (mediaServerItem == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(),"没有可用的MediaServer"); + } + mediaServerService.openRTPServer() + return mediaServerService.getAll(); + } + +} From 6fcff0567e3a324e82ff35e276b0667ecb3e0907 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Wed, 28 Jun 2023 09:15:07 +0800 Subject: [PATCH 03/10] =?UTF-8?q?doc=E5=8E=BB=E9=99=A4cdn=E4=BE=9D?= =?UTF-8?q?=E8=B5=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- doc/_sidebar.md | 1 + doc/index.html | 12 +++--- doc/lib/css/vue.css | 1 + doc/lib/js/docsify-copy-code.min.js | 9 +++++ doc/lib/js/docsify-plantuml.min.js | 1 + doc/lib/js/docsify@4.js | 1 + doc/lib/js/search.min.js | 1 + doc/lib/js/zoom-image.min.js | 1 + .../iot/vmp/vmanager/rtp/RtpController.java | 37 +++++++++++++++---- 9 files changed, 50 insertions(+), 14 deletions(-) create mode 100644 doc/lib/css/vue.css create mode 100644 doc/lib/js/docsify-copy-code.min.js create mode 100644 doc/lib/js/docsify-plantuml.min.js create mode 100644 doc/lib/js/docsify@4.js create mode 100644 doc/lib/js/search.min.js create mode 100644 doc/lib/js/zoom-image.min.js diff --git a/doc/_sidebar.md b/doc/_sidebar.md index 3b10bae8..e49ff915 100644 --- a/doc/_sidebar.md +++ b/doc/_sidebar.md @@ -1,6 +1,7 @@ * **编译与部署** + * [测试](_content/introduction/test.md) * [编译](_content/introduction/compile.md) * [配置](_content/introduction/config.md) * [部署](_content/introduction/deployment.md) diff --git a/doc/index.html b/doc/index.html index 1048d929..580603da 100644 --- a/doc/index.html +++ b/doc/index.html @@ -7,7 +7,7 @@ - + ").firstElementChild),a=e.themeColor,window.CSS&&window.CSS.supports&&window.CSS.supports("(--v:red)")||(e=k("style:not(.inserted),link"),[].forEach.call(e,function(e){"STYLE"===e.nodeName?Q(e,a):"LINK"===e.nodeName&&(e=e.getAttribute("href"),/\.css$/.test(e)&&X(e).then(function(e){e=w("style",e);_.appendChild(e),Q(e,a)}))}))),this._updateRender(),S(h,"ready")},n}(function(e){function n(){e.apply(this,arguments)}return e&&(n.__proto__=e),((n.prototype=Object.create(e&&e.prototype)).constructor=n).prototype.routes=function(){return this.config.routes||{}},n.prototype.matchVirtualRoute=function(t){var a=this.routes(),r=Object.keys(a),c=function(){return null};function u(){var e=r.shift();if(!e)return c(null);var n=A(o=(i="^",0===(o=e).indexOf(i)?o:"^"+o),"$")?o:o+"$",i=t.match(n);if(!i)return u();var o=a[e];if("string"==typeof o)return c(o);if("function"!=typeof o)return u();n=o,e=Xn(),o=e[0];return(0,e[1])(function(e){return"string"==typeof e?c(e):!1===e?c(null):u()}),n.length<=2?o(n(t,i)):n(t,i,o)}return{then:function(e){c=e,u()}}},n}(function(i){function e(){for(var e=[],n=arguments.length;n--;)e[n]=arguments[n];i.apply(this,e),this.route={}}return i&&(e.__proto__=i),((e.prototype=Object.create(i&&i.prototype)).constructor=e).prototype.updateRender=function(){this.router.normalize(),this.route=this.router.parse(),h.setAttribute("data-page",this.route.file)},e.prototype.initRouter=function(){var n=this,e=this.config,e=new("history"===(e.routerMode||"hash")&&t?D:H)(e);this.router=e,this.updateRender(),U=this.route,e.onchange(function(e){n.updateRender(),n._updateRender(),U.path!==n.route.path?(n.$fetch(d,n.$resetEvents.bind(n,e.source)),U=n.route):n.$resetEvents(e.source)})},e}(function(e){function n(){e.apply(this,arguments)}return e&&(n.__proto__=e),((n.prototype=Object.create(e&&e.prototype)).constructor=n).prototype.initLifecycle=function(){var i=this;this._hooks={},this._lifecycle={},["init","mounted","beforeEach","afterEach","doneEach","ready"].forEach(function(e){var n=i._hooks[e]=[];i._lifecycle[e]=function(e){return n.push(e)}})},n.prototype.callHook=function(e,t,a){void 0===a&&(a=d);var r=this._hooks[e],c=this.config.catchPluginErrors,u=function(n){var e=r[n];if(n>=r.length)a(t);else if("function"==typeof e){var i="Docsify plugin error";if(2===e.length)try{e(t,function(e){t=e,u(n+1)})}catch(e){if(!c)throw e;console.error(i,e),u(n+1)}else try{var o=e(t);t=void 0===o?t:o,u(n+1)}catch(e){if(!c)throw e;console.error(i,e),u(n+1)}}else u(n+1)};u(0)},n}(we))))))));function Kn(e,n,i){return Qn&&Qn.abort&&Qn.abort(),Qn=X(e,!0,i)}window.Docsify={util:Me,dom:n,get:X,slugify:Cn,version:"4.13.0"},window.DocsifyCompiler=In,window.marked=Sn,window.Prism=Pn,e(function(e){return new Jn})}(); diff --git a/doc/lib/js/search.min.js b/doc/lib/js/search.min.js new file mode 100644 index 00000000..be7a9f55 --- /dev/null +++ b/doc/lib/js/search.min.js @@ -0,0 +1 @@ +!function(){function u(e){return e.replace(//,"").replace(/{docsify-ignore}/,"").replace(//,"").replace(/{docsify-ignore-all}/,"").trim()}var f={},m={EXPIRE_KEY:"docsify.search.expires",INDEX_KEY:"docsify.search.index"};function g(e){var n={"&":"&","<":"<",">":">",'"':""","'":"'"};return String(e).replace(/[&<>"']/g,function(e){return n[e]})}function y(e){return e.text||"table"!==e.type||(e.cells.unshift(e.header),e.text=e.cells.map(function(e){return e.join(" | ")}).join(" |\n ")),e.text}function v(e){return e.text||"list"!==e.type||(e.text=e.raw),e.text}function b(o,e,s,c){void 0===e&&(e="");var d,e=window.marked.lexer(e),l=window.Docsify.slugify,p={},h="";return e.forEach(function(e,n){var t,a,i,r;"heading"===e.type&&e.depth<=c?(t=(a=(i=e.text,r={},{str:i=(i=void 0===i?"":i)&&i.replace(/^('|")/,"").replace(/('|")$/,"").replace(/(?:^|\s):([\w-]+:?)=?([\w-%]+)?/g,function(e,n,t){return-1===n.indexOf(":")?(r[n]=t&&t.replace(/"/g,"")||!0,""):e}).trim(),config:r})).str,i=a.config,a=u(e.text),d=i.id?s.toURL(o,{id:l(i.id)}):s.toURL(o,{id:l(g(a))}),t&&(h=u(t)),p[d]={slug:d,title:h,body:""}):(0===n&&(d=s.toURL(o),p[d]={slug:d,title:"/"!==o?o.slice(1):"Home Page",body:e.text||""}),d&&(p[d]?p[d].body?(e.text=y(e),e.text=v(e),p[d].body+="\n"+(e.text||"")):(e.text=y(e),e.text=v(e),p[d].body=p[d].body?p[d].body+e.text:e.text):p[d]={slug:d,title:"",body:""}))}),l.clear(),p}function p(e){return e&&e.normalize?e.normalize("NFD").replace(/[\u0300-\u036f]/g,""):e}function o(e){var n=[],t=[];Object.keys(f).forEach(function(n){t=t.concat(Object.keys(f[n]).map(function(e){return f[n][e]}))});var a=(e=e.trim()).split(/[\s\-,\\/]+/);1!==a.length&&(a=[].concat(e,a));for(var i=0;il.length&&(t=l.length),a=c&&"..."+c.substring(n,t).replace(a,function(e){return''+e+""})+"...",o+=a)}),0\n\n

'+e.title+"

\n

"+e.content+"

\n
\n"}),t.classList.add("show"),a.classList.add("show"),t.innerHTML=r||'

'+c+"

",s.hideOtherSidebarContent&&(i&&i.classList.add("hide"),n&&n.classList.add("hide"))}function l(e){s=e}function h(e,n){var t,a,i=n.router.parse().query.s;l(e),Docsify.dom.style("\n.sidebar {\n padding-top: 0;\n}\n\n.search {\n margin-bottom: 20px;\n padding: 6px;\n border-bottom: 1px solid #eee;\n}\n\n.search .input-wrap {\n display: flex;\n align-items: center;\n}\n\n.search .results-panel {\n display: none;\n}\n\n.search .results-panel.show {\n display: block;\n}\n\n.search input {\n outline: none;\n border: none;\n width: 100%;\n padding: 0.6em 7px;\n font-size: inherit;\n border: 1px solid transparent;\n}\n\n.search input:focus {\n box-shadow: 0 0 5px var(--theme-color, #42b983);\n border: 1px solid var(--theme-color, #42b983);\n}\n\n.search input::-webkit-search-decoration,\n.search input::-webkit-search-cancel-button,\n.search input {\n -webkit-appearance: none;\n -moz-appearance: none;\n appearance: none;\n}\n\n.search input::-ms-clear {\n display: none;\n height: 0;\n width: 0;\n}\n\n.search .clear-button {\n cursor: pointer;\n width: 36px;\n text-align: right;\n display: none;\n}\n\n.search .clear-button.show {\n display: block;\n}\n\n.search .clear-button svg {\n transform: scale(.5);\n}\n\n.search h2 {\n font-size: 17px;\n margin: 10px 0;\n}\n\n.search a {\n text-decoration: none;\n color: inherit;\n}\n\n.search .matching-post {\n border-bottom: 1px solid #eee;\n}\n\n.search .matching-post:last-child {\n border-bottom: 0;\n}\n\n.search p {\n font-size: 14px;\n overflow: hidden;\n text-overflow: ellipsis;\n display: -webkit-box;\n -webkit-line-clamp: 2;\n -webkit-box-orient: vertical;\n}\n\n.search p.empty {\n text-align: center;\n}\n\n.app-name.hide, .sidebar-nav.hide {\n display: none;\n}"),function(e){void 0===e&&(e="");var n=Docsify.dom.create("div",'
\n \n
\n \n \n \n \n \n
\n
\n
\n '),e=Docsify.dom.find("aside");Docsify.dom.toggleClass(n,"search"),Docsify.dom.before(e,n)}(i),n=Docsify.dom.find("div.search"),a=Docsify.dom.find(n,"input"),e=Docsify.dom.find(n,".input-wrap"),Docsify.dom.on(n,"click",function(e){return-1===["A","H2","P","EM"].indexOf(e.target.tagName)&&e.stopPropagation()}),Docsify.dom.on(a,"input",function(n){clearTimeout(t),t=setTimeout(function(e){return d(n.target.value.trim())},100)}),Docsify.dom.on(e,"click",function(e){"INPUT"!==e.target.tagName&&(a.value="",d())}),i&&setTimeout(function(e){return d(i)},500)}function x(e,n){var t,a,i,r,o;l(e),t=e.placeholder,a=n.route.path,(r=Docsify.dom.getNode('.search input[type="search"]'))&&("string"==typeof t?r.placeholder=t:(i=Object.keys(t).filter(function(e){return-1u.scrollOffset&&setTimeout(a,150))}),window.addEventListener("resize",a);var f={open:i,close:a,toggle:o,update:function(){var e=0 getMediaServerList(Boolean onlySender, ) { + @Operation(summary = "开启收流和获取发流信息") + @Parameter(name = "isSend", description = "是否发送,false时同时只开启收流", required = true) + @Parameter(name = "callId", description = "整个过程的唯一标识", required = true) + @Parameter(name = "ssrc", description = "来源流的SSRC", required = false) + @Parameter(name = "hasAudio", description = "是否", required = false) + @Parameter(name = "stream", description = "形成的流的ID", required = true) + @Parameter(name = "tcpMode", description = "收流模式, 0为UDP, 1为TCP被动", required = true) + public void openRtpServer(Boolean isSend, String ssrc, String callId, Boolean hasAudio, String stream, Integer tcpMode) { MediaServerItem mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null); if (mediaServerItem == null) { throw new ControllerException(ErrorCode.ERROR100.getCode(),"没有可用的MediaServer"); } - mediaServerService.openRTPServer() - return mediaServerService.getAll(); + } + + @GetMapping(value = "/sendRTP") + @ResponseBody + @Operation(summary = "发送流") + + @Parameter(name = "ssrc", description = "发送流的SSRC", required = true) + @Parameter(name = "ip", description = "目标IP", required = true) + @Parameter(name = "port", description = "目标端口", required = true) + @Parameter(name = "app", description = "待发送应用名", required = true) + @Parameter(name = "stream", description = "待发送流Id", required = true) + @Parameter(name = "callId", description = "整个过程的唯一标识", required = true) + @Parameter(name = "onlyAudio", description = "是否只有音频", required = true) + public void sendRTP(String ssrc, String ip, Integer port, String app, String stream, String callId, Boolean onlyAudio) { + } } From ed035b74d95f4d18d110626c8335f4b189125db6 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Wed, 28 Jun 2023 10:47:33 +0800 Subject: [PATCH 04/10] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E5=85=B3=E9=97=AD?= =?UTF-8?q?=E6=94=B6=E6=B5=81=E5=8F=91=E6=B5=81=E7=9A=84=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/vmanager/rtp/RtpController.java | 40 ++++++++++++++----- 1 file changed, 30 insertions(+), 10 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java index 36ef8aca..c8c1625c 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java @@ -4,6 +4,7 @@ import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.VersionInfo; import com.genersoft.iot.vmp.conf.exception.ControllerException; +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.*; @@ -63,34 +64,53 @@ public class RtpController { private IRedisCatchStorage redisCatchStorage; - @GetMapping(value = "/openRtpServer") + @GetMapping(value = "/receive/open") @ResponseBody @Operation(summary = "开启收流和获取发流信息") - @Parameter(name = "isSend", description = "是否发送,false时同时只开启收流", required = true) - @Parameter(name = "callId", description = "整个过程的唯一标识", required = true) - @Parameter(name = "ssrc", description = "来源流的SSRC", required = false) - @Parameter(name = "hasAudio", description = "是否", required = false) + @Parameter(name = "isSend", description = "是否发送,false时只开启收流, true同时返回推流信息", required = true) + @Parameter(name = "callId", description = "整个过程的唯一标识,为了与后续接口关联", required = true) + @Parameter(name = "ssrc", description = "来源流的SSRC,不传则不校验来源ssrc", required = false) @Parameter(name = "stream", description = "形成的流的ID", required = true) @Parameter(name = "tcpMode", description = "收流模式, 0为UDP, 1为TCP被动", required = true) - public void openRtpServer(Boolean isSend, String ssrc, String callId, Boolean hasAudio, String stream, Integer tcpMode) { + @Parameter(name = "callBack", description = "回调地址,如果收流超时会通道回调通知,回调为get请求,参数为callId", required = true) + public SendRtpItem openRtpServer(Boolean isSend, String ssrc, String callId, String stream, Integer tcpMode, String callBack) { MediaServerItem mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null); if (mediaServerItem == null) { throw new ControllerException(ErrorCode.ERROR100.getCode(),"没有可用的MediaServer"); } + return null; } - @GetMapping(value = "/sendRTP") + @GetMapping(value = "/receive/close") + @ResponseBody + @Operation(summary = "关闭收流") + @Parameter(name = "stream", description = "流的ID", required = true) + public void closeRtpServer(String stream) { + + } + + @GetMapping(value = "/send/start") @ResponseBody @Operation(summary = "发送流") - @Parameter(name = "ssrc", description = "发送流的SSRC", required = true) @Parameter(name = "ip", description = "目标IP", required = true) @Parameter(name = "port", description = "目标端口", required = true) @Parameter(name = "app", description = "待发送应用名", required = true) @Parameter(name = "stream", description = "待发送流Id", required = true) - @Parameter(name = "callId", description = "整个过程的唯一标识", required = true) + @Parameter(name = "callId", description = "整个过程的唯一标识,不传则使用随机端口发流", required = true) @Parameter(name = "onlyAudio", description = "是否只有音频", required = true) - public void sendRTP(String ssrc, String ip, Integer port, String app, String stream, String callId, Boolean onlyAudio) { + @Parameter(name = "streamType", description = "流类型,1为es流,2为ps流, 默认es流", required = false) + public void sendRTP(String ssrc, String ip, Integer port, String app, String stream, String callId, Boolean onlyAudio, Integer streamType) { + + } + + + + @GetMapping(value = "/send/stop") + @ResponseBody + @Operation(summary = "关闭发送流") + @Parameter(name = "callId", description = "整个过程的唯一标识,不传则使用随机端口发流", required = true) + public void closeSendRTP(String callId) { } From a77628e8759901abc3219412fc2c4aced940db28 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Wed, 28 Jun 2023 14:50:46 +0800 Subject: [PATCH 05/10] =?UTF-8?q?=E4=BC=98=E5=8C=96=E7=AB=AF=E5=8F=A3?= =?UTF-8?q?=E9=A2=84=E5=8D=A0=E7=94=A8=EF=BC=8C=E9=98=B2=E6=AD=A2=E5=8D=A0?= =?UTF-8?q?=E7=94=A8=E6=97=A0=E6=B3=95=E9=87=8A=E6=94=BE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/gb28181/bean/Gb28181Sdp.java | 46 ++++++++++ .../request/impl/InviteRequestProcessor.java | 76 ++++++++++------- .../iot/vmp/gb28181/utils/SipUtils.java | 84 ++++++++++++++++++- .../vmp/media/zlm/ZLMRTPServerFactory.java | 34 ++++++-- .../redisMsg/RedisGbPlayMsgListener.java | 33 +++++++- 5 files changed, 230 insertions(+), 43 deletions(-) create mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/bean/Gb28181Sdp.java diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Gb28181Sdp.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Gb28181Sdp.java new file mode 100644 index 00000000..3f2094bb --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Gb28181Sdp.java @@ -0,0 +1,46 @@ +package com.genersoft.iot.vmp.gb28181.bean; + +import javax.sdp.SessionDescription; + +/** + * 28181 的SDP解析器 + */ +public class Gb28181Sdp { + private SessionDescription baseSdb; + private String ssrc; + + private String mediaDescription; + + public static Gb28181Sdp getInstance(SessionDescription baseSdb, String ssrc, String mediaDescription) { + Gb28181Sdp gb28181Sdp = new Gb28181Sdp(); + gb28181Sdp.setBaseSdb(baseSdb); + gb28181Sdp.setSsrc(ssrc); + gb28181Sdp.setMediaDescription(mediaDescription); + return gb28181Sdp; + } + + + public SessionDescription getBaseSdb() { + return baseSdb; + } + + public void setBaseSdb(SessionDescription baseSdb) { + this.baseSdb = baseSdb; + } + + public String getSsrc() { + return ssrc; + } + + public void setSsrc(String ssrc) { + this.ssrc = ssrc; + } + + public String getMediaDescription() { + return mediaDescription; + } + + public void setMediaDescription(String mediaDescription) { + this.mediaDescription = mediaDescription; + } +} \ No newline at end of file diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 3d83d226..b7f600f7 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -241,21 +241,8 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements // 解析sdp消息, 使用jainsip 自带的sdp解析方式 String contentString = new String(request.getRawContent()); - // jainSip不支持y=字段, 移除以解析。 - int ssrcIndex = contentString.indexOf("y="); - // 检查是否有y字段 - String ssrcDefault = "0000000000"; - String ssrc; - SessionDescription sdp; - if (ssrcIndex >= 0) { - //ssrc规定长度为10个字节,不取余下长度以避免后续还有“f=”字段 - ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); - String substring = contentString.substring(0, contentString.indexOf("y=")); - sdp = SdpFactory.getInstance().createSessionDescription(substring); - } else { - ssrc = ssrcDefault; - sdp = SdpFactory.getInstance().createSessionDescription(contentString); - } + Gb28181Sdp gb28181Sdp = SipUtils.parseSDP(contentString); + SessionDescription sdp = gb28181Sdp.getBaseSdb(); String sessionName = sdp.getSessionName().getValue(); Long startTime = null; @@ -317,7 +304,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements String username = sdp.getOrigin().getUsername(); String addressStr = sdp.getConnection().getAddress(); - logger.info("[上级点播]用户:{}, 通道:{}, 地址:{}:{}, ssrc:{}", username, channelId, addressStr, port, ssrc); + Device device = null; // 通过 channel 和 gbStream 是否为null 值判断来源是直播流合适国标 if (channel != null) { @@ -341,8 +328,30 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } return; } + + String ssrc; + if (gb28181Sdp.getSsrc() == null) { + // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 + ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); + logger.warn("[上级Invite] {} 平台:{}, 通道:{}, 缺少 ssrc,补充为: {}", sessionName, username, channelId, ssrc); + }else { + ssrc = gb28181Sdp.getSsrc(); + } + String streamTypeStr = null; + if (mediaTransmissionTCP) { + if (tcpActive) { + streamTypeStr = "TCP-ACTIVE"; + }else { + streamTypeStr = "TCP-PASSIVE"; + } + }else { + streamTypeStr = "UDP"; + } + logger.info("[上级Invite] {}, 平台:{}, 通道:{}, 收流地址:{}:{},收流方式:{}, ssrc:{}", sessionName, username, channelId, addressStr, port, streamTypeStr, ssrc); SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, - device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp()); + device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback -> { + return redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null; + }); if (tcpActive != null) { sendRtpItem.setTcpActive(tcpActive); @@ -469,7 +478,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, device.isSsrcCheck(), false, 0, false, device.getStreamModeForParam()); logger.info(JSONObject.toJSONString(ssrcInfo)); sendRtpItem.setStreamId(ssrcInfo.getStream()); - sendRtpItem.setSsrc(ssrc.equals(ssrcDefault) ? ssrcInfo.getSsrc() : ssrc); + sendRtpItem.setSsrc(ssrc); // 写入redis, 超时时回复 redisCatchStorage.updateSendRTPSever(sendRtpItem); @@ -480,12 +489,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements }); } else { // 当前系统作为下级平台使用,当上级平台点播时不携带ssrc时,并且设备在当前系统中已经点播了。这个时候需要重新给生成一个ssrc,不使用默认的"0000000000"。 - if (ssrc.equals(ssrcDefault)) { - ssrc = ssrcFactory.getPlaySsrc(mediaServerItem.getId()); - ssrcFactory.releaseSsrc(mediaServerItem.getId(), ssrc); - sendRtpItem.setSsrc(ssrc); - } - + sendRtpItem.setSsrc(ssrc); sendRtpItem.setStreamId(playTransaction.getStream()); // 写入redis, 超时时回复 redisCatchStorage.updateSendRTPSever(sendRtpItem); @@ -496,11 +500,15 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } } } else if (gbStream != null) { - if(ssrc.equals(ssrcDefault)) - { - ssrc = ssrcFactory.getPlaySsrc(mediaServerItem.getId()); - ssrcFactory.releaseSsrc(mediaServerItem.getId(), ssrc); + + String ssrc; + if (gb28181Sdp.getSsrc() == null) { + // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 + ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); + }else { + ssrc = gb28181Sdp.getSsrc(); } + if("push".equals(gbStream.getStreamType())) { if (streamPushItem != null && streamPushItem.isPushIng()) { // 推流状态 @@ -545,7 +553,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements if (streamReady) { // 自平台内容 SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, - gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp()); + gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback ->{ + return redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null; + }); if (sendRtpItem == null) { logger.warn("服务器端口资源不足"); @@ -584,7 +594,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements if (streamReady) { // 自平台内容 SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, - gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp()); + gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback ->{ + return redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null; + }); if (sendRtpItem == null) { logger.warn("服务器端口资源不足"); @@ -701,7 +713,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements dynamicTask.stop(callIdHeader.getCallId()); if (serverId.equals(userSetting.getServerId())) { SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId, - app, stream, channelId, mediaTransmissionTCP, platform.isRtcp()); + app, stream, channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback -> { + return redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null; + }); if (sendRtpItem == null) { logger.warn("上级点时创建sendRTPItem失败,可能是服务器端口资源不足"); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java b/src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java index d6037a11..58fc6012 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java @@ -1,14 +1,22 @@ package com.genersoft.iot.vmp.gb28181.utils; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.bean.Gb28181Sdp; import com.genersoft.iot.vmp.gb28181.bean.RemoteAddressInfo; +import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.utils.GitUtil; import gov.nist.javax.sip.address.AddressImpl; import gov.nist.javax.sip.address.SipUri; import gov.nist.javax.sip.header.Subject; import gov.nist.javax.sip.message.SIPRequest; +import org.apache.commons.lang3.RandomStringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.util.ObjectUtils; +import javax.sdp.SdpFactory; +import javax.sdp.SdpParseException; +import javax.sdp.SessionDescription; import javax.sip.PeerUnavailableException; import javax.sip.SipFactory; import javax.sip.header.FromHeader; @@ -16,6 +24,8 @@ import javax.sip.header.Header; import javax.sip.header.UserAgentHeader; import javax.sip.message.Request; import java.text.ParseException; +import java.time.LocalDateTime; +import java.time.format.DateTimeParseException; import java.util.ArrayList; import java.util.List; import java.util.UUID; @@ -28,6 +38,8 @@ import java.util.UUID; */ public class SipUtils { + private final static Logger logger = LoggerFactory.getLogger(SipUtils.class); + public static String getUserIdFromFromHeader(Request request) { FromHeader fromHeader = (FromHeader)request.getHeader(FromHeader.NAME); return getUserIdFromFromHeader(fromHeader); @@ -51,7 +63,7 @@ public class SipUtils { } public static String getNewViaTag() { - return "z9hG4bK" + System.currentTimeMillis(); + return "z9hG4bK" + RandomStringUtils.randomNumeric(10); } public static UserAgentHeader createUserAgentHeader(GitUtil gitUtil) throws PeerUnavailableException, ParseException { @@ -113,6 +125,12 @@ public class SipUtils { strTmp = String.format("%02X", moveSpeed); builder.append(strTmp, 0, 2); builder.append(strTmp, 0, 2); + + //优化zoom低倍速下的变倍速率 + if ((zoomSpeed > 0) && (zoomSpeed <16)) + { + zoomSpeed = 16; + } strTmp = String.format("%X", zoomSpeed); builder.append(strTmp, 0, 1).append("0"); //计算校验码 @@ -183,4 +201,66 @@ public class SipUtils { } return deviceChannel; } -} + + public static Gb28181Sdp parseSDP(String sdpStr) throws SdpParseException { + + // jainSip不支持y= f=字段, 移除以解析。 + int ssrcIndex = sdpStr.indexOf("y="); + int mediaDescriptionIndex = sdpStr.indexOf("f="); + // 检查是否有y字段 + SessionDescription sdp; + String ssrc = null; + String mediaDescription = null; + if (mediaDescriptionIndex == 0 && ssrcIndex == 0) { + sdp = SdpFactory.getInstance().createSessionDescription(sdpStr); + }else { + String lines[] = sdpStr.split("\\r?\\n"); + StringBuilder sdpBuffer = new StringBuilder(); + for (String line : lines) { + if (line.trim().startsWith("y=")) { + ssrc = line.substring(2); + }else if (line.trim().startsWith("f=")) { + mediaDescription = line.substring(2); + }else { + sdpBuffer.append(line.trim()).append("\r\n"); + } + } + sdp = SdpFactory.getInstance().createSessionDescription(sdpBuffer.toString()); + } + return Gb28181Sdp.getInstance(sdp, ssrc, mediaDescription); + } + + public static String getSsrcFromSdp(String sdpStr) { + + // jainSip不支持y= f=字段, 移除以解析。 + int ssrcIndex = sdpStr.indexOf("y="); + if (ssrcIndex == 0) { + return null; + } + String lines[] = sdpStr.split("\\r?\\n"); + for (String line : lines) { + if (line.trim().startsWith("y=")) { + return line.substring(2); + } + } + return null; + } + + public static String parseTime(String timeStr) { + if (ObjectUtils.isEmpty(timeStr)){ + return null; + } + LocalDateTime localDateTime; + try { + localDateTime = LocalDateTime.parse(timeStr); + }catch (DateTimeParseException e) { + try { + localDateTime = LocalDateTime.parse(timeStr, DateUtil.formatterISO8601); + }catch (DateTimeParseException e2) { + logger.error("[格式化时间] 无法格式化时间: {}", timeStr); + return null; + } + } + return localDateTime.format(DateUtil.formatterISO8601); + } +} \ No newline at end of file diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java index 68f1bee4..9c5a4728 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java @@ -219,13 +219,14 @@ public class ZLMRTPServerFactory { * @param tcp 是否为tcp * @return SendRtpItem */ - public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, String deviceId, String channelId, boolean tcp, boolean rtcp){ + public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, + String deviceId, String channelId, boolean tcp, boolean rtcp, KeepPortCallback callback){ // 默认为随机端口 int localPort = 0; if (userSetting.getGbSendStreamStrict()) { if (userSetting.getGbSendStreamStrict()) { - localPort = keepPort(serverItem, ssrc); + localPort = keepPort(serverItem, ssrc, localPort, callback); if (localPort == 0) { return null; } @@ -257,11 +258,12 @@ public class ZLMRTPServerFactory { * @param tcp 是否为tcp * @return SendRtpItem */ - public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, String app, String stream, String channelId, boolean tcp, boolean rtcp){ + public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, + String app, String stream, String channelId, boolean tcp, boolean rtcp, KeepPortCallback callback){ // 默认为随机端口 int localPort = 0; if (userSetting.getGbSendStreamStrict()) { - localPort = keepPort(serverItem, ssrc); + localPort = keepPort(serverItem, ssrc, localPort, callback); if (localPort == 0) { return null; } @@ -282,13 +284,16 @@ public class ZLMRTPServerFactory { return sendRtpItem; } + public interface KeepPortCallback{ + Boolean keep(String ssrc); + } + /** * 保持端口,直到需要需要发流时再释放 */ - public int keepPort(MediaServerItem serverItem, String ssrc) { - int localPort = 0; + public int keepPort(MediaServerItem serverItem, String ssrc, int localPort, KeepPortCallback keepPortCallback) { Map param = new HashMap<>(3); - param.put("port", 0); + param.put("port", localPort); param.put("enable_tcp", 1); param.put("stream_id", ssrc); JSONObject jsonObject = zlmresTfulUtils.openRtpServer(serverItem, param); @@ -296,10 +301,21 @@ public class ZLMRTPServerFactory { localPort = jsonObject.getInteger("port"); HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, serverItem.getId()); // 订阅 zlm启动事件, 新的zlm也会从这里进入系统 + Integer finalLocalPort = localPort; hookSubscribe.addSubscribe(hookSubscribeForRtpServerTimeout, (MediaServerItem mediaServerItem, JSONObject response)->{ - logger.info("[上级点播] {}->监听端口到期继续保持监听", ssrc); - keepPort(serverItem, ssrc); + System.out.println("监听端口到期继续保持监听"); + System.out.println(response); + if (ssrc.equals(response.getString("stream_id"))) { + if (keepPortCallback.keep(ssrc)) { + logger.info("[上级点播] {}->监听端口到期继续保持监听", ssrc); + keepPort(serverItem, ssrc, finalLocalPort, keepPortCallback); + }else { + logger.info("[上级点播] {}->发送取消,无需继续监听", ssrc); + releasePort(serverItem, ssrc); + } + } + }); logger.info("[上级点播] {}->监听端口: {}", ssrc, localPort); }else { diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java index 868e861d..3e2385e5 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java @@ -13,6 +13,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.*; +import com.genersoft.iot.vmp.utils.redis.RedisUtil; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,6 +27,7 @@ import org.springframework.stereotype.Component; import java.text.ParseException; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -314,7 +316,9 @@ public class RedisGbPlayMsgListener implements MessageListener { SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, content.getIp(), content.getPort(), content.getSsrc(), content.getPlatformId(), content.getApp(), content.getStream(), content.getChannelId(), - content.getTcp(), content.getRtcp()); + content.getTcp(), content.getRtcp(), ssrcFromCallback -> { + return querySendRTPServer(content.getPlatformId(), content.getChannelId(), content.getStream(), null) != null; + }); WVPResult result = new WVPResult<>(); result.setCode(0); @@ -391,4 +395,31 @@ public class RedisGbPlayMsgListener implements MessageListener { }); redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); } + + private SendRtpItem querySendRTPServer(String platformGbId, String channelId, String streamId, String callId) { + if (platformGbId == null) { + platformGbId = "*"; + } + if (channelId == null) { + channelId = "*"; + } + if (streamId == null) { + streamId = "*"; + } + if (callId == null) { + callId = "*"; + } + String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + + userSetting.getServerId() + "_*_" + + platformGbId + "_" + + channelId + "_" + + streamId + "_" + + callId; + List scan = RedisUtil.scan(redisTemplate, key); + if (scan.size() > 0) { + return (SendRtpItem)redisTemplate.opsForValue().get(scan.get(0)); + }else { + return null; + } + } } From 639e0bab11332b6877b93c0e1a6bc824df2db158 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Wed, 28 Jun 2023 15:02:59 +0800 Subject: [PATCH 06/10] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E6=A0=91=E5=BD=A2?= =?UTF-8?q?=E5=88=97=E8=A1=A8=E9=80=9A=E9=81=93=E6=9F=A5=E8=AF=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../genersoft/iot/vmp/service/impl/DeviceServiceImpl.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java index fe98d067..c288ff46 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java @@ -1,6 +1,5 @@ package com.genersoft.iot.vmp.service.impl; -import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; @@ -390,8 +389,8 @@ public class DeviceServiceImpl implements IDeviceService { if (device == null) { return null; } - if (ObjectUtils.isEmpty(parentId) || parentId.equals(deviceId)) { - parentId = null; + if (ObjectUtils.isEmpty(parentId) ) { + parentId = deviceId; } List rootNodes = deviceChannelMapper.getSubChannelsByDeviceId(deviceId, parentId, onlyCatalog); return transportChannelsToTree(rootNodes, ""); From def56793ba7c636fbbcabad38e3ac113a3764087 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Wed, 28 Jun 2023 16:52:27 +0800 Subject: [PATCH 07/10] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E4=B8=8A=E7=BA=A7?= =?UTF-8?q?=E6=8E=A8=E6=B5=81=E5=92=8C=E5=81=9C=E6=AD=A2=E6=8E=A8=E6=B5=81?= =?UTF-8?q?=E7=9A=84=E9=80=9A=E7=9F=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../vmp/conf/redis/RedisMsgListenConfig.java | 1 + .../request/impl/AckRequestProcessor.java | 13 ++++ .../request/impl/ByeRequestProcessor.java | 42 +++++++--- .../vmp/media/zlm/ZLMHttpHookListener.java | 8 ++ .../service/bean/MessageForPushChannel.java | 6 +- .../redisMsg/RedisGbPlayMsgListener.java | 1 + .../RedisPushStreamCloseResponseListener.java | 76 +++++++++++++++++++ .../iot/vmp/storager/IRedisCatchStorage.java | 4 + .../storager/impl/RedisCatchStorageImpl.java | 14 ++++ 9 files changed, 151 insertions(+), 14 deletions(-) create mode 100644 src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java index 9f484266..a65491bd 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java @@ -63,6 +63,7 @@ public class RedisMsgListenConfig { container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE)); container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE)); container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE)); +// container.addMessageListener(, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED)); return container; } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index 7a26e783..f40e0c87 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -3,6 +3,8 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.conf.DynamicTask; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; @@ -14,6 +16,7 @@ import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg; import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; @@ -57,6 +60,9 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In @Autowired private IRedisCatchStorage redisCatchStorage; + @Autowired + private UserSetting userSetting; + @Autowired private IVideoManagerStorage storager; @@ -155,6 +161,13 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In } else if (jsonObject.getInteger("code") == 0) { logger.info("调用ZLM推流接口, 结果: {}", jsonObject); logger.info("RTP推流成功[ {}/{} ],{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port")); + if (sendRtpItem.getPlayType() == InviteStreamType.PUSH) { + MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, sendRtpItem.getApp(), sendRtpItem.getStreamId(), + sendRtpItem.getChannelId(), parentPlatform.getServerGBId(), parentPlatform.getName(), userSetting.getServerId(), + sendRtpItem.getMediaServerId()); + messageForPushChannel.setPlatFormIndex(parentPlatform.getId()); + redisCatchStorage.sendPlatformStartPlayMsg(messageForPushChannel); + } } else { logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"), JSON.toJSONString(param)); if (sendRtpItem.isOnlyAudio()) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index addd6336..78e29566 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -1,11 +1,9 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; -import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType; -import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; -import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction; +import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; @@ -15,6 +13,7 @@ import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.service.IPlatformService; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; @@ -25,7 +24,9 @@ import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import javax.sip.*; +import javax.sip.InvalidArgumentException; +import javax.sip.RequestEvent; +import javax.sip.SipException; import javax.sip.address.SipURI; import javax.sip.header.CallIdHeader; import javax.sip.header.FromHeader; @@ -51,6 +52,9 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In @Autowired private IRedisCatchStorage redisCatchStorage; + @Autowired + private IPlatformService platformService; + @Autowired private IDeviceService deviceService; @@ -69,6 +73,9 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In @Autowired private VideoStreamSessionManager streamSession; + @Autowired + private UserSetting userSetting; + @Override public void afterPropertiesSet() throws Exception { // 添加消息处理的订阅 @@ -103,6 +110,19 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); redisCatchStorage.deleteSendRTPServer(platformGbId, channelId, callIdHeader.getCallId(), null); zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param); + if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) { + ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId()); + if (platform != null) { + MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, + sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(), + sendRtpItem.getPlatformId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); + messageForPushChannel.setPlatFormIndex(platform.getId()); + redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel); + }else { + logger.info("[上级平台停止观看] 未找到平台{}的信息,发送redis消息失败", sendRtpItem.getPlatformId()); + } + } + int totalReaderCount = zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId); if (totalReaderCount <= 0) { logger.info("[收到bye] {} 无其它观看者,通知设备停止推流", streamId); @@ -119,12 +139,12 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In logger.error("[收到bye] {} 无其它观看者,通知设备停止推流, 发送BYE失败 {}",streamId, e.getMessage()); } } - if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) { - MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, - sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(), - sendRtpItem.getPlatformId(), null, null, sendRtpItem.getMediaServerId()); - redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); - } +// if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) { +// MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, +// sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(), +// sendRtpItem.getPlatformId(), null, null, sendRtpItem.getMediaServerId()); +// redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); +// } } } // 可能是设备主动停止 diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 9ffb5240..e8432cc2 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -19,6 +19,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.media.zlm.dto.hook.*; import com.genersoft.iot.vmp.service.*; +import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.vmanager.bean.DeferredResultEx; @@ -463,6 +464,13 @@ public class ZLMHttpHookListener { } redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStreamId()); + if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) { + MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, + sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(), + sendRtpItem.getPlatformId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); + messageForPushChannel.setPlatFormIndex(parentPlatform.getId()); + redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel); + } } } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java b/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java index 886f8c29..1a9e3e5c 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java @@ -34,7 +34,7 @@ public class MessageForPushChannel { /** * 请求的平台自增ID */ - private String platFormIndex; + private int platFormIndex; /** * 请求平台名称 @@ -132,11 +132,11 @@ public class MessageForPushChannel { this.mediaServerId = mediaServerId; } - public String getPlatFormIndex() { + public int getPlatFormIndex() { return platFormIndex; } - public void setPlatFormIndex(String platFormIndex) { + public void setPlatFormIndex(int platFormIndex) { this.platFormIndex = platFormIndex; } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java index 3e2385e5..8231fb33 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java @@ -129,6 +129,7 @@ public class RedisGbPlayMsgListener implements MessageListener { case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: RequestPushStreamMsg param = JSON.to(RequestPushStreamMsg.class, wvpRedisMsg.getContent()); requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); + break; default: break; diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java new file mode 100644 index 00000000..4e73578a --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java @@ -0,0 +1,76 @@ +package com.genersoft.iot.vmp.service.redisMsg; + +import com.alibaba.fastjson2.JSON; +import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Component; +import org.springframework.util.ObjectUtils; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * 接收redis发送的结束推流请求 + * @author lin + */ +@Component +public class RedisPushStreamCloseResponseListener implements MessageListener { + + private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamCloseResponseListener.class); + + private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); + + @Qualifier("taskExecutor") + @Autowired + private ThreadPoolTaskExecutor taskExecutor; + + + private Map responseEvents = new ConcurrentHashMap<>(); + + public interface PushStreamResponseEvent{ + void run(MessageForPushChannelResponse response); + } + + @Override + public void onMessage(Message message, byte[] bytes) { + logger.info("[REDIS消息-请求推流结果]: {}", new String(message.getBody())); + boolean isEmpty = taskQueue.isEmpty(); + taskQueue.offer(message); + if (isEmpty) { + taskExecutor.execute(() -> { + while (!taskQueue.isEmpty()) { + Message msg = taskQueue.poll(); + try { + MessageForPushChannelResponse response = JSON.parseObject(new String(msg.getBody()), MessageForPushChannelResponse.class); + if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){ + logger.info("[REDIS消息-请求推流结果]:参数不全"); + continue; + } + // 查看正在等待的invite消息 + if (responseEvents.get(response.getApp() + response.getStream()) != null) { + responseEvents.get(response.getApp() + response.getStream()).run(response); + } + }catch (Exception e) { + logger.warn("[REDIS消息-请求推流结果] 发现未处理的异常, \r\n{}", JSON.toJSONString(message)); + logger.error("[REDIS消息-请求推流结果] 异常内容: ", e); + } + } + }); + } + } + + public void addEvent(String app, String stream, PushStreamResponseEvent callback) { + responseEvents.put(app + stream, callback); + } + + public void removeEvent(String app, String stream) { + responseEvents.remove(app + stream); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index 42708f7f..9722fa16 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -263,4 +263,8 @@ public interface IRedisCatchStorage { void removeAllDevice(); void sendDeviceOrChannelStatus(String deviceId, String channelId, boolean online); + + void sendPlatformStartPlayMsg(MessageForPushChannel messageForPushChannel); + + void sendPlatformStopPlayMsg(MessageForPushChannel messageForPushChannel); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index d4abcb45..e54324fc 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -925,4 +925,18 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { // 使用 RedisTemplate 发送字符串消息会导致发送的消息多带了双引号 stringRedisTemplate.convertAndSend(key, msg.toString()); } + + @Override + public void sendPlatformStartPlayMsg(MessageForPushChannel msg) { + String key = VideoManagerConstants.VM_MSG_STREAM_START_PLAY_NOTIFY; + logger.info("[redis发送通知] 推流被上级平台观看 {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId()); + redisTemplate.convertAndSend(key, JSON.toJSON(msg)); + } + + @Override + public void sendPlatformStopPlayMsg(MessageForPushChannel msg) { + String key = VideoManagerConstants.VM_MSG_STREAM_STOP_PLAY_NOTIFY; + logger.info("[redis发送通知] 上级平台停止观看 {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId()); + redisTemplate.convertAndSend(key, JSON.toJSON(msg)); + } } From bfae9780f75db7495f53511f3116bb6c0470a0b0 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Wed, 28 Jun 2023 20:32:48 +0800 Subject: [PATCH 08/10] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=B5=81=E5=85=B3?= =?UTF-8?q?=E9=97=AD=E6=97=B6=E7=9A=84=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../vmp/conf/redis/RedisMsgListenConfig.java | 5 +- .../cmd/SIPRequestHeaderPlarformProvider.java | 8 +- .../transmit/cmd/impl/SIPCommander.java | 7 +- .../request/impl/ByeRequestProcessor.java | 166 +++++++++--------- .../iot/vmp/service/impl/PlayServiceImpl.java | 2 +- .../RedisPushStreamCloseResponseListener.java | 98 ++++++++--- 6 files changed, 163 insertions(+), 123 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java index a65491bd..762d8e19 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java @@ -43,6 +43,9 @@ public class RedisMsgListenConfig { @Autowired private RedisPushStreamResponseListener redisPushStreamResponseListener; + @Autowired + private RedisPushStreamCloseResponseListener redisPushStreamCloseResponseListener; + /** * redis消息监听器容器 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器 @@ -63,7 +66,7 @@ public class RedisMsgListenConfig { container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE)); container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE)); container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE)); -// container.addMessageListener(, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED)); + container.addMessageListener(redisPushStreamCloseResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED)); return container; } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java index 831897a7..e19886b8 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java @@ -283,7 +283,7 @@ public class SIPRequestHeaderPlarformProvider { viaHeader.setRPort(); viaHeaders.add(viaHeader); // from - SipURI fromSipURI = SipFactory.getInstance().createAddressFactory().createSipURI(platform.getDeviceGBId(), + SipURI fromSipURI = SipFactory.getInstance().createAddressFactory().createSipURI(sendRtpItem.getChannelId(), platform.getDeviceIp() + ":" + platform.getDevicePort()); Address fromAddress = SipFactory.getInstance().createAddressFactory().createAddress(fromSipURI); FromHeader fromHeader = SipFactory.getInstance().createHeaderFactory().createFromHeader(fromAddress, sendRtpItem.getToTag()); @@ -296,13 +296,10 @@ public class SIPRequestHeaderPlarformProvider { MaxForwardsHeader maxForwards = SipFactory.getInstance().createHeaderFactory().createMaxForwardsHeader(70); // ceq CSeqHeader cSeqHeader = SipFactory.getInstance().createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(), Request.BYE); - MessageFactoryImpl messageFactory = (MessageFactoryImpl) SipFactory.getInstance().createMessageFactory(); - // 设置编码, 防止中文乱码 - messageFactory.setDefaultContentEncodingCharset("gb2312"); CallIdHeader callIdHeader = SipFactory.getInstance().createHeaderFactory().createCallIdHeader(sendRtpItem.getCallId()); - request = (SIPRequest) messageFactory.createRequest(requestURI, Request.BYE, callIdHeader, cSeqHeader, fromHeader, + request = (SIPRequest) SipFactory.getInstance().createMessageFactory().createRequest(requestURI, Request.BYE, callIdHeader, cSeqHeader, fromHeader, toHeader, viaHeaders, maxForwards); request.addHeader(SipUtils.createUserAgentHeader(gitUtil)); @@ -310,6 +307,7 @@ public class SIPRequestHeaderPlarformProvider { String sipAddress = platform.getDeviceIp() + ":" + platform.getDevicePort(); Address concatAddress = SipFactory.getInstance().createAddressFactory().createAddress(SipFactory.getInstance().createAddressFactory() .createSipURI(platform.getDeviceGBId(), sipAddress)); + request.addHeader(SipFactory.getInstance().createHeaderFactory().createContactHeader(concatAddress)); return request; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index beec9e90..df48e8e7 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -347,10 +347,9 @@ public class SIPCommander implements ISIPCommander { mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); errorEvent.response(e); }), e -> { - // 这里为例避免一个通道的点播只有一个callID这个参数使用一个固定值 ResponseEvent responseEvent = (ResponseEvent) e.event; SIPResponse response = (SIPResponse) responseEvent.getResponse(); - streamSession.put(device.getDeviceId(), channelId, "play", stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), response, VideoStreamSessionManager.SessionType.play); + streamSession.put(device.getDeviceId(), channelId, e.callId, stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), response, VideoStreamSessionManager.SessionType.play); okEvent.response(e); }); } @@ -452,7 +451,7 @@ public class SIPCommander implements ISIPCommander { sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent, event -> { ResponseEvent responseEvent = (ResponseEvent) event.event; SIPResponse response = (SIPResponse) responseEvent.getResponse(); - streamSession.put(device.getDeviceId(), channelId,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()).getCallId(), ssrcInfo.getStream(), ssrcInfo.getSsrc(), mediaServerItem.getId(), response, VideoStreamSessionManager.SessionType.playback); + streamSession.put(device.getDeviceId(), channelId, event.callId, ssrcInfo.getStream(), ssrcInfo.getSsrc(), mediaServerItem.getId(), response, VideoStreamSessionManager.SessionType.playback); okEvent.response(event); }); if (inviteStreamCallback != null) { @@ -580,7 +579,7 @@ public class SIPCommander implements ISIPCommander { if (ssrcIndex >= 0) { ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); } - streamSession.put(device.getDeviceId(), channelId, response.getCallIdHeader().getCallId(), ssrcInfo.getStream(), ssrc, mediaServerItem.getId(), response, VideoStreamSessionManager.SessionType.download); + streamSession.put(device.getDeviceId(), channelId, newCallIdHeader.getCallId(), ssrcInfo.getStream(), ssrc, mediaServerItem.getId(), response, VideoStreamSessionManager.SessionType.download); okEvent.response(event); }); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index 78e29566..8c93ae32 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -11,6 +11,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.service.IDeviceChannelService; import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IPlatformService; @@ -27,11 +28,7 @@ import org.springframework.stereotype.Component; import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; import javax.sip.SipException; -import javax.sip.address.SipURI; import javax.sip.header.CallIdHeader; -import javax.sip.header.FromHeader; -import javax.sip.header.HeaderAddress; -import javax.sip.header.ToHeader; import javax.sip.message.Response; import java.text.ParseException; import java.util.HashMap; @@ -58,6 +55,9 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In @Autowired private IDeviceService deviceService; + @Autowired + private IDeviceChannelService channelService; + @Autowired private IVideoManagerStorage storager; @@ -88,95 +88,91 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In */ @Override public void process(RequestEvent evt) { - + SIPRequest request = (SIPRequest) evt.getRequest(); try { - responseAck((SIPRequest) evt.getRequest(), Response.OK); + responseAck(request, Response.OK); } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[回复BYE信息失败],{}", e.getMessage()); } CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME); - String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser(); - String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); - SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId, null, callIdHeader.getCallId()); - logger.info("[收到bye] {}/{}", platformGbId, channelId); - if (sendRtpItem != null){ - String streamId = sendRtpItem.getStreamId(); - Map param = new HashMap<>(); - param.put("vhost","__defaultVhost__"); - param.put("app",sendRtpItem.getApp()); - param.put("stream",streamId); - param.put("ssrc",sendRtpItem.getSsrc()); - logger.info("[收到bye] 停止向上级推流:{}", streamId); - MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - redisCatchStorage.deleteSendRTPServer(platformGbId, channelId, callIdHeader.getCallId(), null); - zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param); - if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) { - ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId()); - if (platform != null) { - MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, - sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(), - sendRtpItem.getPlatformId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); - messageForPushChannel.setPlatFormIndex(platform.getId()); - redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel); - }else { - logger.info("[上级平台停止观看] 未找到平台{}的信息,发送redis消息失败", sendRtpItem.getPlatformId()); - } - } - int totalReaderCount = zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId); - if (totalReaderCount <= 0) { - logger.info("[收到bye] {} 无其它观看者,通知设备停止推流", streamId); - if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) { - Device device = deviceService.getDevice(sendRtpItem.getDeviceId()); - if (device == null) { - logger.info("[收到bye] {} 通知设备停止推流时未找到设备信息", streamId); - } - try { - logger.warn("[停止点播] {}/{}", sendRtpItem.getDeviceId(), channelId); - cmder.streamByeCmd(device, channelId, streamId, null); - } catch (InvalidArgumentException | ParseException | SipException | - SsrcTransactionNotFoundException e) { - logger.error("[收到bye] {} 无其它观看者,通知设备停止推流, 发送BYE失败 {}",streamId, e.getMessage()); - } - } -// if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) { -// MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, -// sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(), -// sendRtpItem.getPlatformId(), null, null, sendRtpItem.getMediaServerId()); -// redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); -// } - } - } - // 可能是设备主动停止 - Device device = storager.queryVideoDeviceByChannelId(platformGbId); - if (device != null) { - storager.stopPlay(device.getDeviceId(), channelId); - StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId); - if (streamInfo != null) { - redisCatchStorage.stopPlay(streamInfo); - mediaServerService.closeRTPServer(streamInfo.getMediaServerId(), streamInfo.getStream()); - } - SsrcTransaction ssrcTransactionForPlay = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, "play", null); - if (ssrcTransactionForPlay != null){ - if (ssrcTransactionForPlay.getCallId().equals(callIdHeader.getCallId())){ - // 释放ssrc - MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransactionForPlay.getMediaServerId()); - if (mediaServerItem != null) { - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransactionForPlay.getSsrc()); - } - streamSession.remove(device.getDeviceId(), channelId, ssrcTransactionForPlay.getStream()); - } - } - SsrcTransaction ssrcTransactionForPlayBack = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, callIdHeader.getCallId(), null); - if (ssrcTransactionForPlayBack != null) { - // 释放ssrc - MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransactionForPlayBack.getMediaServerId()); - if (mediaServerItem != null) { - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransactionForPlayBack.getSsrc()); - } - streamSession.remove(device.getDeviceId(), channelId, ssrcTransactionForPlayBack.getStream()); + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId()); + + if (sendRtpItem != null){ + logger.info("[收到bye] 来自平台{}, 停止通道:{}", sendRtpItem.getPlatformId(), sendRtpItem.getChannelId()); + String streamId = sendRtpItem.getStreamId(); + Map param = new HashMap<>(); + param.put("vhost","__defaultVhost__"); + param.put("app",sendRtpItem.getApp()); + param.put("stream",streamId); + param.put("ssrc",sendRtpItem.getSsrc()); + logger.info("[收到bye] 停止向上级推流:{}", streamId); + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), + callIdHeader.getCallId(), null); + zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param); + if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) { + ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId()); + if (platform != null) { + MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, + sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(), + sendRtpItem.getPlatformId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); + messageForPushChannel.setPlatFormIndex(platform.getId()); + redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel); + }else { + logger.info("[上级平台停止观看] 未找到平台{}的信息,发送redis消息失败", sendRtpItem.getPlatformId()); } } + int totalReaderCount = zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId); + if (totalReaderCount <= 0) { + logger.info("[收到bye] {} 无其它观看者,通知设备停止推流", streamId); + if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) { + Device device = deviceService.getDevice(sendRtpItem.getDeviceId()); + if (device == null) { + logger.info("[收到bye] {} 通知设备停止推流时未找到设备信息", streamId); + } + try { + logger.warn("[停止点播] {}/{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + cmder.streamByeCmd(device, sendRtpItem.getChannelId(), streamId, null); + } catch (InvalidArgumentException | ParseException | SipException | + SsrcTransactionNotFoundException e) { + logger.error("[收到bye] {} 无其它观看者,通知设备停止推流, 发送BYE失败 {}",streamId, e.getMessage()); + } + } + } + }else { + // 可能是设备发送的停止 + SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(null, null, callIdHeader.getCallId(), null); + if (ssrcTransaction == null) { + logger.info("[收到bye] 但是无法获取推流信息和发流信息,忽略此请求"); + logger.info(request.toString()); + return; + } + logger.info("[收到bye] 来自设备:{}, 通道已停止推流: {}", ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId()); + + Device device = deviceService.getDevice(ssrcTransaction.getDeviceId()); + if (device == null) { + logger.info("[收到bye] 未找到设备:{} ", ssrcTransaction.getDeviceId()); + return; + } + DeviceChannel channel = channelService.getOne(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId()); + if (channel == null) { + logger.info("[收到bye] 未找到通道,设备:{}, 通道:{}", ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId()); + return; + } + storager.stopPlay(device.getDeviceId(), channel.getChannelId()); + StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channel.getChannelId()); + if (streamInfo != null) { + redisCatchStorage.stopPlay(streamInfo); + mediaServerService.closeRTPServer(streamInfo.getMediaServerId(), streamInfo.getStream()); + } + // 释放ssrc + MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransaction.getMediaServerId()); + if (mediaServerItem != null) { + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransaction.getSsrc()); + } + streamSession.remove(device.getDeviceId(), channel.getChannelId(), ssrcTransaction.getStream()); + } } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 2fbe6bbc..03e00c25 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -143,7 +143,7 @@ public class PlayServiceImpl implements IPlayService { if (rtpInfo.getBoolean("exist")) { int localPort = rtpInfo.getInteger("local_port"); if (localPort == 0) { - logger.warn("[点播],点播时发现rtpServerC存在,但是尚未开始推流"); + logger.warn("[点播],点播时发现rtpServer存在,但是尚未开始推流"); // 此时说明rtpServer已经创建但是流还没有推上来 WVPResult wvpResult = new WVPResult(); wvpResult.setCode(ErrorCode.ERROR100.getCode()); diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java index 4e73578a..f78b6929 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java @@ -1,20 +1,34 @@ package com.genersoft.iot.vmp.service.redisMsg; import com.alibaba.fastjson2.JSON; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType; +import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; +import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; +import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.service.IStreamPushService; +import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; -import org.springframework.util.ObjectUtils; +import javax.sip.InvalidArgumentException; +import javax.sip.SipException; +import java.text.ParseException; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; /** * 接收redis发送的结束推流请求 @@ -25,11 +39,26 @@ public class RedisPushStreamCloseResponseListener implements MessageListener { private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamCloseResponseListener.class); - private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); - - @Qualifier("taskExecutor") @Autowired - private ThreadPoolTaskExecutor taskExecutor; + private IStreamPushService streamPushService; + + @Autowired + private IRedisCatchStorage redisCatchStorage; + + @Autowired + private IVideoManagerStorage storager; + + @Autowired + private ISIPCommanderForPlatform commanderFroPlatform; + + @Autowired + private UserSetting userSetting; + + @Autowired + private IMediaServerService mediaServerService; + + @Autowired + private ZLMRTPServerFactory zlmrtpServerFactory; private Map responseEvents = new ConcurrentHashMap<>(); @@ -40,30 +69,45 @@ public class RedisPushStreamCloseResponseListener implements MessageListener { @Override public void onMessage(Message message, byte[] bytes) { - logger.info("[REDIS消息-请求推流结果]: {}", new String(message.getBody())); - boolean isEmpty = taskQueue.isEmpty(); - taskQueue.offer(message); - if (isEmpty) { - taskExecutor.execute(() -> { - while (!taskQueue.isEmpty()) { - Message msg = taskQueue.poll(); - try { - MessageForPushChannelResponse response = JSON.parseObject(new String(msg.getBody()), MessageForPushChannelResponse.class); - if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){ - logger.info("[REDIS消息-请求推流结果]:参数不全"); - continue; + logger.info("[REDIS消息-推流结束]: {}", new String(message.getBody())); + MessageForPushChannel pushChannel = JSON.parseObject(message.getBody(), MessageForPushChannel.class); + StreamPushItem push = streamPushService.getPush(pushChannel.getApp(), pushChannel.getStream()); + if (push != null) { + if (redisCatchStorage.isChannelSendingRTP(push.getGbId())) { + List sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId( + push.getGbId()); + if (sendRtpItems.size() > 0) { + for (SendRtpItem sendRtpItem : sendRtpItems) { + ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId()); + // 停止向上级推流 + String streamId = sendRtpItem.getStreamId(); + Map param = new HashMap<>(); + param.put("vhost","__defaultVhost__"); + param.put("app",sendRtpItem.getApp()); + param.put("stream",streamId); + param.put("ssrc",sendRtpItem.getSsrc()); + logger.info("[REDIS消息-推流结束] 停止向上级推流:{}", streamId); + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStreamId()); + zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param); + + try { + commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); } - // 查看正在等待的invite消息 - if (responseEvents.get(response.getApp() + response.getStream()) != null) { - responseEvents.get(response.getApp() + response.getStream()).run(response); + if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) { + MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, + sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(), + sendRtpItem.getPlatformId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); + messageForPushChannel.setPlatFormIndex(parentPlatform.getId()); + redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel); } - }catch (Exception e) { - logger.warn("[REDIS消息-请求推流结果] 发现未处理的异常, \r\n{}", JSON.toJSONString(message)); - logger.error("[REDIS消息-请求推流结果] 异常内容: ", e); } } - }); + } } + } public void addEvent(String app, String stream, PushStreamResponseEvent callback) { From 1294081a9cdf9bb4b4523ffc872a4accb5d11144 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Thu, 29 Jun 2023 09:36:13 +0800 Subject: [PATCH 09/10] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=A0=91=E5=BD=A2?= =?UTF-8?q?=E5=88=97=E8=A1=A8=E9=80=9A=E9=81=93=E6=9F=A5=E8=AF=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java index 421cdad2..bdc45bf2 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java @@ -459,8 +459,8 @@ public interface DeviceChannelMapper { "select * " + "from wvp_device_channel " + "where device_id=#{deviceId}" + - " and parent_id = #{parentId} " + - " and parent_id is null " + + " and parent_id = #{parentId} " + + " and parent_id is null or parent_id = #{deviceId}" + " and parental = 1 " + " "}) List getSubChannelsByDeviceId(String deviceId, String parentId, boolean onlyCatalog); From e8b766172e4f1af382a27185172dccf1e1ada556 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Thu, 29 Jun 2023 10:37:20 +0800 Subject: [PATCH 10/10] =?UTF-8?q?=E5=90=88=E5=B9=B6268=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E5=86=85=E5=AE=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../request/impl/ByeRequestProcessor.java | 28 ++++++------------- .../vmp/service/impl/PlatformServiceImpl.java | 1 - .../iot/vmp/service/impl/PlayServiceImpl.java | 4 +-- 3 files changed, 11 insertions(+), 22 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index 08fade74..bc3c582b 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -2,15 +2,10 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; import com.genersoft.iot.vmp.common.InviteInfo; import com.genersoft.iot.vmp.common.InviteSessionType; -import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; -import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType; -import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; -import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction; -import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; @@ -18,11 +13,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; -import com.genersoft.iot.vmp.service.IDeviceChannelService; -import com.genersoft.iot.vmp.service.IDeviceService; -import com.genersoft.iot.vmp.service.IInviteStreamService; -import com.genersoft.iot.vmp.service.IMediaServerService; -import com.genersoft.iot.vmp.service.IPlatformService; +import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; @@ -33,10 +24,6 @@ import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import javax.sip.InvalidArgumentException; -import javax.sip.RequestEvent; -import javax.sip.SipException; -import javax.sip.address.SipURI; import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; import javax.sip.SipException; @@ -160,6 +147,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In } } }else { + // 可能是设备发送的停止 SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(null, null, callIdHeader.getCallId(), null); if (ssrcTransaction == null) { @@ -180,10 +168,12 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In return; } storager.stopPlay(device.getDeviceId(), channel.getChannelId()); - StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channel.getChannelId()); - if (streamInfo != null) { - redisCatchStorage.stopPlay(streamInfo); - mediaServerService.closeRTPServer(streamInfo.getMediaServerId(), streamInfo.getStream()); + InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId()); + if (inviteInfo != null) { + inviteStreamService.removeInviteInfo(inviteInfo); + if (inviteInfo.getStreamInfo() != null) { + mediaServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServerId(), inviteInfo.getStreamInfo().getStream()); + } } // 释放ssrc MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransaction.getMediaServerId()); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java index dd4fb292..e2e83c35 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java @@ -105,7 +105,6 @@ public class PlatformServiceImpl implements IPlatformService { // 行政区划默认去编号的前6位 parentPlatform.setAdministrativeDivision(parentPlatform.getServerGBId().substring(0,6)); } - parentPlatform.setTreeType("CivilCode"); parentPlatform.setCatalogId(parentPlatform.getDeviceGBId()); int result = platformMapper.addParentPlatform(parentPlatform); // 添加缓存 diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index bdbae621..2da1c53a 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -185,7 +185,7 @@ public class PlayServiceImpl implements IPlayService { null); return; } - logger.info("[点播开始] deviceId: {}, channelId: {},码流类型:{},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验:{}", + logger.info("[点播开始] deviceId: {}, channelId: {},码流类型:{}, 收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, device.isSwitchPrimarySubStream() ? "辅码流" : "主码流", ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck()); //端口获取失败的ssrcInfo 没有必要发送点播指令 @@ -267,7 +267,7 @@ public class PlayServiceImpl implements IPlayService { InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); - logger.info("[点播成功] deviceId: {}, channelId: {},码流类型:{}", device.getDeviceId(), + logger.info("[点播成功] deviceId: {}, channelId:{}, 码流类型:{}", device.getDeviceId(), device.isSwitchPrimarySubStream() ? "辅码流" : "主码流"); String streamUrl; if (mediaServerItemInuse.getRtspPort() != 0) {