From fee8d2f8cda5abf80d3680f6b219aa55c2ac894b Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Sat, 26 Oct 2024 23:15:18 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96Catalog=E6=9F=A5=E8=AF=A2?= =?UTF-8?q?=E5=86=85=E5=AD=98=E5=8D=A0=E7=94=A8=E9=AB=98=E7=9A=84=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/gb28181/bean/CatalogData.java | 14 +- .../vmp/gb28181/dao/DeviceChannelMapper.java | 2 + .../impl/DeviceChannelServiceImpl.java | 9 +- .../vmp/gb28181/session/CatalogDataCatch.java | 181 ------------ .../gb28181/session/CatalogDataManager.java | 260 ++++++++++++++++++ .../cmd/CatalogResponseMessageHandler.java | 36 +-- 数据库/2.7.3/初始化-mysql-2.7.3.sql | 3 +- 7 files changed, 285 insertions(+), 220 deletions(-) delete mode 100755 src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java create mode 100755 src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataManager.java diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CatalogData.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CatalogData.java index 4666b707..2c37a1de 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CatalogData.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CatalogData.java @@ -1,9 +1,14 @@ package com.genersoft.iot.vmp.gb28181.bean; import lombok.Data; +import org.jetbrains.annotations.NotNull; import java.time.Instant; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; /** * @author lin @@ -15,15 +20,16 @@ public class CatalogData { */ private int sn; private int total; - private List channelList; - private List regionListList; - private List groupListListList; - private Instant lastTime; + private Instant time; private Device device; private String errorMsg; + private Set redisKeysForChannel = new HashSet<>(); + private Set redisKeysForRegion = new HashSet<>(); + private Set redisKeysForGroup = new HashSet<>(); public enum CatalogDataStatus{ ready, runIng, end } private CatalogDataStatus status; + } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceChannelMapper.java index 37b99687..ea4b9717 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceChannelMapper.java @@ -5,11 +5,13 @@ import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.controller.bean.ChannelReduce; import com.genersoft.iot.vmp.gb28181.dao.provider.DeviceChannelProvider; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; +import com.genersoft.iot.vmp.streamPush.bean.StreamPush; import com.genersoft.iot.vmp.web.gb28181.dto.DeviceChannelExtend; import org.apache.ibatis.annotations.*; import org.springframework.stereotype.Repository; import java.util.List; +import java.util.Map; /** * 用于存储设备通道信息 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java index d37edafb..71dd1323 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java @@ -471,7 +471,7 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { return false; } List allChannels = channelMapper.queryAllChannelsForRefresh(deviceDbId); - Map allChannelMap = new ConcurrentHashMap<>(); + Map allChannelMap = new HashMap<>(); if (!allChannels.isEmpty()) { for (DeviceChannel deviceChannel : allChannels) { allChannelMap.put(deviceChannel.getDeviceDbId() + deviceChannel.getDeviceId(), deviceChannel); @@ -486,14 +486,7 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { StringBuilder stringBuilder = new StringBuilder(); Map subContMap = new HashMap<>(); - // 数据去重 - Set gbIdSet = new HashSet<>(); for (DeviceChannel deviceChannel : deviceChannelList) { - if (gbIdSet.contains(deviceChannel.getDeviceDbId() + deviceChannel.getDeviceId())) { - stringBuilder.append(deviceChannel.getDeviceId()).append(","); - continue; - } - gbIdSet.add(deviceChannel.getDeviceDbId() + deviceChannel.getDeviceId()); DeviceChannel channelInDb = allChannelMap.get(deviceChannel.getDeviceDbId() + deviceChannel.getDeviceId()); if (channelInDb != null) { deviceChannel.setStreamId(channelInDb.getStreamId()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java deleted file mode 100755 index 34a8eceb..00000000 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java +++ /dev/null @@ -1,181 +0,0 @@ -package com.genersoft.iot.vmp.gb28181.session; - -import com.genersoft.iot.vmp.gb28181.bean.*; -import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; - -import java.time.Instant; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; - -@Component -public class CatalogDataCatch { - - public static Map data = new ConcurrentHashMap<>(); - - @Autowired - private IDeviceChannelService deviceChannelService; - - public void addReady(Device device, int sn ) { - CatalogData catalogData = data.get(device.getDeviceId()); - if (catalogData == null || catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end)) { - catalogData = new CatalogData(); - catalogData.setChannelList(Collections.synchronizedList(new ArrayList<>())); - catalogData.setDevice(device); - catalogData.setSn(sn); - catalogData.setStatus(CatalogData.CatalogDataStatus.ready); - catalogData.setLastTime(Instant.now()); - data.put(device.getDeviceId(), catalogData); - } - } - - public void put(String deviceId, int sn, int total, Device device, List deviceChannelList, - List regionList, List groupList) { - CatalogData catalogData = data.get(deviceId); - if (catalogData == null) { - catalogData = new CatalogData(); - catalogData.setSn(sn); - catalogData.setTotal(total); - catalogData.setDevice(device); - catalogData.setChannelList(deviceChannelList); - catalogData.setRegionListList(regionList); - catalogData.setGroupListListList(groupList); - catalogData.setStatus(CatalogData.CatalogDataStatus.runIng); - catalogData.setLastTime(Instant.now()); - data.put(deviceId, catalogData); - }else { - // 同一个设备的通道同步请求只考虑一个,其他的直接忽略 - if (catalogData.getSn() != sn) { - return; - } - catalogData.setTotal(total); - catalogData.setDevice(device); - catalogData.setStatus(CatalogData.CatalogDataStatus.runIng); - - if (deviceChannelList != null && !deviceChannelList.isEmpty()) { - if (catalogData.getChannelList() != null) { - catalogData.getChannelList().addAll(deviceChannelList); - } - } - if (regionList != null && !regionList.isEmpty()) { - if (catalogData.getRegionListList() != null) { - catalogData.getRegionListList().addAll(regionList); - }else { - catalogData.setRegionListList(regionList); - } - } - if (groupList != null && !groupList.isEmpty()) { - if (catalogData.getGroupListListList() != null) { - catalogData.getGroupListListList().addAll(groupList); - }else { - catalogData.setGroupListListList(groupList); - } - } - catalogData.setLastTime(Instant.now()); - } - } - - public List getDeviceChannelList(String deviceId) { - CatalogData catalogData = data.get(deviceId); - if (catalogData == null) { - return null; - } - return catalogData.getChannelList(); - } - - public List getRegionList(String deviceId) { - CatalogData catalogData = data.get(deviceId); - if (catalogData == null) { - return null; - } - return catalogData.getRegionListList(); - } - - public List getGroupList(String deviceId) { - CatalogData catalogData = data.get(deviceId); - if (catalogData == null) { - return null; - } - return catalogData.getGroupListListList(); - } - - public int getTotal(String deviceId) { - CatalogData catalogData = data.get(deviceId); - if (catalogData == null) { - return 0; - } - return catalogData.getTotal(); - } - - public SyncStatus getSyncStatus(String deviceId) { - CatalogData catalogData = data.get(deviceId); - if (catalogData == null) { - return null; - } - SyncStatus syncStatus = new SyncStatus(); - syncStatus.setCurrent(catalogData.getChannelList().size()); - syncStatus.setTotal(catalogData.getTotal()); - syncStatus.setErrorMsg(catalogData.getErrorMsg()); - if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end)) { - syncStatus.setSyncIng(false); - }else { - syncStatus.setSyncIng(true); - } - return syncStatus; - } - - public boolean isSyncRunning(String deviceId) { - CatalogData catalogData = data.get(deviceId); - if (catalogData == null) { - return false; - } - return !catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end); - } - - @Scheduled(fixedDelay = 5 * 1000) //每5秒执行一次, 发现数据5秒未更新则移除数据并认为数据接收超时 - private void timerTask(){ - Set keys = data.keySet(); - - Instant instantBefore5S = Instant.now().minusMillis(TimeUnit.SECONDS.toMillis(5)); - Instant instantBefore30S = Instant.now().minusMillis(TimeUnit.SECONDS.toMillis(30)); - - for (String deviceId : keys) { - CatalogData catalogData = data.get(deviceId); - if ( catalogData.getLastTime().isBefore(instantBefore5S)) { - // 超过五秒收不到消息任务超时, 只更新这一部分数据, 收到数据与声明的总数一致,则重置通道数据,数据不全则只对收到的数据做更新操作 - if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.runIng)) { - if (catalogData.getTotal() == catalogData.getChannelList().size()) { - deviceChannelService.resetChannels(catalogData.getDevice().getId(), catalogData.getChannelList()); - }else { - deviceChannelService.updateChannels(catalogData.getDevice(), catalogData.getChannelList()); - } - String errorMsg = "更新成功,共" + catalogData.getTotal() + "条,已更新" + catalogData.getChannelList().size() + "条"; - catalogData.setErrorMsg(errorMsg); - if (catalogData.getTotal() != catalogData.getChannelList().size()) { - - } - }else if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.ready)) { - String errorMsg = "同步失败,等待回复超时"; - catalogData.setErrorMsg(errorMsg); - } - catalogData.setStatus(CatalogData.CatalogDataStatus.end); - } - if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end) && catalogData.getLastTime().isBefore(instantBefore30S)) { // 超过三十秒,如果标记为end则删除 - data.remove(deviceId); - } - } - } - - - public void setChannelSyncEnd(String deviceId, String errorMsg) { - CatalogData catalogData = data.get(deviceId); - if (catalogData == null) { - return; - } - catalogData.setStatus(CatalogData.CatalogDataStatus.end); - catalogData.setErrorMsg(errorMsg); - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataManager.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataManager.java new file mode 100755 index 00000000..ac5119e3 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataManager.java @@ -0,0 +1,260 @@ +package com.genersoft.iot.vmp.gb28181.session; + +import com.genersoft.iot.vmp.common.InviteInfo; +import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService; +import com.genersoft.iot.vmp.gb28181.service.IGroupService; +import com.genersoft.iot.vmp.gb28181.service.IRegionService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.CommandLineRunner; +import org.springframework.data.redis.core.Cursor; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.core.ScanOptions; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.time.Instant; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.TimeUnit; + +@Slf4j +@Component +public class CatalogDataManager implements CommandLineRunner { + + @Autowired + private IDeviceChannelService deviceChannelService; + + @Autowired + private IRegionService regionService; + + @Autowired + private IGroupService groupService; + + @Autowired + private RedisTemplate redisTemplate; + + private final Map dataMap = new ConcurrentHashMap<>(); + + private final String key = "VMP_CATALOG_DATA"; + + public void addReady(Device device, int sn ) { + CatalogData catalogData = dataMap.get(device.getDeviceId()); + if (catalogData != null) { + Set redisKeysForChannel = catalogData.getRedisKeysForChannel(); + if (redisKeysForChannel != null && !redisKeysForChannel.isEmpty()) { + for (String deleteKey : redisKeysForChannel) { + redisTemplate.opsForHash().delete(key, deleteKey); + } + } + Set redisKeysForRegion = catalogData.getRedisKeysForRegion(); + if (redisKeysForRegion != null && !redisKeysForRegion.isEmpty()) { + for (String deleteKey : redisKeysForRegion) { + redisTemplate.opsForHash().delete(key, deleteKey); + } + } + Set redisKeysForGroup = catalogData.getRedisKeysForGroup(); + if (redisKeysForGroup != null && !redisKeysForGroup.isEmpty()) { + for (String deleteKey : redisKeysForGroup) { + redisTemplate.opsForHash().delete(key, deleteKey); + } + } + dataMap.remove(device.getDeviceId()); + } + catalogData = new CatalogData(); + catalogData.setDevice(device); + catalogData.setSn(sn); + catalogData.setStatus(CatalogData.CatalogDataStatus.ready); + catalogData.setTime(Instant.now()); + dataMap.put(device.getDeviceId(), catalogData); + } + + public void put(String deviceId, int sn, int total, Device device, List deviceChannelList, + List regionList, List groupList) { + CatalogData catalogData = dataMap.get(device.getDeviceId()); + if (catalogData == null ) { + log.warn("[缓存-Catalog] 未找到缓存对象,可能已经结束"); + return; + } + catalogData.setStatus(CatalogData.CatalogDataStatus.runIng); + catalogData.setTotal(total); + catalogData.setTime(Instant.now()); + + if (deviceChannelList != null && !deviceChannelList.isEmpty()) { + for (DeviceChannel deviceChannel : deviceChannelList) { + String keyForChannel = "CHANNEL:" + deviceId + ":" + deviceChannel.getDeviceId() + ":" + sn; + redisTemplate.opsForHash().put(key, keyForChannel, deviceChannel); + catalogData.getRedisKeysForChannel().add(keyForChannel); + } + } + + if (regionList != null && !regionList.isEmpty()) { + for (Region region : regionList) { + String keyForRegion = "REGION:" + deviceId + ":" + region.getDeviceId() + ":" + sn; + redisTemplate.opsForHash().put(key, keyForRegion, region); + catalogData.getRedisKeysForRegion().add(keyForRegion); + } + } + + if (groupList != null && !groupList.isEmpty()) { + for (Group group : groupList) { + String keyForGroup = "GROUP:" + deviceId + ":" + group.getDeviceId() + ":" + sn; + redisTemplate.opsForHash().put(key, keyForGroup, group); + catalogData.getRedisKeysForGroup().add(keyForGroup); + } + } + } + + public List getDeviceChannelList(String deviceId) { + List result = new ArrayList<>(); + CatalogData catalogData = dataMap.get(deviceId); + if (catalogData == null ) { + log.warn("[Redis-Catalog] 未找到缓存对象,可能已经结束"); + return result; + } + for (String objectKey : catalogData.getRedisKeysForChannel()) { + DeviceChannel deviceChannel = (DeviceChannel) redisTemplate.opsForHash().get(key, objectKey); + if (deviceChannel != null) { + result.add(deviceChannel); + } + } + return result; + } + + public List getRegionList(String deviceId) { + List result = new ArrayList<>(); + CatalogData catalogData = dataMap.get(deviceId); + if (catalogData == null ) { + log.warn("[Redis-Catalog] 未找到缓存对象,可能已经结束"); + return result; + } + for (String objectKey : catalogData.getRedisKeysForRegion()) { + Region region = (Region) redisTemplate.opsForHash().get(key, objectKey); + if (region != null) { + result.add(region); + } + } + return result; + } + + public List getGroupList(String deviceId) { + List result = new ArrayList<>(); + CatalogData catalogData = dataMap.get(deviceId); + if (catalogData == null ) { + log.warn("[Redis-Catalog] 未找到缓存对象,可能已经结束"); + return result; + } + for (String objectKey : catalogData.getRedisKeysForGroup()) { + Group group = (Group) redisTemplate.opsForHash().get(key, objectKey); + if (group != null) { + result.add(group); + } + } + return result; + } + + public SyncStatus getSyncStatus(String deviceId) { + CatalogData catalogData = dataMap.get(deviceId); + if (catalogData == null) { + return null; + } + SyncStatus syncStatus = new SyncStatus(); + syncStatus.setCurrent(catalogData.getRedisKeysForChannel().size()); + syncStatus.setTotal(catalogData.getTotal()); + syncStatus.setErrorMsg(catalogData.getErrorMsg()); + if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end)) { + syncStatus.setSyncIng(false); + }else { + syncStatus.setSyncIng(true); + } + return syncStatus; + } + + public boolean isSyncRunning(String deviceId) { + CatalogData catalogData = dataMap.get(deviceId); + if (catalogData == null) { + return false; + } + return !catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end); + } + + @Override + public void run(String... args) throws Exception { + // 启动时清理旧的数据 + redisTemplate.delete(key); + } + + @Scheduled(fixedDelay = 5 * 1000) //每5秒执行一次, 发现数据5秒未更新则移除数据并认为数据接收超时 + private void timerTask(){ + if (dataMap.isEmpty()) { + return; + } + Set keys = dataMap.keySet(); + + Instant instantBefore5S = Instant.now().minusMillis(TimeUnit.SECONDS.toMillis(5)); + Instant instantBefore30S = Instant.now().minusMillis(TimeUnit.SECONDS.toMillis(30)); + for (String dataKey : keys) { + CatalogData catalogData = dataMap.get(dataKey); + if ( catalogData.getTime().isBefore(instantBefore5S)) { + if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.runIng)) { + String deviceId = catalogData.getDevice().getDeviceId(); + int sn = catalogData.getSn(); + List deviceChannelList = getDeviceChannelList(deviceId); + if (catalogData.getTotal() == deviceChannelList.size()) { + deviceChannelService.resetChannels(catalogData.getDevice().getId(), deviceChannelList); + }else { + deviceChannelService.updateChannels(catalogData.getDevice(), deviceChannelList); + } + List regionList = getRegionList(deviceId); + if ( regionList!= null && !regionList.isEmpty()) { + regionService.batchAdd(regionList); + } + List groupList = getGroupList(deviceId); + if (groupList != null && !groupList.isEmpty()) { + groupService.batchAdd(groupList); + } + String errorMsg = "更新成功,共" + catalogData.getTotal() + "条,已更新" + deviceChannelList.size() + "条"; + catalogData.setErrorMsg(errorMsg); + }else if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.ready)) { + String errorMsg = "同步失败,等待回复超时"; + catalogData.setErrorMsg(errorMsg); + } + } + if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end) && catalogData.getTime().isBefore(instantBefore30S)) { // 超过三十秒,如果标记为end则删除 + dataMap.remove(dataKey); + Set redisKeysForChannel = catalogData.getRedisKeysForChannel(); + if (redisKeysForChannel != null && !redisKeysForChannel.isEmpty()) { + for (String deleteKey : redisKeysForChannel) { + redisTemplate.opsForHash().delete(key, deleteKey); + } + } + Set redisKeysForRegion = catalogData.getRedisKeysForRegion(); + if (redisKeysForRegion != null && !redisKeysForRegion.isEmpty()) { + for (String deleteKey : redisKeysForRegion) { + redisTemplate.opsForHash().delete(key, deleteKey); + } + } + Set redisKeysForGroup = catalogData.getRedisKeysForGroup(); + if (redisKeysForGroup != null && !redisKeysForGroup.isEmpty()) { + for (String deleteKey : redisKeysForGroup) { + redisTemplate.opsForHash().delete(key, deleteKey); + } + } + } + } + } + + + public void setChannelSyncEnd(String deviceId, String errorMsg) { + CatalogData catalogData = dataMap.get(deviceId); + if (catalogData == null) { + return; + } + catalogData.setStatus(CatalogData.CatalogDataStatus.end); + catalogData.setErrorMsg(errorMsg); + catalogData.setTime(Instant.now()); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java index 14b415aa..971b80f3 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java @@ -5,7 +5,7 @@ import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService; import com.genersoft.iot.vmp.gb28181.service.IGroupService; import com.genersoft.iot.vmp.gb28181.service.IRegionService; -import com.genersoft.iot.vmp.gb28181.session.CatalogDataCatch; +import com.genersoft.iot.vmp.gb28181.session.CatalogDataManager; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler; @@ -15,9 +15,7 @@ import org.dom4j.DocumentException; import org.dom4j.Element; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; @@ -30,7 +28,6 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicBoolean; /** * 目录查询的回复 @@ -56,15 +53,10 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp private IGroupService groupService; @Autowired - private CatalogDataCatch catalogDataCatch; - - @Qualifier("taskExecutor") - @Autowired - private ThreadPoolTaskExecutor taskExecutor; + private CatalogDataManager catalogDataCatch; @Autowired private SipConfig sipConfig; - private AtomicBoolean processing = new AtomicBoolean(false); @Override public void afterPropertiesSet() throws Exception { @@ -72,7 +64,6 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp } @Override - @Transactional public void handForDevice(RequestEvent evt, Device device, Element element) { taskQueue.offer(new HandlerCatchData(evt, device, element)); // 回复200 OK @@ -83,7 +74,7 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp } } - @Scheduled(fixedDelay = 200) + @Scheduled(fixedDelay = 50) @Transactional public void executeTaskQueue(){ if (taskQueue.isEmpty()) { @@ -170,11 +161,12 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp int sn = Integer.parseInt(snElement.getText()); catalogDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, take.getDevice(), channelList, regionList, groupList); - log.info("[收到通道]设备: {} -> {}个,{}/{}", take.getDevice().getDeviceId(), channelList.size(), catalogDataCatch.getDeviceChannelList(take.getDevice().getDeviceId()) == null ? 0 : catalogDataCatch.getDeviceChannelList(take.getDevice().getDeviceId()).size(), sumNum); - if (catalogDataCatch.getDeviceChannelList(take.getDevice().getDeviceId()).size() == sumNum) { + List deviceChannelList = catalogDataCatch.getDeviceChannelList(take.getDevice().getDeviceId()); + log.info("[收到通道]设备: {} -> {}个,{}/{}", take.getDevice().getDeviceId(), channelList.size(), deviceChannelList.size(), sumNum); + if (deviceChannelList.size() == sumNum) { // 数据已经完整接收, 此时可能存在某个设备离线变上线的情况,但是考虑到性能,此处不做处理, // 目前支持设备通道上线通知时和设备上线时向上级通知 - boolean resetChannelsResult = saveData(take.getDevice()); + boolean resetChannelsResult = saveData(take.getDevice(), sn); if (!resetChannelsResult) { String errorMsg = "接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.getDeviceChannelList(take.getDevice().getDeviceId()).size() + "条"; catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), errorMsg); @@ -193,7 +185,7 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp } @Transactional - public boolean saveData(Device device) { + public boolean saveData(Device device, int sn) { boolean result = true; List deviceChannelList = catalogDataCatch.getDeviceChannelList(device.getDeviceId()); @@ -219,19 +211,11 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp } public SyncStatus getChannelSyncProgress(String deviceId) { - if (catalogDataCatch.getDeviceChannelList(deviceId) == null) { - return null; - } else { - return catalogDataCatch.getSyncStatus(deviceId); - } + return catalogDataCatch.getSyncStatus(deviceId); } public boolean isSyncRunning(String deviceId) { - if (catalogDataCatch.getDeviceChannelList(deviceId) == null) { - return false; - } else { - return catalogDataCatch.isSyncRunning(deviceId); - } + return catalogDataCatch.isSyncRunning(deviceId); } public void setChannelSyncReady(Device device, int sn) { diff --git a/数据库/2.7.3/初始化-mysql-2.7.3.sql b/数据库/2.7.3/初始化-mysql-2.7.3.sql index 6fbb9a4e..62842dc9 100644 --- a/数据库/2.7.3/初始化-mysql-2.7.3.sql +++ b/数据库/2.7.3/初始化-mysql-2.7.3.sql @@ -165,7 +165,8 @@ create table wvp_device_channel constraint uk_wvp_device_channel_unique_device_channel unique (device_db_id, device_id), constraint uk_wvp_unique_channel unique (gb_device_id), constraint uk_wvp_unique_stream_push_id unique (stream_push_id), - constraint uk_wvp_unique_stream_proxy_id unique (stream_proxy_id) + constraint uk_wvp_unique_stream_proxy_id unique (stream_proxy_id), + index(device_db_id) );