diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java index 0389df06..041408ec 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java @@ -93,12 +93,10 @@ public class SubscribeHolder { dynamicTask.stop(taskOverdueKey); } - public List getAllCatalogSubscribePlatform() { - List platforms = new ArrayList<>(); - if(catalogMap.size() > 0) { - for (Integer key : catalogMap.keySet()) { - platforms.add(catalogMap.get(key).getId()); - } + public List getAllCatalogSubscribePlatform() { + List platforms = new ArrayList<>(); + if(!catalogMap.isEmpty()) { + platforms.addAll(catalogMap.keySet()); } return platforms; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java index 0370e0e3..510d365f 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java @@ -62,7 +62,7 @@ public class CatalogEventLister implements ApplicationListener { }else { // 如果事件没有要通知的上级,那么需要自己查询到所有要通知的上级进行通知 - List platforms = subscribeHolder.getAllCatalogSubscribePlatform(); + List platforms = subscribeHolder.getAllCatalogSubscribePlatform(); if (event.getChannels() != null) { if (!platforms.isEmpty()) { for (CommonGbChannel channel : event.getChannels()) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventType.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventType.java new file mode 100755 index 00000000..44268ebc --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventType.java @@ -0,0 +1,24 @@ +package com.genersoft.iot.vmp.gb28181.event.subscribe.catalog; + + +public enum CatalogEventType{ + + + ON("ON"), // 上线 + OFF("OFF"), // 离线 + VLOST("VLOST"), // 视频丢失 + DEFECT("DEFECT"), // 故障 + ADD("ADD"), // 增加 + DEL("DEL"), // 删除 + UPDATE("UPDATE"), // 更新 + ; + + private final String val; + CatalogEventType(String val) { + this.val = val; + } + + public String getVal() { + return val; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java index d983d410..b64141e1 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java @@ -6,7 +6,6 @@ import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.google.common.primitives.Bytes; import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPResponse; -import org.apache.commons.lang3.ArrayUtils; import org.dom4j.Document; import org.dom4j.DocumentException; import org.dom4j.Element; @@ -31,9 +30,9 @@ import java.util.Arrays; import java.util.List; /** - * @description:处理接收IPCamera发来的SIP协议请求消息 - * @author: songww - * @date: 2020年5月3日 下午4:42:22 + * 处理接收IPCamera发来的SIP协议请求消息 + * songww + * 2020年5月3日 下午4:42:22 */ public abstract class SIPRequestProcessorParent { @@ -60,7 +59,7 @@ public abstract class SIPRequestProcessorParent { return null; } - class ResponseAckExtraParam{ + static class ResponseAckExtraParam{ String content; ContentTypeHeader contentTypeHeader; SipURI sipURI; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java index 793872c8..60f0e195 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java @@ -277,9 +277,6 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent ArrayList deviceChannels = new ArrayList<>(updateChannelMap.values()); updateChannelMap.clear(); deviceChannelService.batchUpdateChannel(deviceChannels); - if (device.isAutoSyncChannel()) { - commonGbChannelService.updateChannelFromGb28181DeviceInList(device, deviceChannels); - } } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java b/src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java index 9eb9cac4..49f6cc05 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java @@ -311,4 +311,18 @@ public class SipUtils { } } } + + public static SubscribeInfo buildVirtuallyCatalogSubSubscribe(ParentPlatform platform) { + // 模拟一条订阅信息 + SubscribeInfo subscribeInfo = new SubscribeInfo(); + subscribeInfo.setId(platform.getServerGBId()); + subscribeInfo.setExpires(-1); + subscribeInfo.setEventType("Catalog"); + int random = (int) Math.floor(Math.random() * 10000); + subscribeInfo.setEventId(random + ""); + subscribeInfo.setSimulatedCallId(UUID.randomUUID().toString().replace("-", "") + "@" + platform.getServerIP()); + subscribeInfo.setSimulatedFromTag(UUID.randomUUID().toString().replace("-", "")); + subscribeInfo.setSimulatedToTag(UUID.randomUUID().toString().replace("-", "")); + return subscribeInfo; + } } \ No newline at end of file diff --git a/src/main/java/com/genersoft/iot/vmp/service/ICommonGbChannelService.java b/src/main/java/com/genersoft/iot/vmp/service/ICommonGbChannelService.java index ba3c150f..5192e9f1 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/ICommonGbChannelService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/ICommonGbChannelService.java @@ -28,16 +28,6 @@ public interface ICommonGbChannelService { boolean checkChannelInPlatform(String channelId, String platformServerId); - /** - * 从国标设备中同步通道 - * - * @param gbDeviceId 国标设备编号 - * @param syncKeys 要同步的字段 - */ - boolean syncChannelFromGb28181Device(String gbDeviceId, List syncKeys, Boolean syncGroup, Boolean syncRegion); - - CommonGbChannel getCommonChannelFromDeviceChannel(DeviceChannel deviceChannel, List syncKeys); - PageInfo getChannelsInRegion(String regionDeviceId, String query, int page, int count); List getChannelsInBusinessGroup(String businessGroupID); diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlatformChannelService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlatformChannelService.java index 492964ad..0043420a 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IPlatformChannelService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IPlatformChannelService.java @@ -24,7 +24,7 @@ public interface IPlatformChannelService { /** * 在一个给定的范围内查出分享了这个通道的上级平台 */ - List querySharePlatformListByChannelId(int commonGbId, List platforms); + List querySharePlatformListByChannelId(int commonGbId, List platforms); /** * 查询关联了上级平台的所有通道 @@ -36,5 +36,13 @@ public interface IPlatformChannelService { */ CommonGbChannel queryChannelByPlatformIdAndChannelDeviceId(Integer platformId, String channelId); - List queryCommonGbChannellList(Integer id); + /** + * 获取指定平台关联的通道 + */ + List queryCommonGbChannellList(Integer platformId); + + /** + * 获取指定平台范围已经共享的通道 + */ + List queryChannelListInRange(Integer platformId, List channelList); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java index 17cbae4a..ef94ff6a 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java @@ -1,9 +1,12 @@ package com.genersoft.iot.vmp.service; +import com.genersoft.iot.vmp.common.CommonGbChannel; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo; import com.github.pagehelper.PageInfo; +import java.util.List; + /** * 国标平台的业务类 * @author lin @@ -67,4 +70,14 @@ public interface IPlatformService { * 根据ID查询上级平台 */ ParentPlatform query(Integer platformId); + + /** + * 开启所有开启了共享所有通道的上级 + */ + List queryAllWithShareAll(); + + /** + * 获取指定范围内共享了指定通道的上级平台 + */ + List querySharePlatform(List channel, List platformIdList); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/CommonGbChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/CommonGbChannelServiceImpl.java index f2b512bc..85559de5 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/CommonGbChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/CommonGbChannelServiceImpl.java @@ -4,13 +4,12 @@ import com.genersoft.iot.vmp.common.BatchLimit; import com.genersoft.iot.vmp.common.CivilCodePo; import com.genersoft.iot.vmp.common.CommonGbChannel; import com.genersoft.iot.vmp.conf.CivilCodeFileConf; -import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; -import com.genersoft.iot.vmp.gb28181.bean.Gb28181CodeType; +import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; +import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEventType; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; -import com.genersoft.iot.vmp.service.ICommonGbChannelService; -import com.genersoft.iot.vmp.service.IResourcePlayCallback; -import com.genersoft.iot.vmp.service.IResourceService; +import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.*; import com.genersoft.iot.vmp.storager.dao.CommonChannelMapper; import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper; @@ -32,6 +31,9 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionStatus; +import javax.sip.InvalidArgumentException; +import javax.sip.SipException; +import java.text.ParseException; import java.util.*; @Service @@ -60,25 +62,78 @@ public class CommonGbChannelServiceImpl implements ICommonGbChannelService { @Autowired private CivilCodeFileConf civilCodeFileConf; + @Autowired + private IPlatformService platformService; + + @Autowired + private IPlatformChannelService platformChannelService; + @Autowired private Map resourceServiceMap; + @Autowired + private SubscribeHolder subscribeHolder; + + @Autowired + private ISIPCommanderForPlatform sipCommanderForPlatform; + @Override public CommonGbChannel getChannel(String channelId) { return commonGbChannelMapper.queryByDeviceID(channelId); } + /** + * 发送catalog消息 + */ + private void sendCatalogEvent(List channelList, CatalogEventType catalogEventType) { + // 获取开启了目录订阅且关联了这些通道的 + List allCatalogSubscribePlatformList = subscribeHolder.getAllCatalogSubscribePlatform(); + // 获取所有开启了共享所有通道的上级与订阅通道的上级平台 + List platformList = platformService.querySharePlatform(channelList, allCatalogSubscribePlatformList); + + for (ParentPlatform platform : platformList) { + SubscribeInfo catalogSubscribe = subscribeHolder.getCatalogSubscribe(platform.getId()); + if (catalogSubscribe == null) { + catalogSubscribe = SipUtils.buildVirtuallyCatalogSubSubscribe(platform); + } + // 获取关联的通道 + List channelListForShare = platformChannelService.queryChannelListInRange(platform.getId(), channelList); + logger.warn("[发送Catalog事件] 类型: {}, 平台:{}, 通道个数: {}", + catalogEventType.getVal(), platform.getServerGBId(), channelListForShare.size()); + try { + if (catalogEventType.equals(CatalogEventType.ADD) || catalogEventType.equals(CatalogEventType.UPDATE)) { + sipCommanderForPlatform.sendNotifyForCatalogAddOrUpdate(catalogEventType.getVal(), platform, channelListForShare, catalogSubscribe, 0); + }else { + sipCommanderForPlatform.sendNotifyForCatalogOther(catalogEventType.getVal(), platform, channelListForShare, catalogSubscribe, 0); + } + } catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException | + IllegalAccessException e) { + throw new RuntimeException(e); + } + } + } + @Override public int add(CommonGbChannel channel) { - // TODO 给标记了共享所有通道的上级·平台发送数据 - return commonGbChannelMapper.add(channel); + int result = commonGbChannelMapper.add(channel); + if (result == 0) { + return 0; + } + List channelList = new ArrayList<>(); + sendCatalogEvent(channelList, CatalogEventType.ADD); + return result; } @Override public int delete(String channelId) { - - return commonGbChannelMapper.deleteByDeviceID(channelId); + int result = commonGbChannelMapper.deleteByDeviceID(channelId); + if (result == 0) { + return 0; + } + List channelList = new ArrayList<>(); + sendCatalogEvent(channelList, CatalogEventType.DEL); + return result; } @Override @@ -86,8 +141,13 @@ public class CommonGbChannelServiceImpl implements ICommonGbChannelService { assert channel.getCommonGbId() >= 0; assert channel.getCommonGbDeviceID() != null; assert channel.getCommonGbName() != null; - // TODO 如果状态变化,需要发送消息给级联的上级 - return commonGbChannelMapper.update(channel); + int result = commonGbChannelMapper.update(channel); + if (result == 0) { + return 0; + } + List channelList = new ArrayList<>(); + sendCatalogEvent(channelList, CatalogEventType.UPDATE); + return result; } @Override @@ -95,464 +155,6 @@ public class CommonGbChannelServiceImpl implements ICommonGbChannelService { return commonGbChannelMapper.checkChannelInPlatform(channelId, platformServerId) > 0; } - @Override - public boolean syncChannelFromGb28181Device(String gbDeviceId, List syncKeys, Boolean syncGroup, Boolean syncRegion) { - logger.info("[同步通用通道]来自国标设备,国标编号: {}", gbDeviceId); - List deviceChannels = deviceChannelMapper.queryAllChannels(gbDeviceId); - if (deviceChannels.isEmpty()) { - logger.info("[同步通用通道]来自国标设备,结束, 通道数为0, 国标编号: {}", gbDeviceId); - return false; - } - List commonGbChannelList = new ArrayList<>(); - // 存储得到的10到13位为215的业务分组数据 - Map businessGroupMap = new HashMap<>(); - // 存储得到的10到13位为216的虚拟组织 数据 - Map virtuallyGroupMap = new HashMap<>(); - // 存储得到的行政区划数据 - Map regionMap = new HashMap<>(); - // 存储得到的所有行政区划, 后续检验civilCode是否已传输对应的行政区划数据,从而确定是否需要自动创建节点。 - Set civilCodeSet = new HashSet<>(); - List clearChannels = new ArrayList<>(); - // 对数据进行分类 - deviceChannels.stream().forEach(deviceChannel -> { - if (deviceChannel.getCommonGbChannelId() > 0) { - clearChannels.add(deviceChannel.getChannelId()); - } - Gb28181CodeType channelIdType = SipUtils.getChannelIdType(deviceChannel.getChannelId()); - if (channelIdType != null) { - if ( - ( - channelIdType == Gb28181CodeType.CIVIL_CODE_PROVINCE - || channelIdType == Gb28181CodeType.CIVIL_CODE_CITY - || channelIdType == Gb28181CodeType.CIVIL_CODE_COUNTY - || channelIdType == Gb28181CodeType.CIVIL_CODE_GRASS_ROOTS - ) - && - !regionMap.containsKey(deviceChannel.getChannelId()) - ) { - CivilCodePo parentCivilCodePo = civilCodeFileConf.getParentCode(deviceChannel.getChannelId()); - String civilCode = null; - if (parentCivilCodePo != null) { - civilCode = parentCivilCodePo.getCode(); - } - // 行政区划条目 - Region region = Region.getInstance(deviceChannel.getChannelId(), deviceChannel.getName(), - civilCode); - regionMap.put(deviceChannel.getChannelId(), region); - } - if (channelIdType == Gb28181CodeType.BUSINESS_GROUP - && !businessGroupMap.containsKey(deviceChannel.getChannelId())) { - Group group = Group.getInstance(deviceChannel.getChannelId(), deviceChannel.getName(), - null, deviceChannel.getChannelId()); - businessGroupMap.put(deviceChannel.getChannelId(), group); - } - if (channelIdType == Gb28181CodeType.VIRTUAL_ORGANIZATION - && !virtuallyGroupMap.containsKey(deviceChannel.getChannelId())) { - Group group = Group.getInstance(deviceChannel.getChannelId(), deviceChannel.getName(), deviceChannel.getParentId(), deviceChannel.getBusinessGroupId()); - virtuallyGroupMap.put(deviceChannel.getChannelId(), group); - } - }else { - if (!StringUtils.isEmpty(deviceChannel.getCivilCode())) { - civilCodeSet.add(deviceChannel.getCivilCode()); - } - CommonGbChannel commonGbChannel = getCommonChannelFromDeviceChannel(deviceChannel, syncKeys); - commonGbChannelList.add(commonGbChannel); - } - }); - if (!commonGbChannelList.isEmpty()) { - // 检查是否存在已存在通道与将写入通道相同的情况 - List commonGbChannelInDbList = commonGbChannelMapper.queryInList(commonGbChannelList); - if (!commonGbChannelInDbList.isEmpty()) { - // 这里可以控制新数据覆盖旧数据还是丢弃重复的新数据 - // 目前使用新数据覆盖旧数据,后续分局实际业务需求再做修改 - commonGbChannelInDbList.stream().forEach(commonGbChannel->{ - clearChannels.add(commonGbChannel.getCommonGbDeviceID()); - }); - } - } - - // 检测分组境况 - if (businessGroupMap.isEmpty()) { - virtuallyGroupMap.clear(); - }else { - // 检查业务分组与虚拟组织 - if (!virtuallyGroupMap.isEmpty()) { - for (String key : virtuallyGroupMap.keySet()) { - Group virtuallyGroup = virtuallyGroupMap.get(key); - if (virtuallyGroup.getCommonGroupTopId() == null - || !businessGroupMap.containsKey(virtuallyGroup.getCommonGroupTopId()) - ) { - virtuallyGroupMap.remove(key); - continue; - } - if (virtuallyGroup.getCommonGroupParentId() != null && !virtuallyGroupMap.containsKey(virtuallyGroup.getCommonGroupParentId())) { - virtuallyGroup.setCommonGroupParentId(null); - } - } - if (virtuallyGroupMap.isEmpty()) { - businessGroupMap.clear(); - } - } - } - // 检测行政区划信息是否完整 - for (String civilCode : civilCodeSet) { - if (!regionMap.containsKey(civilCode)) { - logger.warn("[通道信息中缺少地区信息]补充地区信息 国标编号: {}, civilCode: {}", gbDeviceId, civilCode ); - Region region = civilCodeFileConf.createRegion(civilCode); - if (region != null) { - regionMap.put(region.getCommonRegionDeviceId(), region); - }else { - logger.warn("[获取地区信息]失败 国标编号: {}, civilCode: {}", gbDeviceId, civilCode ); - } - } - } - // 对待写入的数据做处理 - if (!commonGbChannelList.isEmpty()) { - commonGbChannelList.stream().forEach(commonGbChannel -> { - if (commonGbChannel.getCommonGbParentID() != null - && !virtuallyGroupMap.containsKey(commonGbChannel.getCommonGbParentID())) { - commonGbChannel.setCommonGbParentID(null); - } - if (commonGbChannel.getCommonGbBusinessGroupID() != null - && !businessGroupMap.containsKey(commonGbChannel.getCommonGbBusinessGroupID())) { - commonGbChannel.setCommonGbBusinessGroupID(null); - } - if (commonGbChannel.getCommonGbCivilCode() != null - && !regionMap.containsKey(commonGbChannel.getCommonGbCivilCode())) { - commonGbChannel.setCommonGbCivilCode(null); - } - }); - } - // ====开始写入数据==== - // 清理重复数据 - TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); - if (!clearChannels.isEmpty()) { - if (clearChannels.size() <= BatchLimit.count) { - commonGbChannelMapper.deleteByDeviceIDs(clearChannels); - } else { - for (int i = 0; i < clearChannels.size(); i += BatchLimit.count) { - int toIndex = i + BatchLimit.count; - if (i + BatchLimit.count > clearChannels.size()) { - toIndex = clearChannels.size(); - } - List clearChannelsSun = clearChannels.subList(i, toIndex); - int currentResult = commonGbChannelMapper.deleteByDeviceIDs(clearChannelsSun); - if (currentResult <= 0) { - dataSourceTransactionManager.rollback(transactionStatus); - return false; - } - } - } - } - // 写入通道数据 - boolean result; - if (!commonGbChannelList.isEmpty()) { - if (commonGbChannelList.size() <= BatchLimit.count) { - result = commonGbChannelMapper.addAll(commonGbChannelList) > 0; - } else { - for (int i = 0; i < commonGbChannelList.size(); i += BatchLimit.count) { - int toIndex = i + BatchLimit.count; - if (i + BatchLimit.count > commonGbChannelList.size()) { - toIndex = commonGbChannelList.size(); - } - List commonGbChannelListSub = commonGbChannelList.subList(i, toIndex); - int currentResult = commonGbChannelMapper.addAll(commonGbChannelListSub); - if (currentResult <= 0) { - dataSourceTransactionManager.rollback(transactionStatus); - logger.info("[同步通用通道]来自国标设备,失败, 写入数据库失败, 国标编号: {}", gbDeviceId); - return false; - } - } - result = true; - } - }else { - result = true; - } - deviceChannelMapper.updateCommonChannelId(gbDeviceId); - // 写入分组数据 - List allGroup = new ArrayList<>(businessGroupMap.values()); - allGroup.addAll(virtuallyGroupMap.values()); - if (!allGroup.isEmpty()) { - // 这里也采取只插入新数据的方式 - List groupInDBList = groupMapper.queryInList(allGroup); - if (!groupInDBList.isEmpty()) { - groupInDBList.stream().forEach(groupInDB -> { - for (int i = 0; i < allGroup.size(); i++) { - if (groupInDB.getCommonGroupDeviceId().equalsIgnoreCase(allGroup.get(i).getCommonGroupDeviceId())) { - allGroup.remove(i); - break; - } - } - }); - } - if (!allGroup.isEmpty()) { - if (allGroup.size() <= BatchLimit.count) { - if (groupMapper.addAll(allGroup) <= 0) { - dataSourceTransactionManager.rollback(transactionStatus); - logger.info("[同步通用通道]来自国标设备,失败,添加分组信息失败, 国标编号: {}", gbDeviceId); - return false; - } - } else { - for (int i = 0; i < allGroup.size(); i += BatchLimit.count) { - int toIndex = i + BatchLimit.count; - if (i + BatchLimit.count > allGroup.size()) { - toIndex = allGroup.size(); - } - List allGroupSub = allGroup.subList(i, toIndex); - if (groupMapper.addAll(allGroupSub) <= 0) { - dataSourceTransactionManager.rollback(transactionStatus); - logger.info("[同步通用通道]来自国标设备,失败,添加分组信息失败, 国标编号: {}", gbDeviceId); - return false; - } - } - } - } - } - // 写入地区 - List allRegion = new ArrayList<>(regionMap.values()); - - if (!allRegion.isEmpty()) { - // 这里也采取只插入新数据的方式 - List regionInDBList = regionMapper.queryInList(allRegion); - List regionInForUpdate = new ArrayList<>(); - if (!regionInDBList.isEmpty()) { - regionInDBList.stream().forEach(regionInDB -> { - for (int i = 0; i < allRegion.size(); i++) { - if (regionInDB.getCommonRegionDeviceId().equalsIgnoreCase(allRegion.get(i).getCommonRegionDeviceId())) { - if (!regionInDB.getCommonRegionName().equals(allRegion.get(i).getCommonRegionName())) { - regionInForUpdate.add(allRegion.get(i)); - } - allRegion.remove(i); - break; - } - } - }); - } - if (!allRegion.isEmpty()) { - if (allRegion.size() <= BatchLimit.count) { - if (regionMapper.addAll(allRegion) <= 0) { - dataSourceTransactionManager.rollback(transactionStatus); - logger.info("[同步通用通道]来自国标设备,失败,添加行政区划信息失败, 国标编号: {}", gbDeviceId); - return false; - } - } else { - for (int i = 0; i < allRegion.size(); i += BatchLimit.count) { - int toIndex = i + BatchLimit.count; - if (i + BatchLimit.count > allRegion.size()) { - toIndex = allRegion.size(); - } - List allRegionSub = allRegion.subList(i, toIndex); - if (regionMapper.addAll(allRegionSub) <= 0) { - dataSourceTransactionManager.rollback(transactionStatus); - logger.info("[同步通用通道]来自国标设备,失败,添加行政区划信息失败, 国标编号: {}", gbDeviceId); - return false; - } - } - } - } - // 对于名称变化的地区进行修改 - if (!regionInForUpdate.isEmpty()) { - regionMapper.updateAllForName(regionInForUpdate); - } - } - dataSourceTransactionManager.commit(transactionStatus); - return result; - } - - private String getTopGroupId(Map businessGroupMap, Map virtuallyGroupMap, String commonGroupId, int depth) { - if (depth >= 16) { - return null; - } - Group group = virtuallyGroupMap.get(commonGroupId); - if (group == null) { - return null; - } - Gb28181CodeType channelIdType = SipUtils.getChannelIdType(group.getCommonGroupParentId()); - if (channelIdType == Gb28181CodeType.BUSINESS_GROUP) { - if (businessGroupMap.containsKey(group.getCommonGroupParentId())) { - return group.getCommonGroupParentId(); - }else { - return null; - } - } - depth ++; - return getTopGroupId(businessGroupMap, virtuallyGroupMap, group.getCommonGroupParentId(), depth); - } - - @Override - public CommonGbChannel getCommonChannelFromDeviceChannel(DeviceChannel deviceChannel, List syncKeys) { - if (deviceChannel == null) { - return null; - } - CommonGbChannel commonGbChannel = new CommonGbChannel(); - commonGbChannel.setCommonGbDeviceID(deviceChannel.getChannelId()); - commonGbChannel.setCommonGbStatus(deviceChannel.isStatus()); - commonGbChannel.setType(CommonGbChannelType.GB28181); - commonGbChannel.setCreateTime(DateUtil.getNow()); - commonGbChannel.setUpdateTime(DateUtil.getNow()); - if (syncKeys == null || syncKeys.isEmpty()) { - commonGbChannel.setCommonGbName(deviceChannel.getName()); - commonGbChannel.setCommonGbManufacturer(deviceChannel.getManufacture()); - commonGbChannel.setCommonGbModel(deviceChannel.getModel()); - commonGbChannel.setCommonGbOwner(deviceChannel.getOwner()); - if (deviceChannel.getCivilCode() != null) { - Gb28181CodeType channelIdType = SipUtils.getChannelIdType(deviceChannel.getCivilCode()); - if (channelIdType == Gb28181CodeType.CIVIL_CODE_PROVINCE - || channelIdType == Gb28181CodeType.CIVIL_CODE_CITY - || channelIdType == Gb28181CodeType.CIVIL_CODE_COUNTY - || channelIdType == Gb28181CodeType.CIVIL_CODE_GRASS_ROOTS - ){ - commonGbChannel.setCommonGbCivilCode(deviceChannel.getCivilCode()); - }else { - logger.warn("[不规范的CivilCode],deviceId: {}, channel: {}, civilCode: {}", - deviceChannel.getDeviceId(), - deviceChannel.getChannelId(), - deviceChannel.getCivilCode()); - } - } - - commonGbChannel.setCommonGbCivilCode(deviceChannel.getCivilCode()); - commonGbChannel.setCommonGbBlock(deviceChannel.getBlock()); - commonGbChannel.setCommonGbAddress(deviceChannel.getAddress()); - commonGbChannel.setCommonGbParental(0); - // 不符合国标的parentId,可以在未分组中找到并重新设置分组信息 - Gb28181CodeType parentIdIdType = SipUtils.getChannelIdType(deviceChannel.getParentId()); - if (parentIdIdType == Gb28181CodeType.VIRTUAL_ORGANIZATION) { - commonGbChannel.setCommonGbParentID(deviceChannel.getParentId()); - } - - commonGbChannel.setCommonGbSafetyWay(deviceChannel.getSafetyWay()); - commonGbChannel.setCommonGbRegisterWay(deviceChannel.getRegisterWay()); - commonGbChannel.setCommonGbCertNum(deviceChannel.getCertNum()); - commonGbChannel.setCommonGbCertifiable(deviceChannel.getCertifiable()); - commonGbChannel.setCommonGbErrCode(deviceChannel.getErrCode()); - commonGbChannel.setCommonGbEndTime(deviceChannel.getEndTime()); - if (NumberUtils.isParsable(deviceChannel.getSecrecy())) { - commonGbChannel.setCommonGbSecrecy(Integer.parseInt(deviceChannel.getSecrecy())); - } - commonGbChannel.setCommonGbIPAddress(deviceChannel.getIpAddress()); - commonGbChannel.setCommonGbPort(deviceChannel.getPort()); - commonGbChannel.setCommonGbPassword(deviceChannel.getPassword()); - commonGbChannel.setCommonGbLongitude(deviceChannel.getLongitude()); - commonGbChannel.setCommonGbLatitude(deviceChannel.getLatitude()); - commonGbChannel.setCommonGbPtzType(deviceChannel.getPTZType()); -// commonGbChannel.setCommonGbPositionType(deviceChannel.getCommonGbPositionType()); - commonGbChannel.setCommonGbBusinessGroupID(deviceChannel.getBusinessGroupId()); - } else { - for (String key : syncKeys) { - switch (key) { - case "commonGbName": - commonGbChannel.setCommonGbName(deviceChannel.getName()); - break; - case "commonGbManufacturer": - commonGbChannel.setCommonGbManufacturer(deviceChannel.getManufacture()); - break; - case "commonGbModel": - commonGbChannel.setCommonGbModel(deviceChannel.getModel()); - break; - case "commonGbOwner": - commonGbChannel.setCommonGbOwner(deviceChannel.getOwner()); - break; - case "commonGbCivilCode": - if (deviceChannel.getCivilCode() == null) { - break; - } - Gb28181CodeType channelIdType = SipUtils.getChannelIdType(deviceChannel.getCivilCode()); - if (channelIdType == Gb28181CodeType.CIVIL_CODE_PROVINCE - || channelIdType == Gb28181CodeType.CIVIL_CODE_CITY - || channelIdType == Gb28181CodeType.CIVIL_CODE_COUNTY - || channelIdType == Gb28181CodeType.CIVIL_CODE_GRASS_ROOTS - ){ - commonGbChannel.setCommonGbCivilCode(deviceChannel.getCivilCode()); - }else { - logger.warn("[不规范的CivilCode],deviceId: {}, channel: {}, civilCode: {}", - deviceChannel.getDeviceId(), - deviceChannel.getChannelId(), - deviceChannel.getCivilCode()); - } - commonGbChannel.setCommonGbCivilCode(deviceChannel.getCivilCode()); - break; - case "commonGbBlock": - commonGbChannel.setCommonGbBlock(deviceChannel.getBlock()); - break; - case "commonGbAddress": - commonGbChannel.setCommonGbAddress(deviceChannel.getAddress()); - break; - case "commonGbParental": - commonGbChannel.setCommonGbParental(deviceChannel.getParental()); - break; - case "commonGbParentID": - commonGbChannel.setCommonGbParentID(deviceChannel.getParentId()); - break; - case "commonGbSafetyWay": - commonGbChannel.setCommonGbSafetyWay(deviceChannel.getSafetyWay()); - break; - case "commonGbRegisterWay": - commonGbChannel.setCommonGbRegisterWay(deviceChannel.getRegisterWay()); - break; - case "commonGbCertNum": - commonGbChannel.setCommonGbCertNum(deviceChannel.getCertNum()); - break; - case "commonGbCertifiable": - commonGbChannel.setCommonGbCertifiable(deviceChannel.getCertifiable()); - break; - case "commonGbErrCode": - commonGbChannel.setCommonGbErrCode(deviceChannel.getErrCode()); - break; - case "commonGbEndTime": - commonGbChannel.setCommonGbEndTime(deviceChannel.getEndTime()); - break; - case "commonGbSecrecy": - if (NumberUtils.isParsable(deviceChannel.getSecrecy())) { - commonGbChannel.setCommonGbSecrecy(Integer.parseInt(deviceChannel.getSecrecy())); - } - break; - case "commonGbIPAddress": - commonGbChannel.setCommonGbIPAddress(deviceChannel.getIpAddress()); - break; - case "commonGbPort": - commonGbChannel.setCommonGbPort(deviceChannel.getPort()); - break; - case "commonGbPassword": - commonGbChannel.setCommonGbPassword(deviceChannel.getPassword()); - break; - case "commonGbLongitude": - commonGbChannel.setCommonGbLongitude(deviceChannel.getLongitude()); - break; - case "commonGbLatitude": - commonGbChannel.setCommonGbLatitude(deviceChannel.getLatitude()); - break; - case "commonGbPtzType": - commonGbChannel.setCommonGbPtzType(deviceChannel.getPTZType()); - break; - case "commonGbPositionType": -// commonGbChannel.setCommonGbPositionType(deviceChannel.getCommonGbPositionType()); - break; - case "commonGbRoomType": - break; - case "commonGbUseType": - break; - case "commonGbSupplyLightType": - break; - case "commonGbDirectionType": - break; - case "commonGbResolution": - break; - case "commonGbBusinessGroupID": - commonGbChannel.setCommonGbBusinessGroupID(deviceChannel.getBusinessGroupId()); - break; - case "commonGbDownloadSpeed": - break; - case "commonGbSVCTimeSupportMode": - break; - - } - } - } - - return commonGbChannel; - } - @Override public List getChannelsInBusinessGroup(String businessGroupID) { return null; @@ -740,7 +342,7 @@ public class CommonGbChannelServiceImpl implements ICommonGbChannelService { throw new RuntimeException("batch update commonGbChannel fail"); } } - // TODO 向国标级联发送catalog + sendCatalogEvent(commonGbChannels, CatalogEventType.UPDATE); } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java index 8b0c85f0..2bf183d0 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java @@ -255,12 +255,21 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { @Override public void batchUpdateChannel(List channels) { + if (channels.isEmpty()) { + return; + } channelMapper.batchUpdate(channels); for (DeviceChannel channel : channels) { if (channel.getParentId() != null) { channelMapper.updateChannelSubCount(channel.getDeviceId(), channel.getParentId()); } } + List commonGbChannelList = new ArrayList<>(); + for (DeviceChannel channel : channels) { + CommonGbChannel commonGbChannel = CommonGbChannel.getInstance(null, channel); + commonGbChannelList.add(commonGbChannel); + } + commonGbChannelService.batchUpdate(commonGbChannelList); } @Override @@ -269,6 +278,7 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { channels.stream().forEach(channel->{ commonGbChannelList.add(CommonGbChannel.getInstance(null, channel)); }); + commonGbChannelService.batchAdd(commonGbChannelList); channelMapper.batchAdd(channels); for (DeviceChannel channel : channels) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformChannelServiceImpl.java index 556efdcc..1c938523 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformChannelServiceImpl.java @@ -121,7 +121,7 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService { } @Override - public List querySharePlatformListByChannelId(int commonGbId, List platforms) { + public List querySharePlatformListByChannelId(int commonGbId, List platforms) { return platformChannelMapper.querySharePlatformListByChannelId(commonGbId, platforms); } @@ -151,11 +151,16 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService { @Override public CommonGbChannel queryChannelByPlatformIdAndChannelDeviceId(Integer platformId, String channelId) { - return null; + return platformChannelMapper.queryChannelByPlatformIdAndChannelDeviceId(platformId, channelId); } @Override public List queryCommonGbChannellList(Integer platformId) { return platformChannelMapper.queryCommonGbChannellList(platformId); } + + @Override + public List queryChannelListInRange(Integer platformId, List channelList) { + return platformChannelMapper.queryChannelListInRange(platformId, channelList); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java index 6de44c2a..67ec0f44 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java @@ -275,15 +275,7 @@ public class PlatformServiceImpl implements IPlatformService { @Override public void addSimulatedSubscribeInfo(ParentPlatform parentPlatform) { // 自动添加一条模拟的订阅信息 - SubscribeInfo subscribeInfo = new SubscribeInfo(); - subscribeInfo.setId(parentPlatform.getServerGBId()); - subscribeInfo.setExpires(-1); - subscribeInfo.setEventType("Catalog"); - int random = (int) Math.floor(Math.random() * 10000); - subscribeInfo.setEventId(random + ""); - subscribeInfo.setSimulatedCallId(UUID.randomUUID().toString().replace("-", "") + "@" + parentPlatform.getServerIP()); - subscribeInfo.setSimulatedFromTag(UUID.randomUUID().toString().replace("-", "")); - subscribeInfo.setSimulatedToTag(UUID.randomUUID().toString().replace("-", "")); + SubscribeInfo subscribeInfo = SipUtils.buildVirtuallyCatalogSubSubscribe(parentPlatform); subscribeHolder.putCatalogSubscribe(parentPlatform.getId(), subscribeInfo); } @@ -473,4 +465,14 @@ public class PlatformServiceImpl implements IPlatformService { public ParentPlatform query(Integer platformId) { return platformMapper.getParentPlatById(platformId); } + + @Override + public List queryAllWithShareAll() { + return platformMapper.queryAllWithShareAll(); + } + + @Override + public List querySharePlatform(List channel, List platformIdList) { + return platformMapper.querySharePlatform(channel, platformIdList); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/CommonChannelPlatformMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/CommonChannelPlatformMapper.java index 6f704a73..89cc5ad2 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/CommonChannelPlatformMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/CommonChannelPlatformMapper.java @@ -46,13 +46,35 @@ public interface CommonChannelPlatformMapper { @Param("commonGbChannelIds") List commonGbChannelIds); @Select("") - List querySharePlatformListByChannelId(@Param("commonGbId") int commonGbId, @Param("platforms") List platforms); + List querySharePlatformListByChannelId(@Param("commonGbId") int commonGbId, @Param("platforms") List platforms); @Select("") List queryCommonGbChannellList(@Param("platformId") Integer platformId); + + + @Select("") + CommonGbChannel queryChannelByPlatformIdAndChannelDeviceId(@Param("platformId") Integer platformId, + @Param("channelId") String channelId); + + @Select("") + List queryChannelListInRange(@Param("platformId") Integer platformId, + @Param("channelList") List channelList); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/ParentPlatformMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/ParentPlatformMapper.java index f8d0651c..b9ad4e70 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/ParentPlatformMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/ParentPlatformMapper.java @@ -1,5 +1,6 @@ package com.genersoft.iot.vmp.storager.dao; +import com.genersoft.iot.vmp.common.CommonGbChannel; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.storager.dao.dto.ChannelSourceInfo; import org.apache.ibatis.annotations.*; @@ -104,4 +105,21 @@ public interface ParentPlatformMapper { "union " + "select 'stream' as name, count(pgs.platform_id) count from wvp_platform_gb_stream pgs left join wvp_gb_stream gs on pgs.gb_stream_id = gs.gb_stream_id where pgs.platform_id=#{platform_id} and gs.gb_id =#{gbId}") List getChannelSource(@Param("platform_id") String platform_id, @Param("gbId") String gbId); + + @Select("SELECT * FROM wvp_platform WHERE share_all_channel=true") + List queryAllWithShareAll(); + + + @Select("") + List querySharePlatform(@Param("channelList") List channelList, + @Param("platformIdList") List platformIdList); } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/channel/CommonChannelController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/channel/CommonChannelController.java index 093e328b..44d91b7e 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/channel/CommonChannelController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/channel/CommonChannelController.java @@ -51,22 +51,6 @@ public class CommonChannelController { private UserSetting userSetting; - /** - * 从下级设备中同步通道 TODO 存疑 放在国标的接口里可能更合适 - * - * @param deviceId 设备编号 - */ - @GetMapping("/sync/device") - @Operation(summary = "从下级设备中同步通道") - @Parameter(name = "deviceId", description = "设备编号") - @Parameter(name = "syncKeys", description = "选择性同步的字段") - public boolean syncFromDevice(String deviceId, String[] syncKeys, - @RequestParam(required = false) Boolean syncGroup, - @RequestParam(required = false) Boolean syncRegion) { - return commonGbChannelService.syncChannelFromGb28181Device(deviceId, Lists.newArrayList(syncKeys), syncGroup, syncRegion); - } - - @Operation(summary = "更新通道信息") @Parameter(name = "CommonGbChannel", description = "commonGbChannel", required = true) @ResponseBody