From 29352cc9cd882e952f5eb3b42a41a1158437e8b7 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Fri, 6 Oct 2023 23:39:38 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E8=87=AA=E5=8A=A8=E5=90=8C?= =?UTF-8?q?=E6=AD=A5=E5=9B=BD=E6=A0=87=E9=80=9A=E9=81=93=E5=88=B0=E9=80=9A?= =?UTF-8?q?=E7=94=A8=E9=80=9A=E9=81=93=EF=BC=8C=E5=88=86=E7=BB=84=E4=BF=A1?= =?UTF-8?q?=E6=81=AF=E4=BB=A5=E5=8F=8A=E8=A1=8C=E6=94=BF=E5=8C=BA=E5=88=92?= =?UTF-8?q?=E8=87=AA=E5=8A=A8=E5=90=8C=E6=AD=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/service/IGroupService.java | 6 +- .../impl/CommonGbChannelServiceImpl.java | 110 +++++++++---- .../vmp/service/impl/GroupServiceImpl.java | 148 ++++++++++-------- .../storager/dao/CommonGbChannelMapper.java | 30 ++-- .../iot/vmp/storager/dao/DeviceMapper.java | 9 ++ .../iot/vmp/storager/dao/GroupMapper.java | 10 +- 6 files changed, 203 insertions(+), 110 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/service/IGroupService.java b/src/main/java/com/genersoft/iot/vmp/service/IGroupService.java index 92b2c17b..6c798dd3 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IGroupService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IGroupService.java @@ -48,17 +48,17 @@ public interface IGroupService { /** * 设置国标设备到相关的分组中 */ - boolean updateChannelsToBusinessGroup(int id, List channels); + boolean updateChannelsToGroup(int id, List channels); /** * 设置国标设备到相关的分组中 */ - boolean updateChannelsToBusinessGroup(String deviceId, List channels); + boolean updateChannelsToGroup(String deviceId, List channels); /** * 移除分组分组中的通道 */ - boolean removeChannelsFromBusinessGroup(List channels); + boolean removeChannelsFromGroup(List channels); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/CommonGbChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/CommonGbChannelServiceImpl.java index 2f0fd926..8aef26b7 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/CommonGbChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/CommonGbChannelServiceImpl.java @@ -111,10 +111,10 @@ public class CommonGbChannelServiceImpl implements ICommonGbChannelService { Set parentIdSet = new HashSet<>(); // 存储得到的所有行政区划, 后续检验civilCode是否已传输对应的行政区划数据,从而确定是否需要自动创建节点。 Set civilCodeSet = new HashSet<>(); - List clearChannels = new ArrayList<>(); + List clearChannels = new ArrayList<>(); deviceChannels.stream().forEach(deviceChannel -> { if (deviceChannel.getCommonGbChannelId() > 0) { - clearChannels.add(deviceChannel); + clearChannels.add(deviceChannel.getChannelId()); } Gb28181CodeType channelIdType = SipUtils.getChannelIdType(deviceChannel.getChannelId()); if (channelIdType != null) { @@ -155,6 +155,15 @@ public class CommonGbChannelServiceImpl implements ICommonGbChannelService { commonGbChannelList.add(commonGbChannel); } }); + // 检查是否存在已存在通道与将写入通道相同的情况 + List commonGbChannelInDbList = commonGbChannelMapper.queryInList(commonGbChannelList); + if (!commonGbChannelInDbList.isEmpty()) { + // 这里可以控制新数据覆盖旧数据还是丢弃重复的新数据 + // 目前使用新数据覆盖旧数据,后续分局实际业务需求再做修改 + commonGbChannelInDbList.stream().forEach(commonGbChannel->{ + clearChannels.add(commonGbChannel.getCommonGbDeviceID()); + }); + } TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); int limit = 50; if (!clearChannels.isEmpty()) { @@ -166,7 +175,7 @@ public class CommonGbChannelServiceImpl implements ICommonGbChannelService { if (i + limit > clearChannels.size()) { toIndex = clearChannels.size(); } - List clearChannelsSun = clearChannels.subList(i, toIndex); + List clearChannelsSun = clearChannels.subList(i, toIndex); int currentResult = commonGbChannelMapper.deleteByDeviceIDs(clearChannelsSun); if (currentResult <= 0) { dataSourceTransactionManager.rollback(transactionStatus); @@ -206,7 +215,6 @@ public class CommonGbChannelServiceImpl implements ICommonGbChannelService { }else { virtuallyGroup.setCommonGroupTopId(topGroupId); } - } } @@ -252,11 +260,7 @@ public class CommonGbChannelServiceImpl implements ICommonGbChannelService { // 处理存在错误的parentId if (!errorParentIdList.isEmpty()) { if (errorParentIdList.size() <= limit) { - if (commonGbChannelMapper.clearParentIds(errorParentIdList) <= 0) { - dataSourceTransactionManager.rollback(transactionStatus); - logger.info("[同步通用通道]来自国标设备,失败, 处理错误的ParentId失败, 国标编号: {}", gbDeviceId); - return false; - } + commonGbChannelMapper.clearParentIds(errorParentIdList); } else { for (int i = 0; i < errorParentIdList.size(); i += limit) { int toIndex = i + limit; @@ -264,46 +268,80 @@ public class CommonGbChannelServiceImpl implements ICommonGbChannelService { toIndex = errorParentIdList.size(); } List errorParentIdListSub = errorParentIdList.subList(i, toIndex); - if (commonGbChannelMapper.clearParentIds(errorParentIdListSub) <= 0) { - dataSourceTransactionManager.rollback(transactionStatus); - logger.info("[同步通用通道]来自国标设备,失败, 处理错误的ParentId失败, 国标编号: {}", gbDeviceId); - return false; - } + commonGbChannelMapper.clearParentIds(errorParentIdListSub); } } } // 分组信息写入数据库 List allGroup = new ArrayList<>(businessGroupMap.values()); allGroup.addAll(virtuallyGroupMap.values()); - if (allGroup.size() <= limit) { - if (groupMapper.addAll(allGroup) <= 0) { - dataSourceTransactionManager.rollback(transactionStatus); - logger.info("[同步通用通道]来自国标设备,失败,添加分组信息失败, 国标编号: {}", gbDeviceId); - return false; + if (!allGroup.isEmpty()) { + // 这里也采取只插入新数据的方式 + List groupInDBList = groupMapper.queryInList(allGroup); + if (!groupInDBList.isEmpty()) { + groupInDBList.stream().forEach(groupInDB -> { + for (int i = 0; i < allGroup.size(); i++) { + if (groupInDB.getCommonGroupDeviceId().equalsIgnoreCase(allGroup.get(i).getCommonGroupDeviceId())) { + allGroup.remove(i); + break; + } + } + }); } - } else { - for (int i = 0; i < allGroup.size(); i += limit) { - int toIndex = i + limit; - if (i + limit > allGroup.size()) { - toIndex = allGroup.size(); - } - List allGroupSub = allGroup.subList(i, toIndex); - if (groupMapper.addAll(allGroupSub) <= 0) { - dataSourceTransactionManager.rollback(transactionStatus); - logger.info("[同步通用通道]来自国标设备,失败,添加分组信息失败, 国标编号: {}", gbDeviceId); - return false; + if (!allGroup.isEmpty()) { + if (allGroup.size() <= limit) { + if (groupMapper.addAll(allGroup) <= 0) { + dataSourceTransactionManager.rollback(transactionStatus); + logger.info("[同步通用通道]来自国标设备,失败,添加分组信息失败, 国标编号: {}", gbDeviceId); + return false; + } + } else { + for (int i = 0; i < allGroup.size(); i += limit) { + int toIndex = i + limit; + if (i + limit > allGroup.size()) { + toIndex = allGroup.size(); + } + List allGroupSub = allGroup.subList(i, toIndex); + if (groupMapper.addAll(allGroupSub) <= 0) { + dataSourceTransactionManager.rollback(transactionStatus); + logger.info("[同步通用通道]来自国标设备,失败,添加分组信息失败, 国标编号: {}", gbDeviceId); + return false; + } + } } } } + List errorCivilCodeList = new ArrayList<>(); // 检测行政区划信息是否完整 for (String civilCode : civilCodeSet) { if (!regionMap.containsKey(civilCode)) { logger.warn("[通道信息中缺少地区信息]补充地区信息 国标编号: {}, civilCode: {}", gbDeviceId, civilCode ); Region region = civilCodeFileConf.createRegion(civilCode); - regionMap.put(region.getCommonRegionDeviceId(), region); + if (region != null) { + regionMap.put(region.getCommonRegionDeviceId(), region); + }else { + logger.warn("[获取地区信息]失败 国标编号: {}, civilCode: {}", gbDeviceId, civilCode ); + errorCivilCodeList.add(civilCode); + } + } } + if (!errorCivilCodeList.isEmpty()) { + if (errorCivilCodeList.size() <= limit) { + commonGbChannelMapper.clearCivilCodes(errorCivilCodeList); + } else { + for (int i = 0; i < errorCivilCodeList.size(); i += limit) { + int toIndex = i + limit; + if (i + limit > errorCivilCodeList.size()) { + toIndex = errorCivilCodeList.size(); + } + List errorCivilCodeListSub = errorParentIdList.subList(i, toIndex); + commonGbChannelMapper.clearCivilCodes(errorCivilCodeListSub); + } + } + } + // 行政区划信息写入数据库 List allRegion = new ArrayList<>(regionMap.values()); if (!allRegion.isEmpty()) { @@ -543,7 +581,15 @@ public class CommonGbChannelServiceImpl implements ICommonGbChannelService { @Override public void deleteGbChannelsFromList(List channelList) { - commonGbChannelMapper.deleteByDeviceIDs(channelList); + if (channelList.isEmpty()) { + return; + } + List channelIdList = new ArrayList<>(channelList.size()); + for (DeviceChannel deviceChannel : channelList) { + channelIdList.add(deviceChannel.getChannelId()); + } + commonGbChannelMapper.deleteByDeviceIDs(channelIdList); + } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/GroupServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/GroupServiceImpl.java index 6c02cc57..0a4efcbe 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/GroupServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/GroupServiceImpl.java @@ -24,7 +24,7 @@ public class GroupServiceImpl implements IGroupService { private CommonGbChannelMapper commonGbChannelDao; @Autowired - private GroupMapper businessGroupDao; + private GroupMapper groupMapper; @Autowired DataSourceTransactionManager dataSourceTransactionManager; @@ -35,105 +35,129 @@ public class GroupServiceImpl implements IGroupService { @Override public List getNodes(String parentId) { - return businessGroupDao.getNodes(parentId); + return groupMapper.getNodes(parentId); } @Override public List getChannels(int id) { - Group businessGroup = businessGroupDao.query(id); - if (businessGroup == null) { + Group group = groupMapper.query(id); + if (group == null) { return null; } - return commonGbChannelDao.getChannels(businessGroup.getCommonBusinessGroupPath()); + return commonGbChannelDao.getChannels(group.getCommonGroupDeviceId()); } @Override public List getChannels(String deviceId) { - Group businessGroup = businessGroupDao.queryByDeviceId(deviceId); - if (businessGroup == null) { + Group group = groupMapper.queryByDeviceId(deviceId); + if (group == null) { return null; } - return commonGbChannelDao.getChannels(businessGroup.getCommonBusinessGroupPath()); + return commonGbChannelDao.getChannels(group.getCommonGroupDeviceId()); } @Override - public boolean add(Group businessGroup) { - return businessGroupDao.add(businessGroup) > 0; + public boolean add(Group group) { + return groupMapper.add(group) > 0; } @Override public boolean remove(int id) { - return businessGroupDao.remove(id) > 0; + return groupMapper.remove(id) > 0; } @Override public boolean remove(String deviceId) { - return businessGroupDao.removeByDeviceId(deviceId) > 0; + return groupMapper.removeByDeviceId(deviceId) > 0; } @Override - public boolean update(Group businessGroup) { - if (businessGroup.getCommonBusinessGroupId() == 0) { + public boolean update(Group group) { + if (group.getCommonGroupId() == 0) { return false; } - Group businessGroupInDb = businessGroupDao.query(businessGroup.getCommonBusinessGroupId()); - if (businessGroupInDb == null) { + return groupMapper.update(group) > 0; + } + + @Override + public boolean updateChannelsToGroup(int id, List channels) { + if (channels.isEmpty()) { return false; } - TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); - boolean result = false; - if (!businessGroupInDb.getCommonBusinessGroupPath().equals(businessGroup.getCommonBusinessGroupPath())) { - // 需要更新通道信息 - int updateCount = commonGbChannelDao.updateBusinessGroupPath(businessGroupInDb.getCommonBusinessGroupPath(), businessGroup.getCommonBusinessGroupPath()); - if (updateCount > 0) { - dataSourceTransactionManager.rollback(transactionStatus); + Group group = groupMapper.query(id); + if (group == null) { + return false; + } + return updateChannelsToGroup(group, channels); + } + + @Override + public boolean updateChannelsToGroup(String deviceId, List channels) { + if (channels.isEmpty()) { + return false; + } + Group group = groupMapper.queryByDeviceId(deviceId); + if (group == null) { + return false; + } + return updateChannelsToGroup(group, channels); + } + + private boolean updateChannelsToGroup(Group group, List channels) { + for (CommonGbChannel channel : channels) { + channel.setCommonGbBusinessGroupID(group.getCommonGroupTopId()); + channel.setCommonGbParentID(group.getCommonGroupDeviceId()); + } + int limit = 50; + if (channels.size() <= limit) { + if (commonGbChannelDao.updateChanelForGroup(channels) <= 0) { + logger.info("[添加通道到分组] 失败"); return false; - } else { - result = businessGroupDao.update(businessGroup) > 0; } } else { - result = businessGroupDao.update(businessGroup) > 0; + TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); + for (int i = 0; i < channels.size(); i += limit) { + int toIndex = i + limit; + if (i + limit > channels.size()) { + toIndex = channels.size(); + } + List channelsSub = channels.subList(i, toIndex); + if (commonGbChannelDao.updateChanelForGroup(channelsSub) <= 0) { + dataSourceTransactionManager.rollback(transactionStatus); + logger.info("[添加通道到分组] 失败"); + return false; + } + } + dataSourceTransactionManager.commit(transactionStatus); } - dataSourceTransactionManager.commit(transactionStatus); - return result; + return true; } @Override - public boolean updateChannelsToBusinessGroup(int id, List channels) { - if (channels.isEmpty()) { - return false; + public boolean removeChannelsFromGroup(List channels) { + int limit = 50; + if (channels.size() <= limit) { + if (commonGbChannelDao.removeChannelsForGroup(channels) <= 0) { + logger.info("[从分组移除通道] 失败"); + return false; + } + } else { + TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); + for (int i = 0; i < channels.size(); i += limit) { + int toIndex = i + limit; + if (i + limit > channels.size()) { + toIndex = channels.size(); + } + List channelsSub = channels.subList(i, toIndex); + if (commonGbChannelDao.removeChannelsForGroup(channelsSub) <= 0) { + dataSourceTransactionManager.rollback(transactionStatus); + logger.info("[从分组移除通道] 失败"); + return false; + } + } + dataSourceTransactionManager.commit(transactionStatus); } - Group businessGroup = businessGroupDao.query(id); - if (businessGroup == null) { - return false; - } - for (CommonGbChannel channel : channels) { - channel.setCommonGbBusinessGroupID(businessGroup.getCommonBusinessGroupPath()); - } - // TODO 增加对数量的判断,分批处理 - return commonGbChannelDao.updateChanelForBusinessGroup(channels) > 1; - } - - @Override - public boolean updateChannelsToBusinessGroup(String deviceId, List channels) { - if (channels.isEmpty()) { - return false; - } - Group businessGroup = businessGroupDao.queryByDeviceId(deviceId); - if (businessGroup == null) { - return false; - } - for (CommonGbChannel channel : channels) { - channel.setCommonGbBusinessGroupID(businessGroup.getCommonBusinessGroupPath()); - } - // TODO 增加对数量的判断,分批处理 - return commonGbChannelDao.updateChanelForBusinessGroup(channels) > 1; - } - - @Override - public boolean removeChannelsFromBusinessGroup(List channels) { - // TODO 增加对数量的判断,分批处理 - return commonGbChannelDao.removeChannelsForBusinessGroup(channels) > 1; + return true; } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/CommonGbChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/CommonGbChannelMapper.java index 4cd58260..8ff0fbe5 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/CommonGbChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/CommonGbChannelMapper.java @@ -11,8 +11,8 @@ import java.util.List; @Repository public interface CommonGbChannelMapper { - @Select(value = "select * from wvp_common_gb_channel where common_gb_business_group_id = '#{commonBusinessGroupPath}'") - List getChannels(String commonBusinessGroupPath); + @Select(value = "select * from wvp_common_gb_channel where common_gb_business_group_id = '#{commonGroupId}'") + List getChannels(String commonGroupId); @Update(value = "") - int updateChanelForBusinessGroup(List channels); + int updateChanelForGroup(List channels); @Delete(value = "") - int removeChannelsForBusinessGroup(List channels); - - @Update("update wvp_common_gb_channel set common_gb_business_group_id = #{newPath} where common_gb_business_group_id = #{oldPath}") - int updateBusinessGroupPath(String oldPath, String newPath); + int removeChannelsForGroup(List channels); @Select("select * from wvp_common_gb_channel where common_gb_device_id=#{channelId}") CommonGbChannel queryByDeviceID(String channelId); @@ -284,10 +281,11 @@ public interface CommonGbChannelMapper { int addAll(List commonGbChannelList); @Delete("") - int deleteByDeviceIDs(List clearChannels); + int deleteByDeviceIDs(List clearChannels); @Update("") int clearParentIds(List errorParentIdList); + + @Update("") + void clearCivilCodes(List errorCivilCodeList); + + @Select("") + List queryInList(List commonGbChannelList); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMapper.java index 30ffd2bb..54fe0152 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMapper.java @@ -44,6 +44,7 @@ public interface DeviceMapper { "on_line," + "media_server_id," + "switch_primary_sub_stream," + + "auto_sync_channel," + "(SELECT count(0) FROM wvp_device_channel WHERE device_id=wvp_device.device_id) as channel_count "+ " FROM wvp_device WHERE device_id = #{deviceId}") Device getDeviceByDeviceId(String deviceId); @@ -74,6 +75,7 @@ public interface DeviceMapper { "subscribe_cycle_for_alarm,"+ "ssrc_check,"+ "as_message_channel,"+ + "auto_sync_channel,"+ "geo_coord_sys,"+ "on_line"+ ") VALUES (" + @@ -102,6 +104,7 @@ public interface DeviceMapper { "#{subscribeCycleForAlarm}," + "#{ssrcCheck}," + "#{asMessageChannel}," + + "#{autoSyncChannel}," + "#{geoCoordSys}," + "#{onLine}" + ")") @@ -160,6 +163,7 @@ public interface DeviceMapper { "on_line,"+ "media_server_id,"+ "switch_primary_sub_stream switchPrimarySubStream,"+ + "auto_sync_channel,"+ "(SELECT count(0) FROM wvp_device_channel WHERE device_id=de.device_id) as channel_count " + "FROM wvp_device de" + " where on_line=${onLine}"+ @@ -197,6 +201,7 @@ public interface DeviceMapper { "subscribe_cycle_for_alarm,"+ "ssrc_check,"+ "as_message_channel,"+ + "auto_sync_channel,"+ "geo_coord_sys,"+ "on_line"+ " FROM wvp_device WHERE on_line = true") @@ -227,6 +232,7 @@ public interface DeviceMapper { "subscribe_cycle_for_alarm,"+ "ssrc_check,"+ "as_message_channel,"+ + "auto_sync_channel,"+ "geo_coord_sys,"+ "on_line"+ " FROM wvp_device WHERE ip = #{host} AND port=#{port}") @@ -250,6 +256,7 @@ public interface DeviceMapper { ", as_message_channel=#{asMessageChannel}" + ", geo_coord_sys=#{geoCoordSys}" + ", switch_primary_sub_stream=#{switchPrimarySubStream}" + + ", auto_sync_channel=#{autoSyncChannel}" + ", media_server_id=#{mediaServerId}" + "WHERE device_id=#{deviceId}"+ " "}) @@ -265,6 +272,7 @@ public interface DeviceMapper { "charset,"+ "ssrc_check,"+ "as_message_channel,"+ + "auto_sync_channel,"+ "geo_coord_sys,"+ "on_line,"+ "media_server_id,"+ @@ -279,6 +287,7 @@ public interface DeviceMapper { "#{charset}," + "#{ssrcCheck}," + "#{asMessageChannel}," + + "#{autoSyncChannel}," + "#{geoCoordSys}," + "#{onLine}," + "#{mediaServerId}," + diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/GroupMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/GroupMapper.java index 76a4d2eb..47956bae 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/GroupMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/GroupMapper.java @@ -69,15 +69,19 @@ public interface GroupMapper { "common_group_create_time, " + "common_group_update_time " + ") values " + - " " + - "( " + + " " + "#{item.commonGroupDeviceId}, " + "#{item.commonGroupName}, " + "#{item.commonGroupParentId}, " + "#{item.commonGroupCreateTime}, " + "#{item.commonGroupUpdateTime} " + - ")" + "" + "") int addAll(List allGroup); + + @Select("") + List queryInList(List allGroup); }