优化自动同步国标通道到通用通道,分组信息以及行政区划自动同步

结构优化
648540858 2023-10-06 23:39:38 +08:00
parent 6255e6f89f
commit 29352cc9cd
6 changed files with 203 additions and 110 deletions

View File

@ -48,17 +48,17 @@ public interface IGroupService {
/**
*
*/
boolean updateChannelsToBusinessGroup(int id, List<CommonGbChannel> channels);
boolean updateChannelsToGroup(int id, List<CommonGbChannel> channels);
/**
*
*/
boolean updateChannelsToBusinessGroup(String deviceId, List<CommonGbChannel> channels);
boolean updateChannelsToGroup(String deviceId, List<CommonGbChannel> channels);
/**
*
*/
boolean removeChannelsFromBusinessGroup(List<CommonGbChannel> channels);
boolean removeChannelsFromGroup(List<CommonGbChannel> channels);
}

View File

@ -111,10 +111,10 @@ public class CommonGbChannelServiceImpl implements ICommonGbChannelService {
Set<String> parentIdSet = new HashSet<>();
// 存储得到的所有行政区划, 后续检验civilCode是否已传输对应的行政区划数据从而确定是否需要自动创建节点。
Set<String> civilCodeSet = new HashSet<>();
List<DeviceChannel> clearChannels = new ArrayList<>();
List<String> clearChannels = new ArrayList<>();
deviceChannels.stream().forEach(deviceChannel -> {
if (deviceChannel.getCommonGbChannelId() > 0) {
clearChannels.add(deviceChannel);
clearChannels.add(deviceChannel.getChannelId());
}
Gb28181CodeType channelIdType = SipUtils.getChannelIdType(deviceChannel.getChannelId());
if (channelIdType != null) {
@ -155,6 +155,15 @@ public class CommonGbChannelServiceImpl implements ICommonGbChannelService {
commonGbChannelList.add(commonGbChannel);
}
});
// 检查是否存在已存在通道与将写入通道相同的情况
List<CommonGbChannel> commonGbChannelInDbList = commonGbChannelMapper.queryInList(commonGbChannelList);
if (!commonGbChannelInDbList.isEmpty()) {
// 这里可以控制新数据覆盖旧数据还是丢弃重复的新数据
// 目前使用新数据覆盖旧数据,后续分局实际业务需求再做修改
commonGbChannelInDbList.stream().forEach(commonGbChannel->{
clearChannels.add(commonGbChannel.getCommonGbDeviceID());
});
}
TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
int limit = 50;
if (!clearChannels.isEmpty()) {
@ -166,7 +175,7 @@ public class CommonGbChannelServiceImpl implements ICommonGbChannelService {
if (i + limit > clearChannels.size()) {
toIndex = clearChannels.size();
}
List<DeviceChannel> clearChannelsSun = clearChannels.subList(i, toIndex);
List<String> clearChannelsSun = clearChannels.subList(i, toIndex);
int currentResult = commonGbChannelMapper.deleteByDeviceIDs(clearChannelsSun);
if (currentResult <= 0) {
dataSourceTransactionManager.rollback(transactionStatus);
@ -206,7 +215,6 @@ public class CommonGbChannelServiceImpl implements ICommonGbChannelService {
}else {
virtuallyGroup.setCommonGroupTopId(topGroupId);
}
}
}
@ -252,11 +260,7 @@ public class CommonGbChannelServiceImpl implements ICommonGbChannelService {
// 处理存在错误的parentId
if (!errorParentIdList.isEmpty()) {
if (errorParentIdList.size() <= limit) {
if (commonGbChannelMapper.clearParentIds(errorParentIdList) <= 0) {
dataSourceTransactionManager.rollback(transactionStatus);
logger.info("[同步通用通道]来自国标设备,失败, 处理错误的ParentId失败, 国标编号: {}", gbDeviceId);
return false;
}
commonGbChannelMapper.clearParentIds(errorParentIdList);
} else {
for (int i = 0; i < errorParentIdList.size(); i += limit) {
int toIndex = i + limit;
@ -264,17 +268,27 @@ public class CommonGbChannelServiceImpl implements ICommonGbChannelService {
toIndex = errorParentIdList.size();
}
List<String> errorParentIdListSub = errorParentIdList.subList(i, toIndex);
if (commonGbChannelMapper.clearParentIds(errorParentIdListSub) <= 0) {
dataSourceTransactionManager.rollback(transactionStatus);
logger.info("[同步通用通道]来自国标设备,失败, 处理错误的ParentId失败, 国标编号: {}", gbDeviceId);
return false;
}
commonGbChannelMapper.clearParentIds(errorParentIdListSub);
}
}
}
// 分组信息写入数据库
List<Group> allGroup = new ArrayList<>(businessGroupMap.values());
allGroup.addAll(virtuallyGroupMap.values());
if (!allGroup.isEmpty()) {
// 这里也采取只插入新数据的方式
List<Group> groupInDBList = groupMapper.queryInList(allGroup);
if (!groupInDBList.isEmpty()) {
groupInDBList.stream().forEach(groupInDB -> {
for (int i = 0; i < allGroup.size(); i++) {
if (groupInDB.getCommonGroupDeviceId().equalsIgnoreCase(allGroup.get(i).getCommonGroupDeviceId())) {
allGroup.remove(i);
break;
}
}
});
}
if (!allGroup.isEmpty()) {
if (allGroup.size() <= limit) {
if (groupMapper.addAll(allGroup) <= 0) {
dataSourceTransactionManager.rollback(transactionStatus);
@ -295,15 +309,39 @@ public class CommonGbChannelServiceImpl implements ICommonGbChannelService {
}
}
}
}
}
List<String> errorCivilCodeList = new ArrayList<>();
// 检测行政区划信息是否完整
for (String civilCode : civilCodeSet) {
if (!regionMap.containsKey(civilCode)) {
logger.warn("[通道信息中缺少地区信息]补充地区信息 国标编号: {} civilCode {}", gbDeviceId, civilCode );
Region region = civilCodeFileConf.createRegion(civilCode);
if (region != null) {
regionMap.put(region.getCommonRegionDeviceId(), region);
}else {
logger.warn("[获取地区信息]失败 国标编号: {} civilCode {}", gbDeviceId, civilCode );
errorCivilCodeList.add(civilCode);
}
}
}
if (!errorCivilCodeList.isEmpty()) {
if (errorCivilCodeList.size() <= limit) {
commonGbChannelMapper.clearCivilCodes(errorCivilCodeList);
} else {
for (int i = 0; i < errorCivilCodeList.size(); i += limit) {
int toIndex = i + limit;
if (i + limit > errorCivilCodeList.size()) {
toIndex = errorCivilCodeList.size();
}
List<String> errorCivilCodeListSub = errorParentIdList.subList(i, toIndex);
commonGbChannelMapper.clearCivilCodes(errorCivilCodeListSub);
}
}
}
// 行政区划信息写入数据库
List<Region> allRegion = new ArrayList<>(regionMap.values());
if (!allRegion.isEmpty()) {
@ -543,7 +581,15 @@ public class CommonGbChannelServiceImpl implements ICommonGbChannelService {
@Override
public void deleteGbChannelsFromList(List<DeviceChannel> channelList) {
commonGbChannelMapper.deleteByDeviceIDs(channelList);
if (channelList.isEmpty()) {
return;
}
List<String> channelIdList = new ArrayList<>(channelList.size());
for (DeviceChannel deviceChannel : channelList) {
channelIdList.add(deviceChannel.getChannelId());
}
commonGbChannelMapper.deleteByDeviceIDs(channelIdList);
}
@Override

View File

@ -24,7 +24,7 @@ public class GroupServiceImpl implements IGroupService {
private CommonGbChannelMapper commonGbChannelDao;
@Autowired
private GroupMapper businessGroupDao;
private GroupMapper groupMapper;
@Autowired
DataSourceTransactionManager dataSourceTransactionManager;
@ -35,105 +35,129 @@ public class GroupServiceImpl implements IGroupService {
@Override
public List<Group> getNodes(String parentId) {
return businessGroupDao.getNodes(parentId);
return groupMapper.getNodes(parentId);
}
@Override
public List<CommonGbChannel> getChannels(int id) {
Group businessGroup = businessGroupDao.query(id);
if (businessGroup == null) {
Group group = groupMapper.query(id);
if (group == null) {
return null;
}
return commonGbChannelDao.getChannels(businessGroup.getCommonBusinessGroupPath());
return commonGbChannelDao.getChannels(group.getCommonGroupDeviceId());
}
@Override
public List<CommonGbChannel> getChannels(String deviceId) {
Group businessGroup = businessGroupDao.queryByDeviceId(deviceId);
if (businessGroup == null) {
Group group = groupMapper.queryByDeviceId(deviceId);
if (group == null) {
return null;
}
return commonGbChannelDao.getChannels(businessGroup.getCommonBusinessGroupPath());
return commonGbChannelDao.getChannels(group.getCommonGroupDeviceId());
}
@Override
public boolean add(Group businessGroup) {
return businessGroupDao.add(businessGroup) > 0;
public boolean add(Group group) {
return groupMapper.add(group) > 0;
}
@Override
public boolean remove(int id) {
return businessGroupDao.remove(id) > 0;
return groupMapper.remove(id) > 0;
}
@Override
public boolean remove(String deviceId) {
return businessGroupDao.removeByDeviceId(deviceId) > 0;
return groupMapper.removeByDeviceId(deviceId) > 0;
}
@Override
public boolean update(Group businessGroup) {
if (businessGroup.getCommonBusinessGroupId() == 0) {
public boolean update(Group group) {
if (group.getCommonGroupId() == 0) {
return false;
}
Group businessGroupInDb = businessGroupDao.query(businessGroup.getCommonBusinessGroupId());
if (businessGroupInDb == null) {
return groupMapper.update(group) > 0;
}
@Override
public boolean updateChannelsToGroup(int id, List<CommonGbChannel> channels) {
if (channels.isEmpty()) {
return false;
}
Group group = groupMapper.query(id);
if (group == null) {
return false;
}
return updateChannelsToGroup(group, channels);
}
@Override
public boolean updateChannelsToGroup(String deviceId, List<CommonGbChannel> channels) {
if (channels.isEmpty()) {
return false;
}
Group group = groupMapper.queryByDeviceId(deviceId);
if (group == null) {
return false;
}
return updateChannelsToGroup(group, channels);
}
private boolean updateChannelsToGroup(Group group, List<CommonGbChannel> channels) {
for (CommonGbChannel channel : channels) {
channel.setCommonGbBusinessGroupID(group.getCommonGroupTopId());
channel.setCommonGbParentID(group.getCommonGroupDeviceId());
}
int limit = 50;
if (channels.size() <= limit) {
if (commonGbChannelDao.updateChanelForGroup(channels) <= 0) {
logger.info("[添加通道到分组] 失败");
return false;
}
} else {
TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
boolean result = false;
if (!businessGroupInDb.getCommonBusinessGroupPath().equals(businessGroup.getCommonBusinessGroupPath())) {
// 需要更新通道信息
int updateCount = commonGbChannelDao.updateBusinessGroupPath(businessGroupInDb.getCommonBusinessGroupPath(), businessGroup.getCommonBusinessGroupPath());
if (updateCount > 0) {
dataSourceTransactionManager.rollback(transactionStatus);
return false;
} else {
result = businessGroupDao.update(businessGroup) > 0;
for (int i = 0; i < channels.size(); i += limit) {
int toIndex = i + limit;
if (i + limit > channels.size()) {
toIndex = channels.size();
}
List<CommonGbChannel> channelsSub = channels.subList(i, toIndex);
if (commonGbChannelDao.updateChanelForGroup(channelsSub) <= 0) {
dataSourceTransactionManager.rollback(transactionStatus);
logger.info("[添加通道到分组] 失败");
return false;
}
} else {
result = businessGroupDao.update(businessGroup) > 0;
}
dataSourceTransactionManager.commit(transactionStatus);
return result;
}
return true;
}
@Override
public boolean updateChannelsToBusinessGroup(int id, List<CommonGbChannel> channels) {
if (channels.isEmpty()) {
public boolean removeChannelsFromGroup(List<CommonGbChannel> channels) {
int limit = 50;
if (channels.size() <= limit) {
if (commonGbChannelDao.removeChannelsForGroup(channels) <= 0) {
logger.info("[从分组移除通道] 失败");
return false;
}
Group businessGroup = businessGroupDao.query(id);
if (businessGroup == null) {
} else {
TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
for (int i = 0; i < channels.size(); i += limit) {
int toIndex = i + limit;
if (i + limit > channels.size()) {
toIndex = channels.size();
}
List<CommonGbChannel> channelsSub = channels.subList(i, toIndex);
if (commonGbChannelDao.removeChannelsForGroup(channelsSub) <= 0) {
dataSourceTransactionManager.rollback(transactionStatus);
logger.info("[从分组移除通道] 失败");
return false;
}
for (CommonGbChannel channel : channels) {
channel.setCommonGbBusinessGroupID(businessGroup.getCommonBusinessGroupPath());
}
// TODO 增加对数量的判断,分批处理
return commonGbChannelDao.updateChanelForBusinessGroup(channels) > 1;
dataSourceTransactionManager.commit(transactionStatus);
}
@Override
public boolean updateChannelsToBusinessGroup(String deviceId, List<CommonGbChannel> channels) {
if (channels.isEmpty()) {
return false;
}
Group businessGroup = businessGroupDao.queryByDeviceId(deviceId);
if (businessGroup == null) {
return false;
}
for (CommonGbChannel channel : channels) {
channel.setCommonGbBusinessGroupID(businessGroup.getCommonBusinessGroupPath());
}
// TODO 增加对数量的判断,分批处理
return commonGbChannelDao.updateChanelForBusinessGroup(channels) > 1;
}
@Override
public boolean removeChannelsFromBusinessGroup(List<CommonGbChannel> channels) {
// TODO 增加对数量的判断,分批处理
return commonGbChannelDao.removeChannelsForBusinessGroup(channels) > 1;
return true;
}
}

View File

@ -11,8 +11,8 @@ import java.util.List;
@Repository
public interface CommonGbChannelMapper {
@Select(value = "select * from wvp_common_gb_channel where common_gb_business_group_id = '#{commonBusinessGroupPath}'")
List<CommonGbChannel> getChannels(String commonBusinessGroupPath);
@Select(value = "select * from wvp_common_gb_channel where common_gb_business_group_id = '#{commonGroupId}'")
List<CommonGbChannel> getChannels(String commonGroupId);
@Update(value = "<script>" +
"<foreach collection='channels' item='item' separator=';'>" +
@ -55,7 +55,7 @@ public interface CommonGbChannelMapper {
" WHERE common_gb_id=#{item.commonGbId}" +
"</foreach>" +
"</script>")
int updateChanelForBusinessGroup(List<CommonGbChannel> channels);
int updateChanelForGroup(List<CommonGbChannel> channels);
@Delete(value = "<script>" +
@ -63,10 +63,7 @@ public interface CommonGbChannelMapper {
"delete from wvp_common_gb_channel WHERE common_gb_id=#{item.commonGbId}" +
"</foreach>" +
"</script>")
int removeChannelsForBusinessGroup(List<CommonGbChannel> channels);
@Update("update wvp_common_gb_channel set common_gb_business_group_id = #{newPath} where common_gb_business_group_id = #{oldPath}")
int updateBusinessGroupPath(String oldPath, String newPath);
int removeChannelsForGroup(List<CommonGbChannel> channels);
@Select("select * from wvp_common_gb_channel where common_gb_device_id=#{channelId}")
CommonGbChannel queryByDeviceID(String channelId);
@ -284,10 +281,11 @@ public interface CommonGbChannelMapper {
int addAll(List<CommonGbChannel> commonGbChannelList);
@Delete("<script> "+
"DELETE from wvp_common_gb_channel WHERE common_gb_id in" +
"<foreach collection='clearChannels' item='item' open='(' separator=',' close=')' > #{item.commonGbChannelId}</foreach>" +
"DELETE from wvp_common_gb_channel WHERE common_gb_device_id in (" +
"<foreach collection='clearChannels' item='item' separator=',' > #{item}</foreach>" +
" )"+
"</script>")
int deleteByDeviceIDs(List<DeviceChannel> clearChannels);
int deleteByDeviceIDs(List<String> clearChannels);
@Update("<script> "+
"UPDATE wvp_common_gb_channel SET commonGbStatus = true WHERE common_gb_id in" +
@ -307,4 +305,16 @@ public interface CommonGbChannelMapper {
"<foreach collection='errorParentIdList' item='item' open='(' separator=',' close=')' > #{item}</foreach>" +
"</script>")
int clearParentIds(List<String> errorParentIdList);
@Update("<script> "+
"UPDATE wvp_common_gb_channel SET common_gb_civilCode = null WHERE common_gb_civilCode in" +
"<foreach collection='errorCivilCodeList' item='item' open='(' separator=',' close=')' > #{item}</foreach>" +
"</script>")
void clearCivilCodes(List<String> errorCivilCodeList);
@Select("<script> "+
"SELECT * FROM wvp_common_gb_channel WHERE common_gb_device_id in" +
"<foreach collection='commonGbChannelList' item='item' open='(' separator=',' close=')' > #{item.commonGbDeviceID}</foreach>" +
"</script>")
List<CommonGbChannel> queryInList(List<CommonGbChannel> commonGbChannelList);
}

View File

@ -44,6 +44,7 @@ public interface DeviceMapper {
"on_line," +
"media_server_id," +
"switch_primary_sub_stream," +
"auto_sync_channel," +
"(SELECT count(0) FROM wvp_device_channel WHERE device_id=wvp_device.device_id) as channel_count "+
" FROM wvp_device WHERE device_id = #{deviceId}")
Device getDeviceByDeviceId(String deviceId);
@ -74,6 +75,7 @@ public interface DeviceMapper {
"subscribe_cycle_for_alarm,"+
"ssrc_check,"+
"as_message_channel,"+
"auto_sync_channel,"+
"geo_coord_sys,"+
"on_line"+
") VALUES (" +
@ -102,6 +104,7 @@ public interface DeviceMapper {
"#{subscribeCycleForAlarm}," +
"#{ssrcCheck}," +
"#{asMessageChannel}," +
"#{autoSyncChannel}," +
"#{geoCoordSys}," +
"#{onLine}" +
")")
@ -160,6 +163,7 @@ public interface DeviceMapper {
"on_line,"+
"media_server_id,"+
"switch_primary_sub_stream switchPrimarySubStream,"+
"auto_sync_channel,"+
"(SELECT count(0) FROM wvp_device_channel WHERE device_id=de.device_id) as channel_count " +
"FROM wvp_device de" +
"<if test=\"onLine != null\"> where on_line=${onLine}</if>"+
@ -197,6 +201,7 @@ public interface DeviceMapper {
"subscribe_cycle_for_alarm,"+
"ssrc_check,"+
"as_message_channel,"+
"auto_sync_channel,"+
"geo_coord_sys,"+
"on_line"+
" FROM wvp_device WHERE on_line = true")
@ -227,6 +232,7 @@ public interface DeviceMapper {
"subscribe_cycle_for_alarm,"+
"ssrc_check,"+
"as_message_channel,"+
"auto_sync_channel,"+
"geo_coord_sys,"+
"on_line"+
" FROM wvp_device WHERE ip = #{host} AND port=#{port}")
@ -250,6 +256,7 @@ public interface DeviceMapper {
"<if test=\"asMessageChannel != null\">, as_message_channel=#{asMessageChannel}</if>" +
"<if test=\"geoCoordSys != null\">, geo_coord_sys=#{geoCoordSys}</if>" +
"<if test=\"switchPrimarySubStream != null\">, switch_primary_sub_stream=#{switchPrimarySubStream}</if>" +
"<if test=\"autoSyncChannel != null\">, auto_sync_channel=#{autoSyncChannel}</if>" +
"<if test=\"mediaServerId != null\">, media_server_id=#{mediaServerId}</if>" +
"WHERE device_id=#{deviceId}"+
" </script>"})
@ -265,6 +272,7 @@ public interface DeviceMapper {
"charset,"+
"ssrc_check,"+
"as_message_channel,"+
"auto_sync_channel,"+
"geo_coord_sys,"+
"on_line,"+
"media_server_id,"+
@ -279,6 +287,7 @@ public interface DeviceMapper {
"#{charset}," +
"#{ssrcCheck}," +
"#{asMessageChannel}," +
"#{autoSyncChannel}," +
"#{geoCoordSys}," +
"#{onLine}," +
"#{mediaServerId}," +

View File

@ -69,15 +69,19 @@ public interface GroupMapper {
"common_group_create_time, " +
"common_group_update_time " +
") values " +
"<foreach collection='allGroup' index='index' item='item' separator=','> " +
"( " +
"<foreach collection='allGroup' index='index' item='item' separator=',' open='(' close=')'> " +
"#{item.commonGroupDeviceId}, " +
"#{item.commonGroupName}, " +
"#{item.commonGroupParentId}, " +
"#{item.commonGroupCreateTime}, " +
"#{item.commonGroupUpdateTime} " +
")" +
"</foreach>" +
"</script>")
int addAll(List<Group> allGroup);
@Select("<script> "+
"SELECT * FROM wvp_common_group WHERE common_group_device_id in" +
"<foreach collection='allGroup' item='item' open='(' separator=',' close=')' > #{item.commonGroupDeviceId}</foreach>" +
"</script>")
List<Group> queryInList(List<Group> allGroup);
}