添加字段控制是否自动同步国标通道

结构优化
648540858 2023-09-25 18:20:05 +08:00
parent ea271bf4b9
commit 527dd66c0d
12 changed files with 258 additions and 176 deletions

View File

@ -91,7 +91,7 @@ public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
if (event.getDeviceChannels() != null) {
deviceChannelList.addAll(event.getDeviceChannels());
}
if (event.getGbStreams() != null && event.getGbStreams().size() > 0){
if (event.getGbStreams() != null && !event.getGbStreams().isEmpty()){
for (GbStream gbStream : event.getGbStreams()) {
if (gbStream.getStreamType().equals("push") && !userSetting.isUsePushingAsStatus()) {
continue;

View File

@ -4,6 +4,7 @@ import com.genersoft.iot.vmp.gb28181.bean.CatalogData;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.SyncStatus;
import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
@ -20,7 +21,7 @@ public class CatalogDataCatch {
public static Map<String, CatalogData> data = new ConcurrentHashMap<>();
@Autowired
private IVideoManagerStorage storager;
private IDeviceChannelService deviceChannelService;
public void addReady(Device device, int sn ) {
CatalogData catalogData = data.get(device.getDeviceId());
@ -112,7 +113,7 @@ public class CatalogDataCatch {
if ( catalogData.getLastTime().isBefore(instantBefore5S)) {
// 超过五秒收不到消息任务超时, 只更新这一部分数据, 收到数据与声明的总数一致,则重置通道数据,数据不全则只对收到的数据做更新操作
if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.runIng)) {
storager.resetChannels(catalogData.getDevice().getDeviceId(), catalogData.getChannelList());
deviceChannelService.resetChannels(catalogData.getDevice(), catalogData.getChannelList());
String errorMsg = "更新成功,共" + catalogData.getTotal() + "条,已更新" + catalogData.getChannelList().size() + "条";
catalogData.setErrorMsg(errorMsg);
}else if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.ready)) {

View File

@ -11,6 +11,7 @@ import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
import com.genersoft.iot.vmp.service.ICommonGbChannelService;
import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import org.dom4j.DocumentException;
@ -58,6 +59,9 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
@Autowired
private IDeviceChannelService deviceChannelService;
@Autowired
private ICommonGbChannelService commonGbChannelService;
@Autowired
private DynamicTask dynamicTask;
@ -123,7 +127,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
logger.info("[收到通道上线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
updateChannelOnlineList.add(channel);
if (updateChannelOnlineList.size() > 300) {
executeSaveForOnline();
executeSaveForOnline(device);
}
if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息
@ -139,7 +143,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
}else {
updateChannelOfflineList.add(channel);
if (updateChannelOfflineList.size() > 300) {
executeSaveForOffline();
executeSaveForOffline(device);
}
if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息
@ -155,7 +159,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
}else {
updateChannelOfflineList.add(channel);
if (updateChannelOfflineList.size() > 300) {
executeSaveForOffline();
executeSaveForOffline(device);
}
if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息
@ -171,7 +175,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
}else {
updateChannelOfflineList.add(channel);
if (updateChannelOfflineList.size() > 300) {
executeSaveForOffline();
executeSaveForOffline(device);
}
if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息
@ -188,7 +192,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
channel.setId(deviceChannel.getId());
updateChannelMap.put(channel.getChannelId(), channel);
if (updateChannelMap.keySet().size() > 300) {
executeSaveForUpdate();
executeSaveForUpdate(device);
}
}else {
addChannelMap.put(channel.getChannelId(), channel);
@ -198,7 +202,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
}
if (addChannelMap.keySet().size() > 300) {
executeSaveForAdd();
executeSaveForAdd(device);
}
}
@ -212,7 +216,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), false);
}
if (deleteChannelList.size() > 300) {
executeSaveForDelete();
executeSaveForDelete(device);
}
break;
case CatalogEvent.UPDATE:
@ -224,12 +228,12 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
channel.setId(deviceChannelForUpdate.getId());
updateChannelMap.put(channel.getChannelId(), channel);
if (updateChannelMap.keySet().size() > 300) {
executeSaveForUpdate();
executeSaveForUpdate(device);
}
}else {
addChannelMap.put(channel.getChannelId(), channel);
if (addChannelMap.keySet().size() > 300) {
executeSaveForAdd();
executeSaveForAdd(device);
}
if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息
@ -251,7 +255,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
|| deleteChannelList.size() > 0) {
if (!dynamicTask.contains(talkKey)) {
dynamicTask.startDelay(talkKey, this::executeSave, 1000);
dynamicTask.startDelay(talkKey, ()-> executeSave(device), 1000);
}
}
}
@ -261,49 +265,64 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
}
}
private void executeSave(){
executeSaveForAdd();
executeSaveForUpdate();
executeSaveForDelete();
executeSaveForOnline();
executeSaveForOffline();
private void executeSave(Device device){
executeSaveForAdd(device);
executeSaveForUpdate(device);
executeSaveForDelete(device);
executeSaveForOnline(device);
executeSaveForOffline(device);
dynamicTask.stop(talkKey);
}
private void executeSaveForUpdate(){
if (updateChannelMap.values().size() > 0) {
private void executeSaveForUpdate(Device device){
if (!updateChannelMap.values().isEmpty()) {
ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(updateChannelMap.values());
updateChannelMap.clear();
deviceChannelService.batchUpdateChannel(deviceChannels);
if (device.isAutoSyncChannel()) {
commonGbChannelService.updateChannelFromGb28181DeviceInList(device, deviceChannels);
}
}
}
private void executeSaveForAdd(){
if (addChannelMap.values().size() > 0) {
private void executeSaveForAdd(Device device){
if (!addChannelMap.values().isEmpty()) {
ArrayList<DeviceChannel> deviceChannels = new ArrayList<>(addChannelMap.values());
addChannelMap.clear();
deviceChannelService.batchAddChannel(deviceChannels);
if (device.isAutoSyncChannel()) {
commonGbChannelService.addChannelFromGb28181DeviceInList(device, deviceChannels);
}
}
}
private void executeSaveForDelete(){
if (deleteChannelList.size() > 0) {
private void executeSaveForDelete(Device device){
if (!deleteChannelList.isEmpty()) {
deviceChannelService.deleteChannels(deleteChannelList);
if (device.isAutoSyncChannel()) {
commonGbChannelService.deleteGbChannelsFromList(deleteChannelList);
}
deleteChannelList.clear();
}
}
private void executeSaveForOnline(){
if (updateChannelOnlineList.size() > 0) {
private void executeSaveForOnline(Device device){
if (!updateChannelOnlineList.isEmpty()) {
deviceChannelService.channelsOnline(updateChannelOnlineList);
if (device.isAutoSyncChannel()) {
commonGbChannelService.channelsOnlineFromList(deleteChannelList);
}
updateChannelOnlineList.clear();
}
}
private void executeSaveForOffline(){
if (updateChannelOfflineList.size() > 0) {
private void executeSaveForOffline(Device device){
if (!updateChannelOfflineList.isEmpty()) {
deviceChannelService.channelsOffline(updateChannelOfflineList);
if (device.isAutoSyncChannel()) {
commonGbChannelService.channelsOfflineFromList(deleteChannelList);
}
updateChannelOfflineList.clear();
}
}

View File

@ -9,6 +9,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessag
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import gov.nist.javax.sip.message.SIPRequest;
import org.dom4j.DocumentException;
@ -49,6 +50,9 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
@Autowired
private IVideoManagerStorage storager;
@Autowired
private IDeviceChannelService deviceChannelService;
@Autowired
private CatalogDataCatch catalogDataCatch;
@ -136,7 +140,7 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
if (catalogDataCatch.get(take.getDevice().getDeviceId()).size() == sumNum) {
// 数据已经完整接收, 此时可能存在某个设备离线变上线的情况,但是考虑到性能,此处不做处理,
// 目前支持设备通道上线通知时和设备上线时向上级通知
boolean resetChannelsResult = storager.resetChannels(take.getDevice().getDeviceId(), catalogDataCatch.get(take.getDevice().getDeviceId()));
boolean resetChannelsResult = deviceChannelService.resetChannels(take.getDevice(), catalogDataCatch.get(take.getDevice().getDeviceId()));
if (!resetChannelsResult) {
String errorMsg = "接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(take.getDevice().getDeviceId()).size() + "条";
catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), errorMsg);

View File

@ -1,8 +1,10 @@
package com.genersoft.iot.vmp.service;
import com.genersoft.iot.vmp.common.CommonGbChannel;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import java.util.ArrayList;
import java.util.List;
public interface ICommonGbChannelService {
@ -25,9 +27,21 @@ public interface ICommonGbChannelService {
* @param gbDeviceId
* @param syncKeys
*/
boolean SyncChannelFromGb28181Device(String gbDeviceId, List<String> syncKeys, Boolean syncGroup, Boolean syncRegion);
boolean syncChannelFromGb28181Device(String gbDeviceId, List<String> syncKeys, Boolean syncGroup, Boolean syncRegion);
CommonGbChannel getCommonChannelFromDeviceChannel(DeviceChannel deviceChannel, List<String> syncKeys);
List<CommonGbChannel> getChannelsInRegion(String civilCode);
List<CommonGbChannel> getChannelsInBusinessGroup(String businessGroupID);
void updateChannelFromGb28181DeviceInList(Device device, List<DeviceChannel> deviceChannels);
void addChannelFromGb28181DeviceInList(Device device, List<DeviceChannel> deviceChannels);
void deleteGbChannelsFromList(List<DeviceChannel> deleteChannelList);
void channelsOnlineFromList(List<DeviceChannel> deleteChannelList);
void channelsOfflineFromList(List<DeviceChannel> deleteChannelList);
}

View File

@ -87,4 +87,9 @@ public interface IDeviceChannelService {
*
*/
void batchAddChannel(List<DeviceChannel> deviceChannels);
/**
*
*/
boolean resetChannels(Device device, List<DeviceChannel> deviceChannelList);
}

View File

@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.service.impl;
import com.genersoft.iot.vmp.common.CommonGbChannel;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.service.ICommonGbChannelService;
import com.genersoft.iot.vmp.service.bean.CommonGbChannelType;
@ -75,7 +76,7 @@ public class CommonGbChannelServiceImpl implements ICommonGbChannelService {
}
@Override
public boolean SyncChannelFromGb28181Device(String gbDeviceId, List<String> syncKeys, Boolean syncGroup, Boolean syncRegion) {
public boolean syncChannelFromGb28181Device(String gbDeviceId, List<String> syncKeys, Boolean syncGroup, Boolean syncRegion) {
logger.info("同步通用通道]来自国标设备,国标编号: {}", gbDeviceId);
List<DeviceChannel> deviceChannels = deviceChannelMapper.queryAllChannels(gbDeviceId);
if (deviceChannels.isEmpty()) {
@ -133,7 +134,8 @@ public class CommonGbChannelServiceImpl implements ICommonGbChannelService {
return result;
}
private CommonGbChannel getCommonChannelFromDeviceChannel(DeviceChannel deviceChannel, List<String> syncKeys) {
@Override
public CommonGbChannel getCommonChannelFromDeviceChannel(DeviceChannel deviceChannel, List<String> syncKeys) {
if (deviceChannel == null) {
return null;
}
@ -278,4 +280,29 @@ public class CommonGbChannelServiceImpl implements ICommonGbChannelService {
public List<CommonGbChannel> getChannelsInBusinessGroup(String businessGroupID) {
return null;
}
@Override
public void updateChannelFromGb28181DeviceInList(Device device, List<DeviceChannel> deviceChannels) {
}
@Override
public void addChannelFromGb28181DeviceInList(Device device, List<DeviceChannel> deviceChannels) {
}
@Override
public void deleteGbChannelsFromList(List<DeviceChannel> deleteChannelList) {
commonGbChannelMapper.deleteByDeviceIDs(deleteChannelList);
}
@Override
public void channelsOnlineFromList(List<DeviceChannel> deleteChannelList) {
}
@Override
public void channelsOfflineFromList(List<DeviceChannel> deleteChannelList) {
}
}

View File

@ -4,12 +4,16 @@ import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.gb28181.utils.Coordtransform;
import com.genersoft.iot.vmp.service.ICommonGbChannelService;
import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper;
import com.genersoft.iot.vmp.storager.dao.DeviceMapper;
import com.genersoft.iot.vmp.storager.dao.PlatformChannelMapper;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce;
@ -17,10 +21,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.interceptor.TransactionAspectSupport;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
/**
@ -40,9 +47,18 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
@Autowired
private DeviceChannelMapper channelMapper;
@Autowired
private PlatformChannelMapper platformChannelMapper;
@Autowired
private DeviceMapper deviceMapper;
@Autowired
EventPublisher eventPublisher;
@Autowired
ICommonGbChannelService commonGbChannelService;
@Override
public DeviceChannel updateGps(DeviceChannel deviceChannel, Device device) {
if (deviceChannel.getLongitude()*deviceChannel.getLatitude() > 0) {
@ -261,5 +277,139 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
}
}
@Override
@Transactional
public boolean resetChannels(Device device, List<DeviceChannel> deviceChannelList) {
if (CollectionUtils.isEmpty(deviceChannelList)) {
return false;
}
List<DeviceChannel> allChannels = channelMapper.queryAllChannels(device.getDeviceId());
Map<String,DeviceChannel> allChannelMap = new ConcurrentHashMap<>();
if (allChannels.size() > 0) {
for (DeviceChannel deviceChannel : allChannels) {
allChannelMap.put(deviceChannel.getChannelId(), deviceChannel);
}
}
// 数据去重
List<DeviceChannel> channels = new ArrayList<>();
List<DeviceChannel> updateChannels = new ArrayList<>();
List<DeviceChannel> addChannels = new ArrayList<>();
StringBuilder stringBuilder = new StringBuilder();
Map<String, Integer> subContMap = new HashMap<>();
// 数据去重
Set<String> gbIdSet = new HashSet<>();
for (DeviceChannel deviceChannel : deviceChannelList) {
if (gbIdSet.contains(deviceChannel.getChannelId())) {
stringBuilder.append(deviceChannel.getChannelId()).append(",");
continue;
}
gbIdSet.add(deviceChannel.getChannelId());
if (allChannelMap.containsKey(deviceChannel.getChannelId())) {
deviceChannel.setStreamId(allChannelMap.get(deviceChannel.getChannelId()).getStreamId());
deviceChannel.setHasAudio(allChannelMap.get(deviceChannel.getChannelId()).isHasAudio());
deviceChannel.setCommonGbChannelId(allChannelMap.get(deviceChannel.getChannelId()).getCommonGbChannelId());
if (allChannelMap.get(deviceChannel.getChannelId()).isStatus() !=deviceChannel.isStatus()){
List<String> strings = platformChannelMapper.queryParentPlatformByChannelId(deviceChannel.getChannelId());
if (!CollectionUtils.isEmpty(strings)){
strings.forEach(platformId->{
eventPublisher.catalogEventPublish(platformId, deviceChannel, deviceChannel.isStatus()? CatalogEvent.ON:CatalogEvent.OFF);
});
}
}
deviceChannel.setUpdateTime(DateUtil.getNow());
updateChannels.add(deviceChannel);
}else {
deviceChannel.setCreateTime(DateUtil.getNow());
deviceChannel.setUpdateTime(DateUtil.getNow());
addChannels.add(deviceChannel);
}
channels.add(deviceChannel);
if (!ObjectUtils.isEmpty(deviceChannel.getParentId())) {
if (subContMap.get(deviceChannel.getParentId()) == null) {
subContMap.put(deviceChannel.getParentId(), 1);
}else {
Integer count = subContMap.get(deviceChannel.getParentId());
subContMap.put(deviceChannel.getParentId(), count++);
}
}
}
if (channels.size() > 0) {
for (DeviceChannel channel : channels) {
if (subContMap.get(channel.getChannelId()) != null){
Integer count = subContMap.get(channel.getChannelId());
if (count > 0) {
channel.setSubCount(count);
channel.setParental(1);
}
}
}
}
if (stringBuilder.length() > 0) {
logger.info("[目录查询]收到的数据存在重复: {}" , stringBuilder);
}
if(CollectionUtils.isEmpty(channels)){
logger.info("通道重设,数据为空={}" , deviceChannelList);
return false;
}
try {
int limitCount = 50;
int cleanChannelsResult = 0;
if (channels.size() > limitCount) {
for (int i = 0; i < channels.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > channels.size()) {
toIndex = channels.size();
}
cleanChannelsResult += channelMapper.cleanChannelsNotInList(device.getDeviceId(), channels.subList(i, toIndex));
}
} else {
cleanChannelsResult = channelMapper.cleanChannelsNotInList(device.getDeviceId(), channels);
}
boolean result = cleanChannelsResult < 0;
if (!result && addChannels.size() > 0) {
if (addChannels.size() > limitCount) {
for (int i = 0; i < addChannels.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > addChannels.size()) {
toIndex = addChannels.size();
}
result = result || channelMapper.batchAdd(addChannels.subList(i, toIndex)) < 0;
}
}else {
result = result || channelMapper.batchAdd(addChannels) < 0;
}
}
if (!result && updateChannels.size() > 0) {
if (updateChannels.size() > limitCount) {
for (int i = 0; i < updateChannels.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > updateChannels.size()) {
toIndex = updateChannels.size();
}
result = result || channelMapper.batchUpdate(updateChannels.subList(i, toIndex)) < 0;
}
}else {
result = result || channelMapper.batchUpdate(updateChannels) < 0;
}
}
if (result) {
//事务回滚
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
}
if (device.isAutoSyncChannel()) {
commonGbChannelService.syncChannelFromGb28181Device(device.getDeviceId(), null, true, true);
}
return true;
}catch (Exception e) {
logger.error("未处理的异常 ", e);
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
return false;
}
}
}

View File

@ -323,13 +323,6 @@ public interface IVideoManagerStorage {
*/
StreamProxyItem getStreamProxyByAppAndStream(String app, String streamId);
/**
* catlog
* @param deviceId
* @param deviceChannelList
*/
boolean resetChannels(String deviceId, List<DeviceChannel> deviceChannelList);
/**
*
* @param platformId

View File

@ -288,4 +288,5 @@ public interface CommonGbChannelMapper {
"<foreach collection='clearChannels' item='item' open='(' separator=',' close=')' > #{item.commonGbChannelId}</foreach>" +
"</script>")
int deleteByDeviceIDs(List<DeviceChannel> clearChannels);
}

View File

@ -108,138 +108,6 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage {
return deviceMapper.getDeviceByDeviceId(deviceId) != null;
}
@Override
public boolean resetChannels(String deviceId, List<DeviceChannel> deviceChannelList) {
if (CollectionUtils.isEmpty(deviceChannelList)) {
return false;
}
List<DeviceChannel> allChannels = deviceChannelMapper.queryAllChannels(deviceId);
Map<String,DeviceChannel> allChannelMap = new ConcurrentHashMap<>();
if (allChannels.size() > 0) {
for (DeviceChannel deviceChannel : allChannels) {
allChannelMap.put(deviceChannel.getChannelId(), deviceChannel);
}
}
TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
// 数据去重
List<DeviceChannel> channels = new ArrayList<>();
List<DeviceChannel> updateChannels = new ArrayList<>();
List<DeviceChannel> addChannels = new ArrayList<>();
StringBuilder stringBuilder = new StringBuilder();
Map<String, Integer> subContMap = new HashMap<>();
// 数据去重
Set<String> gbIdSet = new HashSet<>();
for (DeviceChannel deviceChannel : deviceChannelList) {
if (gbIdSet.contains(deviceChannel.getChannelId())) {
stringBuilder.append(deviceChannel.getChannelId()).append(",");
continue;
}
gbIdSet.add(deviceChannel.getChannelId());
if (allChannelMap.containsKey(deviceChannel.getChannelId())) {
deviceChannel.setStreamId(allChannelMap.get(deviceChannel.getChannelId()).getStreamId());
deviceChannel.setHasAudio(allChannelMap.get(deviceChannel.getChannelId()).isHasAudio());
if (allChannelMap.get(deviceChannel.getChannelId()).isStatus() !=deviceChannel.isStatus()){
List<String> strings = platformChannelMapper.queryParentPlatformByChannelId(deviceChannel.getChannelId());
if (!CollectionUtils.isEmpty(strings)){
strings.forEach(platformId->{
eventPublisher.catalogEventPublish(platformId, deviceChannel, deviceChannel.isStatus()?CatalogEvent.ON:CatalogEvent.OFF);
});
}
}
deviceChannel.setUpdateTime(DateUtil.getNow());
updateChannels.add(deviceChannel);
}else {
deviceChannel.setCreateTime(DateUtil.getNow());
deviceChannel.setUpdateTime(DateUtil.getNow());
addChannels.add(deviceChannel);
}
channels.add(deviceChannel);
if (!ObjectUtils.isEmpty(deviceChannel.getParentId())) {
if (subContMap.get(deviceChannel.getParentId()) == null) {
subContMap.put(deviceChannel.getParentId(), 1);
}else {
Integer count = subContMap.get(deviceChannel.getParentId());
subContMap.put(deviceChannel.getParentId(), count++);
}
}
}
if (channels.size() > 0) {
for (DeviceChannel channel : channels) {
if (subContMap.get(channel.getChannelId()) != null){
Integer count = subContMap.get(channel.getChannelId());
if (count > 0) {
channel.setSubCount(count);
channel.setParental(1);
}
}
}
}
if (stringBuilder.length() > 0) {
logger.info("[目录查询]收到的数据存在重复: {}" , stringBuilder);
}
if(CollectionUtils.isEmpty(channels)){
logger.info("通道重设,数据为空={}" , deviceChannelList);
return false;
}
try {
int limitCount = 50;
int cleanChannelsResult = 0;
if (channels.size() > limitCount) {
for (int i = 0; i < channels.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > channels.size()) {
toIndex = channels.size();
}
cleanChannelsResult += this.deviceChannelMapper.cleanChannelsNotInList(deviceId, channels.subList(i, toIndex));
}
} else {
cleanChannelsResult = this.deviceChannelMapper.cleanChannelsNotInList(deviceId, channels);
}
boolean result = cleanChannelsResult < 0;
if (!result && addChannels.size() > 0) {
if (addChannels.size() > limitCount) {
for (int i = 0; i < addChannels.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > addChannels.size()) {
toIndex = addChannels.size();
}
result = result || deviceChannelMapper.batchAdd(addChannels.subList(i, toIndex)) < 0;
}
}else {
result = result || deviceChannelMapper.batchAdd(addChannels) < 0;
}
}
if (!result && updateChannels.size() > 0) {
if (updateChannels.size() > limitCount) {
for (int i = 0; i < updateChannels.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > updateChannels.size()) {
toIndex = updateChannels.size();
}
result = result || deviceChannelMapper.batchUpdate(updateChannels.subList(i, toIndex)) < 0;
}
}else {
result = result || deviceChannelMapper.batchUpdate(updateChannels) < 0;
}
}
if (result) {
//事务回滚
dataSourceTransactionManager.rollback(transactionStatus);
}
dataSourceTransactionManager.commit(transactionStatus); //手动提交
return true;
}catch (Exception e) {
logger.error("未处理的异常 ", e);
dataSourceTransactionManager.rollback(transactionStatus);
return false;
}
}
@Override
public void deviceChannelOnline(String deviceId, String channelId) {

View File

@ -67,6 +67,6 @@ public class CommonChannelController {
System.out.println("syncKeys===" + Arrays.toString(syncKeys));
System.out.println("syncGroup===" + syncGroup);
System.out.println("syncRegion===" + syncRegion);
return commonGbChannelService.SyncChannelFromGb28181Device(deviceId, Lists.newArrayList(syncKeys), syncGroup, syncRegion);
return commonGbChannelService.syncChannelFromGb28181Device(deviceId, Lists.newArrayList(syncKeys), syncGroup, syncRegion);
}
}