diff --git a/src/main/java/com/genersoft/iot/vmp/common/BatchLimit.java b/src/main/java/com/genersoft/iot/vmp/common/BatchLimit.java new file mode 100644 index 00000000..8cd7227a --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/common/BatchLimit.java @@ -0,0 +1,5 @@ +package com.genersoft.iot.vmp.common; + +public class BatchLimit { + public static final int count = 50; +} diff --git a/src/main/java/com/genersoft/iot/vmp/common/CommonGbChannel.java b/src/main/java/com/genersoft/iot/vmp/common/CommonGbChannel.java index 3730cc54..4e7b3b29 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/CommonGbChannel.java +++ b/src/main/java/com/genersoft/iot/vmp/common/CommonGbChannel.java @@ -1,12 +1,22 @@ package com.genersoft.iot.vmp.common; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.bean.Gb28181CodeType; +import com.genersoft.iot.vmp.gb28181.utils.SipUtils; +import com.genersoft.iot.vmp.service.bean.CommonGbChannelType; +import com.genersoft.iot.vmp.service.impl.CommonGbChannelServiceImpl; +import com.genersoft.iot.vmp.utils.DateUtil; import io.swagger.v3.oas.annotations.media.Schema; +import org.apache.commons.lang3.math.NumberUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.List; public class CommonGbChannel { + private final static Logger logger = LoggerFactory.getLogger(CommonGbChannel.class); + /** * 国标字段:自增ID @@ -602,7 +612,134 @@ public class CommonGbChannel { } - public static CommonGbChannel getInstance(List syncKeys, DeviceChannel channel){ + public static CommonGbChannel getInstance(List syncKeys, DeviceChannel deviceChannel){ + CommonGbChannel commonGbChannel = new CommonGbChannel(); + commonGbChannel.setCommonGbDeviceID(deviceChannel.getChannelId()); + commonGbChannel.setCommonGbStatus(deviceChannel.isStatus()); + commonGbChannel.setType(CommonGbChannelType.GB28181); + commonGbChannel.setCreateTime(DateUtil.getNow()); + commonGbChannel.setUpdateTime(DateUtil.getNow()); + if (syncKeys == null || syncKeys.contains("commonGbName")) { + commonGbChannel.setCommonGbName(deviceChannel.getName()); + } + if (syncKeys == null || syncKeys.contains("commonGbManufacturer")) { + commonGbChannel.setCommonGbManufacturer(deviceChannel.getManufacture()); + } + if (syncKeys == null || syncKeys.contains("commonGbName")) { + commonGbChannel.setCommonGbModel(deviceChannel.getModel()); + } + if (syncKeys == null || syncKeys.contains("commonGbOwner")) { + commonGbChannel.setCommonGbOwner(deviceChannel.getOwner()); + } + if (syncKeys == null || syncKeys.contains("commonGbCivilCode")) { + if (deviceChannel.getCivilCode() != null) { + Gb28181CodeType channelIdType = SipUtils.getChannelIdType(deviceChannel.getCivilCode()); + if (channelIdType == Gb28181CodeType.CIVIL_CODE_PROVINCE + || channelIdType == Gb28181CodeType.CIVIL_CODE_CITY + || channelIdType == Gb28181CodeType.CIVIL_CODE_COUNTY + || channelIdType == Gb28181CodeType.CIVIL_CODE_GRASS_ROOTS + ) { + commonGbChannel.setCommonGbCivilCode(deviceChannel.getCivilCode()); + } else { + logger.warn("[不规范的CivilCode],deviceId: {}, channel: {}, civilCode: {}", + deviceChannel.getDeviceId(), + deviceChannel.getChannelId(), + deviceChannel.getCivilCode()); + } + commonGbChannel.setCommonGbCivilCode(deviceChannel.getCivilCode()); + } + } + if (syncKeys == null || syncKeys.contains("commonGbBlock")) { + commonGbChannel.setCommonGbBlock(deviceChannel.getBlock()); + } + if (syncKeys == null || syncKeys.contains("commonGbAddress")) { + commonGbChannel.setCommonGbAddress(deviceChannel.getAddress()); + } + if (syncKeys == null || syncKeys.contains("commonGbParental")) { + commonGbChannel.setCommonGbParental(deviceChannel.getParental()); + } + if (syncKeys == null || syncKeys.contains("commonGbParentID")) { + commonGbChannel.setCommonGbParentID(deviceChannel.getParentId()); + } + if (syncKeys == null || syncKeys.contains("commonGbSafetyWay")) { + commonGbChannel.setCommonGbSafetyWay(deviceChannel.getSafetyWay()); + } + if (syncKeys == null || syncKeys.contains("commonGbRegisterWay")) { + commonGbChannel.setCommonGbRegisterWay(deviceChannel.getRegisterWay()); + } + if (syncKeys == null || syncKeys.contains("commonGbCertNum")) { + commonGbChannel.setCommonGbCertNum(deviceChannel.getCertNum()); + } + if (syncKeys == null || syncKeys.contains("commonGbCertifiable")) { + commonGbChannel.setCommonGbCertifiable(deviceChannel.getCertifiable()); + } + if (syncKeys == null || syncKeys.contains("commonGbErrCode")) { + commonGbChannel.setCommonGbErrCode(deviceChannel.getErrCode()); + } + if (syncKeys == null || syncKeys.contains("commonGbEndTime")) { + commonGbChannel.setCommonGbEndTime(deviceChannel.getEndTime()); + } + + if (syncKeys == null || syncKeys.contains("commonGbSecrecy")) { + if (NumberUtils.isParsable(deviceChannel.getSecrecy())) { + commonGbChannel.setCommonGbSecrecy(Integer.parseInt(deviceChannel.getSecrecy())); + } + } + + if (syncKeys == null || syncKeys.contains("commonGbIPAddress")) { + commonGbChannel.setCommonGbIPAddress(deviceChannel.getIpAddress()); + } + + if (syncKeys == null || syncKeys.contains("commonGbPort")) { + commonGbChannel.setCommonGbPort(deviceChannel.getPort()); + } + + if (syncKeys == null || syncKeys.contains("commonGbPassword")) { + commonGbChannel.setCommonGbPassword(deviceChannel.getPassword()); + } + + if (syncKeys == null || syncKeys.contains("commonGbLongitude")) { + commonGbChannel.setCommonGbLongitude(deviceChannel.getLongitude()); + } + + if (syncKeys == null || syncKeys.contains("commonGbLatitude")) { + commonGbChannel.setCommonGbLatitude(deviceChannel.getLatitude()); + } + + if (syncKeys == null || syncKeys.contains("commonGbPtzType")) { + commonGbChannel.setCommonGbPtzType(deviceChannel.getPTZType()); + } + +// if (syncKeys == null || syncKeys.contains("commonGbPositionType")) { +//// commonGbChannel.setCommonGbPositionType(deviceChannel.getCommonGbPositionType()); +// } +// +// if (syncKeys == null || syncKeys.contains("commonGbRoomType")) { +// +// } +// if (syncKeys == null || syncKeys.contains("commonGbUseType")) { +// +// } +// if (syncKeys == null || syncKeys.contains("commonGbSupplyLightType")) { +// +// } +// if (syncKeys == null || syncKeys.contains("commonGbDirectionType")) { +// +// } +// if (syncKeys == null || syncKeys.contains("commonGbResolution")) { +// +// } +// if (syncKeys == null || syncKeys.contains("commonGbBusinessGroupID")) { +// commonGbChannel.setCommonGbBusinessGroupID(deviceChannel.getBusinessGroupId()); +// } +// +// if (syncKeys == null || syncKeys.contains("commonGbDownloadSpeed")) { +// +// } +// if (syncKeys == null || syncKeys.contains("commonGbSVCTimeSupportMode")) { +// +// } + return commonGbChannel; } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/GB28181ResourceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/GB28181ResourceServiceImpl.java index 83317dec..136f85df 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/GB28181ResourceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/GB28181ResourceServiceImpl.java @@ -10,6 +10,7 @@ import com.genersoft.iot.vmp.service.IDeviceChannelService; import com.genersoft.iot.vmp.service.IPlayService; import com.genersoft.iot.vmp.service.IResourcePlayCallback; import com.genersoft.iot.vmp.service.IResourceService; +import com.genersoft.iot.vmp.service.bean.CommonGbChannelType; import com.genersoft.iot.vmp.service.bean.InviteErrorCode; import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper; import com.genersoft.iot.vmp.storager.dao.DeviceMapper; @@ -28,13 +29,11 @@ import java.net.URL; /** * 国标的资源实现类 */ -@Service("28181") +@Service(CommonGbChannelType.GB28181) public class GB28181ResourceServiceImpl implements IResourceService { private final Logger logger = LoggerFactory.getLogger(GB28181ResourceServiceImpl.class); - public static final String resourceType = "28181"; - @Autowired private DeviceMapper deviceMapper; @@ -46,7 +45,7 @@ public class GB28181ResourceServiceImpl implements IResourceService { @Override public boolean deleteChannel(CommonGbChannel commonGbChannel) { - if (!GB28181ResourceServiceImpl.resourceType.equals(commonGbChannel.getType())) { + if (!CommonGbChannelType.GB28181.equals(commonGbChannel.getType())) { logger.warn("[资源类-国标28181] 收到移除通道: {} 时发现类型不为28181", commonGbChannel.getCommonGbId()); return false; } @@ -56,7 +55,7 @@ public class GB28181ResourceServiceImpl implements IResourceService { @Override public void startPlay(CommonGbChannel commonGbChannel, IResourcePlayCallback callback) { assert callback != null; - if (!GB28181ResourceServiceImpl.resourceType.equals(commonGbChannel.getType())) { + if (!CommonGbChannelType.GB28181.equals(commonGbChannel.getType())) { logger.warn("[资源类-国标28181] 收到播放通道: {} 时发现类型不为28181", commonGbChannel.getCommonGbId()); callback.call(commonGbChannel, ErrorCode.ERROR500.getCode(), ErrorCode.ERROR500.getMsg(), null); return; @@ -89,7 +88,7 @@ public class GB28181ResourceServiceImpl implements IResourceService { @Override public void stopPlay(CommonGbChannel commonGbChannel, IResourcePlayCallback callback) { - if (!GB28181ResourceServiceImpl.resourceType.equals(commonGbChannel.getType())) { + if (!CommonGbChannelType.GB28181.equals(commonGbChannel.getType())) { logger.warn("[资源类-国标28181] 收到停止播放通道: {} 时发现类型不为28181", commonGbChannel.getCommonGbId()); if (callback != null) { callback.call(commonGbChannel, ErrorCode.ERROR500.getCode(), ErrorCode.ERROR500.getMsg(), null); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java index 5bc9e334..b8f31d8b 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java @@ -601,6 +601,6 @@ public class DeviceChannel { } public void setCommonGbChannelId(int commonGbChannelId) { - commonGbChannelId = commonGbChannelId; + this.commonGbChannelId = commonGbChannelId; } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java index c195cd84..df20fe2d 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java @@ -5,7 +5,6 @@ import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.SyncStatus; import com.genersoft.iot.vmp.service.IDeviceChannelService; -import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @@ -113,7 +112,7 @@ public class CatalogDataCatch { if ( catalogData.getLastTime().isBefore(instantBefore5S)) { // 超过五秒收不到消息任务超时, 只更新这一部分数据, 收到数据与声明的总数一致,则重置通道数据,数据不全则只对收到的数据做更新操作 if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.runIng)) { - deviceChannelService.resetChannels(catalogData.getDevice(), catalogData.getChannelList()); + deviceChannelService.updateChannels(catalogData.getDevice(), catalogData.getChannelList()); String errorMsg = "更新成功,共" + catalogData.getTotal() + "条,已更新" + catalogData.getChannelList().size() + "条"; catalogData.setErrorMsg(errorMsg); }else if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.ready)) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java index 51b81d9e..2f83f568 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java @@ -140,7 +140,7 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp if (catalogDataCatch.get(take.getDevice().getDeviceId()).size() == sumNum) { // 数据已经完整接收, 此时可能存在某个设备离线变上线的情况,但是考虑到性能,此处不做处理, // 目前支持设备通道上线通知时和设备上线时向上级通知 - boolean resetChannelsResult = deviceChannelService.resetChannels(take.getDevice(), catalogDataCatch.get(take.getDevice().getDeviceId())); + boolean resetChannelsResult = deviceChannelService.updateChannels(take.getDevice(), catalogDataCatch.get(take.getDevice().getDeviceId())); String errorMsg = null; if (!resetChannelsResult) { errorMsg = "接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(take.getDevice().getDeviceId()).size() + "条"; diff --git a/src/main/java/com/genersoft/iot/vmp/service/ICommonGbChannelService.java b/src/main/java/com/genersoft/iot/vmp/service/ICommonGbChannelService.java index a4abf428..8cceb475 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/ICommonGbChannelService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/ICommonGbChannelService.java @@ -77,4 +77,11 @@ public interface ICommonGbChannelService { void startPlay(CommonGbChannel channel, IResourcePlayCallback callback); void stopPlay(CommonGbChannel channel, IResourcePlayCallback callback); + + void batchAdd(List commonGbChannels); + + void batchUpdate(List commonGbChannels); + + void batchDelete(List allCommonChannelsForDelete); + } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java b/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java index 016ce7e5..ad0b6ae3 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java @@ -91,6 +91,6 @@ public interface IDeviceChannelService { /** * 重置通道 */ - boolean resetChannels(Device device, List deviceChannelList); + boolean updateChannels(Device device, List deviceChannelList); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/CommonGbChannelType.java b/src/main/java/com/genersoft/iot/vmp/service/bean/CommonGbChannelType.java index e71be517..34521ffa 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/bean/CommonGbChannelType.java +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/CommonGbChannelType.java @@ -1,6 +1,10 @@ package com.genersoft.iot.vmp.service.bean; +/** + * 资源数据类型 + */ public class CommonGbChannelType { + public final static String GB28181 = "28181"; public final static String PUSH = "push"; public final static String PROXY = "proxy"; diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/CommonGbChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/CommonGbChannelServiceImpl.java index df390ac5..abd09d85 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/CommonGbChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/CommonGbChannelServiceImpl.java @@ -1,5 +1,6 @@ package com.genersoft.iot.vmp.service.impl; +import com.genersoft.iot.vmp.common.BatchLimit; import com.genersoft.iot.vmp.common.CivilCodePo; import com.genersoft.iot.vmp.common.CommonGbChannel; import com.genersoft.iot.vmp.conf.CivilCodeFileConf; @@ -237,14 +238,13 @@ public class CommonGbChannelServiceImpl implements ICommonGbChannelService { // ====开始写入数据==== // 清理重复数据 TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); - int limit = 50; if (!clearChannels.isEmpty()) { - if (clearChannels.size() <= limit) { + if (clearChannels.size() <= BatchLimit.count) { commonGbChannelMapper.deleteByDeviceIDs(clearChannels); } else { - for (int i = 0; i < clearChannels.size(); i += limit) { - int toIndex = i + limit; - if (i + limit > clearChannels.size()) { + for (int i = 0; i < clearChannels.size(); i += BatchLimit.count) { + int toIndex = i + BatchLimit.count; + if (i + BatchLimit.count > clearChannels.size()) { toIndex = clearChannels.size(); } List clearChannelsSun = clearChannels.subList(i, toIndex); @@ -259,12 +259,12 @@ public class CommonGbChannelServiceImpl implements ICommonGbChannelService { // 写入通道数据 boolean result; if (!commonGbChannelList.isEmpty()) { - if (commonGbChannelList.size() <= limit) { + if (commonGbChannelList.size() <= BatchLimit.count) { result = commonGbChannelMapper.addAll(commonGbChannelList) > 0; } else { - for (int i = 0; i < commonGbChannelList.size(); i += limit) { - int toIndex = i + limit; - if (i + limit > commonGbChannelList.size()) { + for (int i = 0; i < commonGbChannelList.size(); i += BatchLimit.count) { + int toIndex = i + BatchLimit.count; + if (i + BatchLimit.count > commonGbChannelList.size()) { toIndex = commonGbChannelList.size(); } List commonGbChannelListSub = commonGbChannelList.subList(i, toIndex); @@ -298,16 +298,16 @@ public class CommonGbChannelServiceImpl implements ICommonGbChannelService { }); } if (!allGroup.isEmpty()) { - if (allGroup.size() <= limit) { + if (allGroup.size() <= BatchLimit.count) { if (groupMapper.addAll(allGroup) <= 0) { dataSourceTransactionManager.rollback(transactionStatus); logger.info("[同步通用通道]来自国标设备,失败,添加分组信息失败, 国标编号: {}", gbDeviceId); return false; } } else { - for (int i = 0; i < allGroup.size(); i += limit) { - int toIndex = i + limit; - if (i + limit > allGroup.size()) { + for (int i = 0; i < allGroup.size(); i += BatchLimit.count) { + int toIndex = i + BatchLimit.count; + if (i + BatchLimit.count > allGroup.size()) { toIndex = allGroup.size(); } List allGroupSub = allGroup.subList(i, toIndex); @@ -341,16 +341,16 @@ public class CommonGbChannelServiceImpl implements ICommonGbChannelService { }); } if (!allRegion.isEmpty()) { - if (allRegion.size() <= limit) { + if (allRegion.size() <= BatchLimit.count) { if (regionMapper.addAll(allRegion) <= 0) { dataSourceTransactionManager.rollback(transactionStatus); logger.info("[同步通用通道]来自国标设备,失败,添加行政区划信息失败, 国标编号: {}", gbDeviceId); return false; } } else { - for (int i = 0; i < allRegion.size(); i += limit) { - int toIndex = i + limit; - if (i + limit > allRegion.size()) { + for (int i = 0; i < allRegion.size(); i += BatchLimit.count) { + int toIndex = i + BatchLimit.count; + if (i + BatchLimit.count > allRegion.size()) { toIndex = allRegion.size(); } List allRegionSub = allRegion.subList(i, toIndex); @@ -366,8 +366,6 @@ public class CommonGbChannelServiceImpl implements ICommonGbChannelService { if (!regionInForUpdate.isEmpty()) { regionMapper.updateAllForName(regionInForUpdate); } - - } dataSourceTransactionManager.commit(transactionStatus); return result; @@ -716,5 +714,71 @@ public class CommonGbChannelServiceImpl implements ICommonGbChannelService { resourceService.stopPlay(channel,callback); } + @Override + public void batchAdd(List commonGbChannels) { + if (commonGbChannels.isEmpty()) { + return; + } + if (commonGbChannels.size() > BatchLimit.count) { + for (int i = 0; i < commonGbChannels.size(); i += BatchLimit.count) { + int toIndex = i + BatchLimit.count; + if (i + BatchLimit.count > commonGbChannels.size()) { + toIndex = commonGbChannels.size(); + } + if (commonGbChannelMapper.batchAdd(commonGbChannels.subList(i, toIndex)) < 0) { + throw new RuntimeException("batch add commonGbChannel fail"); + } + } + }else { + if (commonGbChannelMapper.batchAdd(commonGbChannels) < 0) { + throw new RuntimeException("batch add commonGbChannel fail"); + } + } + } + @Override + public void batchUpdate(List commonGbChannels) { + if (commonGbChannels.isEmpty()) { + return; + } + if (commonGbChannels.size() > BatchLimit.count) { + for (int i = 0; i < commonGbChannels.size(); i += BatchLimit.count) { + int toIndex = i + BatchLimit.count; + if (i + BatchLimit.count > commonGbChannels.size()) { + toIndex = commonGbChannels.size(); + } + if (commonGbChannelMapper.batchUpdate(commonGbChannels.subList(i, toIndex)) < 0) { + throw new RuntimeException("batch update commonGbChannel fail"); + } + } + }else { + if (commonGbChannelMapper.batchUpdate(commonGbChannels) < 0) { + throw new RuntimeException("batch update commonGbChannel fail"); + } + } + // TODO 向国标级联发送catalog + } + + @Override + public void batchDelete(List channelsForDelete) { + if (channelsForDelete.isEmpty()) { + return; + } + if (channelsForDelete.size() > BatchLimit.count) { + for (int i = 0; i < channelsForDelete.size(); i += BatchLimit.count) { + int toIndex = i + BatchLimit.count; + if (i + BatchLimit.count > channelsForDelete.size()) { + toIndex = channelsForDelete.size(); + } + if (commonGbChannelMapper.batchDelete(channelsForDelete.subList(i, toIndex)) < 0) { + throw new RuntimeException("batch update commonGbChannel fail"); + } + } + }else { + if (commonGbChannelMapper.batchDelete(channelsForDelete) < 0) { + throw new RuntimeException("batch update commonGbChannel fail"); + } + } + // TODO 向国标级联发送catalog + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java index 5eb64f2f..e7960b1d 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java @@ -1,5 +1,7 @@ package com.genersoft.iot.vmp.service.impl; +import com.genersoft.iot.vmp.common.BatchLimit; +import com.genersoft.iot.vmp.common.CommonGbChannel; import com.genersoft.iot.vmp.common.InviteInfo; import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.gb28181.bean.Device; @@ -38,18 +40,12 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { private final static Logger logger = LoggerFactory.getLogger(DeviceChannelServiceImpl.class); - @Autowired - private IRedisCatchStorage redisCatchStorage; - @Autowired private IInviteStreamService inviteStreamService; @Autowired private DeviceChannelMapper channelMapper; - @Autowired - private PlatformChannelMapper platformChannelMapper; - @Autowired private DeviceMapper deviceMapper; @@ -157,12 +153,11 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { } } } - int limitCount = 50; - if (addChannels.size() > 0) { - if (addChannels.size() > limitCount) { - for (int i = 0; i < addChannels.size(); i += limitCount) { - int toIndex = i + limitCount; - if (i + limitCount > addChannels.size()) { + if (!addChannels.isEmpty()) { + if (addChannels.size() > BatchLimit.count) { + for (int i = 0; i < addChannels.size(); i += BatchLimit.count) { + int toIndex = i + BatchLimit.count; + if (i + BatchLimit.count > addChannels.size()) { toIndex = addChannels.size(); } channelMapper.batchAdd(addChannels.subList(i, toIndex)); @@ -171,11 +166,11 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { channelMapper.batchAdd(addChannels); } } - if (updateChannels.size() > 0) { - if (updateChannels.size() > limitCount) { - for (int i = 0; i < updateChannels.size(); i += limitCount) { - int toIndex = i + limitCount; - if (i + limitCount > updateChannels.size()) { + if (!updateChannels.isEmpty()) { + if (updateChannels.size() > BatchLimit.count) { + for (int i = 0; i < updateChannels.size(); i += BatchLimit.count) { + int toIndex = i + BatchLimit.count; + if (i + BatchLimit.count > updateChannels.size()) { toIndex = updateChannels.size(); } channelMapper.batchUpdate(updateChannels.subList(i, toIndex)); @@ -215,11 +210,10 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { deviceChannel.setUpdateTime(now); result.add(updateGps(deviceChannel, device)); }); - int limitCount = 50; - if (result.size() > limitCount) { - for (int i = 0; i < result.size(); i += limitCount) { - int toIndex = i + limitCount; - if (i + limitCount > result.size()) { + if (result.size() > BatchLimit.count) { + for (int i = 0; i < result.size(); i += BatchLimit.count) { + int toIndex = i + BatchLimit.count; + if (i + BatchLimit.count > result.size()) { toIndex = result.size(); } channelMapper.batchUpdate(result.subList(i, toIndex)); @@ -279,7 +273,7 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { @Override @Transactional - public boolean resetChannels(Device device, List deviceChannelList) { + public boolean updateChannels(Device device, List deviceChannelList) { if (CollectionUtils.isEmpty(deviceChannelList)) { return false; } @@ -290,9 +284,10 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { // 存储需要更新的数据 List updateChannelsForInfo = new ArrayList<>(); - List updateChannelsForStatus = new ArrayList<>(); + List updateCommonChannelsForInfo = new ArrayList<>(); // 存储需要需要新增的数据库 List addChannels = new ArrayList<>(); + List addCommonChannels = new ArrayList<>(); StringBuilder stringBuilder = new StringBuilder(); Map subContMap = new HashMap<>(); @@ -307,28 +302,22 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { } gbIdSet.add(deviceChannel.getChannelId()); if (allChannelMap.containsKey(deviceChannel.getChannelId())) { - deviceChannel.setStreamId(allChannelMap.get(deviceChannel.getChannelId()).getStreamId()); - deviceChannel.setHasAudio(allChannelMap.get(deviceChannel.getChannelId()).isHasAudio()); - deviceChannel.setCommonGbChannelId(allChannelMap.get(deviceChannel.getChannelId()).getCommonGbChannelId()); - deviceChannel.setCommonGbChannelId(allChannelMap.get(deviceChannel.getChannelId()).getCommonGbChannelId()); + DeviceChannel channelInDb = allChannelMap.get(deviceChannel.getChannelId()); + deviceChannel.setId(channelInDb.getId()); + deviceChannel.setStreamId(channelInDb.getStreamId()); + deviceChannel.setHasAudio(channelInDb.isHasAudio()); + deviceChannel.setCommonGbChannelId(channelInDb.getCommonGbChannelId()); deviceChannel.setUpdateTime(DateUtil.getNow()); // 同步时发现状态变化 - if (allChannelMap.get(deviceChannel.getChannelId()).isStatus() !=deviceChannel.isStatus()){ - // TODO 应该通知给commonChannel - updateChannelsForStatus.add(deviceChannel); -// List strings = platformChannelMapper.queryParentPlatformByChannelId(deviceChannel.getChannelId()); -// if (!CollectionUtils.isEmpty(strings)){ -// strings.forEach(platformId->{ -// eventPublisher.catalogEventPublish(platformId, deviceChannel, deviceChannel.isStatus()? CatalogEvent.ON:CatalogEvent.OFF); -// }); -// } - }else { - updateChannelsForInfo.add(deviceChannel); - } + updateChannelsForInfo.add(deviceChannel); + updateCommonChannelsForInfo.add(CommonGbChannel.getInstance(null, deviceChannel)); + // 将需要更新的移除,剩下的都是需要删除的了 + allChannelMap.remove(deviceChannel.getChannelId()); }else { deviceChannel.setCreateTime(DateUtil.getNow()); deviceChannel.setUpdateTime(DateUtil.getNow()); addChannels.add(deviceChannel); + addCommonChannels.add(CommonGbChannel.getInstance(null, deviceChannel)); } channels.add(deviceChannel); if (!ObjectUtils.isEmpty(deviceChannel.getParentId())) { @@ -360,54 +349,59 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { return false; } try { - int limitCount = 50; - int cleanChannelsResult = 0; - if (channels.size() > limitCount) { - for (int i = 0; i < channels.size(); i += limitCount) { - int toIndex = i + limitCount; - if (i + limitCount > channels.size()) { - toIndex = channels.size(); - } - cleanChannelsResult += channelMapper.cleanChannelsNotInList(device.getDeviceId(), channels.subList(i, toIndex)); - } - } else { - cleanChannelsResult = channelMapper.cleanChannelsNotInList(device.getDeviceId(), channels); - } - boolean result = cleanChannelsResult < 0; - if (!result) { - if (!addChannels.isEmpty()) { - if (addChannels.size() > limitCount) { - for (int i = 0; i < addChannels.size(); i += limitCount) { - int toIndex = i + limitCount; - if (i + limitCount > addChannels.size()) { - toIndex = addChannels.size(); - } - result = result || channelMapper.batchAdd(addChannels.subList(i, toIndex)) < 0; + // 此时allChannelMap剩余的就是需要移除的 + if (!allChannelMap.isEmpty()) { + if (allChannelMap.size() > BatchLimit.count) { + for (int i = 0; i < allChannelMap.size(); i += BatchLimit.count) { + int toIndex = i + BatchLimit.count; + if (i + BatchLimit.count > allChannelMap.size()) { + toIndex = allChannelMap.size(); } - }else { - result = channelMapper.batchAdd(addChannels) < 0; + channelMapper.cleanChannelsInList(device.getDeviceId(), + new ArrayList<>(allChannelMap.values()).subList(i, toIndex)); } + } else { + channelMapper.cleanChannelsInList(device.getDeviceId(), new ArrayList<>(allChannelMap.values())); } - if (!updateChannelsForInfo.isEmpty()) { - if (updateChannelsForInfo.size() > limitCount) { - for (int i = 0; i < updateChannelsForInfo.size(); i += limitCount) { - int toIndex = i + limitCount; - if (i + limitCount > updateChannelsForInfo.size()) { - toIndex = updateChannelsForInfo.size(); - } - result = result || channelMapper.batchUpdate(updateChannelsForInfo.subList(i, toIndex)) < 0; + List allCommonChannelsForDelete = new ArrayList<>(); + allChannelMap.values().stream().forEach((deviceChannel) -> { + allCommonChannelsForDelete.add(deviceChannel.getCommonGbChannelId()); + }); + // 通知通用通道批量移除 + commonGbChannelService.batchDelete(allCommonChannelsForDelete); + } + // addChannels 与 addCommonChannels 数量一致,这里使用同一个循环处理 + if (!addChannels.isEmpty()) { + // 对于新增的部分需要先添加通用通道,拿到ID后再添加国标通道 + commonGbChannelService.batchAdd(addCommonChannels); + for (int j = 0; j < addCommonChannels.size(); j++) { + addChannels.get(j).setCommonGbChannelId(addCommonChannels.get(j).getCommonGbId()); + } + if (addChannels.size() > BatchLimit.count) { + for (int i = 0; i < addChannels.size(); i += BatchLimit.count) { + int toIndex = i + BatchLimit.count; + if (i + BatchLimit.count > addChannels.size()) { + toIndex = addChannels.size(); } - }else { - result = result || channelMapper.batchUpdate(updateChannelsForInfo) < 0; + channelMapper.batchAdd(addChannels.subList(i, toIndex)); } + }else { + channelMapper.batchAdd(addChannels); } } - if (result) { - //事务回滚 - TransactionAspectSupport.currentTransactionStatus().setRollbackOnly(); - } - if (device.isAutoSyncChannel()) { - commonGbChannelService.syncChannelFromGb28181Device(device.getDeviceId(), null, true, true); + if (!updateChannelsForInfo.isEmpty()) { + if (updateChannelsForInfo.size() > BatchLimit.count) { + for (int i = 0; i < updateChannelsForInfo.size(); i += BatchLimit.count) { + int toIndex = i + BatchLimit.count; + if (i + BatchLimit.count > updateChannelsForInfo.size()) { + toIndex = updateChannelsForInfo.size(); + } + channelMapper.batchUpdate(updateChannelsForInfo.subList(i, toIndex)); + } + }else { + channelMapper.batchUpdate(updateChannelsForInfo); + } + commonGbChannelService.batchUpdate(updateCommonChannelsForInfo); } return true; }catch (Exception e) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/GroupServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/GroupServiceImpl.java index 03848c75..e9a3a4c5 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/GroupServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/GroupServiceImpl.java @@ -1,5 +1,6 @@ package com.genersoft.iot.vmp.service.impl; +import com.genersoft.iot.vmp.common.BatchLimit; import com.genersoft.iot.vmp.common.CommonGbChannel; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.bean.Gb28181CodeType; @@ -109,12 +110,11 @@ public class GroupServiceImpl implements IGroupService { groupList = queryAllChildGroup(groupParentList, group.getCommonGroupTopId(), groupParentList); } - int limitCount = 50; if (!groupList.isEmpty()) { - if (groupList.size() > limitCount) { - for (int i = 0; i < groupList.size(); i += limitCount) { - int toIndex = i + limitCount; - if (i + limitCount > groupList.size()) { + if (groupList.size() > BatchLimit.count) { + for (int i = 0; i < groupList.size(); i += BatchLimit.count) { + int toIndex = i + BatchLimit.count; + if (i + BatchLimit.count > groupList.size()) { toIndex = groupList.size(); } List subList = groupList.subList(i, toIndex); @@ -200,17 +200,16 @@ public class GroupServiceImpl implements IGroupService { channel.setCommonGbBusinessGroupID(group.getCommonGroupTopId()); channel.setCommonGbParentID(group.getCommonGroupDeviceId()); } - int limit = 50; - if (channels.size() <= limit) { + if (channels.size() <= com.genersoft.iot.vmp.common.BatchLimit.count) { if (commonGbChannelMapper.updateChanelForGroup(channels) <= 0) { logger.info("[添加通道到分组] 失败"); return false; } } else { TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); - for (int i = 0; i < channels.size(); i += limit) { - int toIndex = i + limit; - if (i + limit > channels.size()) { + for (int i = 0; i < channels.size(); i += BatchLimit.count) { + int toIndex = i + BatchLimit.count; + if (i + BatchLimit.count > channels.size()) { toIndex = channels.size(); } List channelsSub = channels.subList(i, toIndex); @@ -227,17 +226,16 @@ public class GroupServiceImpl implements IGroupService { @Override public boolean removeChannelsFromGroup(List channels) { - int limit = 50; - if (channels.size() <= limit) { + if (channels.size() <= BatchLimit.count) { if (commonGbChannelMapper.removeChannelsForGroup(channels) <= 0) { logger.info("[从分组移除通道] 失败"); return false; } } else { TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); - for (int i = 0; i < channels.size(); i += limit) { - int toIndex = i + limit; - if (i + limit > channels.size()) { + for (int i = 0; i < channels.size(); i += BatchLimit.count) { + int toIndex = i + BatchLimit.count; + if (i + BatchLimit.count > channels.size()) { toIndex = channels.size(); } List channelsSub = channels.subList(i, toIndex); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformChannelServiceImpl.java index 7ede0921..3a302c0f 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformChannelServiceImpl.java @@ -1,5 +1,6 @@ package com.genersoft.iot.vmp.service.impl; +import com.genersoft.iot.vmp.common.BatchLimit; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; @@ -82,12 +83,11 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService { int allCount = 0; boolean result = false; TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); - int limitCount = 50; - if (channelReducesToAdd.size() > 0) { - if (channelReducesToAdd.size() > limitCount) { - for (int i = 0; i < channelReducesToAdd.size(); i += limitCount) { - int toIndex = i + limitCount; - if (i + limitCount > channelReducesToAdd.size()) { + if (!channelReducesToAdd.isEmpty()) { + if (channelReducesToAdd.size() > BatchLimit.count) { + for (int i = 0; i < channelReducesToAdd.size(); i += BatchLimit.count) { + int toIndex = i + BatchLimit.count; + if (i + BatchLimit.count > channelReducesToAdd.size()) { toIndex = channelReducesToAdd.size(); } int count = platformChannelMapper.addChannels(platformId, channelReducesToAdd.subList(i, toIndex)); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RegionServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RegionServiceImpl.java index 45100ae3..5a306988 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/RegionServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RegionServiceImpl.java @@ -1,5 +1,6 @@ package com.genersoft.iot.vmp.service.impl; +import com.genersoft.iot.vmp.common.BatchLimit; import com.genersoft.iot.vmp.common.CivilCodePo; import com.genersoft.iot.vmp.conf.CivilCodeFileConf; import com.genersoft.iot.vmp.conf.exception.ControllerException; @@ -57,11 +58,10 @@ public class RegionServiceImpl implements IRegionService { // 查询所有从属的地区,从属地区的编号一定是父节点编号开头的,基于这个获取所有的子节点 List regionList = regionMapper.queryAllChildByDeviceId(regionDeviceId); - int limitCount = 50; - if (regionList.size() > limitCount) { - for (int i = 0; i < regionList.size(); i += limitCount) { - int toIndex = i + limitCount; - if (i + limitCount > regionList.size()) { + if (regionList.size() > BatchLimit.count) { + for (int i = 0; i < regionList.size(); i += BatchLimit.count) { + int toIndex = i + BatchLimit.count; + if (i + BatchLimit.count > regionList.size()) { toIndex = regionList.size(); } List subList = regionList.subList(i, toIndex); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/CommonGbChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/CommonGbChannelMapper.java index 3dc137c7..21682605 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/CommonGbChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/CommonGbChannelMapper.java @@ -436,4 +436,138 @@ public interface CommonGbChannelMapper { "" + ""}) void updateChannelToRegion(@Param("param") UpdateCommonChannelToRegion param); + + @Insert("") + int batchAdd(@Param("commonGbChannels") List commonGbChannels); + + @Update({""}) + int batchUpdate(@Param("commonGbChannels") List commonGbChannels); + + + @Delete(value = {" "}) + int batchDelete(@Param("ids") List ids); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java index 05243b30..dc1358b3 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java @@ -203,7 +203,8 @@ public interface DeviceChannelMapper { "(channel_id, device_id, name, manufacture, model, owner, civil_code, block, sub_count, " + " address, parental, parent_id, safety_way, register_way, cert_num, certifiable, err_code, secrecy, " + " ip_address,port,password,ptz_type,status,stream_id,longitude,latitude,longitude_gcj02,latitude_gcj02,"+ - " longitude_wgs84,latitude_wgs84,has_audio,create_time,update_time,business_group_id,gps_time)"+ + " longitude_wgs84,latitude_wgs84,has_audio,create_time,update_time,business_group_id,gps_time," + + " common_gb_channel_id)"+ "values " + " " + "(#{item.channelId}, #{item.deviceId}, #{item.name}, #{item.manufacture}, #{item.model}, " + @@ -213,7 +214,7 @@ public interface DeviceChannelMapper { "#{item.ipAddress}, #{item.port}, #{item.password}, #{item.PTZType}, #{item.status}, " + "#{item.streamId}, #{item.longitude}, #{item.latitude},#{item.longitudeGcj02}, " + "#{item.latitudeGcj02},#{item.longitudeWgs84}, #{item.latitudeWgs84}, #{item.hasAudio}, now(), now(), " + - "#{item.businessGroupId}, #{item.gpsTime}) " + + "#{item.businessGroupId}, #{item.gpsTime}, #{item.commonGbChannelId}) " + " " + "") int batchAdd(@Param("addChannels") List addChannels); @@ -224,7 +225,7 @@ public interface DeviceChannelMapper { "(channel_id,device_id,name,manufacture,model,owner,civil_code,block,sub_count,"+ " address,parental,parent_id,safety_way,register_way,cert_num,certifiable,err_code,secrecy,"+ " ip_address,port,password,ptz_type,status,stream_id,longitude,latitude,longitude_gcj02,latitude_gcj02,"+ - " longitude_wgs84,latitude_wgs84,has_audio,create_time,update_time,business_group_id,gps_time)"+ + " longitude_wgs84,latitude_wgs84,has_audio,create_time,update_time,business_group_id,gps_time,common_gb_channel_id)"+ "values " + " " + "(#{item.channelId}, #{item.deviceId}, #{item.name}, #{item.manufacture}, #{item.model}, " + @@ -234,7 +235,7 @@ public interface DeviceChannelMapper { "#{item.ipAddress}, #{item.port}, #{item.password}, #{item.PTZType}, #{item.status}, " + "#{item.streamId}, #{item.longitude}, #{item.latitude},#{item.longitudeGcj02}, " + "#{item.latitudeGcj02},#{item.longitudeWgs84}, #{item.latitudeWgs84}, #{item.hasAudio}, now(), now(), " + - "#{item.businessGroupId}, #{item.gpsTime}) " + + "#{item.businessGroupId}, #{item.gpsTime}, #{item.commonGbChannelId}) " + " " + "ON DUPLICATE KEY UPDATE " + "update_time=VALUES(update_time), " + @@ -268,7 +269,8 @@ public interface DeviceChannelMapper { "latitude_wgs84=VALUES(latitude_wgs84), " + "has_audio=VALUES(has_audio), " + "business_group_id=VALUES(business_group_id), " + - "gps_time=VALUES(gps_time)" + + "gps_time=VALUES(gps_time)," + + "common_gb_channel_id=VALUES(commonGbChannelId)" + "") int batchAddOrUpdate(List addChannels); @@ -311,6 +313,7 @@ public interface DeviceChannelMapper { ", latitude_wgs84=#{item.latitudeWgs84}" + ", business_group_id=#{item.businessGroupId}" + ", gps_time=#{item.gpsTime}" + + ", common_gb_channel_id=#{item.commonGbChannelId}" + "WHERE id=#{item.id}" + "WHERE device_id=#{item.deviceId} AND channel_id=#{item.channelId}" + "" + @@ -327,10 +330,10 @@ public interface DeviceChannelMapper { "wvp_device_channel " + "WHERE " + "device_id = #{deviceId} " + - " AND channel_id NOT IN " + - " #{item.channelId}" + + " AND id IN " + + " #{item.id}" + " "}) - int cleanChannelsNotInList(@Param("deviceId") String deviceId, @Param("channels") List channels); + int cleanChannelsInList(@Param("deviceId") String deviceId, @Param("channels") List channels); @Update(" update wvp_device_channel" + " set sub_count = (select *" +