优化Catalog查询内存占用高的问题
parent
6fc8db8bd0
commit
fee8d2f8cd
|
@ -1,9 +1,14 @@
|
|||
package com.genersoft.iot.vmp.gb28181.bean;
|
||||
|
||||
import lombok.Data;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Delayed;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* @author lin
|
||||
|
@ -15,15 +20,16 @@ public class CatalogData {
|
|||
*/
|
||||
private int sn;
|
||||
private int total;
|
||||
private List<DeviceChannel> channelList;
|
||||
private List<Region> regionListList;
|
||||
private List<Group> groupListListList;
|
||||
private Instant lastTime;
|
||||
private Instant time;
|
||||
private Device device;
|
||||
private String errorMsg;
|
||||
private Set<String> redisKeysForChannel = new HashSet<>();
|
||||
private Set<String> redisKeysForRegion = new HashSet<>();
|
||||
private Set<String> redisKeysForGroup = new HashSet<>();
|
||||
|
||||
public enum CatalogDataStatus{
|
||||
ready, runIng, end
|
||||
}
|
||||
private CatalogDataStatus status;
|
||||
|
||||
}
|
||||
|
|
|
@ -5,11 +5,13 @@ import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
|
|||
import com.genersoft.iot.vmp.gb28181.controller.bean.ChannelReduce;
|
||||
import com.genersoft.iot.vmp.gb28181.dao.provider.DeviceChannelProvider;
|
||||
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
|
||||
import com.genersoft.iot.vmp.streamPush.bean.StreamPush;
|
||||
import com.genersoft.iot.vmp.web.gb28181.dto.DeviceChannelExtend;
|
||||
import org.apache.ibatis.annotations.*;
|
||||
import org.springframework.stereotype.Repository;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 用于存储设备通道信息
|
||||
|
|
|
@ -471,7 +471,7 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
|
|||
return false;
|
||||
}
|
||||
List<DeviceChannel> allChannels = channelMapper.queryAllChannelsForRefresh(deviceDbId);
|
||||
Map<String,DeviceChannel> allChannelMap = new ConcurrentHashMap<>();
|
||||
Map<String,DeviceChannel> allChannelMap = new HashMap<>();
|
||||
if (!allChannels.isEmpty()) {
|
||||
for (DeviceChannel deviceChannel : allChannels) {
|
||||
allChannelMap.put(deviceChannel.getDeviceDbId() + deviceChannel.getDeviceId(), deviceChannel);
|
||||
|
@ -486,14 +486,7 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
|
|||
StringBuilder stringBuilder = new StringBuilder();
|
||||
Map<String, Integer> subContMap = new HashMap<>();
|
||||
|
||||
// 数据去重
|
||||
Set<String> gbIdSet = new HashSet<>();
|
||||
for (DeviceChannel deviceChannel : deviceChannelList) {
|
||||
if (gbIdSet.contains(deviceChannel.getDeviceDbId() + deviceChannel.getDeviceId())) {
|
||||
stringBuilder.append(deviceChannel.getDeviceId()).append(",");
|
||||
continue;
|
||||
}
|
||||
gbIdSet.add(deviceChannel.getDeviceDbId() + deviceChannel.getDeviceId());
|
||||
DeviceChannel channelInDb = allChannelMap.get(deviceChannel.getDeviceDbId() + deviceChannel.getDeviceId());
|
||||
if (channelInDb != null) {
|
||||
deviceChannel.setStreamId(channelInDb.getStreamId());
|
||||
|
|
|
@ -1,181 +0,0 @@
|
|||
package com.genersoft.iot.vmp.gb28181.session;
|
||||
|
||||
import com.genersoft.iot.vmp.gb28181.bean.*;
|
||||
import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Component
|
||||
public class CatalogDataCatch {
|
||||
|
||||
public static Map<String, CatalogData> data = new ConcurrentHashMap<>();
|
||||
|
||||
@Autowired
|
||||
private IDeviceChannelService deviceChannelService;
|
||||
|
||||
public void addReady(Device device, int sn ) {
|
||||
CatalogData catalogData = data.get(device.getDeviceId());
|
||||
if (catalogData == null || catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end)) {
|
||||
catalogData = new CatalogData();
|
||||
catalogData.setChannelList(Collections.synchronizedList(new ArrayList<>()));
|
||||
catalogData.setDevice(device);
|
||||
catalogData.setSn(sn);
|
||||
catalogData.setStatus(CatalogData.CatalogDataStatus.ready);
|
||||
catalogData.setLastTime(Instant.now());
|
||||
data.put(device.getDeviceId(), catalogData);
|
||||
}
|
||||
}
|
||||
|
||||
public void put(String deviceId, int sn, int total, Device device, List<DeviceChannel> deviceChannelList,
|
||||
List<Region> regionList, List<Group> groupList) {
|
||||
CatalogData catalogData = data.get(deviceId);
|
||||
if (catalogData == null) {
|
||||
catalogData = new CatalogData();
|
||||
catalogData.setSn(sn);
|
||||
catalogData.setTotal(total);
|
||||
catalogData.setDevice(device);
|
||||
catalogData.setChannelList(deviceChannelList);
|
||||
catalogData.setRegionListList(regionList);
|
||||
catalogData.setGroupListListList(groupList);
|
||||
catalogData.setStatus(CatalogData.CatalogDataStatus.runIng);
|
||||
catalogData.setLastTime(Instant.now());
|
||||
data.put(deviceId, catalogData);
|
||||
}else {
|
||||
// 同一个设备的通道同步请求只考虑一个,其他的直接忽略
|
||||
if (catalogData.getSn() != sn) {
|
||||
return;
|
||||
}
|
||||
catalogData.setTotal(total);
|
||||
catalogData.setDevice(device);
|
||||
catalogData.setStatus(CatalogData.CatalogDataStatus.runIng);
|
||||
|
||||
if (deviceChannelList != null && !deviceChannelList.isEmpty()) {
|
||||
if (catalogData.getChannelList() != null) {
|
||||
catalogData.getChannelList().addAll(deviceChannelList);
|
||||
}
|
||||
}
|
||||
if (regionList != null && !regionList.isEmpty()) {
|
||||
if (catalogData.getRegionListList() != null) {
|
||||
catalogData.getRegionListList().addAll(regionList);
|
||||
}else {
|
||||
catalogData.setRegionListList(regionList);
|
||||
}
|
||||
}
|
||||
if (groupList != null && !groupList.isEmpty()) {
|
||||
if (catalogData.getGroupListListList() != null) {
|
||||
catalogData.getGroupListListList().addAll(groupList);
|
||||
}else {
|
||||
catalogData.setGroupListListList(groupList);
|
||||
}
|
||||
}
|
||||
catalogData.setLastTime(Instant.now());
|
||||
}
|
||||
}
|
||||
|
||||
public List<DeviceChannel> getDeviceChannelList(String deviceId) {
|
||||
CatalogData catalogData = data.get(deviceId);
|
||||
if (catalogData == null) {
|
||||
return null;
|
||||
}
|
||||
return catalogData.getChannelList();
|
||||
}
|
||||
|
||||
public List<Region> getRegionList(String deviceId) {
|
||||
CatalogData catalogData = data.get(deviceId);
|
||||
if (catalogData == null) {
|
||||
return null;
|
||||
}
|
||||
return catalogData.getRegionListList();
|
||||
}
|
||||
|
||||
public List<Group> getGroupList(String deviceId) {
|
||||
CatalogData catalogData = data.get(deviceId);
|
||||
if (catalogData == null) {
|
||||
return null;
|
||||
}
|
||||
return catalogData.getGroupListListList();
|
||||
}
|
||||
|
||||
public int getTotal(String deviceId) {
|
||||
CatalogData catalogData = data.get(deviceId);
|
||||
if (catalogData == null) {
|
||||
return 0;
|
||||
}
|
||||
return catalogData.getTotal();
|
||||
}
|
||||
|
||||
public SyncStatus getSyncStatus(String deviceId) {
|
||||
CatalogData catalogData = data.get(deviceId);
|
||||
if (catalogData == null) {
|
||||
return null;
|
||||
}
|
||||
SyncStatus syncStatus = new SyncStatus();
|
||||
syncStatus.setCurrent(catalogData.getChannelList().size());
|
||||
syncStatus.setTotal(catalogData.getTotal());
|
||||
syncStatus.setErrorMsg(catalogData.getErrorMsg());
|
||||
if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end)) {
|
||||
syncStatus.setSyncIng(false);
|
||||
}else {
|
||||
syncStatus.setSyncIng(true);
|
||||
}
|
||||
return syncStatus;
|
||||
}
|
||||
|
||||
public boolean isSyncRunning(String deviceId) {
|
||||
CatalogData catalogData = data.get(deviceId);
|
||||
if (catalogData == null) {
|
||||
return false;
|
||||
}
|
||||
return !catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end);
|
||||
}
|
||||
|
||||
@Scheduled(fixedDelay = 5 * 1000) //每5秒执行一次, 发现数据5秒未更新则移除数据并认为数据接收超时
|
||||
private void timerTask(){
|
||||
Set<String> keys = data.keySet();
|
||||
|
||||
Instant instantBefore5S = Instant.now().minusMillis(TimeUnit.SECONDS.toMillis(5));
|
||||
Instant instantBefore30S = Instant.now().minusMillis(TimeUnit.SECONDS.toMillis(30));
|
||||
|
||||
for (String deviceId : keys) {
|
||||
CatalogData catalogData = data.get(deviceId);
|
||||
if ( catalogData.getLastTime().isBefore(instantBefore5S)) {
|
||||
// 超过五秒收不到消息任务超时, 只更新这一部分数据, 收到数据与声明的总数一致,则重置通道数据,数据不全则只对收到的数据做更新操作
|
||||
if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.runIng)) {
|
||||
if (catalogData.getTotal() == catalogData.getChannelList().size()) {
|
||||
deviceChannelService.resetChannels(catalogData.getDevice().getId(), catalogData.getChannelList());
|
||||
}else {
|
||||
deviceChannelService.updateChannels(catalogData.getDevice(), catalogData.getChannelList());
|
||||
}
|
||||
String errorMsg = "更新成功,共" + catalogData.getTotal() + "条,已更新" + catalogData.getChannelList().size() + "条";
|
||||
catalogData.setErrorMsg(errorMsg);
|
||||
if (catalogData.getTotal() != catalogData.getChannelList().size()) {
|
||||
|
||||
}
|
||||
}else if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.ready)) {
|
||||
String errorMsg = "同步失败,等待回复超时";
|
||||
catalogData.setErrorMsg(errorMsg);
|
||||
}
|
||||
catalogData.setStatus(CatalogData.CatalogDataStatus.end);
|
||||
}
|
||||
if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end) && catalogData.getLastTime().isBefore(instantBefore30S)) { // 超过三十秒,如果标记为end则删除
|
||||
data.remove(deviceId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void setChannelSyncEnd(String deviceId, String errorMsg) {
|
||||
CatalogData catalogData = data.get(deviceId);
|
||||
if (catalogData == null) {
|
||||
return;
|
||||
}
|
||||
catalogData.setStatus(CatalogData.CatalogDataStatus.end);
|
||||
catalogData.setErrorMsg(errorMsg);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,260 @@
|
|||
package com.genersoft.iot.vmp.gb28181.session;
|
||||
|
||||
import com.genersoft.iot.vmp.common.InviteInfo;
|
||||
import com.genersoft.iot.vmp.common.VideoManagerConstants;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.*;
|
||||
import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
|
||||
import com.genersoft.iot.vmp.gb28181.service.IGroupService;
|
||||
import com.genersoft.iot.vmp.gb28181.service.IRegionService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.CommandLineRunner;
|
||||
import org.springframework.data.redis.core.Cursor;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.data.redis.core.ScanOptions;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.DelayQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
public class CatalogDataManager implements CommandLineRunner {
|
||||
|
||||
@Autowired
|
||||
private IDeviceChannelService deviceChannelService;
|
||||
|
||||
@Autowired
|
||||
private IRegionService regionService;
|
||||
|
||||
@Autowired
|
||||
private IGroupService groupService;
|
||||
|
||||
@Autowired
|
||||
private RedisTemplate<Object, Object> redisTemplate;
|
||||
|
||||
private final Map<String, CatalogData> dataMap = new ConcurrentHashMap<>();
|
||||
|
||||
private final String key = "VMP_CATALOG_DATA";
|
||||
|
||||
public void addReady(Device device, int sn ) {
|
||||
CatalogData catalogData = dataMap.get(device.getDeviceId());
|
||||
if (catalogData != null) {
|
||||
Set<String> redisKeysForChannel = catalogData.getRedisKeysForChannel();
|
||||
if (redisKeysForChannel != null && !redisKeysForChannel.isEmpty()) {
|
||||
for (String deleteKey : redisKeysForChannel) {
|
||||
redisTemplate.opsForHash().delete(key, deleteKey);
|
||||
}
|
||||
}
|
||||
Set<String> redisKeysForRegion = catalogData.getRedisKeysForRegion();
|
||||
if (redisKeysForRegion != null && !redisKeysForRegion.isEmpty()) {
|
||||
for (String deleteKey : redisKeysForRegion) {
|
||||
redisTemplate.opsForHash().delete(key, deleteKey);
|
||||
}
|
||||
}
|
||||
Set<String> redisKeysForGroup = catalogData.getRedisKeysForGroup();
|
||||
if (redisKeysForGroup != null && !redisKeysForGroup.isEmpty()) {
|
||||
for (String deleteKey : redisKeysForGroup) {
|
||||
redisTemplate.opsForHash().delete(key, deleteKey);
|
||||
}
|
||||
}
|
||||
dataMap.remove(device.getDeviceId());
|
||||
}
|
||||
catalogData = new CatalogData();
|
||||
catalogData.setDevice(device);
|
||||
catalogData.setSn(sn);
|
||||
catalogData.setStatus(CatalogData.CatalogDataStatus.ready);
|
||||
catalogData.setTime(Instant.now());
|
||||
dataMap.put(device.getDeviceId(), catalogData);
|
||||
}
|
||||
|
||||
public void put(String deviceId, int sn, int total, Device device, List<DeviceChannel> deviceChannelList,
|
||||
List<Region> regionList, List<Group> groupList) {
|
||||
CatalogData catalogData = dataMap.get(device.getDeviceId());
|
||||
if (catalogData == null ) {
|
||||
log.warn("[缓存-Catalog] 未找到缓存对象,可能已经结束");
|
||||
return;
|
||||
}
|
||||
catalogData.setStatus(CatalogData.CatalogDataStatus.runIng);
|
||||
catalogData.setTotal(total);
|
||||
catalogData.setTime(Instant.now());
|
||||
|
||||
if (deviceChannelList != null && !deviceChannelList.isEmpty()) {
|
||||
for (DeviceChannel deviceChannel : deviceChannelList) {
|
||||
String keyForChannel = "CHANNEL:" + deviceId + ":" + deviceChannel.getDeviceId() + ":" + sn;
|
||||
redisTemplate.opsForHash().put(key, keyForChannel, deviceChannel);
|
||||
catalogData.getRedisKeysForChannel().add(keyForChannel);
|
||||
}
|
||||
}
|
||||
|
||||
if (regionList != null && !regionList.isEmpty()) {
|
||||
for (Region region : regionList) {
|
||||
String keyForRegion = "REGION:" + deviceId + ":" + region.getDeviceId() + ":" + sn;
|
||||
redisTemplate.opsForHash().put(key, keyForRegion, region);
|
||||
catalogData.getRedisKeysForRegion().add(keyForRegion);
|
||||
}
|
||||
}
|
||||
|
||||
if (groupList != null && !groupList.isEmpty()) {
|
||||
for (Group group : groupList) {
|
||||
String keyForGroup = "GROUP:" + deviceId + ":" + group.getDeviceId() + ":" + sn;
|
||||
redisTemplate.opsForHash().put(key, keyForGroup, group);
|
||||
catalogData.getRedisKeysForGroup().add(keyForGroup);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public List<DeviceChannel> getDeviceChannelList(String deviceId) {
|
||||
List<DeviceChannel> result = new ArrayList<>();
|
||||
CatalogData catalogData = dataMap.get(deviceId);
|
||||
if (catalogData == null ) {
|
||||
log.warn("[Redis-Catalog] 未找到缓存对象,可能已经结束");
|
||||
return result;
|
||||
}
|
||||
for (String objectKey : catalogData.getRedisKeysForChannel()) {
|
||||
DeviceChannel deviceChannel = (DeviceChannel) redisTemplate.opsForHash().get(key, objectKey);
|
||||
if (deviceChannel != null) {
|
||||
result.add(deviceChannel);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public List<Region> getRegionList(String deviceId) {
|
||||
List<Region> result = new ArrayList<>();
|
||||
CatalogData catalogData = dataMap.get(deviceId);
|
||||
if (catalogData == null ) {
|
||||
log.warn("[Redis-Catalog] 未找到缓存对象,可能已经结束");
|
||||
return result;
|
||||
}
|
||||
for (String objectKey : catalogData.getRedisKeysForRegion()) {
|
||||
Region region = (Region) redisTemplate.opsForHash().get(key, objectKey);
|
||||
if (region != null) {
|
||||
result.add(region);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public List<Group> getGroupList(String deviceId) {
|
||||
List<Group> result = new ArrayList<>();
|
||||
CatalogData catalogData = dataMap.get(deviceId);
|
||||
if (catalogData == null ) {
|
||||
log.warn("[Redis-Catalog] 未找到缓存对象,可能已经结束");
|
||||
return result;
|
||||
}
|
||||
for (String objectKey : catalogData.getRedisKeysForGroup()) {
|
||||
Group group = (Group) redisTemplate.opsForHash().get(key, objectKey);
|
||||
if (group != null) {
|
||||
result.add(group);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public SyncStatus getSyncStatus(String deviceId) {
|
||||
CatalogData catalogData = dataMap.get(deviceId);
|
||||
if (catalogData == null) {
|
||||
return null;
|
||||
}
|
||||
SyncStatus syncStatus = new SyncStatus();
|
||||
syncStatus.setCurrent(catalogData.getRedisKeysForChannel().size());
|
||||
syncStatus.setTotal(catalogData.getTotal());
|
||||
syncStatus.setErrorMsg(catalogData.getErrorMsg());
|
||||
if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end)) {
|
||||
syncStatus.setSyncIng(false);
|
||||
}else {
|
||||
syncStatus.setSyncIng(true);
|
||||
}
|
||||
return syncStatus;
|
||||
}
|
||||
|
||||
public boolean isSyncRunning(String deviceId) {
|
||||
CatalogData catalogData = dataMap.get(deviceId);
|
||||
if (catalogData == null) {
|
||||
return false;
|
||||
}
|
||||
return !catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run(String... args) throws Exception {
|
||||
// 启动时清理旧的数据
|
||||
redisTemplate.delete(key);
|
||||
}
|
||||
|
||||
@Scheduled(fixedDelay = 5 * 1000) //每5秒执行一次, 发现数据5秒未更新则移除数据并认为数据接收超时
|
||||
private void timerTask(){
|
||||
if (dataMap.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
Set<String> keys = dataMap.keySet();
|
||||
|
||||
Instant instantBefore5S = Instant.now().minusMillis(TimeUnit.SECONDS.toMillis(5));
|
||||
Instant instantBefore30S = Instant.now().minusMillis(TimeUnit.SECONDS.toMillis(30));
|
||||
for (String dataKey : keys) {
|
||||
CatalogData catalogData = dataMap.get(dataKey);
|
||||
if ( catalogData.getTime().isBefore(instantBefore5S)) {
|
||||
if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.runIng)) {
|
||||
String deviceId = catalogData.getDevice().getDeviceId();
|
||||
int sn = catalogData.getSn();
|
||||
List<DeviceChannel> deviceChannelList = getDeviceChannelList(deviceId);
|
||||
if (catalogData.getTotal() == deviceChannelList.size()) {
|
||||
deviceChannelService.resetChannels(catalogData.getDevice().getId(), deviceChannelList);
|
||||
}else {
|
||||
deviceChannelService.updateChannels(catalogData.getDevice(), deviceChannelList);
|
||||
}
|
||||
List<Region> regionList = getRegionList(deviceId);
|
||||
if ( regionList!= null && !regionList.isEmpty()) {
|
||||
regionService.batchAdd(regionList);
|
||||
}
|
||||
List<Group> groupList = getGroupList(deviceId);
|
||||
if (groupList != null && !groupList.isEmpty()) {
|
||||
groupService.batchAdd(groupList);
|
||||
}
|
||||
String errorMsg = "更新成功,共" + catalogData.getTotal() + "条,已更新" + deviceChannelList.size() + "条";
|
||||
catalogData.setErrorMsg(errorMsg);
|
||||
}else if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.ready)) {
|
||||
String errorMsg = "同步失败,等待回复超时";
|
||||
catalogData.setErrorMsg(errorMsg);
|
||||
}
|
||||
}
|
||||
if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end) && catalogData.getTime().isBefore(instantBefore30S)) { // 超过三十秒,如果标记为end则删除
|
||||
dataMap.remove(dataKey);
|
||||
Set<String> redisKeysForChannel = catalogData.getRedisKeysForChannel();
|
||||
if (redisKeysForChannel != null && !redisKeysForChannel.isEmpty()) {
|
||||
for (String deleteKey : redisKeysForChannel) {
|
||||
redisTemplate.opsForHash().delete(key, deleteKey);
|
||||
}
|
||||
}
|
||||
Set<String> redisKeysForRegion = catalogData.getRedisKeysForRegion();
|
||||
if (redisKeysForRegion != null && !redisKeysForRegion.isEmpty()) {
|
||||
for (String deleteKey : redisKeysForRegion) {
|
||||
redisTemplate.opsForHash().delete(key, deleteKey);
|
||||
}
|
||||
}
|
||||
Set<String> redisKeysForGroup = catalogData.getRedisKeysForGroup();
|
||||
if (redisKeysForGroup != null && !redisKeysForGroup.isEmpty()) {
|
||||
for (String deleteKey : redisKeysForGroup) {
|
||||
redisTemplate.opsForHash().delete(key, deleteKey);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void setChannelSyncEnd(String deviceId, String errorMsg) {
|
||||
CatalogData catalogData = dataMap.get(deviceId);
|
||||
if (catalogData == null) {
|
||||
return;
|
||||
}
|
||||
catalogData.setStatus(CatalogData.CatalogDataStatus.end);
|
||||
catalogData.setErrorMsg(errorMsg);
|
||||
catalogData.setTime(Instant.now());
|
||||
}
|
||||
}
|
|
@ -5,7 +5,7 @@ import com.genersoft.iot.vmp.gb28181.bean.*;
|
|||
import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
|
||||
import com.genersoft.iot.vmp.gb28181.service.IGroupService;
|
||||
import com.genersoft.iot.vmp.gb28181.service.IRegionService;
|
||||
import com.genersoft.iot.vmp.gb28181.session.CatalogDataCatch;
|
||||
import com.genersoft.iot.vmp.gb28181.session.CatalogDataManager;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler;
|
||||
|
@ -15,9 +15,7 @@ import org.dom4j.DocumentException;
|
|||
import org.dom4j.Element;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
|
@ -30,7 +28,6 @@ import java.util.ArrayList;
|
|||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
* 目录查询的回复
|
||||
|
@ -56,15 +53,10 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
|
|||
private IGroupService groupService;
|
||||
|
||||
@Autowired
|
||||
private CatalogDataCatch catalogDataCatch;
|
||||
|
||||
@Qualifier("taskExecutor")
|
||||
@Autowired
|
||||
private ThreadPoolTaskExecutor taskExecutor;
|
||||
private CatalogDataManager catalogDataCatch;
|
||||
|
||||
@Autowired
|
||||
private SipConfig sipConfig;
|
||||
private AtomicBoolean processing = new AtomicBoolean(false);
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
|
@ -72,7 +64,6 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
|
|||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public void handForDevice(RequestEvent evt, Device device, Element element) {
|
||||
taskQueue.offer(new HandlerCatchData(evt, device, element));
|
||||
// 回复200 OK
|
||||
|
@ -83,7 +74,7 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
|
|||
}
|
||||
}
|
||||
|
||||
@Scheduled(fixedDelay = 200)
|
||||
@Scheduled(fixedDelay = 50)
|
||||
@Transactional
|
||||
public void executeTaskQueue(){
|
||||
if (taskQueue.isEmpty()) {
|
||||
|
@ -170,11 +161,12 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
|
|||
int sn = Integer.parseInt(snElement.getText());
|
||||
catalogDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, take.getDevice(),
|
||||
channelList, regionList, groupList);
|
||||
log.info("[收到通道]设备: {} -> {}个,{}/{}", take.getDevice().getDeviceId(), channelList.size(), catalogDataCatch.getDeviceChannelList(take.getDevice().getDeviceId()) == null ? 0 : catalogDataCatch.getDeviceChannelList(take.getDevice().getDeviceId()).size(), sumNum);
|
||||
if (catalogDataCatch.getDeviceChannelList(take.getDevice().getDeviceId()).size() == sumNum) {
|
||||
List<DeviceChannel> deviceChannelList = catalogDataCatch.getDeviceChannelList(take.getDevice().getDeviceId());
|
||||
log.info("[收到通道]设备: {} -> {}个,{}/{}", take.getDevice().getDeviceId(), channelList.size(), deviceChannelList.size(), sumNum);
|
||||
if (deviceChannelList.size() == sumNum) {
|
||||
// 数据已经完整接收, 此时可能存在某个设备离线变上线的情况,但是考虑到性能,此处不做处理,
|
||||
// 目前支持设备通道上线通知时和设备上线时向上级通知
|
||||
boolean resetChannelsResult = saveData(take.getDevice());
|
||||
boolean resetChannelsResult = saveData(take.getDevice(), sn);
|
||||
if (!resetChannelsResult) {
|
||||
String errorMsg = "接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.getDeviceChannelList(take.getDevice().getDeviceId()).size() + "条";
|
||||
catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), errorMsg);
|
||||
|
@ -193,7 +185,7 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
|
|||
}
|
||||
|
||||
@Transactional
|
||||
public boolean saveData(Device device) {
|
||||
public boolean saveData(Device device, int sn) {
|
||||
|
||||
boolean result = true;
|
||||
List<DeviceChannel> deviceChannelList = catalogDataCatch.getDeviceChannelList(device.getDeviceId());
|
||||
|
@ -219,19 +211,11 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
|
|||
}
|
||||
|
||||
public SyncStatus getChannelSyncProgress(String deviceId) {
|
||||
if (catalogDataCatch.getDeviceChannelList(deviceId) == null) {
|
||||
return null;
|
||||
} else {
|
||||
return catalogDataCatch.getSyncStatus(deviceId);
|
||||
}
|
||||
return catalogDataCatch.getSyncStatus(deviceId);
|
||||
}
|
||||
|
||||
public boolean isSyncRunning(String deviceId) {
|
||||
if (catalogDataCatch.getDeviceChannelList(deviceId) == null) {
|
||||
return false;
|
||||
} else {
|
||||
return catalogDataCatch.isSyncRunning(deviceId);
|
||||
}
|
||||
return catalogDataCatch.isSyncRunning(deviceId);
|
||||
}
|
||||
|
||||
public void setChannelSyncReady(Device device, int sn) {
|
||||
|
|
|
@ -165,7 +165,8 @@ create table wvp_device_channel
|
|||
constraint uk_wvp_device_channel_unique_device_channel unique (device_db_id, device_id),
|
||||
constraint uk_wvp_unique_channel unique (gb_device_id),
|
||||
constraint uk_wvp_unique_stream_push_id unique (stream_push_id),
|
||||
constraint uk_wvp_unique_stream_proxy_id unique (stream_proxy_id)
|
||||
constraint uk_wvp_unique_stream_proxy_id unique (stream_proxy_id),
|
||||
index(device_db_id)
|
||||
);
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue