优化预置位查询

结构优化
648540858 2023-12-21 13:16:03 +08:00
parent 52c56bfdf2
commit 35fd5eee63
6 changed files with 121 additions and 75 deletions

View File

@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.bean;
import java.time.Instant; import java.time.Instant;
import java.util.List; import java.util.List;
import java.util.Map;
/** /**
* *
@ -13,7 +14,7 @@ public class PresetData {
*/ */
private int sn; private int sn;
private int total; private int total;
private List<PresetItem> presetItemList; private Map<Integer, PresetItem> presetItems;
private Instant lastTime; private Instant lastTime;
private String errorMsg; private String errorMsg;
@ -36,12 +37,12 @@ public class PresetData {
this.total = total; this.total = total;
} }
public List<PresetItem> getPresetItemList() { public Map<Integer, PresetItem> getPresetItems() {
return presetItemList; return presetItems;
} }
public void setPresetItemList(List<PresetItem> presetItemList) { public void setPresetItems(Map<Integer, PresetItem> presetItems) {
this.presetItemList = presetItemList; this.presetItems = presetItems;
} }
public Instant getLastTime() { public Instant getLastTime() {

View File

@ -20,4 +20,6 @@ public class PresetItem {
public void setPresetName(String presetName) { public void setPresetName(String presetName) {
this.presetName = presetName; this.presetName = presetName;
} }
} }

View File

@ -3,7 +3,10 @@ package com.genersoft.iot.vmp.gb28181.session;
import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.service.IDeviceChannelService; import com.genersoft.iot.vmp.service.IDeviceChannelService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -19,6 +22,8 @@ import java.util.concurrent.TimeUnit;
@Component @Component
public class PresetDataCatch { public class PresetDataCatch {
private final Logger logger = LoggerFactory.getLogger(PresetDataCatch.class);
public static Map<Integer, PresetData> data = new ConcurrentHashMap<>(); public static Map<Integer, PresetData> data = new ConcurrentHashMap<>();
@ -30,7 +35,7 @@ public class PresetDataCatch {
PresetData presetData = data.get(sn); PresetData presetData = data.get(sn);
if (presetData == null || presetData.getStatus().equals(DataStatus.end)) { if (presetData == null || presetData.getStatus().equals(DataStatus.end)) {
presetData = new PresetData(); presetData = new PresetData();
presetData.setPresetItemList(Collections.synchronizedList(new ArrayList<>())); presetData.setPresetItems(new ConcurrentHashMap<>());
presetData.setSn(sn); presetData.setSn(sn);
presetData.setStatus(DataStatus.ready); presetData.setStatus(DataStatus.ready);
presetData.setLastTime(Instant.now()); presetData.setLastTime(Instant.now());
@ -44,7 +49,7 @@ public class PresetDataCatch {
presetData = new PresetData(); presetData = new PresetData();
presetData.setSn(sn); presetData.setSn(sn);
presetData.setTotal(total); presetData.setTotal(total);
presetData.setPresetItemList(presetItemList); presetData.setPresetItems(new ConcurrentHashMap<>());
presetData.setStatus(DataStatus.runIng); presetData.setStatus(DataStatus.runIng);
presetData.setLastTime(Instant.now()); presetData.setLastTime(Instant.now());
data.put(sn, presetData); data.put(sn, presetData);
@ -55,9 +60,22 @@ public class PresetDataCatch {
} }
presetData.setTotal(total); presetData.setTotal(total);
presetData.setStatus(DataStatus.runIng); presetData.setStatus(DataStatus.runIng);
presetData.getPresetItemList().addAll(presetItemList);
presetData.setLastTime(Instant.now()); presetData.setLastTime(Instant.now());
} }
if (!presetItemList.isEmpty()) {
for (PresetItem presetItem : presetItemList) {
presetData.getPresetItems().put(presetItem.getPresetID(), presetItem);
}
}
// presetData.getPresetItems().sort((a, b) ->{
// if (a.getPresetID() > b.getPresetID()) {
// return 1;
// }else if (a.getPresetID() < b.getPresetID()) {
// return -1;
// }else {
// return 0;
// }
// });
} }
public List<PresetItem> get(int sn) { public List<PresetItem> get(int sn) {
@ -65,7 +83,7 @@ public class PresetDataCatch {
if (presetData == null) { if (presetData == null) {
return null; return null;
} }
return presetData.getPresetItemList(); return new ArrayList<>(presetData.getPresetItems().values());
} }
public int getTotal(int sn) { public int getTotal(int sn) {
@ -82,7 +100,7 @@ public class PresetDataCatch {
return null; return null;
} }
SyncStatus syncStatus = new SyncStatus(); SyncStatus syncStatus = new SyncStatus();
syncStatus.setCurrent(presetData.getPresetItemList().size()); syncStatus.setCurrent(presetData.getPresetItems().size());
syncStatus.setTotal(presetData.getTotal()); syncStatus.setTotal(presetData.getTotal());
syncStatus.setErrorMsg(presetData.getErrorMsg()); syncStatus.setErrorMsg(presetData.getErrorMsg());
if (presetData.getStatus().equals(DataStatus.end)) { if (presetData.getStatus().equals(DataStatus.end)) {
@ -112,14 +130,15 @@ public class PresetDataCatch {
PresetData presetData = data.get(sn); PresetData presetData = data.get(sn);
String key = DeferredResultHolder.CALLBACK_CMD_PRESETQUERY + sn; String key = DeferredResultHolder.CALLBACK_CMD_PRESETQUERY + sn;
if ( presetData.getLastTime().isBefore(instantBefore5S)) { if ( presetData.getLastTime().isBefore(instantBefore5S)) {
logger.info("[预置位接收等待超时] 直接返回已经收到的数据, {}/{}", presetData.getPresetItems().size(), presetData.getTotal());
// 超过五秒收不到消息任务超时, 只更新这一部分数据, 收到数据与声明的总数一致,则重置通道数据,数据不全则只对收到的数据做更新操作 // 超过五秒收不到消息任务超时, 只更新这一部分数据, 收到数据与声明的总数一致,则重置通道数据,数据不全则只对收到的数据做更新操作
if (presetData.getStatus().equals(DataStatus.runIng)) { if (presetData.getStatus().equals(DataStatus.runIng)) {
RequestMessage requestMessage = new RequestMessage(); RequestMessage requestMessage = new RequestMessage();
requestMessage.setKey(key); requestMessage.setKey(key);
requestMessage.setData(presetData.getPresetItemList()); requestMessage.setData(presetData.getPresetItems().values());
deferredResultHolder.invokeAllResult(requestMessage); deferredResultHolder.invokeAllResult(requestMessage);
String errorMsg = "更新成功,共" + presetData.getTotal() + "条,已更新" + presetData.getPresetItemList().size() + "条"; String errorMsg = "更新成功,共" + presetData.getTotal() + "条,已更新" + presetData.getPresetItems().size() + "条";
presetData.setErrorMsg(errorMsg); presetData.setErrorMsg(errorMsg);
}else if (presetData.getStatus().equals(DataStatus.ready)) { }else if (presetData.getStatus().equals(DataStatus.ready)) {
String errorMsg = "同步失败,等待回复超时"; String errorMsg = "同步失败,等待回复超时";

View File

@ -16,6 +16,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException; import javax.sip.InvalidArgumentException;
@ -26,6 +28,8 @@ import java.text.ParseException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText; import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText;
@ -47,6 +51,12 @@ public class PresetQueryResponseMessageHandler extends SIPRequestProcessorParent
@Autowired @Autowired
private PresetDataCatch presetDataCatch; private PresetDataCatch presetDataCatch;
@Qualifier("taskExecutor")
@Autowired
private ThreadPoolTaskExecutor taskExecutor;
private final ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
private AtomicBoolean processing = new AtomicBoolean(false);
@Override @Override
@ -56,46 +66,52 @@ public class PresetQueryResponseMessageHandler extends SIPRequestProcessorParent
@Override @Override
public void handForDevice(RequestEvent evt, Device device, Element element) { public void handForDevice(RequestEvent evt, Device device, Element element) {
SIPRequest request = (SIPRequest) evt.getRequest(); SIPRequest request = (SIPRequest) evt.getRequest();
String deviceId = SipUtils.getUserIdFromFromHeader(request); taskQueue.offer(new HandlerCatchData(evt, device, element));
try { try {
Element rootElement = getRootElement(evt, device.getCharset()); responseAck(request, Response.OK);
if (rootElement == null) {
logger.warn("[ 设备预置位查询应答 ] content cannot be null, {}", evt.getRequest());
try {
responseAck(request, Response.BAD_REQUEST);
} catch (InvalidArgumentException | ParseException | SipException e) { } catch (InvalidArgumentException | ParseException | SipException e) {
logger.error("[命令发送失败] 设备预置位查询应答处理: {}", e.getMessage()); logger.error("[命令发送失败] 设备预置位查询应答处理: {}", e.getMessage());
} }
return; if (processing.compareAndSet(false, true)) {
taskExecutor.execute(() -> {
while (!taskQueue.isEmpty()) {
HandlerCatchData take = taskQueue.poll();
Element rootElement = null;
try {
rootElement = getRootElement(take.getEvt(), take.getDevice().getCharset());
} catch (DocumentException e) {
logger.error("[设备预置位查询回复] xml解析 失败: ", e);
continue;
}
if (rootElement == null) {
logger.warn("[ 设备预置位查询回复 ] content cannot be null, {}", evt.getRequest());
continue;
} }
Element presetListNumElement = rootElement.element("PresetList"); Element presetListNumElement = rootElement.element("PresetList");
String snStr = getText(rootElement, "SN"); if (presetListNumElement == null) {
//该字段可能为通道或则设备的id logger.warn("[ 设备预置位查询回复 ] PresetList cannot be null, {}", evt.getRequest());
String channelId = getText(rootElement, "DeviceID");
if (channelId == null) {
}
String key = DeferredResultHolder.CALLBACK_CMD_PRESETQUERY + deviceId + channelId;
if (snStr == null || presetListNumElement == null) {
try {
responseAck(request, Response.BAD_REQUEST, "xml error");
} catch (InvalidArgumentException | ParseException | SipException e) {
logger.error("[命令发送失败] 设备预置位查询应答处理: {}", e.getMessage());
}
return; return;
} }
int sumNum = Integer.parseInt(presetListNumElement.attributeValue("Num")); String snStr = getText(rootElement, "SN");
if (snStr == null ) {
logger.warn("[ 设备预置位查询回复 ] sn cannot be null, {}", evt.getRequest());
return;
}
String key = DeferredResultHolder.CALLBACK_CMD_PRESETQUERY + snStr;
String totalStr = getText(rootElement, "SumNum");
int sn = Integer.parseInt(snStr); int sn = Integer.parseInt(snStr);
int sumNum = Integer.parseInt(totalStr);
List<PresetItem> presetItems = new ArrayList<>(); List<PresetItem> presetItems = new ArrayList<>();
if (sumNum == 0) { if (sumNum == 0) {
presetDataCatch.setChannelSyncEnd(sn, null ); presetDataCatch.setChannelSyncEnd(sn, null );
}else { }else {
int i = 0,j = 0;
for (Iterator<Element> presetIterator = presetListNumElement.elementIterator(); presetIterator.hasNext(); ) { for (Iterator<Element> presetIterator = presetListNumElement.elementIterator(); presetIterator.hasNext(); ) {
Element itemListElement = presetIterator.next(); Element itemListElement = presetIterator.next();
PresetItem presetItem = new PresetItem(); PresetItem presetItem = new PresetItem();
i++;
for (Iterator<Element> itemListIterator = itemListElement.elementIterator(); itemListIterator.hasNext(); ) { for (Iterator<Element> itemListIterator = itemListElement.elementIterator(); itemListIterator.hasNext(); ) {
// 遍历item // 遍历item
Element itemOne = itemListIterator.next(); Element itemOne = itemListIterator.next();
@ -106,23 +122,24 @@ public class PresetQueryResponseMessageHandler extends SIPRequestProcessorParent
} else { } else {
presetItem.setPresetName(textTrim); presetItem.setPresetName(textTrim);
} }
presetItems.add(presetItem);
j++;
} }
} }
presetDataCatch.put(sn, sumNum, presetItems); presetDataCatch.put(sn, sumNum, presetItems);
if (sumNum == presetDataCatch.get(sn).size()) { if (presetDataCatch.get(sn).size() == sumNum) {
logger.warn("[ 设备预置位查询成功 ] 共{}条", presetDataCatch.get(sn).size());
RequestMessage requestMessage = new RequestMessage(); RequestMessage requestMessage = new RequestMessage();
requestMessage.setKey(key); requestMessage.setKey(key);
requestMessage.setId(sn + "");
requestMessage.setData(presetDataCatch.get(sn)); requestMessage.setData(presetDataCatch.get(sn));
deferredResultHolder.invokeAllResult(requestMessage); deferredResultHolder.invokeResult(requestMessage);
presetDataCatch.setChannelSyncEnd(sn, null);
} }
} }
try {
responseAck(request, Response.OK);
} catch (InvalidArgumentException | ParseException | SipException e) {
logger.error("[命令发送失败] 设备预置位查询应答处理: {}", e.getMessage());
} }
} catch (DocumentException e) { processing.set(false);
logger.error("[解析xml]失败: ", e); });
} }
} }

View File

@ -4,6 +4,7 @@ package com.genersoft.iot.vmp.vmanager.gb28181.ptz;
import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.PresetItem; import com.genersoft.iot.vmp.gb28181.bean.PresetItem;
import com.genersoft.iot.vmp.gb28181.session.PresetDataCatch;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
@ -44,6 +45,9 @@ public class PtzController {
@Autowired @Autowired
private DeferredResultHolder resultHolder; private DeferredResultHolder resultHolder;
@Autowired
private PresetDataCatch presetDataCatch;
/*** /***
* *
* @param deviceId id * @param deviceId id
@ -153,7 +157,7 @@ public class PtzController {
int sn = SipUtils.getNewSn(); int sn = SipUtils.getNewSn();
String msgId = sn + ""; String msgId = sn + "";
String key = DeferredResultHolder.CALLBACK_CMD_PRESETQUERY + sn; String key = DeferredResultHolder.CALLBACK_CMD_PRESETQUERY + sn;
DeferredResult<List<PresetItem>> result = new DeferredResult<> (3 * 1000L); DeferredResult<List<PresetItem>> result = new DeferredResult<> (30 * 1000L);
result.onTimeout(()->{ result.onTimeout(()->{
logger.warn(String.format("获取设备预置位超时")); logger.warn(String.format("获取设备预置位超时"));
// 释放rtpserver // 释放rtpserver
@ -169,6 +173,7 @@ public class PtzController {
} }
resultHolder.put(key, msgId, result); resultHolder.put(key, msgId, result);
try { try {
presetDataCatch.addReady(sn);
cmder.presetQuery(device, channelId, sn, event -> { cmder.presetQuery(device, channelId, sn, event -> {
RequestMessage msg = new RequestMessage(); RequestMessage msg = new RequestMessage();
msg.setId(msgId); msg.setId(msgId);

View File

@ -5,6 +5,7 @@ import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.PresetQuerySipReq; import com.genersoft.iot.vmp.gb28181.bean.PresetQuerySipReq;
import com.genersoft.iot.vmp.gb28181.session.PresetDataCatch;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
@ -46,7 +47,7 @@ public class ApiDeviceController {
@Autowired @Autowired
private SIPCommander cmder; private SIPCommander cmder;
@Autowired @Autowired
private IDeviceService deviceService; private PresetDataCatch presetDataCatch;
@Autowired @Autowired
private DeferredResultHolder resultHolder; private DeferredResultHolder resultHolder;
@ -233,6 +234,7 @@ public class ApiDeviceController {
resultHolder.put(key, msgId, deferredResultEx); resultHolder.put(key, msgId, deferredResultEx);
try { try {
presetDataCatch.addReady(sn);
cmder.presetQuery(device, code, sn, event -> { cmder.presetQuery(device, code, sn, event -> {
RequestMessage msg = new RequestMessage(); RequestMessage msg = new RequestMessage();
msg.setId(msgId); msg.setId(msgId);