diff --git a/sql/2.6.9更新.sql b/sql/2.6.9更新.sql index 2e047948..514b39e4 100644 --- a/sql/2.6.9更新.sql +++ b/sql/2.6.9更新.sql @@ -1,2 +1,5 @@ alter table wvp_device_channel - change stream_id stream_id varying(255) \ No newline at end of file + change stream_id stream_id varying(255) + +alter table wvp_platform + add auto_push_channel bool default false diff --git a/sql/初始化.sql b/sql/初始化.sql index 93eef4ed..64c404f5 100644 --- a/sql/初始化.sql +++ b/sql/初始化.sql @@ -194,6 +194,7 @@ create table wvp_platform ( create_time character varying(50), update_time character varying(50), as_message_channel bool default false, + auto_push_channel bool default false, constraint uk_platform_unique_server_gb_id unique (server_gb_id) ); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatform.java index fbc95ed9..1638c711 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatform.java @@ -186,6 +186,9 @@ public class ParentPlatform { @Schema(description = "是否作为消息通道") private boolean asMessageChannel; + @Schema(description = "是否作为消息通道") + private boolean autoPushChannel; + public Integer getId() { return id; } @@ -425,4 +428,12 @@ public class ParentPlatform { public void setAsMessageChannel(boolean asMessageChannel) { this.asMessageChannel = asMessageChannel; } + + public boolean isAutoPushChannel() { + return autoPushChannel; + } + + public void setAutoPushChannel(boolean autoPushChannel) { + this.autoPushChannel = autoPushChannel; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java index d385d9af..d932a208 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java @@ -32,11 +32,13 @@ public class SubscribeHolder { public void putCatalogSubscribe(String platformId, SubscribeInfo subscribeInfo) { catalogMap.put(platformId, subscribeInfo); - // 添加订阅到期 - String taskOverdueKey = taskOverduePrefix + "catalog_" + platformId; - // 添加任务处理订阅过期 - dynamicTask.startDelay(taskOverdueKey, () -> removeCatalogSubscribe(subscribeInfo.getId()), - subscribeInfo.getExpires() * 1000); + if (subscribeInfo.getExpires() > 0) { + // 添加订阅到期 + String taskOverdueKey = taskOverduePrefix + "catalog_" + platformId; + // 添加任务处理订阅过期 + dynamicTask.startDelay(taskOverdueKey, () -> removeCatalogSubscribe(subscribeInfo.getId()), + subscribeInfo.getExpires() * 1000); + } } public SubscribeInfo getCatalogSubscribe(String platformId) { @@ -63,11 +65,13 @@ public class SubscribeHolder { dynamicTask.startCron(key, new MobilePositionSubscribeHandlerTask(platformId), subscribeInfo.getGpsInterval() * 1000); String taskOverdueKey = taskOverduePrefix + "MobilePosition_" + platformId; - // 添加任务处理订阅过期 - dynamicTask.startDelay(taskOverdueKey, () -> { - removeMobilePositionSubscribe(subscribeInfo.getId()); - }, - subscribeInfo.getExpires() * 1000); + if (subscribeInfo.getExpires() > 0) { + // 添加任务处理订阅过期 + dynamicTask.startDelay(taskOverdueKey, () -> { + removeMobilePositionSubscribe(subscribeInfo.getId()); + }, + subscribeInfo.getExpires() * 1000); + } } public SubscribeInfo getMobilePositionSubscribe(String platformId) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java index 07176f21..e5c50455 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java @@ -18,6 +18,9 @@ public class SubscribeInfo { } + public SubscribeInfo() { + } + private String id; private SIPRequest request; @@ -33,6 +36,21 @@ public class SubscribeInfo { private String sn; private int gpsInterval; + /** + * 模拟的FromTag + */ + private String simulatedFromTag; + + /** + * 模拟的ToTag + */ + private String simulatedToTag; + + /** + * 模拟的CallID + */ + private String simulatedCallId; + public String getId() { return id; } @@ -96,4 +114,28 @@ public class SubscribeInfo { public void setGpsInterval(int gpsInterval) { this.gpsInterval = gpsInterval; } + + public String getSimulatedFromTag() { + return simulatedFromTag; + } + + public void setSimulatedFromTag(String simulatedFromTag) { + this.simulatedFromTag = simulatedFromTag; + } + + public String getSimulatedCallId() { + return simulatedCallId; + } + + public void setSimulatedCallId(String simulatedCallId) { + this.simulatedCallId = simulatedCallId; + } + + public String getSimulatedToTag() { + return simulatedToTag; + } + + public void setSimulatedToTag(String simulatedToTag) { + this.simulatedToTag = simulatedToTag; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java index 9a00c179..a3f1a213 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java @@ -227,11 +227,11 @@ public class SIPRequestHeaderPlarformProvider { SipURI fromSipURI = SipFactory.getInstance().createAddressFactory().createSipURI(parentPlatform.getDeviceGBId(), parentPlatform.getDeviceIp() + ":" + parentPlatform.getDevicePort()); Address fromAddress = SipFactory.getInstance().createAddressFactory().createAddress(fromSipURI); - FromHeader fromHeader = SipFactory.getInstance().createHeaderFactory().createFromHeader(fromAddress, subscribeInfo.getResponse().getToTag()); + FromHeader fromHeader = SipFactory.getInstance().createHeaderFactory().createFromHeader(fromAddress, subscribeInfo.getResponse() != null ? subscribeInfo.getResponse().getToTag(): subscribeInfo.getSimulatedToTag()); // to SipURI toSipURI = SipFactory.getInstance().createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerGBDomain()); Address toAddress = SipFactory.getInstance().createAddressFactory().createAddress(toSipURI); - ToHeader toHeader = SipFactory.getInstance().createHeaderFactory().createToHeader(toAddress, subscribeInfo.getRequest().getFromTag()); + ToHeader toHeader = SipFactory.getInstance().createHeaderFactory().createToHeader(toAddress, subscribeInfo.getRequest() != null ?subscribeInfo.getRequest().getFromTag(): subscribeInfo.getSimulatedFromTag()); // Forwards MaxForwardsHeader maxForwards = SipFactory.getInstance().createHeaderFactory().createMaxForwardsHeader(70); @@ -241,7 +241,7 @@ public class SIPRequestHeaderPlarformProvider { // 设置编码, 防止中文乱码 messageFactory.setDefaultContentEncodingCharset("gb2312"); - CallIdHeader callIdHeader = SipFactory.getInstance().createHeaderFactory().createCallIdHeader(subscribeInfo.getRequest().getCallIdHeader().getCallId()); + CallIdHeader callIdHeader = SipFactory.getInstance().createHeaderFactory().createCallIdHeader(subscribeInfo.getRequest() != null ? subscribeInfo.getRequest().getCallIdHeader().getCallId(): subscribeInfo.getSimulatedCallId()); request = (SIPRequest) messageFactory.createRequest(requestURI, Request.NOTIFY, callIdHeader, cSeqHeader, fromHeader, toHeader, viaHeaders, maxForwards); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index 06a08e24..473ca91d 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -148,13 +148,13 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { CallIdHeader callIdHeader = sipSender.getNewCallIdHeader(parentPlatform.getDeviceIp(),parentPlatform.getTransport()); - Request request = headerProviderPlatformProvider.createMessageRequest( - parentPlatform, - keepaliveXml.toString(), - SipUtils.getNewFromTag(), - SipUtils.getNewViaTag(), - callIdHeader); - sipSender.transmitRequest(parentPlatform.getDeviceIp(), request, errorEvent, okEvent); + Request request = headerProviderPlatformProvider.createMessageRequest( + parentPlatform, + keepaliveXml.toString(), + SipUtils.getNewFromTag(), + SipUtils.getNewViaTag(), + callIdHeader); + sipSender.transmitRequest(parentPlatform.getDeviceIp(), request, errorEvent, okEvent); return callIdHeader.getCallId(); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java index 4ff4e980..8aa3b0df 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java @@ -10,6 +10,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; +import com.genersoft.iot.vmp.service.IPlatformService; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPResponse; @@ -50,6 +51,10 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme @Autowired private SIPSender sipSender; + + @Autowired + private IPlatformService platformService; + @Override public void afterPropertiesSet() throws Exception { // 添加消息处理的订阅 @@ -191,5 +196,8 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("未处理的异常 ", e); } + if (subscribeHolder.getCatalogSubscribe(platformId) == null && platform.isAutoPushChannel()) { + platformService.addSimulatedSubscribeInfo(platform); + } } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java index e9cddff8..7b249d07 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java @@ -55,4 +55,6 @@ public interface IPlatformService { * @param platformId 平台 */ void sendNotifyMobilePosition(String platformId); + + void addSimulatedSubscribeInfo(ParentPlatform parentPlatform); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java index 054b6ffd..26b4f3e2 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java @@ -18,6 +18,7 @@ import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.stereotype.Service; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.annotation.Transactional; import org.springframework.util.ObjectUtils; import java.util.ArrayList; diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java index d295ed4c..b67fcc5b 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java @@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; +import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; @@ -16,17 +17,22 @@ import com.genersoft.iot.vmp.storager.dao.*; import com.genersoft.iot.vmp.utils.DateUtil; import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; +import gov.nist.javax.sip.message.SIPRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import javax.sip.InvalidArgumentException; +import javax.sip.PeerUnavailableException; import javax.sip.SipException; +import javax.sip.SipFactory; +import javax.sip.address.Address; +import javax.sip.address.SipURI; +import javax.sip.header.*; +import javax.sip.message.Request; import java.text.ParseException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; /** * @author lin @@ -182,6 +188,7 @@ public class PlatformServiceImpl implements IPlatformService { } } + return false; } @@ -256,6 +263,31 @@ public class PlatformServiceImpl implements IPlatformService { }, (parentPlatform.getKeepTimeout())*1000); } + if (parentPlatform.isAutoPushChannel()) { + if (subscribeHolder.getCatalogSubscribe(parentPlatform.getServerGBId()) == null) { + addSimulatedSubscribeInfo(parentPlatform); + } + }else { + SubscribeInfo catalogSubscribe = subscribeHolder.getCatalogSubscribe(parentPlatform.getServerGBId()); + if (catalogSubscribe != null && catalogSubscribe.getExpires() == -1) { + subscribeHolder.removeCatalogSubscribe(parentPlatform.getServerGBId()); + } + } + } + + @Override + public void addSimulatedSubscribeInfo(ParentPlatform parentPlatform) { + // 自动添加一条模拟的订阅信息 + SubscribeInfo subscribeInfo = new SubscribeInfo(); + subscribeInfo.setId(parentPlatform.getServerGBId()); + subscribeInfo.setExpires(-1); + subscribeInfo.setEventType("Catalog"); + int random = (int) Math.floor(Math.random() * 10000); + subscribeInfo.setEventId(random + ""); + subscribeInfo.setSimulatedCallId(UUID.randomUUID().toString().replace("-", "") + "@" + parentPlatform.getServerIP()); + subscribeInfo.setSimulatedFromTag(UUID.randomUUID().toString().replace("-", "")); + subscribeInfo.setSimulatedToTag(UUID.randomUUID().toString().replace("-", "")); + subscribeHolder.putCatalogSubscribe(parentPlatform.getServerGBId(), subscribeInfo); } private void registerTask(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo){ diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index dcaab9e3..bc34162b 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -440,7 +440,7 @@ public class StreamPushServiceImpl implements IStreamPushService { } } - if (streamPushItemListFroPlatform.size() > 0) { + if (!streamPushItemListFroPlatform.isEmpty()) { platformGbStreamMapper.batchAdd(streamPushItemListFroPlatform); // 发送通知 for (String platformId : platformForEvent.keySet()) { diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/ParentPlatformMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/ParentPlatformMapper.java index 10958a87..9dc05034 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/ParentPlatformMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/ParentPlatformMapper.java @@ -16,10 +16,10 @@ import java.util.List; public interface ParentPlatformMapper { @Insert("INSERT INTO wvp_platform (enable, name, server_gb_id, server_gb_domain, server_ip, server_port,device_gb_id,device_ip,"+ - "device_port,username,password,expires,keep_timeout,transport,character_set,ptz,rtcp,as_message_channel,"+ + "device_port,username,password,expires,keep_timeout,transport,character_set,ptz,rtcp,as_message_channel,auto_push_channel,"+ "status,start_offline_push,catalog_id,administrative_division,catalog_group,create_time,update_time) " + " VALUES (#{enable}, #{name}, #{serverGBId}, #{serverGBDomain}, #{serverIP}, #{serverPort}, #{deviceGBId}, #{deviceIp}, " + - " #{devicePort}, #{username}, #{password}, #{expires}, #{keepTimeout}, #{transport}, #{characterSet}, #{ptz}, #{rtcp}, #{asMessageChannel}, " + + " #{devicePort}, #{username}, #{password}, #{expires}, #{keepTimeout}, #{transport}, #{characterSet}, #{ptz}, #{rtcp}, #{asMessageChannel}, #{autoPushChannel}, " + " #{status}, #{startOfflinePush}, #{catalogId}, #{administrativeDivision}, #{catalogGroup}, #{createTime}, #{updateTime})") int addParentPlatform(ParentPlatform parentPlatform); @@ -42,6 +42,7 @@ public interface ParentPlatformMapper { "ptz=#{ptz}, " + "rtcp=#{rtcp}, " + "as_message_channel=#{asMessageChannel}, " + + "auto_push_channel=#{autoPushChannel}, " + "status=#{status}, " + "start_offline_push=#{startOfflinePush}, " + "catalog_group=#{catalogGroup}, " + diff --git a/web_src/src/components/dialog/platformEdit.vue b/web_src/src/components/dialog/platformEdit.vue index 0158cf74..f3f4255b 100755 --- a/web_src/src/components/dialog/platformEdit.vue +++ b/web_src/src/components/dialog/platformEdit.vue @@ -91,9 +91,10 @@ - + - + + {{ @@ -141,6 +142,7 @@ export default { ptz: true, rtcp: false, asMessageChannel: false, + autoPushChannel: false, name: null, serverGBId: null, serverGBDomain: null, @@ -208,6 +210,7 @@ export default { this.platform.ptz = platform.ptz; this.platform.rtcp = platform.rtcp; this.platform.asMessageChannel = platform.asMessageChannel; + this.platform.autoPushChannel = platform.autoPushChannel; this.platform.name = platform.name; this.platform.serverGBId = platform.serverGBId; this.platform.serverGBDomain = platform.serverGBDomain; @@ -284,6 +287,7 @@ export default { ptz: true, rtcp: false, asMessageChannel: false, + autoPushChannel: false, name: null, serverGBId: null, administrativeDivision: null,