From 2113e8cf271e0d189d4ff9dd2d4d5dd7cba6e3ab Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Wed, 24 Apr 2024 20:33:52 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96notify=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E4=B8=AD=E7=9B=AE=E5=BD=95=E7=9A=84=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../NotifyRequestForCatalogProcessor.java | 30 +- ...tifyRequestForMobilePositionProcessor.java | 31 +- .../request/impl/NotifyRequestProcessor.java | 289 ++++++------------ .../impl/DeviceChannelServiceImpl.java | 18 +- 4 files changed, 165 insertions(+), 203 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java index a0ce17c0..6185cda4 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java @@ -5,6 +5,7 @@ import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; @@ -18,6 +19,7 @@ import org.dom4j.Element; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; @@ -28,6 +30,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; /** @@ -46,6 +49,8 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent private final Map addChannelMap = new ConcurrentHashMap<>(); private final List deleteChannelList = new CopyOnWriteArrayList<>(); + private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); + @Autowired private UserSetting userSetting; @@ -65,11 +70,24 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent private SipConfig sipConfig; @Transactional - public void process(List evtList) { - if (evtList.isEmpty()) { + public void process(RequestEvent evt) { + if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) { + logger.error("[notify-目录订阅] 待处理消息队列已满 {},返回486 BUSY_HERE,消息不做处理", userSetting.getMaxNotifyCountQueue()); return; } - for (RequestEvent evt : evtList) { + taskQueue.offer(new HandlerCatchData(evt, null, null)); + } + + @Scheduled(fixedRate = 400) //每400毫秒执行一次 + public void executeTaskQueue(){ + if (taskQueue.isEmpty()) { + return; + } + for (HandlerCatchData take : taskQueue) { + if (take == null) { + continue; + } + RequestEvent evt = take.getEvt(); try { long start = System.currentTimeMillis(); FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); @@ -221,6 +239,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent logger.error("未处理的异常 ", e); } } + taskQueue.clear(); if (!updateChannelMap.keySet().isEmpty() || !addChannelMap.keySet().isEmpty() || !updateChannelOnlineList.isEmpty() @@ -295,4 +314,9 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent updateChannelOfflineList.clear(); } } + + @Scheduled(fixedRate = 10000) //每1秒执行一次 + public void execute(){ + logger.info("[待处理Notify-目录订阅消息数量]: {}", taskQueue.size()); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java index 05a5336a..9f6acc96 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java @@ -4,6 +4,7 @@ import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData; import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; @@ -17,6 +18,7 @@ import org.dom4j.Element; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.ObjectUtils; @@ -27,6 +29,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; /** * SIP命令类型: NOTIFY请求中的移动位置请求处理 @@ -37,6 +40,7 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForMobilePositionProcessor.class); + private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); @Autowired private UserSetting userSetting; @@ -50,14 +54,28 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor @Autowired private IDeviceChannelService deviceChannelService; + public void process(RequestEvent evt) { + + if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) { + logger.error("[notify-移动位置] 待处理消息队列已满 {},返回486 BUSY_HERE,消息不做处理", userSetting.getMaxNotifyCountQueue()); + return; + } + taskQueue.offer(new HandlerCatchData(evt, null, null)); + } + + @Scheduled(fixedRate = 200) //每200毫秒执行一次 @Transactional - public void process(List eventList) { - if (eventList.isEmpty()) { + public void executeTaskQueue() { + if (taskQueue.isEmpty()) { return; } Map updateChannelMap = new ConcurrentHashMap<>(); List addMobilePositionList = new ArrayList<>(); - for (RequestEvent evt : eventList) { + for (HandlerCatchData take : taskQueue) { + if (take == null) { + continue; + } + RequestEvent evt = take.getEvt(); try { FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); @@ -180,10 +198,11 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor logger.error("未处理的异常 ", e); } } + taskQueue.clear(); if(!updateChannelMap.isEmpty()) { List channels = new ArrayList<>(updateChannelMap.values()); logger.info("[移动位置订阅]更新通道位置: {}", channels.size()); - deviceChannelService.batchUpdateChannelGPS(channels); + deviceChannelService.batchUpdateChannel(channels); updateChannelMap.clear(); } if (userSetting.isSavePositionHistory() && !addMobilePositionList.isEmpty()) { @@ -196,4 +215,8 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor addMobilePositionList.clear(); } } + @Scheduled(fixedRate = 10000) + public void execute(){ + logger.info("[待处理Notify-移动位置订阅消息数量]: {}", taskQueue.size()); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index dcf823bf..8da07a15 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -1,12 +1,9 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; import com.genersoft.iot.vmp.conf.SipConfig; -import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; -import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; @@ -14,9 +11,7 @@ import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; import com.genersoft.iot.vmp.service.IDeviceChannelService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.DateUtil; -import com.genersoft.iot.vmp.utils.redis.RedisUtil; import gov.nist.javax.sip.message.SIPRequest; import org.dom4j.DocumentException; import org.dom4j.Element; @@ -24,9 +19,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import javax.sip.InvalidArgumentException; @@ -35,9 +27,6 @@ import javax.sip.SipException; import javax.sip.header.FromHeader; import javax.sip.message.Response; import java.text.ParseException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; /** * SIP命令类型: NOTIFY请求,这是作为上级发送订阅请求后,设备才会响应的 @@ -48,15 +37,6 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements private final static Logger logger = LoggerFactory.getLogger(NotifyRequestProcessor.class); - @Autowired - private UserSetting userSetting; - - @Autowired - private IVideoManagerStorage storager; - - @Autowired - private EventPublisher eventPublisher; - @Autowired private SipConfig sipConfig; @@ -80,14 +60,6 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements @Autowired private NotifyRequestForMobilePositionProcessor notifyRequestForMobilePositionProcessor; - private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); - - @Qualifier("taskExecutor") - @Autowired - private ThreadPoolTaskExecutor taskExecutor; - - private int maxQueueCount = 30000; - @Override public void afterPropertiesSet() throws Exception { // 添加消息处理的订阅 @@ -97,187 +69,122 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements @Override public void process(RequestEvent evt) { try { - if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) { - responseAck((SIPRequest) evt.getRequest(), Response.BUSY_HERE, null, null); - logger.error("[notify] 待处理消息队列已满 {},返回486 BUSY_HERE,消息不做处理", userSetting.getMaxNotifyCountQueue()); - return; - } else { + responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null); + Element rootElement = getRootElement(evt); + if (rootElement == null) { + logger.error("处理NOTIFY消息时未获取到消息体,{}", evt.getRequest()); responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null); + return; } + String cmd = XmlUtil.getText(rootElement, "CmdType"); + if (CmdType.CATALOG.equals(cmd)) { + notifyRequestForCatalogProcessor.process(evt); + } else if (CmdType.ALARM.equals(cmd)) { + processNotifyAlarm(evt); + } else if (CmdType.MOBILE_POSITION.equals(cmd)) { + notifyRequestForMobilePositionProcessor.process(evt); + } else { + logger.info("接收到消息:" + cmd); + } } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("未处理的异常 ", e); - } - taskQueue.offer(new HandlerCatchData(evt, null, null)); - } - @Scheduled(fixedRate = 200) //每200毫秒执行一次 - public void executeTaskQueue(){ - if (taskQueue.isEmpty()) { - return; - } - try { - List catalogEventList = new ArrayList<>(); - List alarmEventList = new ArrayList<>(); - List mobilePositionEventList = new ArrayList<>(); - for (HandlerCatchData take : taskQueue) { - if (take == null) { - continue; - } - Element rootElement = getRootElement(take.getEvt()); - if (rootElement == null) { - logger.error("处理NOTIFY消息时未获取到消息体,{}", take.getEvt().getRequest()); - continue; - } - String cmd = XmlUtil.getText(rootElement, "CmdType"); - - if (CmdType.CATALOG.equals(cmd)) { - catalogEventList.add(take.getEvt()); - } else if (CmdType.ALARM.equals(cmd)) { - alarmEventList.add(take.getEvt()); - } else if (CmdType.MOBILE_POSITION.equals(cmd)) { - mobilePositionEventList.add(take.getEvt()); - } else { - logger.info("接收到消息:" + cmd); - } - } - taskQueue.clear(); - if (!alarmEventList.isEmpty()) { - processNotifyAlarm(alarmEventList); - } - if (!catalogEventList.isEmpty()) { - notifyRequestForCatalogProcessor.process(catalogEventList); - } - if (!mobilePositionEventList.isEmpty()) { - notifyRequestForMobilePositionProcessor.process(mobilePositionEventList); - } } catch (DocumentException e) { - logger.error("处理NOTIFY消息时错误", e); + throw new RuntimeException(e); } - } + } /*** * 处理alarm设备报警Notify */ - private void processNotifyAlarm(List evtList) { + private void processNotifyAlarm(RequestEvent evt) { if (!sipConfig.isAlarm()) { return; } - if (!evtList.isEmpty()) { - for (RequestEvent evt : evtList) { - try { - FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); - String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); + try { + FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); + String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); - Element rootElement = getRootElement(evt); - if (rootElement == null) { - logger.error("处理alarm设备报警Notify时未获取到消息体{}", evt.getRequest()); - return; - } - Element deviceIdElement = rootElement.element("DeviceID"); - String channelId = deviceIdElement.getText().toString(); - - Device device = redisCatchStorage.getDevice(deviceId); - if (device == null) { - logger.warn("[ NotifyAlarm ] 未找到设备:{}", deviceId); - return; - } - rootElement = getRootElement(evt, device.getCharset()); - if (rootElement == null) { - logger.warn("[ NotifyAlarm ] content cannot be null, {}", evt.getRequest()); - return; - } - DeviceAlarm deviceAlarm = new DeviceAlarm(); - deviceAlarm.setDeviceId(deviceId); - deviceAlarm.setAlarmPriority(XmlUtil.getText(rootElement, "AlarmPriority")); - deviceAlarm.setAlarmMethod(XmlUtil.getText(rootElement, "AlarmMethod")); - String alarmTime = XmlUtil.getText(rootElement, "AlarmTime"); - if (alarmTime == null) { - logger.warn("[ NotifyAlarm ] AlarmTime cannot be null"); - return; - } - deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime)); - if (XmlUtil.getText(rootElement, "AlarmDescription") == null) { - deviceAlarm.setAlarmDescription(""); - } else { - deviceAlarm.setAlarmDescription(XmlUtil.getText(rootElement, "AlarmDescription")); - } - if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Longitude"))) { - deviceAlarm.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude"))); - } else { - deviceAlarm.setLongitude(0.00); - } - if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Latitude"))) { - deviceAlarm.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude"))); - } else { - deviceAlarm.setLatitude(0.00); - } - logger.info("[收到Notify-Alarm]:{}/{}", device.getDeviceId(), deviceAlarm.getChannelId()); - if ("4".equals(deviceAlarm.getAlarmMethod())) { - MobilePosition mobilePosition = new MobilePosition(); - mobilePosition.setChannelId(channelId); - mobilePosition.setCreateTime(DateUtil.getNow()); - mobilePosition.setDeviceId(deviceAlarm.getDeviceId()); - mobilePosition.setTime(deviceAlarm.getAlarmTime()); - mobilePosition.setLongitude(deviceAlarm.getLongitude()); - mobilePosition.setLatitude(deviceAlarm.getLatitude()); - mobilePosition.setReportSource("GPS Alarm"); - - // 更新device channel 的经纬度 - DeviceChannel deviceChannel = new DeviceChannel(); - deviceChannel.setDeviceId(device.getDeviceId()); - deviceChannel.setChannelId(channelId); - deviceChannel.setLongitude(mobilePosition.getLongitude()); - deviceChannel.setLatitude(mobilePosition.getLatitude()); - deviceChannel.setGpsTime(mobilePosition.getTime()); - - deviceChannel = deviceChannelService.updateGps(deviceChannel, device); - - mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84()); - mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84()); - mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); - mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); - - deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); - } - - // 回复200 OK - if (redisCatchStorage.deviceIsOnline(deviceId)) { - publisher.deviceAlarmEventPublish(deviceAlarm); - } - } catch (DocumentException e) { - logger.error("未处理的异常 ", e); - } + Element rootElement = getRootElement(evt); + if (rootElement == null) { + logger.error("处理alarm设备报警Notify时未获取到消息体{}", evt.getRequest()); + return; } + Element deviceIdElement = rootElement.element("DeviceID"); + String channelId = deviceIdElement.getText().toString(); + + Device device = redisCatchStorage.getDevice(deviceId); + if (device == null) { + logger.warn("[ NotifyAlarm ] 未找到设备:{}", deviceId); + return; + } + rootElement = getRootElement(evt, device.getCharset()); + if (rootElement == null) { + logger.warn("[ NotifyAlarm ] content cannot be null, {}", evt.getRequest()); + return; + } + DeviceAlarm deviceAlarm = new DeviceAlarm(); + deviceAlarm.setDeviceId(deviceId); + deviceAlarm.setAlarmPriority(XmlUtil.getText(rootElement, "AlarmPriority")); + deviceAlarm.setAlarmMethod(XmlUtil.getText(rootElement, "AlarmMethod")); + String alarmTime = XmlUtil.getText(rootElement, "AlarmTime"); + if (alarmTime == null) { + logger.warn("[ NotifyAlarm ] AlarmTime cannot be null"); + return; + } + deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime)); + if (XmlUtil.getText(rootElement, "AlarmDescription") == null) { + deviceAlarm.setAlarmDescription(""); + } else { + deviceAlarm.setAlarmDescription(XmlUtil.getText(rootElement, "AlarmDescription")); + } + if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Longitude"))) { + deviceAlarm.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude"))); + } else { + deviceAlarm.setLongitude(0.00); + } + if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Latitude"))) { + deviceAlarm.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude"))); + } else { + deviceAlarm.setLatitude(0.00); + } + logger.info("[收到Notify-Alarm]:{}/{}", device.getDeviceId(), deviceAlarm.getChannelId()); + if ("4".equals(deviceAlarm.getAlarmMethod())) { + MobilePosition mobilePosition = new MobilePosition(); + mobilePosition.setChannelId(channelId); + mobilePosition.setCreateTime(DateUtil.getNow()); + mobilePosition.setDeviceId(deviceAlarm.getDeviceId()); + mobilePosition.setTime(deviceAlarm.getAlarmTime()); + mobilePosition.setLongitude(deviceAlarm.getLongitude()); + mobilePosition.setLatitude(deviceAlarm.getLatitude()); + mobilePosition.setReportSource("GPS Alarm"); + + // 更新device channel 的经纬度 + DeviceChannel deviceChannel = new DeviceChannel(); + deviceChannel.setDeviceId(device.getDeviceId()); + deviceChannel.setChannelId(channelId); + deviceChannel.setLongitude(mobilePosition.getLongitude()); + deviceChannel.setLatitude(mobilePosition.getLatitude()); + deviceChannel.setGpsTime(mobilePosition.getTime()); + + deviceChannel = deviceChannelService.updateGps(deviceChannel, device); + + mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84()); + mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84()); + mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); + mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); + + deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); + } + + // 回复200 OK + if (redisCatchStorage.deviceIsOnline(deviceId)) { + publisher.deviceAlarmEventPublish(deviceAlarm); + } + } catch (DocumentException e) { + logger.error("未处理的异常 ", e); } } - public void setCmder(SIPCommander cmder) { - } - public void setStorager(IVideoManagerStorage storager) { - this.storager = storager; - } - - public void setPublisher(EventPublisher publisher) { - this.publisher = publisher; - } - - public void setRedis(RedisUtil redis) { - } - - public void setDeferredResultHolder(DeferredResultHolder deferredResultHolder) { - } - - public IRedisCatchStorage getRedisCatchStorage() { - return redisCatchStorage; - } - - public void setRedisCatchStorage(IRedisCatchStorage redisCatchStorage) { - this.redisCatchStorage = redisCatchStorage; - } - - @Scheduled(fixedRate = 10000) //每1秒执行一次 - public void execute(){ - logger.info("[待处理Notify消息数量]: {}", taskQueue.size()); - } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java index 67e34a02..58e7b02a 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java @@ -275,15 +275,23 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { } @Override - public void batchUpdateChannel(List channels) { + public synchronized void batchUpdateChannel(List channels) { String now = DateUtil.getNow(); for (DeviceChannel channel : channels) { channel.setUpdateTime(now); } - channelMapper.batchUpdate(channels); - for (DeviceChannel channel : channels) { - if (channel.getParentId() != null) { - channelMapper.updateChannelSubCount(channel.getDeviceId(), channel.getParentId()); + int limitCount = 1000; + if (!channels.isEmpty()) { + if (channels.size() > limitCount) { + for (int i = 0; i < channels.size(); i += limitCount) { + int toIndex = i + limitCount; + if (i + limitCount > channels.size()) { + toIndex = channels.size(); + } + channelMapper.batchUpdate(channels.subList(i, toIndex)); + } + }else { + channelMapper.batchUpdate(channels); } } }