From 39199a1daa2d3faf4a874c57039bc771545a4ba7 Mon Sep 17 00:00:00 2001 From: gaofw189 Date: Mon, 13 Feb 2023 10:37:00 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96WVP=E4=BD=9C=E4=B8=BA?= =?UTF-8?q?=E4=B8=8B=E7=BA=A7=E5=B9=B3=E5=8F=B0=E6=9F=A5=E8=AF=A2=E8=AE=BE?= =?UTF-8?q?=E5=A4=87=E5=BD=95=E5=83=8F=E5=88=97=E8=A1=A8=E4=B8=8A=E6=8A=A5?= =?UTF-8?q?=E7=BC=93=E6=85=A2=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/common/VideoManagerConstants.java | 11 ++ .../cmd/RecordInfoResponseMessageHandler.java | 175 +++++++++--------- .../impl/InviteResponseProcessor.java | 2 +- .../com/genersoft/iot/vmp/utils/UJson.java | 150 +++++++++++++++ .../iot/vmp/utils/redis/RedisUtil.java | 2 +- 5 files changed, 249 insertions(+), 91 deletions(-) create mode 100644 src/main/java/com/genersoft/iot/vmp/utils/UJson.java diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java index b92f6b16..f143fad1 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java +++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java @@ -138,4 +138,15 @@ public class VideoManagerConstants { public static final String WVP_STREAM_GB_ID_PREFIX = "memberNo_"; public static final String WVP_STREAM_GPS_MSG_PREFIX = "WVP_STREAM_GPS_MSG_"; + /** + * Redis Const + * 设备录像信息结果前缀 + */ + public static final String REDIS_RECORD_INFO_RES_PRE = "GB_RECORD_INFO_RES_"; + /** + * Redis Const + * 设备录像信息结果前缀 + */ + public static final String REDIS_RECORD_INFO_RES_COUNT_PRE = "GB_RECORD_INFO_RES_COUNT:"; + } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java index 8b4ae2e1..6d8d8f56 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java @@ -1,16 +1,17 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd; +import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; -import com.genersoft.iot.vmp.gb28181.session.RecordDataCatch; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler; import com.genersoft.iot.vmp.utils.DateUtil; +import com.genersoft.iot.vmp.utils.UJson; +import com.genersoft.iot.vmp.utils.redis.RedisUtil; import gov.nist.javax.sip.message.SIPRequest; -import org.dom4j.DocumentException; import org.dom4j.Element; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,11 +27,9 @@ import javax.sip.RequestEvent; import javax.sip.SipException; import javax.sip.message.Response; import java.text.ParseException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; +import java.util.*; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.stream.Collectors; import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText; @@ -48,9 +47,6 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent @Autowired private ResponseMessageHandler responseMessageHandler; - @Autowired - private RecordDataCatch recordDataCatch; - @Autowired private DeferredResultHolder deferredResultHolder; @@ -61,6 +57,8 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent @Autowired private ThreadPoolTaskExecutor taskExecutor; + private Long recordInfoTtl = 1800L; + @Override public void afterPropertiesSet() throws Exception { responseMessageHandler.addHandler(cmdType, this); @@ -68,93 +66,93 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent @Override public void handForDevice(RequestEvent evt, Device device, Element rootElement) { - boolean isEmpty = taskQueue.isEmpty(); try { // 回复200 OK responseAck((SIPRequest) evt.getRequest(), Response.OK); }catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[命令发送失败] 国标级联 国标录像: {}", e.getMessage()); } - taskQueue.offer(new HandlerCatchData(evt, device, rootElement)); - if (isEmpty) { - taskExecutor.execute(()->{ - while (!taskQueue.isEmpty()) { - try { - HandlerCatchData take = taskQueue.poll(); - Element rootElementForCharset = getRootElement(take.getEvt(), take.getDevice().getCharset()); - if (rootElement == null) { - logger.warn("[ 国标录像 ] content cannot be null, {}", evt.getRequest()); - continue; - } - String sn = getText(rootElementForCharset, "SN"); - String channelId = getText(rootElementForCharset, "DeviceID"); - RecordInfo recordInfo = new RecordInfo(); - recordInfo.setChannelId(channelId); - recordInfo.setDeviceId(take.getDevice().getDeviceId()); - recordInfo.setSn(sn); - recordInfo.setName(getText(rootElementForCharset, "Name")); - String sumNumStr = getText(rootElementForCharset, "SumNum"); - int sumNum = 0; - if (!ObjectUtils.isEmpty(sumNumStr)) { - sumNum = Integer.parseInt(sumNumStr); - } - recordInfo.setSumNum(sumNum); - Element recordListElement = rootElementForCharset.element("RecordList"); - if (recordListElement == null || sumNum == 0) { - logger.info("无录像数据"); - int count = recordDataCatch.put(take.getDevice().getDeviceId(),channelId, sn, sumNum, new ArrayList<>()); - recordInfo.setCount(count); - eventPublisher.recordEndEventPush(recordInfo); - releaseRequest(take.getDevice().getDeviceId(), sn); - } else { - Iterator recordListIterator = recordListElement.elementIterator(); - if (recordListIterator != null) { - List recordList = new ArrayList<>(); - // 遍历DeviceList - while (recordListIterator.hasNext()) { - Element itemRecord = recordListIterator.next(); - Element recordElement = itemRecord.element("DeviceID"); - if (recordElement == null) { - logger.info("记录为空,下一个..."); - continue; - } - RecordItem record = new RecordItem(); - record.setDeviceId(getText(itemRecord, "DeviceID")); - record.setName(getText(itemRecord, "Name")); - record.setFilePath(getText(itemRecord, "FilePath")); - record.setFileSize(getText(itemRecord, "FileSize")); - record.setAddress(getText(itemRecord, "Address")); + taskExecutor.execute(()->{ + try { - String startTimeStr = getText(itemRecord, "StartTime"); - record.setStartTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(startTimeStr)); - - String endTimeStr = getText(itemRecord, "EndTime"); - record.setEndTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(endTimeStr)); - - record.setSecrecy(itemRecord.element("Secrecy") == null ? 0 - : Integer.parseInt(getText(itemRecord, "Secrecy"))); - record.setType(getText(itemRecord, "Type")); - record.setRecorderId(getText(itemRecord, "RecorderID")); - recordList.add(record); - } - recordInfo.setRecordList(recordList); - int count = recordDataCatch.put(take.getDevice().getDeviceId(),channelId, sn, sumNum, recordList);recordInfo.setCount(count); - logger.info("[国标录像], {}->{}: {}/{}", take.getDevice().getDeviceId(), sn, count, sumNum); - // 发送消息,如果是上级查询此录像,则会通过这里通知给上级 - eventPublisher.recordEndEventPush(recordInfo); - } - if (recordDataCatch.isComplete(take.getDevice().getDeviceId(), sn)){ - releaseRequest(take.getDevice().getDeviceId(), sn); + String sn = getText(rootElement, "SN"); + String channelId = getText(rootElement, "DeviceID"); + RecordInfo recordInfo = new RecordInfo(); + recordInfo.setChannelId(channelId); + recordInfo.setDeviceId(device.getDeviceId()); + recordInfo.setSn(sn); + recordInfo.setName(getText(rootElement, "Name")); + String sumNumStr = getText(rootElement, "SumNum"); + int sumNum = 0; + if (!ObjectUtils.isEmpty(sumNumStr)) { + sumNum = Integer.parseInt(sumNumStr); + } + recordInfo.setSumNum(sumNum); + Element recordListElement = rootElement.element("RecordList"); + if (recordListElement == null || sumNum == 0) { + logger.info("无录像数据"); + recordInfo.setCount(sumNum); + eventPublisher.recordEndEventPush(recordInfo); + releaseRequest(device.getDeviceId(), sn,recordInfo); + } else { + Iterator recordListIterator = recordListElement.elementIterator(); + if (recordListIterator != null) { + List recordList = new ArrayList<>(); + // 遍历DeviceList + while (recordListIterator.hasNext()) { + Element itemRecord = recordListIterator.next(); + Element recordElement = itemRecord.element("DeviceID"); + if (recordElement == null) { + logger.info("记录为空,下一个..."); + continue; } + RecordItem record = new RecordItem(); + record.setDeviceId(getText(itemRecord, "DeviceID")); + record.setName(getText(itemRecord, "Name")); + record.setFilePath(getText(itemRecord, "FilePath")); + record.setFileSize(getText(itemRecord, "FileSize")); + record.setAddress(getText(itemRecord, "Address")); + + String startTimeStr = getText(itemRecord, "StartTime"); + record.setStartTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(startTimeStr)); + + String endTimeStr = getText(itemRecord, "EndTime"); + record.setEndTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(endTimeStr)); + + record.setSecrecy(itemRecord.element("Secrecy") == null ? 0 + : Integer.parseInt(getText(itemRecord, "Secrecy"))); + record.setType(getText(itemRecord, "Type")); + record.setRecorderId(getText(itemRecord, "RecorderID")); + recordList.add(record); } - } catch (DocumentException e) { - logger.error("xml解析异常: ", e); - } catch (Exception e) { - logger.warn("[国标录像] 发现未处理的异常, {}\r\n{}",e.getMessage(), evt.getRequest()); + Map map = recordList.stream() + .filter(record -> record.getDeviceId() != null) + .collect(Collectors.toMap(record -> record.getStartTime()+ record.getEndTime(), UJson::writeJson)); + // 获取任务结果数据 + String resKey = VideoManagerConstants.REDIS_RECORD_INFO_RES_PRE + channelId + sn; + RedisUtil.hmset(resKey, map, recordInfoTtl); + String resCountKey = VideoManagerConstants.REDIS_RECORD_INFO_RES_COUNT_PRE + channelId + sn; + long incr = RedisUtil.incr(resCountKey, map.size()); + RedisUtil.expire(resCountKey, recordInfoTtl); + recordInfo.setRecordList(recordList); + recordInfo.setCount(Math.toIntExact(incr)); + eventPublisher.recordEndEventPush(recordInfo); + if (incr < sumNum) { + return; + } + // 已接收完成 + List resList = RedisUtil.hmget(resKey).values().stream().map(e -> UJson.readJson(e.toString(), RecordItem.class)).collect(Collectors.toList()); + if (resList.size() < sumNum) { + return; + } + recordInfo.setRecordList(resList); + releaseRequest(device.getDeviceId(), sn,recordInfo); } } - }); - } + } catch (Exception e) { + logger.error("[国标录像] 发现未处理的异常, "+e.getMessage(), e); + } + }); } @Override @@ -162,15 +160,14 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent } - public void releaseRequest(String deviceId, String sn){ + public void releaseRequest(String deviceId, String sn,RecordInfo recordInfo){ String key = DeferredResultHolder.CALLBACK_CMD_RECORDINFO + deviceId + sn; // 对数据进行排序 - Collections.sort(recordDataCatch.getRecordInfo(deviceId, sn).getRecordList()); + Collections.sort(recordInfo.getRecordList()); RequestMessage msg = new RequestMessage(); msg.setKey(key); - msg.setData(recordDataCatch.getRecordInfo(deviceId, sn)); + msg.setData(recordInfo); deferredResultHolder.invokeAllResult(msg); - recordDataCatch.remove(deviceId, sn); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java index d0ba97eb..b5a9ee7b 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java @@ -80,8 +80,8 @@ public class InviteResponseProcessor extends SIPResponseProcessorAbstract { */ @Override public void process(ResponseEvent evt ){ + logger.debug("接收到消息:" + evt.getResponse()); try { - SIPResponse response = (SIPResponse)evt.getResponse(); int statusCode = response.getStatusCode(); // trying不会回复 diff --git a/src/main/java/com/genersoft/iot/vmp/utils/UJson.java b/src/main/java/com/genersoft/iot/vmp/utils/UJson.java new file mode 100644 index 00000000..a2d40339 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/utils/UJson.java @@ -0,0 +1,150 @@ +package com.genersoft.iot.vmp.utils; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; + +/** + * @author gaofuwang + * @version 1.0 + * @date 2022/3/11 10:17 + */ +public class UJson { + + private static Logger logger = LoggerFactory.getLogger(UJson.class); + public static final ObjectMapper JSON_MAPPER = new ObjectMapper(); + + static { + JSON_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,false); + } + + private ObjectNode node; + + public UJson(){ + this.node = JSON_MAPPER.createObjectNode(); + } + + public UJson(String json){ + if(StringUtils.isBlank(json)){ + this.node = JSON_MAPPER.createObjectNode(); + }else{ + try { + this.node = JSON_MAPPER.readValue(json, ObjectNode.class); + }catch (Exception e){ + logger.error(e.getMessage(), e); + this.node = JSON_MAPPER.createObjectNode(); + } + } + } + + public UJson(ObjectNode node){ + this.node = node; + } + + public String asText(String key){ + JsonNode jsonNode = node.get(key); + if(Objects.isNull(jsonNode)){ + return ""; + } + return jsonNode.asText(); + } + + public String asText(String key, String defaultVal){ + JsonNode jsonNode = node.get(key); + if(Objects.isNull(jsonNode)){ + return ""; + } + return jsonNode.asText(defaultVal); + } + + public UJson put(String key, String value){ + this.node.put(key, value); + return this; + } + + public UJson put(String key, Integer value){ + this.node.put(key, value); + return this; + } + + public static UJson json(){ + return new UJson(); + } + + public static UJson json(String json){ + return new UJson(json); + } + + public static T readJson(String json, Class clazz){ + if(StringUtils.isBlank(json)){ + return null; + } + try { + return JSON_MAPPER.readValue(json, clazz); + }catch (Exception e){ + logger.error(e.getMessage(), e); + return null; + } + } + + public static String writeJson(Object object) { + try{ + return JSON_MAPPER.writeValueAsString(object); + }catch (Exception e){ + logger.error(e.getMessage(), e); + return ""; + } + } + + @Override + public String toString() { + return node.toString(); + } + + public int asInt(String key, int defValue) { + JsonNode jsonNode = this.node.get(key); + if(Objects.isNull(jsonNode)){ + return defValue; + } + return jsonNode.asInt(defValue); + } + + public UJson getSon(String key) { + JsonNode sonNode = this.node.get(key); + if(Objects.isNull(sonNode)){ + return new UJson(); + } + return new UJson((ObjectNode) sonNode); + } + + public UJson set(String key, ObjectNode sonNode) { + this.node.set(key, sonNode); + return this; + } + + public UJson set(String key, UJson sonNode) { + this.node.set(key, sonNode.node); + return this; + } + + public Iterator> fields() { + return this.node.fields(); + } + + public ObjectNode getNode() { + return this.node; + } + + public UJson setAll(UJson json) { + this.node.setAll(json.node); + return this; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/utils/redis/RedisUtil.java b/src/main/java/com/genersoft/iot/vmp/utils/redis/RedisUtil.java index 50152cda..a50553d4 100644 --- a/src/main/java/com/genersoft/iot/vmp/utils/redis/RedisUtil.java +++ b/src/main/java/com/genersoft/iot/vmp/utils/redis/RedisUtil.java @@ -238,7 +238,7 @@ public class RedisUtil { * @param time 时间 * @return true / false */ - public static boolean hmset(String key, Map map, long time) { + public static boolean hmset(String key, Map map, long time) { if (redisTemplate == null) { redisTemplate = SpringBeanFactory.getBean("redisTemplate"); }