优化通道同步添加对SN的判断,精简代码

pull/437/head
648540858 2022-04-17 19:48:05 +08:00
parent 25102229f6
commit 0dc1807f62
11 changed files with 121 additions and 106 deletions

View File

@ -4,6 +4,7 @@ import java.util.Date;
import java.util.List;
public class CatalogData {
private int sn; // 命令序列号
private int total;
private List<DeviceChannel> channelList;
private Date lastTime;
@ -15,6 +16,15 @@ public class CatalogData {
}
private CatalogDataStatus status;
public int getSn() {
return sn;
}
public void setSn(int sn) {
this.sn = sn;
}
public int getTotal() {
return total;
}

View File

@ -54,6 +54,7 @@ public class OnlineEventListener implements ApplicationListener<OnlineEvent> {
@Autowired
private SIPCommander cmder;
private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Override
@ -76,7 +77,7 @@ public class OnlineEventListener implements ApplicationListener<OnlineEvent> {
if (deviceInStore == null) { //第一次上线
logger.info("[{}] 首次注册,查询设备信息以及通道信息", device.getDeviceId());
cmder.deviceInfoQuery(device);
cmder.catalogQuery(device, null);
deviceService.sync(device);
}
break;
// 设备主动发送心跳触发的在线事件

View File

@ -26,28 +26,35 @@ public class CatalogDataCatch {
@Autowired
private IVideoManagerStorage storager;
public void addReady(String key) {
CatalogData catalogData = data.get(key);
public void addReady(Device device, int sn ) {
CatalogData catalogData = data.get(device.getDeviceId());
if (catalogData == null || catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end)) {
catalogData = new CatalogData();
catalogData.setChannelList(new ArrayList<>());
catalogData.setDevice(device);
catalogData.setSn(sn);
catalogData.setStatus(CatalogData.CatalogDataStatus.ready);
catalogData.setLastTime(new Date(System.currentTimeMillis()));
data.put(key, catalogData);
data.put(device.getDeviceId(), catalogData);
}
}
public void put(String key, int total, Device device, List<DeviceChannel> deviceChannelList) {
CatalogData catalogData = data.get(key);
public void put(String deviceId, int sn, int total, Device device, List<DeviceChannel> deviceChannelList) {
CatalogData catalogData = data.get(deviceId);
if (catalogData == null) {
catalogData = new CatalogData();
catalogData.setSn(sn);
catalogData.setTotal(total);
catalogData.setDevice(device);
catalogData.setChannelList(new ArrayList<>());
catalogData.setStatus(CatalogData.CatalogDataStatus.runIng);
catalogData.setLastTime(new Date(System.currentTimeMillis()));
data.put(key, catalogData);
data.put(deviceId, catalogData);
}else {
// 同一个设备的通道同步请求只考虑一个,其他的直接忽略
if (catalogData.getSn() != sn) {
return;
}
catalogData.setTotal(total);
catalogData.setDevice(device);
catalogData.setStatus(CatalogData.CatalogDataStatus.runIng);
@ -56,20 +63,20 @@ public class CatalogDataCatch {
}
}
public List<DeviceChannel> get(String key) {
CatalogData catalogData = data.get(key);
public List<DeviceChannel> get(String deviceId) {
CatalogData catalogData = data.get(deviceId);
if (catalogData == null) return null;
return catalogData.getChannelList();
}
public int getTotal(String key) {
CatalogData catalogData = data.get(key);
public int getTotal(String deviceId) {
CatalogData catalogData = data.get(deviceId);
if (catalogData == null) return 0;
return catalogData.getTotal();
}
public SyncStatus getSyncStatus(String key) {
CatalogData catalogData = data.get(key);
public SyncStatus getSyncStatus(String deviceId) {
CatalogData catalogData = data.get(deviceId);
if (catalogData == null) return null;
SyncStatus syncStatus = new SyncStatus();
syncStatus.setCurrent(catalogData.getChannelList().size());
@ -78,10 +85,6 @@ public class CatalogDataCatch {
return syncStatus;
}
public void del(String key) {
data.remove(key);
}
@Scheduled(fixedRate = 5 * 1000) //每5秒执行一次, 发现数据5秒未更新则移除数据并认为数据接收超时
private void timerTask(){
Set<String> keys = data.keySet();
@ -92,23 +95,30 @@ public class CatalogDataCatch {
Calendar calendarBefore30S = Calendar.getInstance();
calendarBefore30S.setTime(new Date());
calendarBefore30S.set(Calendar.SECOND, calendarBefore30S.get(Calendar.SECOND) - 30);
for (String key : keys) {
CatalogData catalogData = data.get(key);
if (catalogData.getLastTime().before(calendarBefore5S.getTime())) { // 超过五秒收不到消息任务超时, 只更新这一部分数据
storager.resetChannels(catalogData.getDevice().getDeviceId(), catalogData.getChannelList());
String errorMsg = "更新成功,共" + catalogData.getTotal() + "条,已更新" + catalogData.getChannelList().size() + "条";
for (String deviceId : keys) {
CatalogData catalogData = data.get(deviceId);
if ( catalogData.getLastTime().before(calendarBefore5S.getTime())) { // 超过五秒收不到消息任务超时, 只更新这一部分数据
if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.runIng)) {
storager.resetChannels(catalogData.getDevice().getDeviceId(), catalogData.getChannelList());
if (catalogData.getTotal() != catalogData.getChannelList().size()) {
String errorMsg = "更新成功,共" + catalogData.getTotal() + "条,已更新" + catalogData.getChannelList().size() + "条";
catalogData.setErrorMsg(errorMsg);
}
}else if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.ready)) {
String errorMsg = "同步失败,等待回复超时";
catalogData.setErrorMsg(errorMsg);
}
catalogData.setStatus(CatalogData.CatalogDataStatus.end);
catalogData.setErrorMsg(errorMsg);
}
if (catalogData.getLastTime().before(calendarBefore30S.getTime())) { // 超过三十秒如果标记为end则删除
data.remove(key);
if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end) && catalogData.getLastTime().before(calendarBefore30S.getTime())) { // 超过三十秒如果标记为end则删除
data.remove(deviceId);
}
}
}
public void setChannelSyncEnd(String key, String errorMsg) {
CatalogData catalogData = data.get(key);
public void setChannelSyncEnd(String deviceId, String errorMsg) {
CatalogData catalogData = data.get(deviceId);
if (catalogData == null)return;
catalogData.setStatus(CatalogData.CatalogDataStatus.end);
catalogData.setErrorMsg(errorMsg);

View File

@ -250,7 +250,7 @@ public interface ISIPCommander {
*
* @param device
*/
boolean catalogQuery(Device device, SipSubscribe.Event errorEvent);
boolean catalogQuery(Device device, int sn, SipSubscribe.Event errorEvent);
/**
*

View File

@ -1208,14 +1208,14 @@ public class SIPCommander implements ISIPCommander {
* @param device
*/
@Override
public boolean catalogQuery(Device device, SipSubscribe.Event errorEvent) {
public boolean catalogQuery(Device device, int sn, SipSubscribe.Event errorEvent) {
try {
StringBuffer catalogXml = new StringBuffer(200);
String charset = device.getCharset();
catalogXml.append("<?xml version=\"1.0\" encoding=\"" + charset + "\"?>\r\n");
catalogXml.append("<Query>\r\n");
catalogXml.append("<CmdType>Catalog</CmdType>\r\n");
catalogXml.append("<SN>" + (int)((Math.random()*9+1)*100000) + "</SN>\r\n");
catalogXml.append("<SN>" + sn + "</SN>\r\n");
catalogXml.append("<DeviceID>" + device.getDeviceId() + "</DeviceID>\r\n");
catalogXml.append("</Query>\r\n");

View File

@ -86,23 +86,17 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
rootElement = getRootElement(evt, device.getCharset());
Element deviceListElement = rootElement.element("DeviceList");
Element sumNumElement = rootElement.element("SumNum");
if (sumNumElement == null || deviceListElement == null) {
Element snElement = rootElement.element("SN");
if (snElement == null || sumNumElement == null || deviceListElement == null) {
responseAck(evt, Response.BAD_REQUEST, "xml error");
return;
}
int sumNum = Integer.parseInt(sumNumElement.getText());
if (sumNum == 0) {
// 数据已经完整接收
storager.cleanChannelsForDevice(device.getDeviceId());
RequestMessage msg = new RequestMessage();
msg.setKey(key);
WVPResult<Object> result = new WVPResult<>();
result.setCode(0);
result.setData(device);
msg.setData(result);
result.setMsg("更新成功共0条");
deferredResultHolder.invokeAllResult(msg);
catalogDataCatch.del(key);
catalogDataCatch.setChannelSyncEnd(device.getDeviceId(), null);
}else {
Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
if (deviceListIterator != null) {
@ -123,24 +117,18 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
channelList.add(deviceChannel);
}
int sn = Integer.parseInt(snElement.getText());
logger.info("收到来自设备【{}】的通道: {}个,{}/{}", device.getDeviceId(), channelList.size(), catalogDataCatch.get(key) == null ? 0 :catalogDataCatch.get(key).size(), sumNum);
catalogDataCatch.put(key, sumNum, device, channelList);
if (catalogDataCatch.get(key).size() == sumNum) {
catalogDataCatch.put(device.getDeviceId(), sn, sumNum, device, channelList);
if (catalogDataCatch.get(device.getDeviceId()).size() == sumNum) {
// 数据已经完整接收
boolean resetChannelsResult = storager.resetChannels(device.getDeviceId(), catalogDataCatch.get(key));
RequestMessage msg = new RequestMessage();
msg.setKey(key);
WVPResult<Object> result = new WVPResult<>();
result.setCode(0);
result.setData(device);
if (resetChannelsResult || sumNum ==0) {
result.setMsg("更新成功,共" + sumNum + "条,已更新" + catalogDataCatch.get(key).size() + "条");
boolean resetChannelsResult = storager.resetChannels(device.getDeviceId(), catalogDataCatch.get(device.getDeviceId()));
if (!resetChannelsResult) {
String errorMsg = "接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(device.getDeviceId()).size() + "条";
catalogDataCatch.setChannelSyncEnd(device.getDeviceId(), errorMsg);
}else {
result.setMsg("接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(key).size() + "条");
catalogDataCatch.setChannelSyncEnd(device.getDeviceId(), null);
}
msg.setData(result);
deferredResultHolder.invokeAllResult(msg);
catalogDataCatch.del(key);
}
}
// 回复200 OK
@ -228,21 +216,18 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
}
public SyncStatus getChannelSyncProgress(String deviceId) {
String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + deviceId;
if (catalogDataCatch.get(key) == null) {
if (catalogDataCatch.get(deviceId) == null) {
return null;
}else {
return catalogDataCatch.getSyncStatus(key);
return catalogDataCatch.getSyncStatus(deviceId);
}
}
public void setChannelSyncReady(String deviceId) {
String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + deviceId;
catalogDataCatch.addReady(key);
public void setChannelSyncReady(Device device, int sn) {
catalogDataCatch.addReady(device, sn);
}
public void setChannelSyncEnd(String deviceId, String errorMsg) {
String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + deviceId;
catalogDataCatch.setChannelSyncEnd(key, errorMsg);
catalogDataCatch.setChannelSyncEnd(deviceId, errorMsg);
}
}

View File

@ -44,15 +44,8 @@ public interface IDeviceService {
SyncStatus getChannelSyncStatus(String deviceId);
/**
*
* @param deviceId ID
*
* @param device
*/
void setChannelSyncReady(String deviceId);
/**
*
* @param deviceId ID
* @param errorMsg
*/
void setChannelSyncEnd(String deviceId, String errorMsg);
void sync(Device device);
}

View File

@ -100,12 +100,16 @@ public class DeviceServiceImpl implements IDeviceService {
}
@Override
public void setChannelSyncReady(String deviceId) {
catalogResponseMessageHandler.setChannelSyncReady(deviceId);
}
@Override
public void setChannelSyncEnd(String deviceId, String errorMsg) {
catalogResponseMessageHandler.setChannelSyncEnd(deviceId, errorMsg);
public void sync(Device device) {
if (catalogResponseMessageHandler.getChannelSyncProgress(device.getDeviceId()) != null) {
logger.info("开启同步时发现同步已经存在");
return;
}
int sn = (int)((Math.random()*9+1)*100000);
catalogResponseMessageHandler.setChannelSyncReady(device, sn);
sipCommander.catalogQuery(device, sn, event -> {
String errorMsg = String.format("同步通道失败,错误码: %s, %s", event.statusCode, event.msg);
catalogResponseMessageHandler.setChannelSyncEnd(device.getDeviceId(), errorMsg);
});
}
}

View File

@ -238,12 +238,15 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage {
@Override
public boolean resetChannels(String deviceId, List<DeviceChannel> deviceChannelList) {
if (deviceChannelList == null) {
return false;
}
TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
// 数据去重
List<DeviceChannel> channels = new ArrayList<>();
StringBuilder stringBuilder = new StringBuilder();
Map<String, Integer> subContMap = new HashMap<>();
if (deviceChannelList.size() > 1) {
if (deviceChannelList != null && deviceChannelList.size() > 1) {
// 数据去重
Set<String> gbIdSet = new HashSet<>();
for (DeviceChannel deviceChannel : deviceChannelList) {
@ -300,6 +303,7 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage {
dataSourceTransactionManager.commit(transactionStatus); //手动提交
return true;
}catch (Exception e) {
e.printStackTrace();
dataSourceTransactionManager.rollback(transactionStatus);
return false;
}
@ -415,10 +419,9 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage {
TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
boolean result = false;
try {
if (platformChannelMapper.delChannelForDeviceId(deviceId) <0 // 删除与国标平台的关联
|| deviceChannelMapper.cleanChannelsByDeviceId(deviceId) < 0 // 删除他的通道
|| deviceMapper.del(deviceId) < 0 // 移除设备信息
) {
platformChannelMapper.delChannelForDeviceId(deviceId);
deviceChannelMapper.cleanChannelsByDeviceId(deviceId);
if ( deviceMapper.del(deviceId) < 0 ) {
//事务回滚
dataSourceTransactionManager.rollback(transactionStatus);
}

View File

@ -172,12 +172,8 @@ public class DeviceQuery {
wvpResult.setData(syncStatus);
return wvpResult;
}
SyncStatus syncStatusReady = new SyncStatus();
deviceService.setChannelSyncReady(deviceId);
cmder.catalogQuery(device, event -> {
String errorMsg = String.format("同步通道失败,错误码: %s, %s", event.statusCode, event.msg);
deviceService.setChannelSyncEnd(deviceId, errorMsg);
});
deviceService.sync(device);
WVPResult<SyncStatus> wvpResult = new WVPResult<>();
wvpResult.setCode(0);
wvpResult.setMsg("开始同步");

View File

@ -61,23 +61,36 @@ export default {
if (!this.syncFlag) {
this.syncFlag = true;
}
if (res.data.data == null) {
this.syncStatus = "success"
this.percentage = 100;
this.msg = '同步成功';
}else if (res.data.data.total == 0){
this.msg = `等待同步中`;
this.timmer = setTimeout(this.getProgress, 300)
}else if (res.data.data.errorMsg !== null ){
this.msg = res.data.data.errorMsg;
this.syncStatus = "exception"
}else {
this.total = res.data.data.total;
this.current = res.data.data.current;
this.percentage = Math.floor(Number(res.data.data.current)/Number(res.data.data.total)* 10000)/100;
this.msg = `同步中...[${res.data.data.current}/${res.data.data.total}]`;
this.timmer = setTimeout(this.getProgress, 300)
if (res.data.data != null) {
if (res.data.data.total == 0) {
if (res.data.data.errorMsg !== null ){
this.msg = res.data.data.errorMsg;
this.syncStatus = "exception"
}else {
this.msg = `等待同步中`;
this.timmer = setTimeout(this.getProgress, 300)
}
}else {
if (res.data.data.total == res.data.data.current) {
this.syncStatus = "success"
this.percentage = 100;
this.msg = '同步成功';
}else {
if (res.data.data.errorMsg !== null ){
this.msg = res.data.data.errorMsg;
this.syncStatus = "exception"
}else {
this.total = res.data.data.total;
this.current = res.data.data.current;
this.percentage = Math.floor(Number(res.data.data.current)/Number(res.data.data.total)* 10000)/100;
this.msg = `同步中...[${res.data.data.current}/${res.data.data.total}]`;
this.timmer = setTimeout(this.getProgress, 300)
}
}
}
}
}else {
if (this.syncFlag) {
this.syncStatus = "success"