优化通用通道同步,待完成

结构优化
648540858 2023-11-27 11:04:43 +08:00
parent a9e5883492
commit d88c635d8a
2 changed files with 192 additions and 14 deletions

View File

@ -120,8 +120,6 @@ public class CommonGbChannelServiceImpl implements ICommonGbChannelService {
Map<String, Group> virtuallyGroupMap = new HashMap<>();
// 存储得到的行政区划数据
Map<String, Region> regionMap = new HashMap<>();
// // 存储得到的所有parentId, 后续检验parentId是否已传输对应的分组/行政区划数据,从而确定是否需要自动创建节点。
// Set<String> parentIdSet = new HashSet<>();
// 存储得到的所有行政区划, 后续检验civilCode是否已传输对应的行政区划数据从而确定是否需要自动创建节点。
Set<String> civilCodeSet = new HashSet<>();
List<String> clearChannels = new ArrayList<>();

View File

@ -1,24 +1,25 @@
package com.genersoft.iot.vmp.service.impl;
import com.genersoft.iot.vmp.common.BatchLimit;
import com.genersoft.iot.vmp.common.CommonGbChannel;
import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.common.*;
import com.genersoft.iot.vmp.conf.CivilCodeFileConf;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.Gb28181CodeType;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.gb28181.utils.Coordtransform;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.service.ICommonGbChannelService;
import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.service.bean.Group;
import com.genersoft.iot.vmp.service.bean.Region;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper;
import com.genersoft.iot.vmp.storager.dao.DeviceMapper;
import com.genersoft.iot.vmp.storager.dao.PlatformChannelMapper;
import com.genersoft.iot.vmp.storager.dao.*;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -55,6 +56,15 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
@Autowired
ICommonGbChannelService commonGbChannelService;
@Autowired
private CivilCodeFileConf civilCodeFileConf;
@Autowired
private GroupMapper groupMapper;
@Autowired
private RegionMapper regionMapper;
@Override
public DeviceChannel updateGps(DeviceChannel deviceChannel, Device device) {
@ -289,15 +299,23 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
List<DeviceChannel> addChannels = new ArrayList<>();
List<CommonGbChannel> addCommonChannels = new ArrayList<>();
StringBuilder stringBuilder = new StringBuilder();
Map<String, Integer> subContMap = new HashMap<>();
// 存储得到的10到13位为215的业务分组数据
Map<String, Group> businessGroupMap = new HashMap<>();
// 存储得到的10到13位为216的虚拟组织 数据
Map<String, Group> virtuallyGroupMap = new HashMap<>();
// 存储得到的行政区划数据
Map<String, Region> regionMap = new HashMap<>();
// 存储得到的所有行政区划, 后续检验civilCode是否已传输对应的行政区划数据从而确定是否需要自动创建节点。
Set<String> civilCodeSet = new HashSet<>();
List<String> clearChannels = new ArrayList<>();
Set<String> gbIdSet = new HashSet<>();
for (DeviceChannel deviceChannel : deviceChannelList) {
// 数据去重
if (gbIdSet.contains(deviceChannel.getChannelId())) {
stringBuilder.append(deviceChannel.getChannelId()).append(",");
logger.info("[目录查询]收到的数据存在重复: {}" , deviceChannel.getChannelId());
continue;
}
gbIdSet.add(deviceChannel.getChannelId());
@ -317,7 +335,47 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
deviceChannel.setCreateTime(DateUtil.getNow());
deviceChannel.setUpdateTime(DateUtil.getNow());
addChannels.add(deviceChannel);
addCommonChannels.add(CommonGbChannel.getInstance(null, deviceChannel));
Gb28181CodeType channelIdType = SipUtils.getChannelIdType(deviceChannel.getChannelId());
if (channelIdType != null) {
if (
(
channelIdType == Gb28181CodeType.CIVIL_CODE_PROVINCE
|| channelIdType == Gb28181CodeType.CIVIL_CODE_CITY
|| channelIdType == Gb28181CodeType.CIVIL_CODE_COUNTY
|| channelIdType == Gb28181CodeType.CIVIL_CODE_GRASS_ROOTS
)
&&
!regionMap.containsKey(deviceChannel.getChannelId())
) {
CivilCodePo parentCivilCodePo = civilCodeFileConf.getParentCode(deviceChannel.getChannelId());
String civilCode = null;
if (parentCivilCodePo != null) {
civilCode = parentCivilCodePo.getCode();
}
// 行政区划条目
Region region = Region.getInstance(deviceChannel.getChannelId(), deviceChannel.getName(),
civilCode);
regionMap.put(deviceChannel.getChannelId(), region);
}
if (channelIdType == Gb28181CodeType.BUSINESS_GROUP
&& !businessGroupMap.containsKey(deviceChannel.getChannelId())) {
Group group = Group.getInstance(deviceChannel.getChannelId(), deviceChannel.getName(),
null, deviceChannel.getChannelId());
businessGroupMap.put(deviceChannel.getChannelId(), group);
}
if (channelIdType == Gb28181CodeType.VIRTUAL_ORGANIZATION
&& !virtuallyGroupMap.containsKey(deviceChannel.getChannelId())) {
Group group = Group.getInstance(deviceChannel.getChannelId(), deviceChannel.getName(), deviceChannel.getParentId(), deviceChannel.getBusinessGroupId());
virtuallyGroupMap.put(deviceChannel.getChannelId(), group);
}
}else {
if (!StringUtils.isEmpty(deviceChannel.getCivilCode())) {
civilCodeSet.add(deviceChannel.getCivilCode());
}
addCommonChannels.add(CommonGbChannel.getInstance(null, deviceChannel));
}
}
channels.add(deviceChannel);
if (!ObjectUtils.isEmpty(deviceChannel.getParentId())) {
@ -341,9 +399,60 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
}
}
if (stringBuilder.length() > 0) {
logger.info("[目录查询]收到的数据存在重复: {}" , stringBuilder);
// 检测分组境况
if (businessGroupMap.isEmpty()) {
virtuallyGroupMap.clear();
}else {
// 检查业务分组与虚拟组织
if (!virtuallyGroupMap.isEmpty()) {
for (String key : virtuallyGroupMap.keySet()) {
Group virtuallyGroup = virtuallyGroupMap.get(key);
if (virtuallyGroup.getCommonGroupTopId() == null
|| !businessGroupMap.containsKey(virtuallyGroup.getCommonGroupTopId())
) {
virtuallyGroupMap.remove(key);
continue;
}
if (virtuallyGroup.getCommonGroupParentId() != null && !virtuallyGroupMap.containsKey(virtuallyGroup.getCommonGroupParentId())) {
virtuallyGroup.setCommonGroupParentId(null);
}
}
if (virtuallyGroupMap.isEmpty()) {
businessGroupMap.clear();
}
}
}
// 检测行政区划信息是否完整
for (String civilCode : civilCodeSet) {
if (!regionMap.containsKey(civilCode)) {
logger.warn("[通道信息中缺少地区信息]补充地区信息 civilCode {}", civilCode );
Region region = civilCodeFileConf.createRegion(civilCode);
if (region != null) {
regionMap.put(region.getCommonRegionDeviceId(), region);
}else {
logger.warn("[获取地区信息]失败 civilCode {}", civilCode );
}
}
}
// 对待写入的数据做处理
if (!addCommonChannels.isEmpty()) {
addCommonChannels.stream().forEach(commonGbChannel -> {
if (commonGbChannel.getCommonGbParentID() != null
&& !virtuallyGroupMap.containsKey(commonGbChannel.getCommonGbParentID())) {
commonGbChannel.setCommonGbParentID(null);
}
if (commonGbChannel.getCommonGbBusinessGroupID() != null
&& !businessGroupMap.containsKey(commonGbChannel.getCommonGbBusinessGroupID())) {
commonGbChannel.setCommonGbBusinessGroupID(null);
}
if (commonGbChannel.getCommonGbCivilCode() != null
&& !regionMap.containsKey(commonGbChannel.getCommonGbCivilCode())) {
commonGbChannel.setCommonGbCivilCode(null);
}
});
}
if(CollectionUtils.isEmpty(channels)){
logger.info("通道重设,数据为空={}" , deviceChannelList);
return false;
@ -403,6 +512,77 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
}
commonGbChannelService.batchUpdate(updateCommonChannelsForInfo);
}
// 写入分组数据
List<Group> allGroup = new ArrayList<>(businessGroupMap.values());
allGroup.addAll(virtuallyGroupMap.values());
if (!allGroup.isEmpty()) {
// 这里也采取只插入新数据的方式
List<Group> groupInDBList = groupMapper.queryInList(allGroup);
if (!groupInDBList.isEmpty()) {
groupInDBList.stream().forEach(groupInDB -> {
for (int i = 0; i < allGroup.size(); i++) {
if (groupInDB.getCommonGroupDeviceId().equalsIgnoreCase(allGroup.get(i).getCommonGroupDeviceId())) {
allGroup.remove(i);
break;
}
}
});
}
if (!allGroup.isEmpty()) {
if (allGroup.size() <= BatchLimit.count) {
groupMapper.addAll(allGroup);
} else {
for (int i = 0; i < allGroup.size(); i += BatchLimit.count) {
int toIndex = i + BatchLimit.count;
if (i + BatchLimit.count > allGroup.size()) {
toIndex = allGroup.size();
}
groupMapper.addAll(allGroup.subList(i, toIndex));
}
}
}
}
// 写入地区
List<Region> allRegion = new ArrayList<>(regionMap.values());
if (!allRegion.isEmpty()) {
// 这里也采取只插入新数据的方式
List<Region> regionInDBList = regionMapper.queryInList(allRegion);
List<Region> regionInForUpdate = new ArrayList<>();
if (!regionInDBList.isEmpty()) {
regionInDBList.stream().forEach(regionInDB -> {
for (int i = 0; i < allRegion.size(); i++) {
if (regionInDB.getCommonRegionDeviceId().equalsIgnoreCase(allRegion.get(i).getCommonRegionDeviceId())) {
if (!regionInDB.getCommonRegionName().equals(allRegion.get(i).getCommonRegionName())) {
regionInForUpdate.add(allRegion.get(i));
}
allRegion.remove(i);
break;
}
}
});
}
if (!allRegion.isEmpty()) {
if (allRegion.size() <= BatchLimit.count) {
if (regionMapper.addAll(allRegion) <= 0) {
regionMapper.addAll(allRegion);
}
} else {
for (int i = 0; i < allRegion.size(); i += BatchLimit.count) {
int toIndex = i + BatchLimit.count;
if (i + BatchLimit.count > allRegion.size()) {
toIndex = allRegion.size();
}
List<Region> allRegionSub = allRegion.subList(i, toIndex);
regionMapper.addAll(allRegionSub);
}
}
}
// 对于名称变化的地区进行修改
if (!regionInForUpdate.isEmpty()) {
regionMapper.updateAllForName(regionInForUpdate);
}
}
return true;
}catch (Exception e) {
logger.error("未处理的异常 ", e);