From 354a39961ad26949f597e4c434b0cd470b7f78ee Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Mon, 14 Mar 2022 18:24:30 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E7=82=B9=E6=92=AD,=20?= =?UTF-8?q?=E7=BA=A7=E8=81=94=E7=82=B9=E6=92=AD=E7=BA=A7=E8=81=94=E5=BD=95?= =?UTF-8?q?=E5=83=8F=E3=80=82=E7=BA=A7=E8=81=94=E5=88=97=E8=A1=A8=E6=98=BE?= =?UTF-8?q?=E7=A4=BA=E8=AE=A2=E9=98=85=E7=8A=B6=E6=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/conf/SipPlatformRunner.java | 7 +- .../gb28181/bean/InviteStreamCallback.java | 5 + .../vmp/gb28181/bean/InviteStreamInfo.java | 61 ++++ .../iot/vmp/gb28181/bean/ParentPlatform.java | 39 +++ .../iot/vmp/gb28181/bean/SubscribeInfo.java | 2 + .../iot/vmp/gb28181/event/SipSubscribe.java | 8 +- .../subscribe/catalog/CatalogEventLister.java | 2 - .../transmit/SIPProcessorObserver.java | 4 +- .../gb28181/transmit/cmd/ISIPCommander.java | 9 +- .../transmit/cmd/impl/SIPCommander.java | 84 +++-- .../cmd/impl/SIPCommanderFroPlatform.java | 52 +-- .../request/impl/AckRequestProcessor.java | 97 ++++-- .../request/impl/ByeRequestProcessor.java | 8 +- .../request/impl/InviteRequestProcessor.java | 90 +++-- .../cmd/MediaStatusNotifyMessageHandler.java | 12 +- .../impl/RegisterResponseProcessor.java | 26 +- .../vmp/media/zlm/ZLMRTPServerFactory.java | 8 +- .../iot/vmp/service/IPlayService.java | 12 +- .../service/bean/InviteTimeOutCallback.java | 6 + .../iot/vmp/service/bean/PlayBackResult.java | 6 +- .../iot/vmp/service/impl/PlayServiceImpl.java | 311 +++++++++--------- .../iot/vmp/storager/IRedisCatchStorage.java | 12 +- .../storager/dao/PlatformChannelMapper.java | 4 +- .../storager/dao/PlatformGbStreamMapper.java | 2 +- .../storager/impl/RedisCatchStorageImpl.java | 104 +++--- .../impl/VideoManagerStoragerImpl.java | 29 +- .../gb28181/platform/PlatformController.java | 16 +- .../vmanager/gb28181/play/PlayController.java | 4 +- .../gb28181/playback/DownloadController.java | 14 +- .../gb28181/playback/PlaybackController.java | 12 +- .../vmp/web/gb28181/ApiStreamController.java | 2 +- web_src/src/components/ParentPlatformList.vue | 26 +- .../components/dialog/chooseChannelForGb.vue | 4 +- .../dialog/chooseChannelForStream.vue | 4 +- web_src/static/css/iconfont.css | 22 +- web_src/static/css/iconfont.woff2 | Bin 52052 -> 52504 bytes 36 files changed, 694 insertions(+), 410 deletions(-) create mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteStreamCallback.java create mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteStreamInfo.java create mode 100644 src/main/java/com/genersoft/iot/vmp/service/bean/InviteTimeOutCallback.java diff --git a/src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java b/src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java index 4ebaf0bf..7f9f8476 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java @@ -60,12 +60,9 @@ public class SipPlatformRunner implements CommandLineRunner { // 取消订阅 sipCommanderForPlatform.unregister(parentPlatform, null, (eventResult)->{ - ParentPlatform platform = storager.queryParentPlatByServerGBId(parentPlatform.getServerGBId()); - sipCommanderForPlatform.register(platform, null, null); + // 发送平台未注册消息 + publisher.platformNotRegisterEventPublish(parentPlatform.getServerGBId()); }); - - // 发送平台未注册消息 - publisher.platformNotRegisterEventPublish(parentPlatform.getServerGBId()); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteStreamCallback.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteStreamCallback.java new file mode 100644 index 00000000..42a05198 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteStreamCallback.java @@ -0,0 +1,5 @@ +package com.genersoft.iot.vmp.gb28181.bean; + +public interface InviteStreamCallback { + void call(InviteStreamInfo inviteStreamInfo); +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteStreamInfo.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteStreamInfo.java new file mode 100644 index 00000000..3f3c5835 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteStreamInfo.java @@ -0,0 +1,61 @@ +package com.genersoft.iot.vmp.gb28181.bean; + +import com.alibaba.fastjson.JSONObject; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; + +public class InviteStreamInfo { + + public InviteStreamInfo(MediaServerItem mediaServerItem, JSONObject response, String callId, String app, String stream) { + this.mediaServerItem = mediaServerItem; + this.response = response; + this.callId = callId; + this.app = app; + this.stream = stream; + } + + private MediaServerItem mediaServerItem; + private JSONObject response; + private String callId; + private String app; + private String stream; + + public MediaServerItem getMediaServerItem() { + return mediaServerItem; + } + + public void setMediaServerItem(MediaServerItem mediaServerItem) { + this.mediaServerItem = mediaServerItem; + } + + public JSONObject getResponse() { + return response; + } + + public void setResponse(JSONObject response) { + this.response = response; + } + + public String getCallId() { + return callId; + } + + public void setCallId(String callId) { + this.callId = callId; + } + + public String getApp() { + return app; + } + + public void setApp(String app) { + this.app = app; + } + + public String getStream() { + return stream; + } + + public void setStream(String stream) { + this.stream = stream; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatform.java index 0c061450..8df79394 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatform.java @@ -114,6 +114,21 @@ public class ParentPlatform { */ private String catalogId; + /** + * 已被订阅目录信息 + */ + private boolean catalogSubscribe; + + /** + * 已被订阅报警信息 + */ + private boolean alarmSubscribe; + + /** + * 已被订阅GPS信息 + */ + private boolean gpsSubscribe; + public Integer getId() { return id; } @@ -290,4 +305,28 @@ public class ParentPlatform { public void setCatalogId(String catalogId) { this.catalogId = catalogId; } + + public boolean isCatalogSubscribe() { + return catalogSubscribe; + } + + public void setCatalogSubscribe(boolean catalogSubscribe) { + this.catalogSubscribe = catalogSubscribe; + } + + public boolean isAlarmSubscribe() { + return alarmSubscribe; + } + + public void setAlarmSubscribe(boolean alarmSubscribe) { + this.alarmSubscribe = alarmSubscribe; + } + + public boolean isGpsSubscribe() { + return gpsSubscribe; + } + + public void setGpsSubscribe(boolean gpsSubscribe) { + this.gpsSubscribe = gpsSubscribe; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java index 956e93eb..434a639a 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java @@ -21,6 +21,8 @@ public class SubscribeInfo { this.eventType = eventHeader.getEventType(); this.transaction = evt.getServerTransaction(); this.dialog = evt.getDialog(); + CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME); + this.callId = callIdHeader.getCallId(); } private String id; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java index b347bbaa..bc775e46 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java @@ -49,10 +49,10 @@ public class SipSubscribe { errorTimeSubscribes.remove(key); } } - logger.info("okTimeSubscribes.size:{}",okTimeSubscribes.size()); - logger.info("okSubscribes.size:{}",okSubscribes.size()); - logger.info("errorTimeSubscribes.size:{}",errorTimeSubscribes.size()); - logger.info("errorSubscribes.size:{}",errorSubscribes.size()); + logger.debug("okTimeSubscribes.size:{}",okTimeSubscribes.size()); + logger.debug("okSubscribes.size:{}",okSubscribes.size()); + logger.debug("errorTimeSubscribes.size:{}",errorTimeSubscribes.size()); + logger.debug("errorSubscribes.size:{}",errorSubscribes.size()); } public interface Event { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java index 6af6c661..d511f421 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java @@ -117,8 +117,6 @@ public class CatalogEventLister implements ApplicationListener { List parentPlatforms = parentPlatformMap.get(gbId); if (parentPlatforms != null && parentPlatforms.size() > 0) { for (ParentPlatform platform : parentPlatforms) { - String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_Catalog_" + platform.getServerGBId(); -// SubscribeInfo subscribeInfo = redisCatchStorage.getSubscribe(key); SubscribeInfo subscribeInfo = subscribeHolder.getCatalogSubscribe(platform.getServerGBId()); if (subscribeInfo == null) continue; logger.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), gbId); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java index 30efa204..6439e8cc 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java @@ -95,14 +95,14 @@ public class SIPProcessorObserver implements ISIPProcessorObserver { logger.debug("\n收到响应:\n{}", responseEvent.getResponse()); int status = response.getStatusCode(); - if (((status >= 200) && (status < 300)) || status == 401) { // Success! + if (((status >= 200) && (status < 300)) || status == Response.UNAUTHORIZED) { // Success! CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME); String method = cseqHeader.getMethod(); ISIPResponseProcessor sipRequestProcessor = responseProcessorMap.get(method); if (sipRequestProcessor != null) { sipRequestProcessor.process(responseEvent); } - if (responseEvent.getResponse() != null && sipSubscribe.getOkSubscribesSize() > 0 ) { + if (status != Response.UNAUTHORIZED && responseEvent.getResponse() != null && sipSubscribe.getOkSubscribesSize() > 0 ) { CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME); if (callIdHeader != null) { SipSubscribe.Event subscribe = sipSubscribe.getOkSubscribe(callIdHeader.getCallId()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java index 67be2471..409eedbf 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.gb28181.bean.InviteStreamCallback; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; @@ -103,7 +104,7 @@ public interface ISIPCommander { * @param startTime 开始时间,格式要求:yyyy-MM-dd HH:mm:ss * @param endTime 结束时间,格式要求:yyyy-MM-dd HH:mm:ss */ - void playbackStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInf, Device device, String channelId, String startTime, String endTime, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent); + void playbackStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInf, Device device, String channelId, String startTime, String endTime,InviteStreamCallback inviteStreamCallback, InviteStreamCallback event, SipSubscribe.Event errorEvent); /** * 请求历史媒体下载 @@ -114,13 +115,13 @@ public interface ISIPCommander { * @param endTime 结束时间,格式要求:yyyy-MM-dd HH:mm:ss * @param downloadSpeed 下载倍速参数 */ - void downloadStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, String startTime, String endTime, String downloadSpeed, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent); + void downloadStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, String startTime, String endTime, String downloadSpeed, InviteStreamCallback event, SipSubscribe.Event errorEvent); /** * 视频流停止 */ - void streamByeCmd(String deviceId, String channelId, String stream, SipSubscribe.Event okEvent); - void streamByeCmd(String deviceId, String channelId, String stream); + void streamByeCmd(String deviceId, String channelId, String stream, String callId, SipSubscribe.Event okEvent); + void streamByeCmd(String deviceId, String channelId, String stream, String callId); /** * 回放暂停 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index b38a8c11..5df6314e 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -6,6 +6,8 @@ import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetup; import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.gb28181.bean.InviteStreamCallback; +import com.genersoft.iot.vmp.gb28181.bean.InviteStreamInfo; import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; @@ -445,27 +447,13 @@ public class SIPCommander implements ISIPCommander { * @param endTime 结束时间,格式要求:yyyy-MM-dd HH:mm:ss */ @Override - public void playbackStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, String startTime, String endTime, ZLMHttpHookSubscribe.Event event - , SipSubscribe.Event errorEvent) { + public void playbackStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, + String startTime, String endTime, InviteStreamCallback inviteStreamCallback, InviteStreamCallback hookEvent, + SipSubscribe.Event errorEvent) { try { logger.info("{} 分配的ZLM为: {} [{}:{}]", ssrcInfo.getStream(), mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort()); - // 添加订阅 - JSONObject subscribeKey = new JSONObject(); - subscribeKey.put("app", "rtp"); - subscribeKey.put("stream", ssrcInfo.getStream()); - subscribeKey.put("regist", true); - subscribeKey.put("schema", "rtmp"); - subscribeKey.put("mediaServerId", mediaServerItem.getId()); - logger.debug("录像回放添加订阅,订阅内容:" + subscribeKey.toString()); - subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, - (MediaServerItem mediaServerItemInUse, JSONObject json)->{ - if (event != null) { - event.response(mediaServerItemInUse, json); - } - }); - StringBuffer content = new StringBuffer(200); content.append("v=0\r\n"); content.append("o="+sipConfig.getId()+" 0 0 IN IP4 " + mediaServerItem.getSdpIp() + "\r\n"); @@ -530,6 +518,21 @@ public class SIPCommander implements ISIPCommander { CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() : udpSipProvider.getNewCallId(); + // 添加订阅 + JSONObject subscribeKey = new JSONObject(); + subscribeKey.put("app", "rtp"); + subscribeKey.put("stream", ssrcInfo.getStream()); + subscribeKey.put("regist", true); + subscribeKey.put("schema", "rtmp"); + subscribeKey.put("mediaServerId", mediaServerItem.getId()); + logger.debug("录像回放添加订阅,订阅内容:" + subscribeKey); + subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, + (MediaServerItem mediaServerItemInUse, JSONObject json)->{ + if (hookEvent != null) { + InviteStreamInfo inviteStreamInfo = new InviteStreamInfo(mediaServerItemInUse, json, callIdHeader.getCallId(), "rtp", ssrcInfo.getStream()); + hookEvent.call(inviteStreamInfo); + } + }); Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, "fromplybck" + tm, null, callIdHeader, ssrcInfo.getSsrc()); transmitRequest(device, request, errorEvent, okEvent -> { @@ -537,6 +540,9 @@ public class SIPCommander implements ISIPCommander { streamSession.put(device.getDeviceId(), channelId, callIdHeader.getCallId(), ssrcInfo.getStream(), ssrcInfo.getSsrc(), mediaServerItem.getId(), responseEvent.getClientTransaction()); streamSession.put(device.getDeviceId(), channelId, callIdHeader.getCallId(), okEvent.dialog); }); + if (inviteStreamCallback != null) { + inviteStreamCallback.call(new InviteStreamInfo(mediaServerItem, null, callIdHeader.getCallId(), "rtp", ssrcInfo.getStream())); + } } catch ( SipException | ParseException | InvalidArgumentException e) { e.printStackTrace(); } @@ -552,24 +558,11 @@ public class SIPCommander implements ISIPCommander { * @param downloadSpeed 下载倍速参数 */ @Override - public void downloadStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, String startTime, String endTime, String downloadSpeed, ZLMHttpHookSubscribe.Event event + public void downloadStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, String startTime, String endTime, String downloadSpeed, InviteStreamCallback event , SipSubscribe.Event errorEvent) { try { logger.info("{} 分配的ZLM为: {} [{}:{}]", ssrcInfo.getStream(), mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort()); - // 添加订阅 - JSONObject subscribeKey = new JSONObject(); - subscribeKey.put("app", "rtp"); - subscribeKey.put("stream", ssrcInfo.getStream()); - subscribeKey.put("regist", true); - subscribeKey.put("mediaServerId", mediaServerItem.getId()); - logger.debug("录像回放添加订阅,订阅内容:" + subscribeKey.toString()); - subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, - (MediaServerItem mediaServerItemInUse, JSONObject json)->{ - event.response(mediaServerItemInUse, json); - subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey); - }); - StringBuffer content = new StringBuffer(200); content.append("v=0\r\n"); content.append("o="+sipConfig.getId()+" 0 0 IN IP4 " + mediaServerItem.getSdpIp() + "\r\n"); @@ -637,6 +630,19 @@ public class SIPCommander implements ISIPCommander { CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() : udpSipProvider.getNewCallId(); + // 添加订阅 + JSONObject subscribeKey = new JSONObject(); + subscribeKey.put("app", "rtp"); + subscribeKey.put("stream", ssrcInfo.getStream()); + subscribeKey.put("regist", true); + subscribeKey.put("mediaServerId", mediaServerItem.getId()); + logger.debug("录像回放添加订阅,订阅内容:" + subscribeKey.toString()); + subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, + (MediaServerItem mediaServerItemInUse, JSONObject json)->{ + event.call(new InviteStreamInfo(mediaServerItem, json, callIdHeader.getCallId(), "rtp", ssrcInfo.getStream())); + subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey); + }); + Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, "fromplybck" + tm, null, callIdHeader, ssrcInfo.getSsrc()); ClientTransaction transaction = transmitRequest(device, request, errorEvent); @@ -652,15 +658,15 @@ public class SIPCommander implements ISIPCommander { * 视频流停止, 不使用回调 */ @Override - public void streamByeCmd(String deviceId, String channelId, String stream) { - streamByeCmd(deviceId, channelId, stream, null); + public void streamByeCmd(String deviceId, String channelId, String stream, String callId) { + streamByeCmd(deviceId, channelId, stream, callId, null); } /** * 视频流停止 */ @Override - public void streamByeCmd(String deviceId, String channelId, String stream, SipSubscribe.Event okEvent) { + public void streamByeCmd(String deviceId, String channelId, String stream, String callId, SipSubscribe.Event okEvent) { try { SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(deviceId, channelId, null, stream); ClientTransaction transaction = streamSession.getTransactionByStream(deviceId, channelId, stream); @@ -672,7 +678,15 @@ public class SIPCommander implements ISIPCommander { } return; } - SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, stream); + SIPDialog dialog; + if (callId != null) { + dialog = streamSession.getDialogByCallId(deviceId, channelId, callId); + }else { + if (stream == null) return; + dialog = streamSession.getDialogByStream(deviceId, channelId, stream); + } + + if (dialog == null) { logger.warn("[ {} -> {}]停止视频流的时候发现对话已丢失", deviceId, channelId); return; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index 329a5aa9..6f1d031b 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -13,6 +13,7 @@ import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.utils.SerializeUtils; import gov.nist.javax.sip.SipProviderImpl; import gov.nist.javax.sip.SipStackImpl; +import gov.nist.javax.sip.message.MessageFactoryImpl; import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.stack.SIPDialog; import org.slf4j.Logger; @@ -77,11 +78,11 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { @Override public boolean unregister(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) { ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId()); + parentPlatform.setExpires("0"); if (parentPlatformCatch != null) { parentPlatformCatch.setParentPlatform(parentPlatform); redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch); } - parentPlatform.setExpires("0"); return register(parentPlatform, null, null, errorEvent, okEvent, false); } @@ -416,11 +417,13 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { private void sendNotify(ParentPlatform parentPlatform, String catalogXmlContent, SubscribeInfo subscribeInfo, SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent ) throws NoSuchFieldException, IllegalAccessException, SipException, ParseException { + MessageFactoryImpl messageFactory = (MessageFactoryImpl) sipFactory.createMessageFactory(); + // 设置编码, 防止中文乱码 + messageFactory.setDefaultContentEncodingCharset("gb2312"); Dialog dialog = subscribeInfo.getDialog(); - Request notifyRequest = dialog.createRequest(Request.NOTIFY); - + if (dialog == null) return; + SIPRequest notifyRequest = (SIPRequest)dialog.createRequest(Request.NOTIFY); ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml"); - notifyRequest.setContent(catalogXmlContent, contentTypeHeader); SubscriptionStateHeader subscriptionState = sipFactory.createHeaderFactory() @@ -511,7 +514,8 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { } @Override - public boolean sendNotifyForCatalogOther(String type, ParentPlatform parentPlatform, List deviceChannels, SubscribeInfo subscribeInfo, Integer index) { + public boolean sendNotifyForCatalogOther(String type, ParentPlatform parentPlatform, List deviceChannels, + SubscribeInfo subscribeInfo, Integer index) { if (parentPlatform == null || deviceChannels == null || deviceChannels.size() == 0 @@ -579,24 +583,30 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { recordXml.append("" +recordInfo.getSn() + "\r\n"); recordXml.append("" + recordInfo.getDeviceId() + "\r\n"); recordXml.append("" + recordInfo.getSumNum() + "\r\n"); - recordXml.append("\r\n"); - for (RecordItem recordItem : recordInfo.getRecordList()) { - recordXml.append("\r\n"); - if (deviceChannel != null) { - recordXml.append("" + recordItem.getDeviceId() + "\r\n"); - recordXml.append("" + recordItem.getName() + "\r\n"); - recordXml.append("" + DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(recordItem.getStartTime()) + "\r\n"); - recordXml.append("" + DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(recordItem.getEndTime()) + "\r\n"); - recordXml.append("" + recordItem.getSecrecy() + "\r\n"); - recordXml.append("" + recordItem.getType() + "\r\n"); - if (!StringUtils.isEmpty(recordItem.getFileSize())) { - recordXml.append("" + recordItem.getFileSize() + "\r\n"); - } - if (!StringUtils.isEmpty(recordItem.getFilePath())) { - recordXml.append("" + recordItem.getFilePath() + "\r\n"); + if (recordInfo.getRecordList() == null ) { + recordXml.append("\r\n"); + }else { + recordXml.append("\r\n"); + if (recordInfo.getRecordList().size() > 0) { + for (RecordItem recordItem : recordInfo.getRecordList()) { + recordXml.append("\r\n"); + if (deviceChannel != null) { + recordXml.append("" + recordItem.getDeviceId() + "\r\n"); + recordXml.append("" + recordItem.getName() + "\r\n"); + recordXml.append("" + DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(recordItem.getStartTime()) + "\r\n"); + recordXml.append("" + DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(recordItem.getEndTime()) + "\r\n"); + recordXml.append("" + recordItem.getSecrecy() + "\r\n"); + recordXml.append("" + recordItem.getType() + "\r\n"); + if (!StringUtils.isEmpty(recordItem.getFileSize())) { + recordXml.append("" + recordItem.getFileSize() + "\r\n"); + } + if (!StringUtils.isEmpty(recordItem.getFilePath())) { + recordXml.append("" + recordItem.getFilePath() + "\r\n"); + } + } + recordXml.append("\r\n"); } } - recordXml.append("\r\n"); } recordXml.append("\r\n"); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index 8556730c..ec83fa85 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -27,10 +27,7 @@ import javax.sip.header.CallIdHeader; import javax.sip.header.FromHeader; import javax.sip.header.HeaderAddress; import javax.sip.header.ToHeader; -import java.util.HashMap; -import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; +import java.util.*; /** * SIP命令类型: ACK请求 @@ -84,44 +81,72 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId, null, callIdHeader.getCallId()); String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; - String deviceId = sendRtpItem.getDeviceId(); - StreamInfo streamInfo = null; - if (sendRtpItem.isPlay()) { - streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); - }else { - streamInfo = redisCatchStorage.queryPlaybackByDevice(deviceId, channelId); - } - if (streamInfo == null) { - streamInfo = new StreamInfo(); - streamInfo.setApp(sendRtpItem.getApp()); - streamInfo.setStream(sendRtpItem.getStreamId()); - } - redisCatchStorage.updateSendRTPSever(sendRtpItem); + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + logger.info("收到ACK,开始向上级推流 rtp/{}", sendRtpItem.getStreamId()); Map param = new HashMap<>(); param.put("vhost","__defaultVhost__"); - param.put("app",streamInfo.getApp()); - param.put("stream",streamInfo.getStream()); + param.put("app",sendRtpItem.getApp()); + param.put("stream",sendRtpItem.getStreamId()); param.put("ssrc", sendRtpItem.getSsrc()); param.put("dst_url",sendRtpItem.getIp()); param.put("dst_port", sendRtpItem.getPort()); param.put("is_udp", is_Udp); - MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); - if (jsonObject.getInteger("code") != 0) { - logger.info("监听流以等待流上线{}/{}", streamInfo.getApp(), streamInfo.getStream()); - // 监听流上线 - // 添加订阅 - JSONObject subscribeKey = new JSONObject(); - subscribeKey.put("app", "rtp"); - subscribeKey.put("stream", streamInfo.getStream()); - subscribeKey.put("regist", true); - subscribeKey.put("schema", "rtmp"); - subscribeKey.put("mediaServerId", sendRtpItem.getMediaServerId()); - subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, - (MediaServerItem mediaServerItemInUse, JSONObject json)->{ - zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); - }); - } + param.put("src_port", sendRtpItem.getLocalPort()); + zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); + + + +// if (streamInfo == null) { // 流还没上来,对方就回复ack +// logger.info("监听流以等待流上线1 rtp/{}", sendRtpItem.getStreamId()); +// // 监听流上线 +// // 添加订阅 +// JSONObject subscribeKey = new JSONObject(); +// subscribeKey.put("app", "rtp"); +// subscribeKey.put("stream", sendRtpItem.getStreamId()); +// subscribeKey.put("regist", true); +// subscribeKey.put("schema", "rtmp"); +// subscribeKey.put("mediaServerId", sendRtpItem.getMediaServerId()); +// subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, +// (MediaServerItem mediaServerItemInUse, JSONObject json)->{ +// Map param = new HashMap<>(); +// param.put("vhost","__defaultVhost__"); +// param.put("app",json.getString("app")); +// param.put("stream",json.getString("stream")); +// param.put("ssrc", sendRtpItem.getSsrc()); +// param.put("dst_url",sendRtpItem.getIp()); +// param.put("dst_port", sendRtpItem.getPort()); +// param.put("is_udp", is_Udp); +// param.put("src_port", sendRtpItem.getLocalPort()); +// zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); +// }); +// }else { +// Map param = new HashMap<>(); +// param.put("vhost","__defaultVhost__"); +// param.put("app",streamInfo.getApp()); +// param.put("stream",streamInfo.getStream()); +// param.put("ssrc", sendRtpItem.getSsrc()); +// param.put("dst_url",sendRtpItem.getIp()); +// param.put("dst_port", sendRtpItem.getPort()); +// param.put("is_udp", is_Udp); +// param.put("src_port", sendRtpItem.getLocalPort()); +// +// JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); +// if (jsonObject.getInteger("code") != 0) { +// logger.info("监听流以等待流上线2 {}/{}", streamInfo.getApp(), streamInfo.getStream()); +// // 监听流上线 +// // 添加订阅 +// JSONObject subscribeKey = new JSONObject(); +// subscribeKey.put("app", "rtp"); +// subscribeKey.put("stream", streamInfo.getStream()); +// subscribeKey.put("regist", true); +// subscribeKey.put("schema", "rtmp"); +// subscribeKey.put("mediaServerId", sendRtpItem.getMediaServerId()); +// subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, +// (MediaServerItem mediaServerItemInUse, JSONObject json)->{ +// zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); +// }); +// } +// } } } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index 2811c4f5..e487447a 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -93,14 +93,16 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In param.put("app",sendRtpItem.getApp()); param.put("stream",streamId); param.put("ssrc",sendRtpItem.getSsrc()); - logger.info("停止向上级推流:" + streamId); + logger.info("收到bye:停止向上级推流:" + streamId); MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param); redisCatchStorage.deleteSendRTPServer(platformGbId, channelId, callIdHeader.getCallId(), null); int totalReaderCount = zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId); if (totalReaderCount <= 0) { - logger.info(streamId + "无其它观看者,通知设备停止推流"); - cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId, streamId); + logger.info("收到bye: {}无其它观看者,通知设备停止推流", streamId); + if (sendRtpItem.isPlay()) { + cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId, streamId, null); + } } } // 可能是设备主动停止 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index f82f7810..fef3412c 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; +import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; @@ -91,6 +92,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @Autowired private SIPProcessorObserver sipProcessorObserver; + @Autowired + private VideoStreamSessionManager sessionManager; + @Override public void afterPropertiesSet() throws Exception { @@ -233,6 +237,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } String username = sdp.getOrigin().getUsername(); String addressStr = sdp.getOrigin().getAddress(); + logger.info("[上级点播]用户:{}, 地址:{}:{}, ssrc:{}", username, addressStr, port, ssrc); Device device = null; // 通过 channel 和 gbStream 是否为null 值判断来源是直播流合适国标 @@ -266,13 +271,14 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements sendRtpItem.setDialog(dialogByteArray); byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction()); sendRtpItem.setTransaction(transactionByteArray); - // 写入redis, 超时时回复 - redisCatchStorage.updateSendRTPSever(sendRtpItem); + Long finalStartTime = startTime; Long finalStopTime = stopTime; ZLMHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON)->{ - logger.info("[上级点播]下级已经开始推流。 回复200OK(SDP), {}/{}", sendRtpItem.getApp(), sendRtpItem.getStreamId()); + String app = responseJSON.getString("app"); + String stream = responseJSON.getString("stream"); + logger.info("[上级点播]下级已经开始推流。 回复200OK(SDP), {}/{}", app, stream); // * 0 等待设备推流上来 // * 1 下级已经推流,等待上级平台回复ack // * 2 推流中 @@ -325,46 +331,66 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements e.printStackTrace(); } }); + sendRtpItem.setApp("rtp"); if ("Playback".equals(sessionName)) { sendRtpItem.setPlay(false); - sendRtpItem.setStreamId(ssrc); + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, true); + sendRtpItem.setStreamId(ssrcInfo.getStream()); + // 写入redis, 超时时回复 + redisCatchStorage.updateSendRTPSever(sendRtpItem); SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - playService.playBack(device.getDeviceId(), channelId, format.format(start), format.format(end),result -> { - if (result.getCode() != 0){ - logger.warn("录像回放失败"); - if (result.getEvent() != null) { - errorEvent.response(result.getEvent()); + playService.playBack(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, format.format(start), + format.format(end), null, result -> { + if (result.getCode() != 0){ + logger.warn("录像回放失败"); + if (result.getEvent() != null) { + errorEvent.response(result.getEvent()); + } + redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); + try { + responseAck(evt, Response.REQUEST_TIMEOUT); + } catch (SipException e) { + e.printStackTrace(); + } catch (InvalidArgumentException e) { + e.printStackTrace(); + } catch (ParseException e) { + e.printStackTrace(); + } + }else { + if (result.getMediaServerItem() != null) { + hookEvent.response(result.getMediaServerItem(), result.getResponse()); + } } - redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); - try { - responseAck(evt, Response.REQUEST_TIMEOUT); - } catch (SipException e) { - e.printStackTrace(); - } catch (InvalidArgumentException e) { - e.printStackTrace(); - } catch (ParseException e) { - e.printStackTrace(); - } - }else { - if (result.getMediaServerItem() != null) { - hookEvent.response(result.getMediaServerItem(), result.getResponse()); - } - } - }); + }); }else { sendRtpItem.setPlay(true); - StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId); - if (streamInfo == null) { + SsrcTransaction playTransaction = sessionManager.getSsrcTransaction(device.getDeviceId(), channelId, "play", null); + if (playTransaction != null) { + Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, "rtp", playTransaction.getStream()); + if (!streamReady) { + playTransaction = null; + } + } + if (playTransaction == null) { + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, true); if (mediaServerItem.isRtpEnable()) { sendRtpItem.setStreamId(String.format("%s_%s", device.getDeviceId(), channelId)); + }else { + sendRtpItem.setStreamId(ssrcInfo.getStream()); } - sendRtpItem.setPlay(false); - playService.play(mediaServerItem,device.getDeviceId(), channelId, hookEvent, errorEvent, ()->{ + // 写入redis, 超时时回复 + redisCatchStorage.updateSendRTPSever(sendRtpItem); + playService.play(mediaServerItem, ssrcInfo, device, channelId, hookEvent, errorEvent, (code, msg)->{ redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); - }); + }, null); }else { - sendRtpItem.setStreamId(streamInfo.getStream()); - hookEvent.response(mediaServerItem, null); + sendRtpItem.setStreamId(playTransaction.getStream()); + // 写入redis, 超时时回复 + redisCatchStorage.updateSendRTPSever(sendRtpItem); + JSONObject jsonObject = new JSONObject(); + jsonObject.put("app", sendRtpItem.getApp()); + jsonObject.put("stream", sendRtpItem.getStreamId()); + hookEvent.response(mediaServerItem, jsonObject); } } }else if (gbStream != null) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java index 3c83ec29..8235ade1 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java @@ -18,6 +18,7 @@ import org.springframework.stereotype.Component; import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; import javax.sip.SipException; +import javax.sip.header.CallIdHeader; import javax.sip.message.Response; import java.text.ParseException; @@ -56,14 +57,15 @@ public class MediaStatusNotifyMessageHandler extends SIPRequestProcessorParent i } catch (ParseException e) { e.printStackTrace(); } + CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME); String NotifyType =getText(rootElement, "NotifyType"); if (NotifyType.equals("121")){ logger.info("媒体播放完毕,通知关流"); - StreamInfo streamInfo = redisCatchStorage.queryPlaybackByDevice(device.getDeviceId(), "*"); - if (streamInfo != null) { - redisCatchStorage.stopPlayback(streamInfo); - cmder.streamByeCmd(streamInfo.getDeviceID(), streamInfo.getChannelId(), streamInfo.getStream()); - } + String channelId =getText(rootElement, "DeviceID"); + redisCatchStorage.stopPlayback(device.getDeviceId(), channelId, null, callIdHeader.getCallId()); + cmder.streamByeCmd(device.getDeviceId(), channelId, null, callIdHeader.getCallId()); + // TODO 如果级联播放,需要给上级发送此通知 + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java index ffac1d00..1cb11287 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.response.impl; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch; +import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.event.response.SIPResponseProcessorAbstract; @@ -40,6 +41,9 @@ public class RegisterResponseProcessor extends SIPResponseProcessorAbstract { @Autowired private SIPProcessorObserver sipProcessorObserver; + @Autowired + private SubscribeHolder subscribeHolder; + @Override public void afterPropertiesSet() throws Exception { // 添加消息处理的订阅 @@ -83,19 +87,19 @@ public class RegisterResponseProcessor extends SIPResponseProcessorAbstract { // 注册/注销成功 logger.info(String.format("%s %s成功", platformGBId, action)); redisCatchStorage.delPlatformRegisterInfo(callId); - parentPlatform.setStatus("注册".equals(action)); + redisCatchStorage.delPlatformCatchInfo(platformGBId); // 取回Expires设置,避免注销过程中被置为0 - if (!parentPlatformCatch.getParentPlatform().getExpires().equals("0")) { - ParentPlatform parentPlatformTmp = storager.queryParentPlatByServerGBId(platformGBId); - String expires = parentPlatformTmp.getExpires(); - parentPlatform.setExpires(expires); - parentPlatform.setId(parentPlatformTmp.getId()); - redisCatchStorage.updatePlatformRegister(parentPlatform); - redisCatchStorage.updatePlatformKeepalive(parentPlatform); - parentPlatformCatch.setParentPlatform(parentPlatform); - redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch); - } + ParentPlatform parentPlatformTmp = storager.queryParentPlatByServerGBId(platformGBId); + parentPlatformTmp.setStatus("注册".equals(action)); + redisCatchStorage.updatePlatformRegister(parentPlatformTmp); + redisCatchStorage.updatePlatformKeepalive(parentPlatformTmp); + parentPlatformCatch.setParentPlatform(parentPlatformTmp); + redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch); storager.updateParentPlatformStatus(platformGBId, "注册".equals(action)); + if ("注销".equals(action)) { + subscribeHolder.removeCatalogSubscribe(platformGBId); + subscribeHolder.removeMobilePositionSubscribe(platformGBId); + } } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java index a0b7e75b..5d1e8aff 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java @@ -45,12 +45,8 @@ public class ZLMRTPServerFactory { Map param = new HashMap<>(); int result = -1; - /** - * 不设置推流端口端则使用随机端口 - */ - if (StringUtils.isEmpty(mediaServerItem.getSendRtpPortRange())){ - param.put("port", 0); - }else { + // 不设置推流端口端则使用随机端口 + if (!StringUtils.isEmpty(mediaServerItem.getSendRtpPortRange())){ int newPort = getPortFromportRange(mediaServerItem); param.put("port", newPort); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java index 80ededa6..4cff4a68 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java @@ -2,10 +2,14 @@ package com.genersoft.iot.vmp.service; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.gb28181.bean.InviteStreamCallback; +import com.genersoft.iot.vmp.gb28181.bean.InviteStreamInfo; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.service.bean.InviteTimeOutCallback; import com.genersoft.iot.vmp.service.bean.PlayBackCallback; +import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult; import org.springframework.http.ResponseEntity; import org.springframework.web.context.request.async.DeferredResult; @@ -17,13 +21,17 @@ public interface IPlayService { void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid); + void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, + ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, + InviteTimeOutCallback timeoutCallback, String uuid); PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent, Runnable timeoutCallback); MediaServerItem getNewMediaServerItem(Device device); - void onPublishHandlerForDownload(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String toString); + void onPublishHandlerForDownload(InviteStreamInfo inviteStreamInfo, String deviceId, String channelId, String toString); - DeferredResult> playBack(String deviceId, String channelId, String startTime, String endTime, PlayBackCallback errorCallBack); + DeferredResult> playBack(String deviceId, String channelId, String startTime, String endTime, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack); + DeferredResult> playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,String deviceId, String channelId, String startTime, String endTime, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack); void zlmServerOffline(String mediaServerId); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/InviteTimeOutCallback.java b/src/main/java/com/genersoft/iot/vmp/service/bean/InviteTimeOutCallback.java new file mode 100644 index 00000000..e30db5d9 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/InviteTimeOutCallback.java @@ -0,0 +1,6 @@ +package com.genersoft.iot.vmp.service.bean; + +public interface InviteTimeOutCallback { + + void run(int code, String msg); // code: 0 sip超时, 1 收流超时 +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/PlayBackResult.java b/src/main/java/com/genersoft/iot/vmp/service/bean/PlayBackResult.java index 10a2759f..8029b5af 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/bean/PlayBackResult.java +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/PlayBackResult.java @@ -7,9 +7,9 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import javax.sip.RequestEvent; public class PlayBackResult { - private int code; - private T data; - private MediaServerItem mediaServerItem; + private int code; + private T data; + private MediaServerItem mediaServerItem; private JSONObject response; private SipSubscribe.EventResult event; diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 2df78b76..9ee58673 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -16,6 +16,7 @@ import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.service.bean.InviteTimeOutCallback; import com.genersoft.iot.vmp.service.bean.PlayBackCallback; import com.genersoft.iot.vmp.service.bean.PlayBackResult; import com.genersoft.iot.vmp.service.bean.SSRCInfo; @@ -27,6 +28,7 @@ import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult; import com.genersoft.iot.vmp.service.IMediaService; import com.genersoft.iot.vmp.service.IPlayService; import gov.nist.javax.sip.stack.SIPDialog; +import jdk.nashorn.internal.ir.RuntimeNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -36,6 +38,9 @@ import org.springframework.stereotype.Service; import org.springframework.util.ResourceUtils; import org.springframework.web.context.request.async.DeferredResult; +import javax.sip.header.CallIdHeader; +import javax.sip.header.Header; +import javax.sip.message.Request; import java.io.FileNotFoundException; import java.util.*; @@ -79,6 +84,8 @@ public class PlayServiceImpl implements IPlayService { private UserSetup userSetup; + + @Override public PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId, ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, @@ -141,67 +148,7 @@ public class PlayServiceImpl implements IPlayService { e.printStackTrace(); } }); - if (streamInfo == null) { - String streamId = null; - if (mediaServerItem.isRtpEnable()) { - streamId = String.format("%s_%s", device.getDeviceId(), channelId); - } - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId); - // 超时处理 - Timer timer = new Timer(); - timer.schedule(new TimerTask() { - @Override - public void run() { - logger.warn(String.format("设备点播超时,deviceId:%s ,channelId:%s", deviceId, channelId)); - if (timeoutCallback != null) { - timeoutCallback.run(); - } - WVPResult wvpResult = new WVPResult(); - wvpResult.setCode(-1); - SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, ssrcInfo.getStream()); - if (dialog != null) { - wvpResult.setMsg("收流超时,请稍候重试"); - // 点播超时回复BYE 同时释放ssrc以及此次点播的资源 - cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream()); - }else { - wvpResult.setMsg("点播超时,请稍候重试"); - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - mediaServerService.closeRTPServer(deviceId, channelId, ssrcInfo.getStream()); - streamSession.remove(deviceId, channelId, ssrcInfo.getStream()); - } - - msg.setData(wvpResult); - - // 回复之前所有的点播请求 - resultHolder.invokeAllResult(msg); - } - }, userSetup.getPlayTimeout()); - // 发送点播消息 - cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInUse, JSONObject response) -> { - logger.info("收到订阅消息: " + response.toJSONString()); - timer.cancel(); - onPublishHandlerForPlay(mediaServerItemInUse, response, deviceId, channelId, uuid); - if (hookEvent != null) { - hookEvent.response(mediaServerItem, response); - } - }, (event) -> { - timer.cancel(); - WVPResult wvpResult = new WVPResult(); - wvpResult.setCode(-1); - // 点播返回sip错误 - mediaServerService.closeRTPServer(playResult.getDevice().getDeviceId(), channelId, ssrcInfo.getStream()); - // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - streamSession.remove(deviceId, channelId, ssrcInfo.getStream()); - - wvpResult.setMsg(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg)); - msg.setData(wvpResult); - resultHolder.invokeAllResult(msg); - if (errorEvent != null) { - errorEvent.response(event); - } - }); - } else { + if (streamInfo != null) { String streamId = streamInfo.getStream(); if (streamId == null) { WVPResult wvpResult = new WVPResult(); @@ -227,67 +174,109 @@ public class PlayServiceImpl implements IPlayService { if (hookEvent != null) { hookEvent.response(mediaServerItem, JSONObject.parseObject(JSON.toJSONString(streamInfo))); } - } else { - // TODO 点播前是否重置状态 + }else { redisCatchStorage.stopPlay(streamInfo); storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId()); - String streamId2 = null; - if (mediaServerItem.isRtpEnable()) { - streamId2 = String.format("%s_%s", device.getDeviceId(), channelId); - } - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId2); - // 超时处理 - Timer timer = new Timer(); - timer.schedule(new TimerTask() { - @Override - public void run() { - logger.warn(String.format("设备点播超时,deviceId:%s ,channelId:%s", deviceId, channelId)); - if (timeoutCallback != null) { - timeoutCallback.run(); - } - WVPResult wvpResult = new WVPResult(); - wvpResult.setCode(-1); - SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, ssrcInfo.getStream()); - if (dialog != null) { - wvpResult.setMsg("收流超时,请稍候重试"); - // 点播超时回复BYE 同时释放ssrc以及此次点播的资源 - cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream()); - }else { - wvpResult.setMsg("点播超时,请稍候重试"); - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - mediaServerService.closeRTPServer(deviceId, channelId, ssrcInfo.getStream()); - streamSession.remove(deviceId, channelId, ssrcInfo.getStream()); - } - - msg.setData(wvpResult); - // 回复之前所有的点播请求 - resultHolder.invokeAllResult(msg); - } - }, userSetup.getPlayTimeout()); - cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> { - logger.info("收到订阅消息: " + response.toJSONString()); - onPublishHandlerForPlay(mediaServerItemInuse, response, deviceId, channelId, uuid); - }, (event) -> { - mediaServerService.closeRTPServer(playResult.getDevice().getDeviceId(), channelId, ssrcInfo.getStream()); - // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - streamSession.remove(deviceId, channelId, ssrcInfo.getStream()); - WVPResult wvpResult = new WVPResult(); - wvpResult.setCode(-1); - wvpResult.setMsg(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg)); - msg.setData(wvpResult); - resultHolder.invokeAllResult(msg); - }); + streamInfo = null; } + + } + if (streamInfo == null) { + String streamId = null; + if (mediaServerItem.isRtpEnable()) { + streamId = String.format("%s_%s", device.getDeviceId(), channelId); + } + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId); + play(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInUse, response)->{ + if (hookEvent != null) { + hookEvent.response(mediaServerItem, response); + } + }, event -> { + // sip error错误 + WVPResult wvpResult = new WVPResult(); + wvpResult.setCode(-1); + wvpResult.setMsg(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg)); + msg.setData(wvpResult); + resultHolder.invokeAllResult(msg); + if (errorEvent != null) { + errorEvent.response(event); + } + }, (code, msgStr)->{ + // invite点播超时 + WVPResult wvpResult = new WVPResult(); + wvpResult.setCode(-1); + if (code == 0) { + wvpResult.setMsg("点播超时,请稍候重试"); + }else if (code == 1) { + wvpResult.setMsg("收流超时,请稍候重试"); + } + msg.setData(wvpResult); + // 回复之前所有的点播请求 + resultHolder.invokeAllResult(msg); + }, uuid); + } + return playResult; + } + + + + @Override + public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, + ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, + InviteTimeOutCallback timeoutCallback, String uuid) { + + String streamId = null; + if (mediaServerItem.isRtpEnable()) { + streamId = String.format("%s_%s", device.getDeviceId(), channelId); + } + if (ssrcInfo == null) { + ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId); } - return playResult; + // 超时处理 + Timer timer = new Timer(); + SSRCInfo finalSsrcInfo = ssrcInfo; + timer.schedule(new TimerTask() { + @Override + public void run() { + logger.warn(String.format("设备点播超时,deviceId:%s ,channelId:%s", device.getDeviceId(), channelId)); + + SIPDialog dialog = streamSession.getDialogByStream(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); + if (dialog != null) { + timeoutCallback.run(1, "收流超时"); + // 点播超时回复BYE 同时释放ssrc以及此次点播的资源 + cmder.streamByeCmd(device.getDeviceId(), channelId, finalSsrcInfo.getStream(), null); + }else { + timeoutCallback.run(0, "点播超时"); + mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc()); + mediaServerService.closeRTPServer(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); + streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); + } + } + }, userSetup.getPlayTimeout()); + + cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> { + logger.info("收到订阅消息: " + response.toJSONString()); + timer.cancel(); + // hook响应 + onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId, uuid); + hookEvent.response(mediaServerItemInuse, response); + }, (event) -> { + timer.cancel(); + mediaServerService.closeRTPServer(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); + // 释放ssrc + mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc()); + streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); + errorEvent.response(event); + }); } @Override public void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String uuid) { RequestMessage msg = new RequestMessage(); - msg.setId(uuid); + if (uuid != null) { + msg.setId(uuid); + } msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId); StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId); if (streamInfo != null) { @@ -297,7 +286,6 @@ public class PlayServiceImpl implements IPlayService { storager.startPlay(deviceId, channelId, streamInfo.getStream()); } redisCatchStorage.startPlay(streamInfo); - msg.setData(JSON.toJSONString(streamInfo)); WVPResult wvpResult = new WVPResult(); wvpResult.setCode(0); @@ -329,9 +317,24 @@ public class PlayServiceImpl implements IPlayService { return mediaServerItem; } + @Override + public DeferredResult> playBack(String deviceId, String channelId, String startTime, + String endTime,InviteStreamCallback inviteStreamCallback, + PlayBackCallback callback) { + Device device = storager.queryVideoDevice(deviceId); + if (device == null) return null; + MediaServerItem newMediaServerItem = getNewMediaServerItem(device); + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, true); + + return playBack(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, inviteStreamCallback, callback); + } @Override - public DeferredResult> playBack(String deviceId, String channelId, String startTime, String endTime, PlayBackCallback callback) { + public DeferredResult> playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, + String deviceId, String channelId, String startTime, + String endTime, InviteStreamCallback infoCallBack, + PlayBackCallback playBackCallback) { + if (mediaServerItem == null || ssrcInfo == null) return null; String uuid = UUID.randomUUID().toString(); String key = DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId; DeferredResult> result = new DeferredResult<>(30000L); @@ -341,8 +344,6 @@ public class PlayServiceImpl implements IPlayService { return result; } - MediaServerItem newMediaServerItem = getNewMediaServerItem(device); - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, true); resultHolder.put(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId, uuid, result); RequestMessage msg = new RequestMessage(); msg.setId(uuid); @@ -356,63 +357,62 @@ public class PlayServiceImpl implements IPlayService { logger.warn(String.format("设备回放超时,deviceId:%s ,channelId:%s", deviceId, channelId)); playBackResult.setCode(-1); playBackResult.setData(msg); - callback.call(playBackResult); + playBackCallback.call(playBackResult); SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, ssrcInfo.getStream()); // 点播超时回复BYE 同时释放ssrc以及此次点播的资源 if (dialog != null) { // 点播超时回复BYE 同时释放ssrc以及此次点播的资源 - cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream()); + cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null); }else { - mediaServerService.releaseSsrc(newMediaServerItem.getId(), ssrcInfo.getSsrc()); + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); mediaServerService.closeRTPServer(deviceId, channelId, ssrcInfo.getStream()); streamSession.remove(deviceId, channelId, ssrcInfo.getStream()); } - cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream()); + cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null); // 回复之前所有的点播请求 - callback.call(playBackResult); + playBackCallback.call(playBackResult); } }, userSetup.getPlayTimeout()); - cmder.playbackStreamCmd(newMediaServerItem, ssrcInfo, device, channelId, startTime, endTime, (MediaServerItem mediaServerItem, JSONObject response) -> { - logger.info("收到订阅消息: " + response.toJSONString()); - timer.cancel(); - StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId); - if (streamInfo == null) { - logger.warn("设备回放API调用失败!"); - msg.setData("设备回放API调用失败!"); - playBackResult.setCode(-1); - playBackResult.setData(msg); - callback.call(playBackResult); - return; - } - redisCatchStorage.startPlayback(streamInfo); - msg.setData(JSON.toJSONString(streamInfo)); - playBackResult.setCode(0); - playBackResult.setData(msg); - playBackResult.setMediaServerItem(mediaServerItem); - playBackResult.setResponse(response); - callback.call(playBackResult); - }, event -> { - timer.cancel(); - msg.setData(String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg)); - playBackResult.setCode(-1); - playBackResult.setData(msg); - playBackResult.setEvent(event); - callback.call(playBackResult); - streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); - }); + cmder.playbackStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, infoCallBack, + (InviteStreamInfo inviteStreamInfo) -> { + logger.info("收到订阅消息: " + inviteStreamInfo.getResponse().toJSONString()); + timer.cancel(); + StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId); + if (streamInfo == null) { + logger.warn("设备回放API调用失败!"); + msg.setData("设备回放API调用失败!"); + playBackResult.setCode(-1); + playBackResult.setData(msg); + playBackCallback.call(playBackResult); + return; + } + redisCatchStorage.startPlayback(streamInfo, inviteStreamInfo.getCallId()); + msg.setData(JSON.toJSONString(streamInfo)); + playBackResult.setCode(0); + playBackResult.setData(msg); + playBackResult.setMediaServerItem(inviteStreamInfo.getMediaServerItem()); + playBackResult.setResponse(inviteStreamInfo.getResponse()); + playBackCallback.call(playBackResult); + }, event -> { + timer.cancel(); + msg.setData(String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg)); + playBackResult.setCode(-1); + playBackResult.setData(msg); + playBackResult.setEvent(event); + playBackCallback.call(playBackResult); + streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); + }); return result; } - - @Override - public void onPublishHandlerForDownload(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String uuid) { + public void onPublishHandlerForDownload(InviteStreamInfo inviteStreamInfo, String deviceId, String channelId, String uuid) { RequestMessage msg = new RequestMessage(); msg.setKey(DeferredResultHolder.CALLBACK_CMD_DOWNLOAD + deviceId + channelId); msg.setId(uuid); - StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId); + StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId); if (streamInfo != null) { - redisCatchStorage.startDownload(streamInfo); + redisCatchStorage.startDownload(streamInfo, inviteStreamInfo.getCallId()); msg.setData(JSON.toJSONString(streamInfo)); resultHolder.invokeResult(msg); } else { @@ -449,7 +449,8 @@ public class PlayServiceImpl implements IPlayService { if (allSsrc.size() > 0) { for (SsrcTransaction ssrcTransaction : allSsrc) { if(ssrcTransaction.getMediaServerId().equals(mediaServerId)) { - cmder.streamByeCmd(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream()); + cmder.streamByeCmd(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(), + ssrcTransaction.getStream(), null); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index 5829292c..50948533 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -47,17 +47,15 @@ public interface IRedisCatchStorage { StreamInfo queryPlayByStreamId(String steamId); - StreamInfo queryPlaybackByStreamId(String steamId); - StreamInfo queryPlayByDevice(String deviceId, String channelId); Map queryPlayByDeviceId(String deviceId); - boolean startPlayback(StreamInfo stream); + boolean startPlayback(StreamInfo stream, String callId); - boolean stopPlayback(StreamInfo streamInfo); + boolean stopPlayback(String deviceId, String channelId, String stream, String callId); - StreamInfo queryPlaybackByDevice(String deviceId, String code); + StreamInfo queryPlayback(String deviceId, String channelID, String stream, String callId); void updatePlatformCatchInfo(ParentPlatformCatch parentPlatformCatch); @@ -167,9 +165,9 @@ public interface IRedisCatchStorage { * 开始下载录像时存入 * @param streamInfo */ - boolean startDownload(StreamInfo streamInfo); + boolean startDownload(StreamInfo streamInfo, String callId); - StreamInfo queryDownloadByStreamId(String streamId); + StreamInfo queryDownload(String deviceId, String channelId, String stream, String callId); /** * 查找第三方系统留下的国标预设值 diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformChannelMapper.java index f1d23f18..e048f31e 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformChannelMapper.java @@ -55,7 +55,7 @@ public interface PlatformChannelMapper { int cleanChannelForGB(String platformId); @Select("SELECT dc.* FROM platform_gb_channel pgc left join device_channel dc on dc.id = pgc.deviceChannelId WHERE dc.channelId='${channelId}' and pgc.platformId='${platformId}'") - DeviceChannel queryChannelInParentPlatform(String platformId, String channelId); + List queryChannelInParentPlatform(String platformId, String channelId); @Select(" select dc.channelId as id, dc.name as name, pgc.platformId as platformId, pgc.catalogId as parentId, 0 as childrenCount, 1 as type " + " from device_channel dc left join platform_gb_channel pgc on dc.id = pgc.deviceChannelId " + @@ -67,7 +67,7 @@ public interface PlatformChannelMapper { " left join device_channel dc on dc.id = pgc.deviceChannelId\n" + " left join device d on dc.deviceId = d.deviceId\n" + "where dc.channelId = #{channelId} and pgc.platformId=#{platformId}") - Device queryVideoDeviceByPlatformIdAndChannelId(String platformId, String channelId); + List queryVideoDeviceByPlatformIdAndChannelId(String platformId, String channelId); @Delete(" + diff --git a/web_src/src/components/dialog/chooseChannelForGb.vue b/web_src/src/components/dialog/chooseChannelForGb.vue index 50a495a5..78bfddf5 100644 --- a/web_src/src/components/dialog/chooseChannelForGb.vue +++ b/web_src/src/components/dialog/chooseChannelForGb.vue @@ -1,8 +1,8 @@