From bef7fb2440093bbf92a2d192ce1f1c4db219b07c Mon Sep 17 00:00:00 2001 From: dengming Date: Thu, 10 Mar 2022 15:08:41 +0800 Subject: [PATCH 01/13] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=85=B3=E9=97=AD?= =?UTF-8?q?=E6=8C=89=E9=9C=80=E6=8B=89=E6=B5=81=E6=97=A0=E6=95=88=E7=9A=84?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java | 2 +- .../genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 62723ac5..aa0fa876 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -459,7 +459,7 @@ public class ZLMHttpHookListener { } } MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); - if (mediaServerItem != null && "-1".equals(mediaServerItem.getStreamNoneReaderDelayMS())) { + if (mediaServerItem != null && mediaServerItem.getStreamNoneReaderDelayMS() == -1) { ret.put("close", false); } return new ResponseEntity(ret.toString(),HttpStatus.OK); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java index e5781657..600dad80 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java @@ -512,7 +512,7 @@ public class MediaServerServiceImpl implements IMediaServerService { param.put("hook.on_stream_not_found",String.format("%s/on_stream_not_found", hookPrex)); param.put("hook.on_server_keepalive",String.format("%s/on_server_keepalive", hookPrex)); param.put("hook.timeoutSec","20"); - param.put("general.streamNoneReaderDelayMS","-1".equals(mediaServerItem.getStreamNoneReaderDelayMS())?"3600000":mediaServerItem.getStreamNoneReaderDelayMS() ); + param.put("general.streamNoneReaderDelayMS",mediaServerItem.getStreamNoneReaderDelayMS()==-1?"3600000":mediaServerItem.getStreamNoneReaderDelayMS() ); JSONObject responseJSON = zlmresTfulUtils.setServerConfig(mediaServerItem, param); From d474bf929e3787b6ac88cfb5b2ee242844a0339b Mon Sep 17 00:00:00 2001 From: dengming Date: Thu, 10 Mar 2022 15:24:50 +0800 Subject: [PATCH 02/13] =?UTF-8?q?=E5=88=A0=E9=99=A4=E5=A4=9A=E4=BD=99?= =?UTF-8?q?=E4=BE=9D=E8=B5=96=E5=8C=85com.google.guava:30.0-jre=EF=BC=8C?= =?UTF-8?q?=E9=A1=B9=E7=9B=AE=E5=BC=95=E7=94=A8=E4=BA=86=E7=89=88=E6=9C=AC?= =?UTF-8?q?31?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 7 ------- 1 file changed, 7 deletions(-) diff --git a/pom.xml b/pom.xml index 2dde4da9..2d7f746f 100644 --- a/pom.xml +++ b/pom.xml @@ -169,13 +169,6 @@ 1.2.73 - - - com.google.guava - guava - 30.0-jre - - com.squareup.okhttp3 From f88c70d38ce1c9c0d04460d0c225373c84dc2eca Mon Sep 17 00:00:00 2001 From: dengming Date: Thu, 10 Mar 2022 17:27:07 +0800 Subject: [PATCH 03/13] =?UTF-8?q?=E5=88=A0=E9=99=A4=E9=87=8D=E5=A4=8D?= =?UTF-8?q?=E5=AD=97=E6=AE=B5=EF=BC=8CchannelId=E5=AD=97=E6=AE=B5=E6=9F=A5?= =?UTF-8?q?=E8=AF=A2=E4=BA=86=E4=B8=A4=E6=AC=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java index 2431699a..5e6ffed8 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java @@ -231,7 +231,6 @@ public interface DeviceChannelMapper { " name as title,\n" + " channelId as \"value\",\n" + " channelId as \"key\",\n" + - " channelId,\n" + " longitude,\n" + " latitude\n" + " from device_channel\n" + From a948b38dd606055739271334f3521908e6bb1781 Mon Sep 17 00:00:00 2001 From: dengming Date: Thu, 10 Mar 2022 17:40:45 +0800 Subject: [PATCH 04/13] =?UTF-8?q?=E4=BC=98=E5=8C=96pom=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=EF=BC=8C=E8=A7=A3=E5=86=B3=E7=BC=96=E8=AF=91Failed=20to=20perf?= =?UTF-8?q?orm=20fetch=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pom.xml b/pom.xml index 2d7f746f..7ad53db9 100644 --- a/pom.xml +++ b/pom.xml @@ -272,6 +272,9 @@ pl.project13.maven git-commit-id-plugin + + true + From 4a22611320f7016d98a37dda57ef1ca4ea9489ec Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Fri, 11 Mar 2022 09:54:35 +0800 Subject: [PATCH 05/13] =?UTF-8?q?=E4=BC=98=E5=8C=96=E7=9B=AE=E5=BD=95?= =?UTF-8?q?=E8=AE=A2=E9=98=85=E6=97=B6=E7=88=B6=E8=8A=82=E7=82=B9=E7=9A=84?= =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../genersoft/iot/vmp/gb28181/event/SipSubscribe.java | 2 +- .../event/request/impl/NotifyRequestProcessor.java | 1 - .../request/impl/message/MessageRequestProcessor.java | 3 --- .../iot/vmp/storager/dao/DeviceChannelMapper.java | 9 +++++++++ .../iot/vmp/storager/impl/VideoManagerStoragerImpl.java | 3 ++- 5 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java index 49c52d5e..b347bbaa 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java @@ -30,7 +30,7 @@ public class SipSubscribe { // @Scheduled(fixedRate= 100 * 60 * 60 ) @Scheduled(cron="0 0/5 * * * ?") //每5分钟执行一次 public void execute(){ - logger.info("[定时任务] 清理过期的订阅信息"); + logger.info("[定时任务] 清理过期的SIP订阅信息"); Calendar calendar = Calendar.getInstance(); calendar.setTime(new Date()); calendar.set(Calendar.MINUTE, calendar.get(Calendar.MINUTE) - 5); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index 6ae0f3e7..3961eeab 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -233,7 +233,6 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements */ private void processNotifyCatalogList(RequestEvent evt) { try { - System.out.println(343434); FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageRequestProcessor.java index 5813998b..e126fba4 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageRequestProcessor.java @@ -67,9 +67,6 @@ public class MessageRequestProcessor extends SIPRequestProcessorParent implement // 查询设备是否存在 CSeqHeader cseqHeader = (CSeqHeader) evt.getRequest().getHeader(CSeqHeader.NAME); String method = cseqHeader.getMethod(); - if (method.equals("MESSAGE")) { - System.out.println(); - } Device device = redisCatchStorage.getDevice(deviceId); // 查询上级平台是否存在 ParentPlatform parentPlatform = storage.queryParentPlatByServerGBId(deviceId); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java index 5e6ffed8..e4fc1ebe 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java @@ -247,4 +247,13 @@ public interface DeviceChannelMapper { " #{item.channelId}" + " "}) int cleanChannelsNotInList(String deviceId, List channels); + + @Update(" update device_channel" + + " set subCount = (select *" + + " from (select count(0)" + + " from device_channel" + + " where deviceId = #{deviceId} and parentId = #{channelId}) as temp)" + + " where deviceId = #{deviceId} " + + " and channelId = #{channelId}") + int updateChannelSubCount(String deviceId, String channelId); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java index ce450884..fe71e42d 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java @@ -42,7 +42,7 @@ import java.util.*; @Component public class VideoManagerStoragerImpl implements IVideoManagerStorager { - private Logger logger = LoggerFactory.getLogger(VideoManagerStoragerImpl.class); + private final Logger logger = LoggerFactory.getLogger(VideoManagerStoragerImpl.class); @Autowired EventPublisher eventPublisher; @@ -171,6 +171,7 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { }else { deviceChannelMapper.update(channel); } + deviceChannelMapper.updateChannelSubCount(deviceId,channel.getParentId()); } @Override From 5cfaad2d6d6d5451ba93cce68707f1ae23711585 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Fri, 11 Mar 2022 09:58:36 +0800 Subject: [PATCH 06/13] #376 --- .../gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index a379f391..329a5aa9 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -101,7 +101,9 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { callIdHeader = udpSipProvider.getNewCallId(); } - request = headerProviderPlarformProvider.createRegisterRequest(parentPlatform, redisCatchStorage.getCSEQ(Request.REGISTER), "FromRegister" + tm, null, callIdHeader); + request = headerProviderPlarformProvider.createRegisterRequest(parentPlatform, + redisCatchStorage.getCSEQ(Request.REGISTER), "FromRegister" + tm, + "z9hG4bK-" + UUID.randomUUID().toString().replace("-", ""), callIdHeader); // 将 callid 写入缓存, 等注册成功可以更新状态 String callIdFromHeader = callIdHeader.getCallId(); redisCatchStorage.updatePlatformRegisterInfo(callIdFromHeader, parentPlatform.getServerGBId()); From 1c95f1b4aa45f5fbe16b3ff2e00c560dd5d84550 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Fri, 11 Mar 2022 11:22:47 +0800 Subject: [PATCH 07/13] =?UTF-8?q?=E7=A7=BB=E9=99=A4=E5=A4=9A=E4=BD=99?= =?UTF-8?q?=E5=AD=97=E6=AE=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- DOCKERFILE | 2 +- .../iot/vmp/gb28181/bean/SubscribeHolder.java | 12 ++++++ .../iot/vmp/gb28181/bean/SubscribeInfo.java | 33 --------------- .../subscribe/catalog/CatalogEventLister.java | 2 +- .../impl/SubscribeRequestProcessor.java | 17 +------- .../iot/vmp/storager/IRedisCatchStorage.java | 10 ----- .../storager/impl/RedisCatchStorageImpl.java | 41 ------------------- 7 files changed, 16 insertions(+), 101 deletions(-) diff --git a/DOCKERFILE b/DOCKERFILE index 96bc29bf..6c4beb55 100644 --- a/DOCKERFILE +++ b/DOCKERFILE @@ -85,7 +85,7 @@ RUN echo '#!/bin/bash' > run.sh && \ echo 'nohup /opt/media/MediaServer -d -m 3 &' >> run.sh && \ echo 'cd /opt/wvp' >> run.sh && \ echo 'if [${WVP_CONFIG}]; then' >> run.sh && \ - echo ' java -jar *.jar --spring.confi g.location=/opt/wvp/config/application.yml --media.record-assist-port=18081 ${WVP_CONFIG}' >> run.sh && \ + echo ' java -jar *.jar --spring.config.location=/opt/wvp/config/application.yml --media.record-assist-port=18081 ${WVP_CONFIG}' >> run.sh && \ echo 'else' >> run.sh && \ echo ' java -jar *.jar --spring.config.location=/opt/wvp/config/application.yml --media.record-assist-port=18081 --media.ip=127.0.0.1 --media.sdp-ip=${WVP_IP} --sip.ip=${WVP_IP} --media.stream-ip=${WVP_IP}' >> run.sh && \ echo 'fi' >> run.sh diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java index a027486c..287c2a0a 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java @@ -2,6 +2,8 @@ package com.genersoft.iot.vmp.gb28181.bean; import org.springframework.stereotype.Component; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; @Component @@ -34,4 +36,14 @@ public class SubscribeHolder { public void removeMobilePositionSubscribe(String platformId) { mobilePositionMap.remove(platformId); } + + public List getAllCatalogSubscribePlatform() { + List platforms = new ArrayList<>(); + if(catalogMap.size() > 0) { + for (String key : catalogMap.keySet()) { + platforms.add(catalogMap.get(key).getId()); + } + } + return platforms; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java index 373533a6..956e93eb 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java @@ -14,17 +14,11 @@ public class SubscribeInfo { public SubscribeInfo(RequestEvent evt, String id) { this.id = id; Request request = evt.getRequest(); - CallIdHeader callIdHeader = (CallIdHeader)request.getHeader(CallIdHeader.NAME); - this.callId = callIdHeader.getCallId(); - FromHeader fromHeader = (FromHeader)request.getHeader(FromHeader.NAME); - this.fromTag = fromHeader.getTag(); ExpiresHeader expiresHeader = (ExpiresHeader)request.getHeader(ExpiresHeader.NAME); this.expires = expiresHeader.getExpires(); EventHeader eventHeader = (EventHeader)request.getHeader(EventHeader.NAME); this.eventId = eventHeader.getEventId(); this.eventType = eventHeader.getEventType(); - ViaHeader viaHeader = (ViaHeader)request.getHeader(ViaHeader.NAME); - this.branch = viaHeader.getBranch(); this.transaction = evt.getServerTransaction(); this.dialog = evt.getDialog(); } @@ -34,9 +28,6 @@ public class SubscribeInfo { private String callId; private String eventId; private String eventType; - private String fromTag; - private String toTag; - private String branch; private ServerTransaction transaction; private Dialog dialog; @@ -52,18 +43,6 @@ public class SubscribeInfo { return callId; } - public String getFromTag() { - return fromTag; - } - - public void setToTag(String toTag) { - this.toTag = toTag; - } - - public String getToTag() { - return toTag; - } - public void setId(String id) { this.id = id; } @@ -76,10 +55,6 @@ public class SubscribeInfo { this.callId = callId; } - public void setFromTag(String fromTag) { - this.fromTag = fromTag; - } - public String getEventId() { return eventId; } @@ -96,14 +71,6 @@ public class SubscribeInfo { this.eventType = eventType; } - public String getBranch() { - return branch; - } - - public void setBranch(String branch) { - this.branch = branch; - } - public ServerTransaction getTransaction() { return transaction; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java index 9e3b352a..6af6c661 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java @@ -74,7 +74,7 @@ public class CatalogEventLister implements ApplicationListener { } }else { // 获取所用订阅 - List platforms = redisCatchStorage.getAllSubscribePlatform(); + List platforms = subscribeHolder.getAllCatalogSubscribePlatform(); if (event.getDeviceChannels() != null) { if (platforms.size() > 0) { for (DeviceChannel deviceChannel : event.getDeviceChannels()) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java index e1e7125e..4acec149 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java @@ -158,20 +158,14 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme String interval = XmlUtil.getText(rootElement, "Interval"); // GPS上报时间间隔 dynamicTask.startCron(key, new GPSSubscribeTask(redisCatchStorage, sipCommanderForPlatform, storager, platformId, sn, key, subscribeHolder), Integer.parseInt(interval)); subscribeHolder.putMobilePositionSubscribe(platformId, subscribeInfo); -// redisCatchStorage.updateSubscribe(key, subscribeInfo); }else if (subscribeInfo.getExpires() == 0) { dynamicTask.stop(key); -// redisCatchStorage.delSubscribe(key); subscribeHolder.removeMobilePositionSubscribe(platformId); } try { ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId); - Response response = responseXmlAck(evt, resultXml.toString(), parentPlatform); - ToHeader toHeader = (ToHeader)response.getHeader(ToHeader.NAME); - subscribeInfo.setToTag(toHeader.getTag()); - redisCatchStorage.updateSubscribe(key, subscribeInfo); - + responseXmlAck(evt, resultXml.toString(), parentPlatform); } catch (SipException e) { e.printStackTrace(); } catch (InvalidArgumentException e) { @@ -211,21 +205,14 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme .append("\r\n"); if (subscribeInfo.getExpires() > 0) { -// redisCatchStorage.updateSubscribe(key, subscribeInfo); subscribeHolder.putCatalogSubscribe(platformId, subscribeInfo); }else if (subscribeInfo.getExpires() == 0) { -// redisCatchStorage.delSubscribe(key); subscribeHolder.removeCatalogSubscribe(platformId); } try { ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId); - Response response = responseXmlAck(evt, resultXml.toString(), parentPlatform); - ToHeader toHeader = (ToHeader)response.getHeader(ToHeader.NAME); - subscribeInfo.setToTag(toHeader.getTag()); -// redisCatchStorage.updateSubscribe(key, subscribeInfo); - subscribeHolder.putCatalogSubscribe(platformId, subscribeInfo); - + responseXmlAck(evt, resultXml.toString(), parentPlatform); } catch (SipException e) { e.printStackTrace(); } catch (InvalidArgumentException e) { diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index b0edc063..5829292c 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -204,18 +204,8 @@ public interface IRedisCatchStorage { void resetAllSN(); - void updateSubscribe(String key, SubscribeInfo subscribeInfo); - - SubscribeInfo getSubscribe(String key); - - void delSubscribe(String key); - MediaItem getStreamInfo(String app, String streamId, String mediaServerId); - List getAllSubscribe(); - - List getAllSubscribePlatform(); - void addCpuInfo(double cpuInfo); void addMemInfo(double memInfo); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 0641348a..0541d935 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -490,21 +490,6 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { return (GPSMsgInfo)redis.get(key); } - @Override - public void updateSubscribe(String key, SubscribeInfo subscribeInfo) { - redis.set(key, subscribeInfo, subscribeInfo.getExpires()); - } - - @Override - public SubscribeInfo getSubscribe(String key) { - return (SubscribeInfo)redis.get(key); - } - - @Override - public void delSubscribe(String key) { - redis.del(key); - } - @Override public List getAllGpsMsgInfo() { String scanKey = VideoManagerConstants.WVP_STREAM_GPS_MSG_PREFIX + userSetup.getServerId() + "_*"; @@ -535,32 +520,6 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { return result; } - @Override - public List getAllSubscribe() { - String scanKey = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_Catalog_*"; - List result = new ArrayList<>(); - List keys = redis.scan(scanKey); - for (int i = 0; i < keys.size(); i++) { - String key = (String) keys.get(i); - SubscribeInfo subscribeInfo = (SubscribeInfo) redis.get(key); - result.add(subscribeInfo); - } - return result; - } - - @Override - public List getAllSubscribePlatform() { - String scanKey = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_Catalog_*"; - List result = new ArrayList<>(); - List keys = redis.scan(scanKey); - for (int i = 0; i < keys.size(); i++) { - String key = (String) keys.get(i); - String platformId = key.substring(scanKey.length() - 1); - result.add(platformId); - } - return result; - } - @Override public void addCpuInfo(double cpuInfo) { String key = VideoManagerConstants.SYSTEM_INFO_CPU_PREFIX + userSetup.getServerId(); From 61e91afd9112e8650ce1ec15f575df587f4f27a6 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Fri, 11 Mar 2022 11:22:55 +0800 Subject: [PATCH 08/13] #390 --- .../iot/vmp/gb28181/event/online/OnlineEventListener.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java index 27bc4bcf..d3580d35 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java @@ -68,8 +68,6 @@ public class OnlineEventListener implements ApplicationListener { String key = VideoManagerConstants.KEEPLIVEKEY_PREFIX + userSetup.getServerId() + "_" + event.getDevice().getDeviceId(); Device deviceInStore = storager.queryVideoDevice(device.getDeviceId()); device.setOnline(1); - // 处理上线监听 - storager.updateDevice(device); switch (event.getFrom()) { // 注册时触发的在线事件,先在redis中增加超时超时监听 case VideoManagerConstants.EVENT_ONLINE_REGISTER: @@ -98,7 +96,8 @@ public class OnlineEventListener implements ApplicationListener { break; } - + // 处理上线监听 + storager.updateDevice(device); List deviceChannelList = storager.queryOnlineChannelsByDeviceId(device.getDeviceId()); eventPublisher.catalogEventPublish(null, deviceChannelList, CatalogEvent.ON); // 上线添加订阅 From 1171cf1ea9a2c69916bfb49181ecb5bf66055b33 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Mon, 14 Mar 2022 11:36:01 +0800 Subject: [PATCH 09/13] =?UTF-8?q?=E4=BD=BF=E7=94=A8zlm=E6=96=B0=E7=89=B9?= =?UTF-8?q?=E6=80=A7=E6=94=AF=E6=8C=81=E5=AF=B9=E4=B8=8E=E5=9B=BD=E6=A0=87?= =?UTF-8?q?=E6=B5=81=E5=85=B3=E9=97=AD=E9=9F=B3=E9=A2=91=E4=BB=A5=E5=8A=A0?= =?UTF-8?q?=E5=BF=AB=E6=8B=89=E6=B5=81=E9=80=9F=E5=BA=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../vmp/media/zlm/ZLMHttpHookListener.java | 40 +++++++++++-------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index aa0fa876..7a3c5f59 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -9,9 +9,12 @@ import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.MediaConfig; import com.genersoft.iot.vmp.conf.UserSetup; import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.GbStream; +import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; +import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.media.zlm.dto.*; import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.SSRCInfo; @@ -81,7 +84,7 @@ public class ZLMHttpHookListener { private UserSetup userSetup; @Autowired - private MediaConfig mediaConfig; + private VideoStreamSessionManager sessionManager; /** * 服务器定时上报时间,上报间隔可配置,默认10s上报一次 @@ -204,15 +207,15 @@ public class ZLMHttpHookListener { }else { ret.put("enableMP4", userSetup.isRecordPushLive()); } - StreamInfo streamInfo = redisCatchStorage.queryPlaybackByStreamId(stream); - - // 录像回放时不进行录像下载 - if (streamInfo != null) { - ret.put("enableMP4", false); - }else { - ret.put("enableMP4", userSetup.isRecordPushLive()); + List ssrcTransactionForAll = sessionManager.getSsrcTransactionForAll(null, null, null, stream); + if (ssrcTransactionForAll != null && ssrcTransactionForAll.size() == 1) { + String deviceId = ssrcTransactionForAll.get(0).getDeviceId(); + String channelId = ssrcTransactionForAll.get(0).getChannelId(); + DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId); + if (deviceChannel != null) { + ret.put("enable_audio", deviceChannel.isHasAudio()); + } } - return new ResponseEntity(ret.toString(), HttpStatus.OK); } @@ -347,8 +350,12 @@ public class ZLMHttpHookListener { redisCatchStorage.stopPlay(streamInfo); storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId()); }else{ - streamInfo = redisCatchStorage.queryPlaybackByStreamId(streamId); - redisCatchStorage.stopPlayback(streamInfo); + streamInfo = redisCatchStorage.queryPlayback(null, null, streamId, null); + if (streamInfo != null) { + redisCatchStorage.stopPlayback(streamInfo.getDeviceID(), streamInfo.getChannelId(), + streamInfo.getStream(), null); + } + } }else { if (!"rtp".equals(app)){ @@ -440,18 +447,19 @@ public class ZLMHttpHookListener { ret.put("close", false); } else { cmder.streamByeCmd(streamInfoForPlayCatch.getDeviceID(), streamInfoForPlayCatch.getChannelId(), - streamInfoForPlayCatch.getStream()); + streamInfoForPlayCatch.getStream(), null); redisCatchStorage.stopPlay(streamInfoForPlayCatch); storager.stopPlay(streamInfoForPlayCatch.getDeviceID(), streamInfoForPlayCatch.getChannelId()); } }else{ - StreamInfo streamInfoForPlayBackCatch = redisCatchStorage.queryPlaybackByStreamId(streamId); + StreamInfo streamInfoForPlayBackCatch = redisCatchStorage.queryPlayback(null, null, streamId, null); if (streamInfoForPlayBackCatch != null) { cmder.streamByeCmd(streamInfoForPlayBackCatch.getDeviceID(), - streamInfoForPlayBackCatch.getChannelId(), streamInfoForPlayBackCatch.getStream()); - redisCatchStorage.stopPlayback(streamInfoForPlayBackCatch); + streamInfoForPlayBackCatch.getChannelId(), streamInfoForPlayBackCatch.getStream(), null); + redisCatchStorage.stopPlayback(streamInfoForPlayBackCatch.getDeviceID(), + streamInfoForPlayBackCatch.getChannelId(), streamInfoForPlayBackCatch.getStream(), null); }else { - StreamInfo streamInfoForDownload = redisCatchStorage.queryDownloadByStreamId(streamId); + StreamInfo streamInfoForDownload = redisCatchStorage.queryDownload(null, null, streamId, null); // 进行录像下载时无人观看不断流 if (streamInfoForDownload != null) { ret.put("close", false); From 354a39961ad26949f597e4c434b0cd470b7f78ee Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Mon, 14 Mar 2022 18:24:30 +0800 Subject: [PATCH 10/13] =?UTF-8?q?=E4=BC=98=E5=8C=96=E7=82=B9=E6=92=AD,=20?= =?UTF-8?q?=E7=BA=A7=E8=81=94=E7=82=B9=E6=92=AD=E7=BA=A7=E8=81=94=E5=BD=95?= =?UTF-8?q?=E5=83=8F=E3=80=82=E7=BA=A7=E8=81=94=E5=88=97=E8=A1=A8=E6=98=BE?= =?UTF-8?q?=E7=A4=BA=E8=AE=A2=E9=98=85=E7=8A=B6=E6=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/conf/SipPlatformRunner.java | 7 +- .../gb28181/bean/InviteStreamCallback.java | 5 + .../vmp/gb28181/bean/InviteStreamInfo.java | 61 ++++ .../iot/vmp/gb28181/bean/ParentPlatform.java | 39 +++ .../iot/vmp/gb28181/bean/SubscribeInfo.java | 2 + .../iot/vmp/gb28181/event/SipSubscribe.java | 8 +- .../subscribe/catalog/CatalogEventLister.java | 2 - .../transmit/SIPProcessorObserver.java | 4 +- .../gb28181/transmit/cmd/ISIPCommander.java | 9 +- .../transmit/cmd/impl/SIPCommander.java | 84 +++-- .../cmd/impl/SIPCommanderFroPlatform.java | 52 +-- .../request/impl/AckRequestProcessor.java | 97 ++++-- .../request/impl/ByeRequestProcessor.java | 8 +- .../request/impl/InviteRequestProcessor.java | 90 +++-- .../cmd/MediaStatusNotifyMessageHandler.java | 12 +- .../impl/RegisterResponseProcessor.java | 26 +- .../vmp/media/zlm/ZLMRTPServerFactory.java | 8 +- .../iot/vmp/service/IPlayService.java | 12 +- .../service/bean/InviteTimeOutCallback.java | 6 + .../iot/vmp/service/bean/PlayBackResult.java | 6 +- .../iot/vmp/service/impl/PlayServiceImpl.java | 311 +++++++++--------- .../iot/vmp/storager/IRedisCatchStorage.java | 12 +- .../storager/dao/PlatformChannelMapper.java | 4 +- .../storager/dao/PlatformGbStreamMapper.java | 2 +- .../storager/impl/RedisCatchStorageImpl.java | 104 +++--- .../impl/VideoManagerStoragerImpl.java | 29 +- .../gb28181/platform/PlatformController.java | 16 +- .../vmanager/gb28181/play/PlayController.java | 4 +- .../gb28181/playback/DownloadController.java | 14 +- .../gb28181/playback/PlaybackController.java | 12 +- .../vmp/web/gb28181/ApiStreamController.java | 2 +- web_src/src/components/ParentPlatformList.vue | 26 +- .../components/dialog/chooseChannelForGb.vue | 4 +- .../dialog/chooseChannelForStream.vue | 4 +- web_src/static/css/iconfont.css | 22 +- web_src/static/css/iconfont.woff2 | Bin 52052 -> 52504 bytes 36 files changed, 694 insertions(+), 410 deletions(-) create mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteStreamCallback.java create mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteStreamInfo.java create mode 100644 src/main/java/com/genersoft/iot/vmp/service/bean/InviteTimeOutCallback.java diff --git a/src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java b/src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java index 4ebaf0bf..7f9f8476 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java @@ -60,12 +60,9 @@ public class SipPlatformRunner implements CommandLineRunner { // 取消订阅 sipCommanderForPlatform.unregister(parentPlatform, null, (eventResult)->{ - ParentPlatform platform = storager.queryParentPlatByServerGBId(parentPlatform.getServerGBId()); - sipCommanderForPlatform.register(platform, null, null); + // 发送平台未注册消息 + publisher.platformNotRegisterEventPublish(parentPlatform.getServerGBId()); }); - - // 发送平台未注册消息 - publisher.platformNotRegisterEventPublish(parentPlatform.getServerGBId()); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteStreamCallback.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteStreamCallback.java new file mode 100644 index 00000000..42a05198 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteStreamCallback.java @@ -0,0 +1,5 @@ +package com.genersoft.iot.vmp.gb28181.bean; + +public interface InviteStreamCallback { + void call(InviteStreamInfo inviteStreamInfo); +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteStreamInfo.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteStreamInfo.java new file mode 100644 index 00000000..3f3c5835 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteStreamInfo.java @@ -0,0 +1,61 @@ +package com.genersoft.iot.vmp.gb28181.bean; + +import com.alibaba.fastjson.JSONObject; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; + +public class InviteStreamInfo { + + public InviteStreamInfo(MediaServerItem mediaServerItem, JSONObject response, String callId, String app, String stream) { + this.mediaServerItem = mediaServerItem; + this.response = response; + this.callId = callId; + this.app = app; + this.stream = stream; + } + + private MediaServerItem mediaServerItem; + private JSONObject response; + private String callId; + private String app; + private String stream; + + public MediaServerItem getMediaServerItem() { + return mediaServerItem; + } + + public void setMediaServerItem(MediaServerItem mediaServerItem) { + this.mediaServerItem = mediaServerItem; + } + + public JSONObject getResponse() { + return response; + } + + public void setResponse(JSONObject response) { + this.response = response; + } + + public String getCallId() { + return callId; + } + + public void setCallId(String callId) { + this.callId = callId; + } + + public String getApp() { + return app; + } + + public void setApp(String app) { + this.app = app; + } + + public String getStream() { + return stream; + } + + public void setStream(String stream) { + this.stream = stream; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatform.java index 0c061450..8df79394 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatform.java @@ -114,6 +114,21 @@ public class ParentPlatform { */ private String catalogId; + /** + * 已被订阅目录信息 + */ + private boolean catalogSubscribe; + + /** + * 已被订阅报警信息 + */ + private boolean alarmSubscribe; + + /** + * 已被订阅GPS信息 + */ + private boolean gpsSubscribe; + public Integer getId() { return id; } @@ -290,4 +305,28 @@ public class ParentPlatform { public void setCatalogId(String catalogId) { this.catalogId = catalogId; } + + public boolean isCatalogSubscribe() { + return catalogSubscribe; + } + + public void setCatalogSubscribe(boolean catalogSubscribe) { + this.catalogSubscribe = catalogSubscribe; + } + + public boolean isAlarmSubscribe() { + return alarmSubscribe; + } + + public void setAlarmSubscribe(boolean alarmSubscribe) { + this.alarmSubscribe = alarmSubscribe; + } + + public boolean isGpsSubscribe() { + return gpsSubscribe; + } + + public void setGpsSubscribe(boolean gpsSubscribe) { + this.gpsSubscribe = gpsSubscribe; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java index 956e93eb..434a639a 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java @@ -21,6 +21,8 @@ public class SubscribeInfo { this.eventType = eventHeader.getEventType(); this.transaction = evt.getServerTransaction(); this.dialog = evt.getDialog(); + CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME); + this.callId = callIdHeader.getCallId(); } private String id; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java index b347bbaa..bc775e46 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java @@ -49,10 +49,10 @@ public class SipSubscribe { errorTimeSubscribes.remove(key); } } - logger.info("okTimeSubscribes.size:{}",okTimeSubscribes.size()); - logger.info("okSubscribes.size:{}",okSubscribes.size()); - logger.info("errorTimeSubscribes.size:{}",errorTimeSubscribes.size()); - logger.info("errorSubscribes.size:{}",errorSubscribes.size()); + logger.debug("okTimeSubscribes.size:{}",okTimeSubscribes.size()); + logger.debug("okSubscribes.size:{}",okSubscribes.size()); + logger.debug("errorTimeSubscribes.size:{}",errorTimeSubscribes.size()); + logger.debug("errorSubscribes.size:{}",errorSubscribes.size()); } public interface Event { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java index 6af6c661..d511f421 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java @@ -117,8 +117,6 @@ public class CatalogEventLister implements ApplicationListener { List parentPlatforms = parentPlatformMap.get(gbId); if (parentPlatforms != null && parentPlatforms.size() > 0) { for (ParentPlatform platform : parentPlatforms) { - String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_Catalog_" + platform.getServerGBId(); -// SubscribeInfo subscribeInfo = redisCatchStorage.getSubscribe(key); SubscribeInfo subscribeInfo = subscribeHolder.getCatalogSubscribe(platform.getServerGBId()); if (subscribeInfo == null) continue; logger.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), gbId); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java index 30efa204..6439e8cc 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java @@ -95,14 +95,14 @@ public class SIPProcessorObserver implements ISIPProcessorObserver { logger.debug("\n收到响应:\n{}", responseEvent.getResponse()); int status = response.getStatusCode(); - if (((status >= 200) && (status < 300)) || status == 401) { // Success! + if (((status >= 200) && (status < 300)) || status == Response.UNAUTHORIZED) { // Success! CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME); String method = cseqHeader.getMethod(); ISIPResponseProcessor sipRequestProcessor = responseProcessorMap.get(method); if (sipRequestProcessor != null) { sipRequestProcessor.process(responseEvent); } - if (responseEvent.getResponse() != null && sipSubscribe.getOkSubscribesSize() > 0 ) { + if (status != Response.UNAUTHORIZED && responseEvent.getResponse() != null && sipSubscribe.getOkSubscribesSize() > 0 ) { CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME); if (callIdHeader != null) { SipSubscribe.Event subscribe = sipSubscribe.getOkSubscribe(callIdHeader.getCallId()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java index 67be2471..409eedbf 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.gb28181.bean.InviteStreamCallback; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; @@ -103,7 +104,7 @@ public interface ISIPCommander { * @param startTime 开始时间,格式要求:yyyy-MM-dd HH:mm:ss * @param endTime 结束时间,格式要求:yyyy-MM-dd HH:mm:ss */ - void playbackStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInf, Device device, String channelId, String startTime, String endTime, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent); + void playbackStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInf, Device device, String channelId, String startTime, String endTime,InviteStreamCallback inviteStreamCallback, InviteStreamCallback event, SipSubscribe.Event errorEvent); /** * 请求历史媒体下载 @@ -114,13 +115,13 @@ public interface ISIPCommander { * @param endTime 结束时间,格式要求:yyyy-MM-dd HH:mm:ss * @param downloadSpeed 下载倍速参数 */ - void downloadStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, String startTime, String endTime, String downloadSpeed, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent); + void downloadStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, String startTime, String endTime, String downloadSpeed, InviteStreamCallback event, SipSubscribe.Event errorEvent); /** * 视频流停止 */ - void streamByeCmd(String deviceId, String channelId, String stream, SipSubscribe.Event okEvent); - void streamByeCmd(String deviceId, String channelId, String stream); + void streamByeCmd(String deviceId, String channelId, String stream, String callId, SipSubscribe.Event okEvent); + void streamByeCmd(String deviceId, String channelId, String stream, String callId); /** * 回放暂停 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index b38a8c11..5df6314e 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -6,6 +6,8 @@ import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetup; import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.gb28181.bean.InviteStreamCallback; +import com.genersoft.iot.vmp.gb28181.bean.InviteStreamInfo; import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; @@ -445,27 +447,13 @@ public class SIPCommander implements ISIPCommander { * @param endTime 结束时间,格式要求:yyyy-MM-dd HH:mm:ss */ @Override - public void playbackStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, String startTime, String endTime, ZLMHttpHookSubscribe.Event event - , SipSubscribe.Event errorEvent) { + public void playbackStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, + String startTime, String endTime, InviteStreamCallback inviteStreamCallback, InviteStreamCallback hookEvent, + SipSubscribe.Event errorEvent) { try { logger.info("{} 分配的ZLM为: {} [{}:{}]", ssrcInfo.getStream(), mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort()); - // 添加订阅 - JSONObject subscribeKey = new JSONObject(); - subscribeKey.put("app", "rtp"); - subscribeKey.put("stream", ssrcInfo.getStream()); - subscribeKey.put("regist", true); - subscribeKey.put("schema", "rtmp"); - subscribeKey.put("mediaServerId", mediaServerItem.getId()); - logger.debug("录像回放添加订阅,订阅内容:" + subscribeKey.toString()); - subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, - (MediaServerItem mediaServerItemInUse, JSONObject json)->{ - if (event != null) { - event.response(mediaServerItemInUse, json); - } - }); - StringBuffer content = new StringBuffer(200); content.append("v=0\r\n"); content.append("o="+sipConfig.getId()+" 0 0 IN IP4 " + mediaServerItem.getSdpIp() + "\r\n"); @@ -530,6 +518,21 @@ public class SIPCommander implements ISIPCommander { CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() : udpSipProvider.getNewCallId(); + // 添加订阅 + JSONObject subscribeKey = new JSONObject(); + subscribeKey.put("app", "rtp"); + subscribeKey.put("stream", ssrcInfo.getStream()); + subscribeKey.put("regist", true); + subscribeKey.put("schema", "rtmp"); + subscribeKey.put("mediaServerId", mediaServerItem.getId()); + logger.debug("录像回放添加订阅,订阅内容:" + subscribeKey); + subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, + (MediaServerItem mediaServerItemInUse, JSONObject json)->{ + if (hookEvent != null) { + InviteStreamInfo inviteStreamInfo = new InviteStreamInfo(mediaServerItemInUse, json, callIdHeader.getCallId(), "rtp", ssrcInfo.getStream()); + hookEvent.call(inviteStreamInfo); + } + }); Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, "fromplybck" + tm, null, callIdHeader, ssrcInfo.getSsrc()); transmitRequest(device, request, errorEvent, okEvent -> { @@ -537,6 +540,9 @@ public class SIPCommander implements ISIPCommander { streamSession.put(device.getDeviceId(), channelId, callIdHeader.getCallId(), ssrcInfo.getStream(), ssrcInfo.getSsrc(), mediaServerItem.getId(), responseEvent.getClientTransaction()); streamSession.put(device.getDeviceId(), channelId, callIdHeader.getCallId(), okEvent.dialog); }); + if (inviteStreamCallback != null) { + inviteStreamCallback.call(new InviteStreamInfo(mediaServerItem, null, callIdHeader.getCallId(), "rtp", ssrcInfo.getStream())); + } } catch ( SipException | ParseException | InvalidArgumentException e) { e.printStackTrace(); } @@ -552,24 +558,11 @@ public class SIPCommander implements ISIPCommander { * @param downloadSpeed 下载倍速参数 */ @Override - public void downloadStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, String startTime, String endTime, String downloadSpeed, ZLMHttpHookSubscribe.Event event + public void downloadStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, String startTime, String endTime, String downloadSpeed, InviteStreamCallback event , SipSubscribe.Event errorEvent) { try { logger.info("{} 分配的ZLM为: {} [{}:{}]", ssrcInfo.getStream(), mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort()); - // 添加订阅 - JSONObject subscribeKey = new JSONObject(); - subscribeKey.put("app", "rtp"); - subscribeKey.put("stream", ssrcInfo.getStream()); - subscribeKey.put("regist", true); - subscribeKey.put("mediaServerId", mediaServerItem.getId()); - logger.debug("录像回放添加订阅,订阅内容:" + subscribeKey.toString()); - subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, - (MediaServerItem mediaServerItemInUse, JSONObject json)->{ - event.response(mediaServerItemInUse, json); - subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey); - }); - StringBuffer content = new StringBuffer(200); content.append("v=0\r\n"); content.append("o="+sipConfig.getId()+" 0 0 IN IP4 " + mediaServerItem.getSdpIp() + "\r\n"); @@ -637,6 +630,19 @@ public class SIPCommander implements ISIPCommander { CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() : udpSipProvider.getNewCallId(); + // 添加订阅 + JSONObject subscribeKey = new JSONObject(); + subscribeKey.put("app", "rtp"); + subscribeKey.put("stream", ssrcInfo.getStream()); + subscribeKey.put("regist", true); + subscribeKey.put("mediaServerId", mediaServerItem.getId()); + logger.debug("录像回放添加订阅,订阅内容:" + subscribeKey.toString()); + subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, + (MediaServerItem mediaServerItemInUse, JSONObject json)->{ + event.call(new InviteStreamInfo(mediaServerItem, json, callIdHeader.getCallId(), "rtp", ssrcInfo.getStream())); + subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey); + }); + Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, "fromplybck" + tm, null, callIdHeader, ssrcInfo.getSsrc()); ClientTransaction transaction = transmitRequest(device, request, errorEvent); @@ -652,15 +658,15 @@ public class SIPCommander implements ISIPCommander { * 视频流停止, 不使用回调 */ @Override - public void streamByeCmd(String deviceId, String channelId, String stream) { - streamByeCmd(deviceId, channelId, stream, null); + public void streamByeCmd(String deviceId, String channelId, String stream, String callId) { + streamByeCmd(deviceId, channelId, stream, callId, null); } /** * 视频流停止 */ @Override - public void streamByeCmd(String deviceId, String channelId, String stream, SipSubscribe.Event okEvent) { + public void streamByeCmd(String deviceId, String channelId, String stream, String callId, SipSubscribe.Event okEvent) { try { SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(deviceId, channelId, null, stream); ClientTransaction transaction = streamSession.getTransactionByStream(deviceId, channelId, stream); @@ -672,7 +678,15 @@ public class SIPCommander implements ISIPCommander { } return; } - SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, stream); + SIPDialog dialog; + if (callId != null) { + dialog = streamSession.getDialogByCallId(deviceId, channelId, callId); + }else { + if (stream == null) return; + dialog = streamSession.getDialogByStream(deviceId, channelId, stream); + } + + if (dialog == null) { logger.warn("[ {} -> {}]停止视频流的时候发现对话已丢失", deviceId, channelId); return; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index 329a5aa9..6f1d031b 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -13,6 +13,7 @@ import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.utils.SerializeUtils; import gov.nist.javax.sip.SipProviderImpl; import gov.nist.javax.sip.SipStackImpl; +import gov.nist.javax.sip.message.MessageFactoryImpl; import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.stack.SIPDialog; import org.slf4j.Logger; @@ -77,11 +78,11 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { @Override public boolean unregister(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) { ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId()); + parentPlatform.setExpires("0"); if (parentPlatformCatch != null) { parentPlatformCatch.setParentPlatform(parentPlatform); redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch); } - parentPlatform.setExpires("0"); return register(parentPlatform, null, null, errorEvent, okEvent, false); } @@ -416,11 +417,13 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { private void sendNotify(ParentPlatform parentPlatform, String catalogXmlContent, SubscribeInfo subscribeInfo, SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent ) throws NoSuchFieldException, IllegalAccessException, SipException, ParseException { + MessageFactoryImpl messageFactory = (MessageFactoryImpl) sipFactory.createMessageFactory(); + // 设置编码, 防止中文乱码 + messageFactory.setDefaultContentEncodingCharset("gb2312"); Dialog dialog = subscribeInfo.getDialog(); - Request notifyRequest = dialog.createRequest(Request.NOTIFY); - + if (dialog == null) return; + SIPRequest notifyRequest = (SIPRequest)dialog.createRequest(Request.NOTIFY); ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml"); - notifyRequest.setContent(catalogXmlContent, contentTypeHeader); SubscriptionStateHeader subscriptionState = sipFactory.createHeaderFactory() @@ -511,7 +514,8 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { } @Override - public boolean sendNotifyForCatalogOther(String type, ParentPlatform parentPlatform, List deviceChannels, SubscribeInfo subscribeInfo, Integer index) { + public boolean sendNotifyForCatalogOther(String type, ParentPlatform parentPlatform, List deviceChannels, + SubscribeInfo subscribeInfo, Integer index) { if (parentPlatform == null || deviceChannels == null || deviceChannels.size() == 0 @@ -579,24 +583,30 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { recordXml.append("" +recordInfo.getSn() + "\r\n"); recordXml.append("" + recordInfo.getDeviceId() + "\r\n"); recordXml.append("" + recordInfo.getSumNum() + "\r\n"); - recordXml.append("\r\n"); - for (RecordItem recordItem : recordInfo.getRecordList()) { - recordXml.append("\r\n"); - if (deviceChannel != null) { - recordXml.append("" + recordItem.getDeviceId() + "\r\n"); - recordXml.append("" + recordItem.getName() + "\r\n"); - recordXml.append("" + DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(recordItem.getStartTime()) + "\r\n"); - recordXml.append("" + DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(recordItem.getEndTime()) + "\r\n"); - recordXml.append("" + recordItem.getSecrecy() + "\r\n"); - recordXml.append("" + recordItem.getType() + "\r\n"); - if (!StringUtils.isEmpty(recordItem.getFileSize())) { - recordXml.append("" + recordItem.getFileSize() + "\r\n"); - } - if (!StringUtils.isEmpty(recordItem.getFilePath())) { - recordXml.append("" + recordItem.getFilePath() + "\r\n"); + if (recordInfo.getRecordList() == null ) { + recordXml.append("\r\n"); + }else { + recordXml.append("\r\n"); + if (recordInfo.getRecordList().size() > 0) { + for (RecordItem recordItem : recordInfo.getRecordList()) { + recordXml.append("\r\n"); + if (deviceChannel != null) { + recordXml.append("" + recordItem.getDeviceId() + "\r\n"); + recordXml.append("" + recordItem.getName() + "\r\n"); + recordXml.append("" + DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(recordItem.getStartTime()) + "\r\n"); + recordXml.append("" + DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(recordItem.getEndTime()) + "\r\n"); + recordXml.append("" + recordItem.getSecrecy() + "\r\n"); + recordXml.append("" + recordItem.getType() + "\r\n"); + if (!StringUtils.isEmpty(recordItem.getFileSize())) { + recordXml.append("" + recordItem.getFileSize() + "\r\n"); + } + if (!StringUtils.isEmpty(recordItem.getFilePath())) { + recordXml.append("" + recordItem.getFilePath() + "\r\n"); + } + } + recordXml.append("\r\n"); } } - recordXml.append("\r\n"); } recordXml.append("\r\n"); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index 8556730c..ec83fa85 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -27,10 +27,7 @@ import javax.sip.header.CallIdHeader; import javax.sip.header.FromHeader; import javax.sip.header.HeaderAddress; import javax.sip.header.ToHeader; -import java.util.HashMap; -import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; +import java.util.*; /** * SIP命令类型: ACK请求 @@ -84,44 +81,72 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId, null, callIdHeader.getCallId()); String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; - String deviceId = sendRtpItem.getDeviceId(); - StreamInfo streamInfo = null; - if (sendRtpItem.isPlay()) { - streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); - }else { - streamInfo = redisCatchStorage.queryPlaybackByDevice(deviceId, channelId); - } - if (streamInfo == null) { - streamInfo = new StreamInfo(); - streamInfo.setApp(sendRtpItem.getApp()); - streamInfo.setStream(sendRtpItem.getStreamId()); - } - redisCatchStorage.updateSendRTPSever(sendRtpItem); + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + logger.info("收到ACK,开始向上级推流 rtp/{}", sendRtpItem.getStreamId()); Map param = new HashMap<>(); param.put("vhost","__defaultVhost__"); - param.put("app",streamInfo.getApp()); - param.put("stream",streamInfo.getStream()); + param.put("app",sendRtpItem.getApp()); + param.put("stream",sendRtpItem.getStreamId()); param.put("ssrc", sendRtpItem.getSsrc()); param.put("dst_url",sendRtpItem.getIp()); param.put("dst_port", sendRtpItem.getPort()); param.put("is_udp", is_Udp); - MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); - if (jsonObject.getInteger("code") != 0) { - logger.info("监听流以等待流上线{}/{}", streamInfo.getApp(), streamInfo.getStream()); - // 监听流上线 - // 添加订阅 - JSONObject subscribeKey = new JSONObject(); - subscribeKey.put("app", "rtp"); - subscribeKey.put("stream", streamInfo.getStream()); - subscribeKey.put("regist", true); - subscribeKey.put("schema", "rtmp"); - subscribeKey.put("mediaServerId", sendRtpItem.getMediaServerId()); - subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, - (MediaServerItem mediaServerItemInUse, JSONObject json)->{ - zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); - }); - } + param.put("src_port", sendRtpItem.getLocalPort()); + zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); + + + +// if (streamInfo == null) { // 流还没上来,对方就回复ack +// logger.info("监听流以等待流上线1 rtp/{}", sendRtpItem.getStreamId()); +// // 监听流上线 +// // 添加订阅 +// JSONObject subscribeKey = new JSONObject(); +// subscribeKey.put("app", "rtp"); +// subscribeKey.put("stream", sendRtpItem.getStreamId()); +// subscribeKey.put("regist", true); +// subscribeKey.put("schema", "rtmp"); +// subscribeKey.put("mediaServerId", sendRtpItem.getMediaServerId()); +// subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, +// (MediaServerItem mediaServerItemInUse, JSONObject json)->{ +// Map param = new HashMap<>(); +// param.put("vhost","__defaultVhost__"); +// param.put("app",json.getString("app")); +// param.put("stream",json.getString("stream")); +// param.put("ssrc", sendRtpItem.getSsrc()); +// param.put("dst_url",sendRtpItem.getIp()); +// param.put("dst_port", sendRtpItem.getPort()); +// param.put("is_udp", is_Udp); +// param.put("src_port", sendRtpItem.getLocalPort()); +// zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); +// }); +// }else { +// Map param = new HashMap<>(); +// param.put("vhost","__defaultVhost__"); +// param.put("app",streamInfo.getApp()); +// param.put("stream",streamInfo.getStream()); +// param.put("ssrc", sendRtpItem.getSsrc()); +// param.put("dst_url",sendRtpItem.getIp()); +// param.put("dst_port", sendRtpItem.getPort()); +// param.put("is_udp", is_Udp); +// param.put("src_port", sendRtpItem.getLocalPort()); +// +// JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); +// if (jsonObject.getInteger("code") != 0) { +// logger.info("监听流以等待流上线2 {}/{}", streamInfo.getApp(), streamInfo.getStream()); +// // 监听流上线 +// // 添加订阅 +// JSONObject subscribeKey = new JSONObject(); +// subscribeKey.put("app", "rtp"); +// subscribeKey.put("stream", streamInfo.getStream()); +// subscribeKey.put("regist", true); +// subscribeKey.put("schema", "rtmp"); +// subscribeKey.put("mediaServerId", sendRtpItem.getMediaServerId()); +// subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, +// (MediaServerItem mediaServerItemInUse, JSONObject json)->{ +// zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); +// }); +// } +// } } } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index 2811c4f5..e487447a 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -93,14 +93,16 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In param.put("app",sendRtpItem.getApp()); param.put("stream",streamId); param.put("ssrc",sendRtpItem.getSsrc()); - logger.info("停止向上级推流:" + streamId); + logger.info("收到bye:停止向上级推流:" + streamId); MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param); redisCatchStorage.deleteSendRTPServer(platformGbId, channelId, callIdHeader.getCallId(), null); int totalReaderCount = zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId); if (totalReaderCount <= 0) { - logger.info(streamId + "无其它观看者,通知设备停止推流"); - cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId, streamId); + logger.info("收到bye: {}无其它观看者,通知设备停止推流", streamId); + if (sendRtpItem.isPlay()) { + cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId, streamId, null); + } } } // 可能是设备主动停止 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index f82f7810..fef3412c 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; +import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; @@ -91,6 +92,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @Autowired private SIPProcessorObserver sipProcessorObserver; + @Autowired + private VideoStreamSessionManager sessionManager; + @Override public void afterPropertiesSet() throws Exception { @@ -233,6 +237,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } String username = sdp.getOrigin().getUsername(); String addressStr = sdp.getOrigin().getAddress(); + logger.info("[上级点播]用户:{}, 地址:{}:{}, ssrc:{}", username, addressStr, port, ssrc); Device device = null; // 通过 channel 和 gbStream 是否为null 值判断来源是直播流合适国标 @@ -266,13 +271,14 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements sendRtpItem.setDialog(dialogByteArray); byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction()); sendRtpItem.setTransaction(transactionByteArray); - // 写入redis, 超时时回复 - redisCatchStorage.updateSendRTPSever(sendRtpItem); + Long finalStartTime = startTime; Long finalStopTime = stopTime; ZLMHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON)->{ - logger.info("[上级点播]下级已经开始推流。 回复200OK(SDP), {}/{}", sendRtpItem.getApp(), sendRtpItem.getStreamId()); + String app = responseJSON.getString("app"); + String stream = responseJSON.getString("stream"); + logger.info("[上级点播]下级已经开始推流。 回复200OK(SDP), {}/{}", app, stream); // * 0 等待设备推流上来 // * 1 下级已经推流,等待上级平台回复ack // * 2 推流中 @@ -325,46 +331,66 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements e.printStackTrace(); } }); + sendRtpItem.setApp("rtp"); if ("Playback".equals(sessionName)) { sendRtpItem.setPlay(false); - sendRtpItem.setStreamId(ssrc); + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, true); + sendRtpItem.setStreamId(ssrcInfo.getStream()); + // 写入redis, 超时时回复 + redisCatchStorage.updateSendRTPSever(sendRtpItem); SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - playService.playBack(device.getDeviceId(), channelId, format.format(start), format.format(end),result -> { - if (result.getCode() != 0){ - logger.warn("录像回放失败"); - if (result.getEvent() != null) { - errorEvent.response(result.getEvent()); + playService.playBack(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, format.format(start), + format.format(end), null, result -> { + if (result.getCode() != 0){ + logger.warn("录像回放失败"); + if (result.getEvent() != null) { + errorEvent.response(result.getEvent()); + } + redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); + try { + responseAck(evt, Response.REQUEST_TIMEOUT); + } catch (SipException e) { + e.printStackTrace(); + } catch (InvalidArgumentException e) { + e.printStackTrace(); + } catch (ParseException e) { + e.printStackTrace(); + } + }else { + if (result.getMediaServerItem() != null) { + hookEvent.response(result.getMediaServerItem(), result.getResponse()); + } } - redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); - try { - responseAck(evt, Response.REQUEST_TIMEOUT); - } catch (SipException e) { - e.printStackTrace(); - } catch (InvalidArgumentException e) { - e.printStackTrace(); - } catch (ParseException e) { - e.printStackTrace(); - } - }else { - if (result.getMediaServerItem() != null) { - hookEvent.response(result.getMediaServerItem(), result.getResponse()); - } - } - }); + }); }else { sendRtpItem.setPlay(true); - StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId); - if (streamInfo == null) { + SsrcTransaction playTransaction = sessionManager.getSsrcTransaction(device.getDeviceId(), channelId, "play", null); + if (playTransaction != null) { + Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, "rtp", playTransaction.getStream()); + if (!streamReady) { + playTransaction = null; + } + } + if (playTransaction == null) { + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, true); if (mediaServerItem.isRtpEnable()) { sendRtpItem.setStreamId(String.format("%s_%s", device.getDeviceId(), channelId)); + }else { + sendRtpItem.setStreamId(ssrcInfo.getStream()); } - sendRtpItem.setPlay(false); - playService.play(mediaServerItem,device.getDeviceId(), channelId, hookEvent, errorEvent, ()->{ + // 写入redis, 超时时回复 + redisCatchStorage.updateSendRTPSever(sendRtpItem); + playService.play(mediaServerItem, ssrcInfo, device, channelId, hookEvent, errorEvent, (code, msg)->{ redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); - }); + }, null); }else { - sendRtpItem.setStreamId(streamInfo.getStream()); - hookEvent.response(mediaServerItem, null); + sendRtpItem.setStreamId(playTransaction.getStream()); + // 写入redis, 超时时回复 + redisCatchStorage.updateSendRTPSever(sendRtpItem); + JSONObject jsonObject = new JSONObject(); + jsonObject.put("app", sendRtpItem.getApp()); + jsonObject.put("stream", sendRtpItem.getStreamId()); + hookEvent.response(mediaServerItem, jsonObject); } } }else if (gbStream != null) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java index 3c83ec29..8235ade1 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java @@ -18,6 +18,7 @@ import org.springframework.stereotype.Component; import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; import javax.sip.SipException; +import javax.sip.header.CallIdHeader; import javax.sip.message.Response; import java.text.ParseException; @@ -56,14 +57,15 @@ public class MediaStatusNotifyMessageHandler extends SIPRequestProcessorParent i } catch (ParseException e) { e.printStackTrace(); } + CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME); String NotifyType =getText(rootElement, "NotifyType"); if (NotifyType.equals("121")){ logger.info("媒体播放完毕,通知关流"); - StreamInfo streamInfo = redisCatchStorage.queryPlaybackByDevice(device.getDeviceId(), "*"); - if (streamInfo != null) { - redisCatchStorage.stopPlayback(streamInfo); - cmder.streamByeCmd(streamInfo.getDeviceID(), streamInfo.getChannelId(), streamInfo.getStream()); - } + String channelId =getText(rootElement, "DeviceID"); + redisCatchStorage.stopPlayback(device.getDeviceId(), channelId, null, callIdHeader.getCallId()); + cmder.streamByeCmd(device.getDeviceId(), channelId, null, callIdHeader.getCallId()); + // TODO 如果级联播放,需要给上级发送此通知 + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java index ffac1d00..1cb11287 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.response.impl; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch; +import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.event.response.SIPResponseProcessorAbstract; @@ -40,6 +41,9 @@ public class RegisterResponseProcessor extends SIPResponseProcessorAbstract { @Autowired private SIPProcessorObserver sipProcessorObserver; + @Autowired + private SubscribeHolder subscribeHolder; + @Override public void afterPropertiesSet() throws Exception { // 添加消息处理的订阅 @@ -83,19 +87,19 @@ public class RegisterResponseProcessor extends SIPResponseProcessorAbstract { // 注册/注销成功 logger.info(String.format("%s %s成功", platformGBId, action)); redisCatchStorage.delPlatformRegisterInfo(callId); - parentPlatform.setStatus("注册".equals(action)); + redisCatchStorage.delPlatformCatchInfo(platformGBId); // 取回Expires设置,避免注销过程中被置为0 - if (!parentPlatformCatch.getParentPlatform().getExpires().equals("0")) { - ParentPlatform parentPlatformTmp = storager.queryParentPlatByServerGBId(platformGBId); - String expires = parentPlatformTmp.getExpires(); - parentPlatform.setExpires(expires); - parentPlatform.setId(parentPlatformTmp.getId()); - redisCatchStorage.updatePlatformRegister(parentPlatform); - redisCatchStorage.updatePlatformKeepalive(parentPlatform); - parentPlatformCatch.setParentPlatform(parentPlatform); - redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch); - } + ParentPlatform parentPlatformTmp = storager.queryParentPlatByServerGBId(platformGBId); + parentPlatformTmp.setStatus("注册".equals(action)); + redisCatchStorage.updatePlatformRegister(parentPlatformTmp); + redisCatchStorage.updatePlatformKeepalive(parentPlatformTmp); + parentPlatformCatch.setParentPlatform(parentPlatformTmp); + redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch); storager.updateParentPlatformStatus(platformGBId, "注册".equals(action)); + if ("注销".equals(action)) { + subscribeHolder.removeCatalogSubscribe(platformGBId); + subscribeHolder.removeMobilePositionSubscribe(platformGBId); + } } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java index a0b7e75b..5d1e8aff 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java @@ -45,12 +45,8 @@ public class ZLMRTPServerFactory { Map param = new HashMap<>(); int result = -1; - /** - * 不设置推流端口端则使用随机端口 - */ - if (StringUtils.isEmpty(mediaServerItem.getSendRtpPortRange())){ - param.put("port", 0); - }else { + // 不设置推流端口端则使用随机端口 + if (!StringUtils.isEmpty(mediaServerItem.getSendRtpPortRange())){ int newPort = getPortFromportRange(mediaServerItem); param.put("port", newPort); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java index 80ededa6..4cff4a68 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java @@ -2,10 +2,14 @@ package com.genersoft.iot.vmp.service; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.gb28181.bean.InviteStreamCallback; +import com.genersoft.iot.vmp.gb28181.bean.InviteStreamInfo; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.service.bean.InviteTimeOutCallback; import com.genersoft.iot.vmp.service.bean.PlayBackCallback; +import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult; import org.springframework.http.ResponseEntity; import org.springframework.web.context.request.async.DeferredResult; @@ -17,13 +21,17 @@ public interface IPlayService { void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid); + void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, + ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, + InviteTimeOutCallback timeoutCallback, String uuid); PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent, Runnable timeoutCallback); MediaServerItem getNewMediaServerItem(Device device); - void onPublishHandlerForDownload(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String toString); + void onPublishHandlerForDownload(InviteStreamInfo inviteStreamInfo, String deviceId, String channelId, String toString); - DeferredResult> playBack(String deviceId, String channelId, String startTime, String endTime, PlayBackCallback errorCallBack); + DeferredResult> playBack(String deviceId, String channelId, String startTime, String endTime, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack); + DeferredResult> playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,String deviceId, String channelId, String startTime, String endTime, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack); void zlmServerOffline(String mediaServerId); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/InviteTimeOutCallback.java b/src/main/java/com/genersoft/iot/vmp/service/bean/InviteTimeOutCallback.java new file mode 100644 index 00000000..e30db5d9 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/InviteTimeOutCallback.java @@ -0,0 +1,6 @@ +package com.genersoft.iot.vmp.service.bean; + +public interface InviteTimeOutCallback { + + void run(int code, String msg); // code: 0 sip超时, 1 收流超时 +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/PlayBackResult.java b/src/main/java/com/genersoft/iot/vmp/service/bean/PlayBackResult.java index 10a2759f..8029b5af 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/bean/PlayBackResult.java +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/PlayBackResult.java @@ -7,9 +7,9 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import javax.sip.RequestEvent; public class PlayBackResult { - private int code; - private T data; - private MediaServerItem mediaServerItem; + private int code; + private T data; + private MediaServerItem mediaServerItem; private JSONObject response; private SipSubscribe.EventResult event; diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 2df78b76..9ee58673 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -16,6 +16,7 @@ import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.service.bean.InviteTimeOutCallback; import com.genersoft.iot.vmp.service.bean.PlayBackCallback; import com.genersoft.iot.vmp.service.bean.PlayBackResult; import com.genersoft.iot.vmp.service.bean.SSRCInfo; @@ -27,6 +28,7 @@ import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult; import com.genersoft.iot.vmp.service.IMediaService; import com.genersoft.iot.vmp.service.IPlayService; import gov.nist.javax.sip.stack.SIPDialog; +import jdk.nashorn.internal.ir.RuntimeNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -36,6 +38,9 @@ import org.springframework.stereotype.Service; import org.springframework.util.ResourceUtils; import org.springframework.web.context.request.async.DeferredResult; +import javax.sip.header.CallIdHeader; +import javax.sip.header.Header; +import javax.sip.message.Request; import java.io.FileNotFoundException; import java.util.*; @@ -79,6 +84,8 @@ public class PlayServiceImpl implements IPlayService { private UserSetup userSetup; + + @Override public PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId, ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, @@ -141,67 +148,7 @@ public class PlayServiceImpl implements IPlayService { e.printStackTrace(); } }); - if (streamInfo == null) { - String streamId = null; - if (mediaServerItem.isRtpEnable()) { - streamId = String.format("%s_%s", device.getDeviceId(), channelId); - } - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId); - // 超时处理 - Timer timer = new Timer(); - timer.schedule(new TimerTask() { - @Override - public void run() { - logger.warn(String.format("设备点播超时,deviceId:%s ,channelId:%s", deviceId, channelId)); - if (timeoutCallback != null) { - timeoutCallback.run(); - } - WVPResult wvpResult = new WVPResult(); - wvpResult.setCode(-1); - SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, ssrcInfo.getStream()); - if (dialog != null) { - wvpResult.setMsg("收流超时,请稍候重试"); - // 点播超时回复BYE 同时释放ssrc以及此次点播的资源 - cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream()); - }else { - wvpResult.setMsg("点播超时,请稍候重试"); - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - mediaServerService.closeRTPServer(deviceId, channelId, ssrcInfo.getStream()); - streamSession.remove(deviceId, channelId, ssrcInfo.getStream()); - } - - msg.setData(wvpResult); - - // 回复之前所有的点播请求 - resultHolder.invokeAllResult(msg); - } - }, userSetup.getPlayTimeout()); - // 发送点播消息 - cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInUse, JSONObject response) -> { - logger.info("收到订阅消息: " + response.toJSONString()); - timer.cancel(); - onPublishHandlerForPlay(mediaServerItemInUse, response, deviceId, channelId, uuid); - if (hookEvent != null) { - hookEvent.response(mediaServerItem, response); - } - }, (event) -> { - timer.cancel(); - WVPResult wvpResult = new WVPResult(); - wvpResult.setCode(-1); - // 点播返回sip错误 - mediaServerService.closeRTPServer(playResult.getDevice().getDeviceId(), channelId, ssrcInfo.getStream()); - // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - streamSession.remove(deviceId, channelId, ssrcInfo.getStream()); - - wvpResult.setMsg(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg)); - msg.setData(wvpResult); - resultHolder.invokeAllResult(msg); - if (errorEvent != null) { - errorEvent.response(event); - } - }); - } else { + if (streamInfo != null) { String streamId = streamInfo.getStream(); if (streamId == null) { WVPResult wvpResult = new WVPResult(); @@ -227,67 +174,109 @@ public class PlayServiceImpl implements IPlayService { if (hookEvent != null) { hookEvent.response(mediaServerItem, JSONObject.parseObject(JSON.toJSONString(streamInfo))); } - } else { - // TODO 点播前是否重置状态 + }else { redisCatchStorage.stopPlay(streamInfo); storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId()); - String streamId2 = null; - if (mediaServerItem.isRtpEnable()) { - streamId2 = String.format("%s_%s", device.getDeviceId(), channelId); - } - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId2); - // 超时处理 - Timer timer = new Timer(); - timer.schedule(new TimerTask() { - @Override - public void run() { - logger.warn(String.format("设备点播超时,deviceId:%s ,channelId:%s", deviceId, channelId)); - if (timeoutCallback != null) { - timeoutCallback.run(); - } - WVPResult wvpResult = new WVPResult(); - wvpResult.setCode(-1); - SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, ssrcInfo.getStream()); - if (dialog != null) { - wvpResult.setMsg("收流超时,请稍候重试"); - // 点播超时回复BYE 同时释放ssrc以及此次点播的资源 - cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream()); - }else { - wvpResult.setMsg("点播超时,请稍候重试"); - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - mediaServerService.closeRTPServer(deviceId, channelId, ssrcInfo.getStream()); - streamSession.remove(deviceId, channelId, ssrcInfo.getStream()); - } - - msg.setData(wvpResult); - // 回复之前所有的点播请求 - resultHolder.invokeAllResult(msg); - } - }, userSetup.getPlayTimeout()); - cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> { - logger.info("收到订阅消息: " + response.toJSONString()); - onPublishHandlerForPlay(mediaServerItemInuse, response, deviceId, channelId, uuid); - }, (event) -> { - mediaServerService.closeRTPServer(playResult.getDevice().getDeviceId(), channelId, ssrcInfo.getStream()); - // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - streamSession.remove(deviceId, channelId, ssrcInfo.getStream()); - WVPResult wvpResult = new WVPResult(); - wvpResult.setCode(-1); - wvpResult.setMsg(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg)); - msg.setData(wvpResult); - resultHolder.invokeAllResult(msg); - }); + streamInfo = null; } + + } + if (streamInfo == null) { + String streamId = null; + if (mediaServerItem.isRtpEnable()) { + streamId = String.format("%s_%s", device.getDeviceId(), channelId); + } + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId); + play(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInUse, response)->{ + if (hookEvent != null) { + hookEvent.response(mediaServerItem, response); + } + }, event -> { + // sip error错误 + WVPResult wvpResult = new WVPResult(); + wvpResult.setCode(-1); + wvpResult.setMsg(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg)); + msg.setData(wvpResult); + resultHolder.invokeAllResult(msg); + if (errorEvent != null) { + errorEvent.response(event); + } + }, (code, msgStr)->{ + // invite点播超时 + WVPResult wvpResult = new WVPResult(); + wvpResult.setCode(-1); + if (code == 0) { + wvpResult.setMsg("点播超时,请稍候重试"); + }else if (code == 1) { + wvpResult.setMsg("收流超时,请稍候重试"); + } + msg.setData(wvpResult); + // 回复之前所有的点播请求 + resultHolder.invokeAllResult(msg); + }, uuid); + } + return playResult; + } + + + + @Override + public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, + ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, + InviteTimeOutCallback timeoutCallback, String uuid) { + + String streamId = null; + if (mediaServerItem.isRtpEnable()) { + streamId = String.format("%s_%s", device.getDeviceId(), channelId); + } + if (ssrcInfo == null) { + ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId); } - return playResult; + // 超时处理 + Timer timer = new Timer(); + SSRCInfo finalSsrcInfo = ssrcInfo; + timer.schedule(new TimerTask() { + @Override + public void run() { + logger.warn(String.format("设备点播超时,deviceId:%s ,channelId:%s", device.getDeviceId(), channelId)); + + SIPDialog dialog = streamSession.getDialogByStream(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); + if (dialog != null) { + timeoutCallback.run(1, "收流超时"); + // 点播超时回复BYE 同时释放ssrc以及此次点播的资源 + cmder.streamByeCmd(device.getDeviceId(), channelId, finalSsrcInfo.getStream(), null); + }else { + timeoutCallback.run(0, "点播超时"); + mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc()); + mediaServerService.closeRTPServer(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); + streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); + } + } + }, userSetup.getPlayTimeout()); + + cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> { + logger.info("收到订阅消息: " + response.toJSONString()); + timer.cancel(); + // hook响应 + onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId, uuid); + hookEvent.response(mediaServerItemInuse, response); + }, (event) -> { + timer.cancel(); + mediaServerService.closeRTPServer(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); + // 释放ssrc + mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc()); + streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); + errorEvent.response(event); + }); } @Override public void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String uuid) { RequestMessage msg = new RequestMessage(); - msg.setId(uuid); + if (uuid != null) { + msg.setId(uuid); + } msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId); StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId); if (streamInfo != null) { @@ -297,7 +286,6 @@ public class PlayServiceImpl implements IPlayService { storager.startPlay(deviceId, channelId, streamInfo.getStream()); } redisCatchStorage.startPlay(streamInfo); - msg.setData(JSON.toJSONString(streamInfo)); WVPResult wvpResult = new WVPResult(); wvpResult.setCode(0); @@ -329,9 +317,24 @@ public class PlayServiceImpl implements IPlayService { return mediaServerItem; } + @Override + public DeferredResult> playBack(String deviceId, String channelId, String startTime, + String endTime,InviteStreamCallback inviteStreamCallback, + PlayBackCallback callback) { + Device device = storager.queryVideoDevice(deviceId); + if (device == null) return null; + MediaServerItem newMediaServerItem = getNewMediaServerItem(device); + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, true); + + return playBack(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, inviteStreamCallback, callback); + } @Override - public DeferredResult> playBack(String deviceId, String channelId, String startTime, String endTime, PlayBackCallback callback) { + public DeferredResult> playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, + String deviceId, String channelId, String startTime, + String endTime, InviteStreamCallback infoCallBack, + PlayBackCallback playBackCallback) { + if (mediaServerItem == null || ssrcInfo == null) return null; String uuid = UUID.randomUUID().toString(); String key = DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId; DeferredResult> result = new DeferredResult<>(30000L); @@ -341,8 +344,6 @@ public class PlayServiceImpl implements IPlayService { return result; } - MediaServerItem newMediaServerItem = getNewMediaServerItem(device); - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, true); resultHolder.put(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId, uuid, result); RequestMessage msg = new RequestMessage(); msg.setId(uuid); @@ -356,63 +357,62 @@ public class PlayServiceImpl implements IPlayService { logger.warn(String.format("设备回放超时,deviceId:%s ,channelId:%s", deviceId, channelId)); playBackResult.setCode(-1); playBackResult.setData(msg); - callback.call(playBackResult); + playBackCallback.call(playBackResult); SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, ssrcInfo.getStream()); // 点播超时回复BYE 同时释放ssrc以及此次点播的资源 if (dialog != null) { // 点播超时回复BYE 同时释放ssrc以及此次点播的资源 - cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream()); + cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null); }else { - mediaServerService.releaseSsrc(newMediaServerItem.getId(), ssrcInfo.getSsrc()); + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); mediaServerService.closeRTPServer(deviceId, channelId, ssrcInfo.getStream()); streamSession.remove(deviceId, channelId, ssrcInfo.getStream()); } - cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream()); + cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null); // 回复之前所有的点播请求 - callback.call(playBackResult); + playBackCallback.call(playBackResult); } }, userSetup.getPlayTimeout()); - cmder.playbackStreamCmd(newMediaServerItem, ssrcInfo, device, channelId, startTime, endTime, (MediaServerItem mediaServerItem, JSONObject response) -> { - logger.info("收到订阅消息: " + response.toJSONString()); - timer.cancel(); - StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId); - if (streamInfo == null) { - logger.warn("设备回放API调用失败!"); - msg.setData("设备回放API调用失败!"); - playBackResult.setCode(-1); - playBackResult.setData(msg); - callback.call(playBackResult); - return; - } - redisCatchStorage.startPlayback(streamInfo); - msg.setData(JSON.toJSONString(streamInfo)); - playBackResult.setCode(0); - playBackResult.setData(msg); - playBackResult.setMediaServerItem(mediaServerItem); - playBackResult.setResponse(response); - callback.call(playBackResult); - }, event -> { - timer.cancel(); - msg.setData(String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg)); - playBackResult.setCode(-1); - playBackResult.setData(msg); - playBackResult.setEvent(event); - callback.call(playBackResult); - streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); - }); + cmder.playbackStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, infoCallBack, + (InviteStreamInfo inviteStreamInfo) -> { + logger.info("收到订阅消息: " + inviteStreamInfo.getResponse().toJSONString()); + timer.cancel(); + StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId); + if (streamInfo == null) { + logger.warn("设备回放API调用失败!"); + msg.setData("设备回放API调用失败!"); + playBackResult.setCode(-1); + playBackResult.setData(msg); + playBackCallback.call(playBackResult); + return; + } + redisCatchStorage.startPlayback(streamInfo, inviteStreamInfo.getCallId()); + msg.setData(JSON.toJSONString(streamInfo)); + playBackResult.setCode(0); + playBackResult.setData(msg); + playBackResult.setMediaServerItem(inviteStreamInfo.getMediaServerItem()); + playBackResult.setResponse(inviteStreamInfo.getResponse()); + playBackCallback.call(playBackResult); + }, event -> { + timer.cancel(); + msg.setData(String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg)); + playBackResult.setCode(-1); + playBackResult.setData(msg); + playBackResult.setEvent(event); + playBackCallback.call(playBackResult); + streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); + }); return result; } - - @Override - public void onPublishHandlerForDownload(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String uuid) { + public void onPublishHandlerForDownload(InviteStreamInfo inviteStreamInfo, String deviceId, String channelId, String uuid) { RequestMessage msg = new RequestMessage(); msg.setKey(DeferredResultHolder.CALLBACK_CMD_DOWNLOAD + deviceId + channelId); msg.setId(uuid); - StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId); + StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId); if (streamInfo != null) { - redisCatchStorage.startDownload(streamInfo); + redisCatchStorage.startDownload(streamInfo, inviteStreamInfo.getCallId()); msg.setData(JSON.toJSONString(streamInfo)); resultHolder.invokeResult(msg); } else { @@ -449,7 +449,8 @@ public class PlayServiceImpl implements IPlayService { if (allSsrc.size() > 0) { for (SsrcTransaction ssrcTransaction : allSsrc) { if(ssrcTransaction.getMediaServerId().equals(mediaServerId)) { - cmder.streamByeCmd(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream()); + cmder.streamByeCmd(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(), + ssrcTransaction.getStream(), null); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index 5829292c..50948533 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -47,17 +47,15 @@ public interface IRedisCatchStorage { StreamInfo queryPlayByStreamId(String steamId); - StreamInfo queryPlaybackByStreamId(String steamId); - StreamInfo queryPlayByDevice(String deviceId, String channelId); Map queryPlayByDeviceId(String deviceId); - boolean startPlayback(StreamInfo stream); + boolean startPlayback(StreamInfo stream, String callId); - boolean stopPlayback(StreamInfo streamInfo); + boolean stopPlayback(String deviceId, String channelId, String stream, String callId); - StreamInfo queryPlaybackByDevice(String deviceId, String code); + StreamInfo queryPlayback(String deviceId, String channelID, String stream, String callId); void updatePlatformCatchInfo(ParentPlatformCatch parentPlatformCatch); @@ -167,9 +165,9 @@ public interface IRedisCatchStorage { * 开始下载录像时存入 * @param streamInfo */ - boolean startDownload(StreamInfo streamInfo); + boolean startDownload(StreamInfo streamInfo, String callId); - StreamInfo queryDownloadByStreamId(String streamId); + StreamInfo queryDownload(String deviceId, String channelId, String stream, String callId); /** * 查找第三方系统留下的国标预设值 diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformChannelMapper.java index f1d23f18..e048f31e 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformChannelMapper.java @@ -55,7 +55,7 @@ public interface PlatformChannelMapper { int cleanChannelForGB(String platformId); @Select("SELECT dc.* FROM platform_gb_channel pgc left join device_channel dc on dc.id = pgc.deviceChannelId WHERE dc.channelId='${channelId}' and pgc.platformId='${platformId}'") - DeviceChannel queryChannelInParentPlatform(String platformId, String channelId); + List queryChannelInParentPlatform(String platformId, String channelId); @Select(" select dc.channelId as id, dc.name as name, pgc.platformId as platformId, pgc.catalogId as parentId, 0 as childrenCount, 1 as type " + " from device_channel dc left join platform_gb_channel pgc on dc.id = pgc.deviceChannelId " + @@ -67,7 +67,7 @@ public interface PlatformChannelMapper { " left join device_channel dc on dc.id = pgc.deviceChannelId\n" + " left join device d on dc.deviceId = d.deviceId\n" + "where dc.channelId = #{channelId} and pgc.platformId=#{platformId}") - Device queryVideoDeviceByPlatformIdAndChannelId(String platformId, String channelId); + List queryVideoDeviceByPlatformIdAndChannelId(String platformId, String channelId); @Delete(" + diff --git a/web_src/src/components/dialog/chooseChannelForGb.vue b/web_src/src/components/dialog/chooseChannelForGb.vue index 50a495a5..78bfddf5 100644 --- a/web_src/src/components/dialog/chooseChannelForGb.vue +++ b/web_src/src/components/dialog/chooseChannelForGb.vue @@ -1,8 +1,8 @@