diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java index c80cc88f..de93804c 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java @@ -1,7 +1,5 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; -import com.genersoft.iot.vmp.conf.CivilCodeFileConf; -import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.Device; @@ -20,10 +18,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; import javax.sip.RequestEvent; import javax.sip.header.FromHeader; -import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -39,8 +37,6 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForCatalogProcessor.class); - private final List updateChannelOnlineList = new CopyOnWriteArrayList<>(); - private final List updateChannelOfflineList = new CopyOnWriteArrayList<>(); private final Map updateChannelMap = new ConcurrentHashMap<>(); private final Map addChannelMap = new ConcurrentHashMap<>(); @@ -59,275 +55,111 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent @Autowired private IDeviceChannelService deviceChannelService; - @Autowired - private DynamicTask dynamicTask; - - @Autowired - private CivilCodeFileConf civilCodeFileConf; - @Autowired private SipConfig sipConfig; - private final static String talkKey = "notify-request-for-catalog-task"; + @Transactional + public void process(List evtList) { + if (evtList.isEmpty()) { + return; + } + for (RequestEvent evt : evtList) { + try { + long start = System.currentTimeMillis(); + FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); + String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); - public void process(RequestEvent evt) { - try { - long start = System.currentTimeMillis(); - FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); - String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); + Device device = redisCatchStorage.getDevice(deviceId); + if (device == null || !device.isOnLine()) { + logger.warn("[收到目录订阅]:{}, 但是设备已经离线", (device != null ? device.getDeviceId():"" )); + return; + } + Element rootElement = getRootElement(evt, device.getCharset()); + if (rootElement == null) { + logger.warn("[ 收到目录订阅 ] content cannot be null, {}", evt.getRequest()); + return; + } + Element deviceListElement = rootElement.element("DeviceList"); + if (deviceListElement == null) { + return; + } + Iterator deviceListIterator = deviceListElement.elementIterator(); + if (deviceListIterator != null) { - Device device = redisCatchStorage.getDevice(deviceId); - if (device == null || !device.isOnLine()) { - logger.warn("[收到目录订阅]:{}, 但是设备已经离线", (device != null ? device.getDeviceId():"" )); - return; - } - Element rootElement = getRootElement(evt, device.getCharset()); - if (rootElement == null) { - logger.warn("[ 收到目录订阅 ] content cannot be null, {}", evt.getRequest()); - return; - } - Element deviceListElement = rootElement.element("DeviceList"); - if (deviceListElement == null) { - return; - } - Iterator deviceListIterator = deviceListElement.elementIterator(); - if (deviceListIterator != null) { - - // 遍历DeviceList - while (deviceListIterator.hasNext()) { - Element itemDevice = deviceListIterator.next(); - Element eventElement = itemDevice.element("Event"); - String event; - if (eventElement == null) { - logger.warn("[收到目录订阅]:{}, 但是Event为空, 设为默认值 ADD", (device != null ? device.getDeviceId():"" )); - event = CatalogEvent.ADD; - }else { - event = eventElement.getText().toUpperCase(); - } - DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event); - if (channel == null) { - logger.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent())); - continue; - } - if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) { - channel.setParentId(null); - } - channel.setDeviceId(device.getDeviceId()); - logger.info("[收到目录订阅]:{}/{}", device.getDeviceId(), channel.getChannelId()); - switch (event) { - case CatalogEvent.ON: - // 上线 - logger.info("[收到通道上线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); - updateChannelOnlineList.add(channel); - if (updateChannelOnlineList.size() > 300) { - executeSaveForOnline(); - } - if (userSetting.getDeviceStatusNotify()) { - // 发送redis消息 - redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), true); - } - - break; - case CatalogEvent.OFF : - // 离线 - logger.info("[收到通道离线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); - if (userSetting.getRefuseChannelStatusChannelFormNotify()) { - logger.info("[收到通道离线通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); - }else { - updateChannelOfflineList.add(channel); - if (updateChannelOfflineList.size() > 300) { - executeSaveForOffline(); - } - if (userSetting.getDeviceStatusNotify()) { - // 发送redis消息 - redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); - } - } - break; - case CatalogEvent.VLOST: - // 视频丢失 - logger.info("[收到通道视频丢失通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); - if (userSetting.getRefuseChannelStatusChannelFormNotify()) { - logger.info("[收到通道视频丢失通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); - }else { - updateChannelOfflineList.add(channel); - if (updateChannelOfflineList.size() > 300) { - executeSaveForOffline(); - } - if (userSetting.getDeviceStatusNotify()) { - // 发送redis消息 - redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); - } - } - break; - case CatalogEvent.DEFECT: - // 故障 - logger.info("[收到通道视频故障通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); - if (userSetting.getRefuseChannelStatusChannelFormNotify()) { - logger.info("[收到通道视频故障通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); - }else { - updateChannelOfflineList.add(channel); - if (updateChannelOfflineList.size() > 300) { - executeSaveForOffline(); - } - if (userSetting.getDeviceStatusNotify()) { - // 发送redis消息 - redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); - } - } - break; - case CatalogEvent.ADD: - // 增加 - logger.info("[收到增加通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); - // 判断此通道是否存在 - DeviceChannel deviceChannel = deviceChannelService.getOne(deviceId, channel.getChannelId()); - if (deviceChannel != null) { - logger.info("[增加通道] 已存在,不发送通知只更新,设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); - channel.setId(deviceChannel.getId()); - updateChannelMap.put(channel.getChannelId(), channel); - if (updateChannelMap.keySet().size() > 300) { - executeSaveForUpdate(); - } - }else { - addChannelMap.put(channel.getChannelId(), channel); - if (userSetting.getDeviceStatusNotify()) { - // 发送redis消息 - redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true); - } - - if (addChannelMap.keySet().size() > 300) { - executeSaveForAdd(); - } - } - - break; - case CatalogEvent.DEL: - // 删除 - logger.info("[收到删除通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); - deleteChannelList.add(channel); - if (userSetting.getDeviceStatusNotify()) { - // 发送redis消息 - redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), false); - } - if (deleteChannelList.size() > 300) { - executeSaveForDelete(); - } - break; - case CatalogEvent.UPDATE: - // 更新 - logger.info("[收到更新通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); - // 判断此通道是否存在 - DeviceChannel deviceChannelForUpdate = deviceChannelService.getOne(deviceId, channel.getChannelId()); - if (deviceChannelForUpdate != null) { - channel.setId(deviceChannelForUpdate.getId()); - channel.setUpdateTime(DateUtil.getNow()); - updateChannelMap.put(channel.getChannelId(), channel); - if (updateChannelMap.keySet().size() > 300) { - executeSaveForUpdate(); - } - }else { - addChannelMap.put(channel.getChannelId(), channel); - if (addChannelMap.keySet().size() > 300) { - executeSaveForAdd(); - } - if (userSetting.getDeviceStatusNotify()) { - // 发送redis消息 - redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true); - } - } - break; - default: - logger.warn("[ NotifyCatalog ] event not found : {}", event ); - - } - // 转发变化信息 - eventPublisher.catalogEventPublish(null, channel, event); - - if (!updateChannelMap.keySet().isEmpty() - || !addChannelMap.keySet().isEmpty() - || !updateChannelOnlineList.isEmpty() - || !updateChannelOfflineList.isEmpty() - || !deleteChannelList.isEmpty()) { - - if (!dynamicTask.contains(talkKey)) { - dynamicTask.startDelay(talkKey, this::executeSave, 1000); + // 遍历DeviceList + while (deviceListIterator.hasNext()) { + Element itemDevice = deviceListIterator.next(); + Element eventElement = itemDevice.element("Event"); + String event; + if (eventElement == null) { + logger.warn("[收到目录订阅]:{}, 但是Event为空, 设为默认值 ADD", (device != null ? device.getDeviceId():"" )); + event = CatalogEvent.ADD; + }else { + event = eventElement.getText().toUpperCase(); } + DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event); + if (channel == null) { + logger.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent())); + continue; + } + if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) { + channel.setParentId(null); + } + channel.setDeviceId(device.getDeviceId()); + logger.info("[收到目录订阅]:{}, {}/{}",event, device.getDeviceId(), channel.getChannelId()); + switch (event) { + case CatalogEvent.ON: + // 上线 + deviceChannelService.online(channel); + if (userSetting.getDeviceStatusNotify()) { + // 发送redis消息 + redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), true); + } + + break; + case CatalogEvent.OFF : + case CatalogEvent.VLOST: + case CatalogEvent.DEFECT: + // 离线 + if (userSetting.getRefuseChannelStatusChannelFormNotify()) { + logger.info("[目录订阅] 离线 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + }else { + deviceChannelService.offline(channel); + if (userSetting.getDeviceStatusNotify()) { + // 发送redis消息 + redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); + } + } + break; + case CatalogEvent.DEL: + // 删除 + deviceChannelService.delete(channel); + if (userSetting.getDeviceStatusNotify()) { + // 发送redis消息 + redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), false); + } + break; + case CatalogEvent.ADD: + case CatalogEvent.UPDATE: + // 更新 + channel.setUpdateTime(DateUtil.getNow()); + deviceChannelService.updateChannel(deviceId,channel); + if (userSetting.getDeviceStatusNotify()) { + // 发送redis消息 + redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true); + } + break; + default: + logger.warn("[ NotifyCatalog ] event not found : {}", event ); + + } + // 转发变化信息 + eventPublisher.catalogEventPublish(null, channel, event); } } + } catch (DocumentException e) { + logger.error("未处理的异常 ", e); } - } catch (DocumentException e) { - logger.error("未处理的异常 ", e); } } - - // TODO 同一个通道如果先发送更新再发送离线可能无法正常离线 - private void executeSave(){ - try { - executeSaveForAdd(); - } catch (Exception e) { - logger.error("[存储收到的增加通道] 异常: ", e ); - } - try { - executeSaveForOnline(); - } catch (Exception e) { - logger.error("[存储收到的通道上线] 异常: ", e ); - } - try { - executeSaveForOffline(); - } catch (Exception e) { - logger.error("[存储收到的通道离线] 异常: ", e ); - } - try { - executeSaveForUpdate(); - } catch (Exception e) { - logger.error("[存储收到的更新通道] 异常: ", e ); - } - try { - executeSaveForDelete(); - } catch (Exception e) { - logger.error("[存储收到的删除通道] 异常: ", e ); - } - - dynamicTask.stop(talkKey); - } - - private void executeSaveForUpdate(){ - if (!updateChannelMap.values().isEmpty()) { - logger.info("[存储收到的更新通道], 数量: {}", updateChannelMap.size()); - ArrayList deviceChannels = new ArrayList<>(updateChannelMap.values()); - deviceChannelService.batchUpdateChannel(deviceChannels); - updateChannelMap.clear(); - } - - } - - private void executeSaveForAdd(){ - if (!addChannelMap.values().isEmpty()) { - ArrayList deviceChannels = new ArrayList<>(addChannelMap.values()); - addChannelMap.clear(); - deviceChannelService.batchAddChannel(deviceChannels); - } - } - - private void executeSaveForDelete(){ - if (!deleteChannelList.isEmpty()) { - deviceChannelService.deleteChannels(deleteChannelList); - deleteChannelList.clear(); - } - } - - private void executeSaveForOnline(){ - if (!updateChannelOnlineList.isEmpty()) { - deviceChannelService.channelsOnline(updateChannelOnlineList); - updateChannelOnlineList.clear(); - } - } - - private void executeSaveForOffline(){ - if (!updateChannelOfflineList.isEmpty()) { - deviceChannelService.channelsOffline(updateChannelOfflineList); - updateChannelOfflineList.clear(); - } - } - } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java index 1b7daf02..05a5336a 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java @@ -1,8 +1,6 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; -import com.genersoft.iot.vmp.conf.CivilCodeFileConf; -import com.genersoft.iot.vmp.conf.DynamicTask; -import com.genersoft.iot.vmp.conf.SipConfig; +import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; @@ -19,8 +17,8 @@ import org.dom4j.Element; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; import org.springframework.util.ObjectUtils; import javax.sip.RequestEvent; @@ -29,7 +27,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; /** * SIP命令类型: NOTIFY请求中的移动位置请求处理 @@ -40,10 +37,6 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForMobilePositionProcessor.class); - private final Map updateChannelMap = new ConcurrentHashMap<>(); - - private final List addMobilePositionList = new CopyOnWriteArrayList(); - @Autowired private UserSetting userSetting; @@ -57,180 +50,150 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor @Autowired private IDeviceChannelService deviceChannelService; - @Autowired - private DynamicTask dynamicTask; - - @Autowired - private CivilCodeFileConf civilCodeFileConf; - - @Autowired - private SipConfig sipConfig; - - private final static String talkKey = "notify-request-for-mobile-position-task"; - - @Async("taskExecutor") - public void process(RequestEvent evt) { - try { - FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); - String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); - long startTime = System.currentTimeMillis(); - // 回复 200 OK - Element rootElement = getRootElement(evt); - if (rootElement == null) { - logger.error("处理MobilePosition移动位置Notify时未获取到消息体,{}", evt.getRequest()); - return; - } - Device device = redisCatchStorage.getDevice(deviceId); - MobilePosition mobilePosition = new MobilePosition(); - mobilePosition.setCreateTime(DateUtil.getNow()); - List elements = rootElement.elements(); - String channelId = null; - for (Element element : elements) { - switch (element.getName()){ - case "DeviceID": - channelId = element.getStringValue(); - if (device == null) { - device = redisCatchStorage.getDevice(channelId); - if (device == null) { - // 根据通道id查询设备Id - List deviceList = deviceChannelService.getDeviceByChannelId(channelId); - if (!deviceList.isEmpty()) { - device = deviceList.get(0); - } - } - } - if (device == null) { - logger.warn("[mobilePosition移动位置Notify] 未找到通道{}所属的设备", channelId); - return; - } - mobilePosition.setDeviceId(device.getDeviceId()); - mobilePosition.setChannelId(channelId); - // 兼容设备部分设备上报是通道编号与设备编号一致的情况 - if (deviceId.equals(channelId)) { - List deviceChannels = deviceChannelService.queryChaneListByDeviceId(deviceId); - if (deviceChannels.size() == 1) { - channelId = deviceChannels.get(0).getChannelId(); - } - } - if (!ObjectUtils.isEmpty(device.getName())) { - mobilePosition.setDeviceName(device.getName()); - } - mobilePosition.setDeviceId(device.getDeviceId()); - mobilePosition.setChannelId(channelId); - continue; - case "Time": - String timeVal = element.getStringValue(); - if (ObjectUtils.isEmpty(timeVal)) { - mobilePosition.setTime(DateUtil.getNow()); - } else { - mobilePosition.setTime(SipUtils.parseTime(timeVal)); - } - continue; - case "Longitude": - mobilePosition.setLongitude(Double.parseDouble(element.getStringValue())); - continue; - case "Latitude": - mobilePosition.setLatitude(Double.parseDouble(element.getStringValue())); - continue; - case "Speed": - String speedVal = element.getStringValue(); - if (NumericUtil.isDouble(speedVal)) { - mobilePosition.setSpeed(Double.parseDouble(speedVal)); - } else { - mobilePosition.setSpeed(0.0); - } - continue; - case "Direction": - String directionVal = element.getStringValue(); - if (NumericUtil.isDouble(directionVal)) { - mobilePosition.setDirection(Double.parseDouble(directionVal)); - } else { - mobilePosition.setDirection(0.0); - } - continue; - case "Altitude": - String altitudeVal = element.getStringValue(); - if (NumericUtil.isDouble(altitudeVal)) { - mobilePosition.setAltitude(Double.parseDouble(altitudeVal)); - } else { - mobilePosition.setAltitude(0.0); - } - continue; - + @Transactional + public void process(List eventList) { + if (eventList.isEmpty()) { + return; + } + Map updateChannelMap = new ConcurrentHashMap<>(); + List addMobilePositionList = new ArrayList<>(); + for (RequestEvent evt : eventList) { + try { + FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); + String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); + long startTime = System.currentTimeMillis(); + // 回复 200 OK + Element rootElement = getRootElement(evt); + if (rootElement == null) { + logger.error("处理MobilePosition移动位置Notify时未获取到消息体,{}", evt.getRequest()); + return; + } + Device device = redisCatchStorage.getDevice(deviceId); + if (device == null) { + logger.error("处理MobilePosition移动位置Notify时未获取到device,{}", deviceId); + return; + } + MobilePosition mobilePosition = new MobilePosition(); + mobilePosition.setDeviceId(device.getDeviceId()); + mobilePosition.setDeviceName(device.getName()); + mobilePosition.setCreateTime(DateUtil.getNow()); + List elements = rootElement.elements(); + for (Element element : elements) { + switch (element.getName()){ + case "DeviceID": + String channelId = element.getStringValue(); + if (!deviceId.equals(channelId)) { + mobilePosition.setChannelId(channelId); + } + continue; + case "Time": + String timeVal = element.getStringValue(); + if (ObjectUtils.isEmpty(timeVal)) { + mobilePosition.setTime(DateUtil.getNow()); + } else { + mobilePosition.setTime(SipUtils.parseTime(timeVal)); + } + continue; + case "Longitude": + mobilePosition.setLongitude(Double.parseDouble(element.getStringValue())); + continue; + case "Latitude": + mobilePosition.setLatitude(Double.parseDouble(element.getStringValue())); + continue; + case "Speed": + String speedVal = element.getStringValue(); + if (NumericUtil.isDouble(speedVal)) { + mobilePosition.setSpeed(Double.parseDouble(speedVal)); + } else { + mobilePosition.setSpeed(0.0); + } + continue; + case "Direction": + String directionVal = element.getStringValue(); + if (NumericUtil.isDouble(directionVal)) { + mobilePosition.setDirection(Double.parseDouble(directionVal)); + } else { + mobilePosition.setDirection(0.0); + } + continue; + case "Altitude": + String altitudeVal = element.getStringValue(); + if (NumericUtil.isDouble(altitudeVal)) { + mobilePosition.setAltitude(Double.parseDouble(altitudeVal)); + } else { + mobilePosition.setAltitude(0.0); + } + continue; + + } } - } // logger.info("[收到移动位置订阅通知]:{}/{}->{}.{}, 时间: {}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(), // mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime); - mobilePosition.setReportSource("Mobile Position"); + mobilePosition.setReportSource("Mobile Position"); - // 更新device channel 的经纬度 - DeviceChannel deviceChannel = new DeviceChannel(); - deviceChannel.setDeviceId(device.getDeviceId()); - deviceChannel.setChannelId(channelId); - deviceChannel.setLongitude(mobilePosition.getLongitude()); - deviceChannel.setLatitude(mobilePosition.getLatitude()); - deviceChannel.setGpsTime(mobilePosition.getTime()); - updateChannelMap.put(deviceId + channelId, deviceChannel); - addMobilePositionList.add(mobilePosition); - if(updateChannelMap.size() > 2000) { - executeSaveChannel(); - } - if (userSetting.isSavePositionHistory()) { - if(addMobilePositionList.size() > 2000) { - executeSaveMobilePosition(); + // 更新device channel 的经纬度 + DeviceChannel deviceChannel = new DeviceChannel(); + deviceChannel.setDeviceId(device.getDeviceId()); + deviceChannel.setLongitude(mobilePosition.getLongitude()); + deviceChannel.setLatitude(mobilePosition.getLatitude()); + deviceChannel.setGpsTime(mobilePosition.getTime()); + updateChannelMap.put(deviceId + mobilePosition.getChannelId(), deviceChannel); + addMobilePositionList.add(mobilePosition); + + + // 向关联了该通道并且开启移动位置订阅的上级平台发送移动位置订阅消息 + try { + eventPublisher.mobilePositionEventPublish(mobilePosition); + }catch (Exception e) { + logger.error("[向上级转发移动位置失败] ", e); } + if (mobilePosition.getChannelId().equals(mobilePosition.getDeviceId()) || mobilePosition.getChannelId() == null) { + List channels = deviceChannelService.queryChaneListByDeviceId(mobilePosition.getDeviceId()); + channels.forEach(channel -> { + // 发送redis消息。 通知位置信息的变化 + JSONObject jsonObject = new JSONObject(); + jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime())); + jsonObject.put("serial", channel.getDeviceId()); + jsonObject.put("code", channel.getChannelId()); + jsonObject.put("longitude", mobilePosition.getLongitude()); + jsonObject.put("latitude", mobilePosition.getLatitude()); + jsonObject.put("altitude", mobilePosition.getAltitude()); + jsonObject.put("direction", mobilePosition.getDirection()); + jsonObject.put("speed", mobilePosition.getSpeed()); + redisCatchStorage.sendMobilePositionMsg(jsonObject); + }); + }else { + // 发送redis消息。 通知位置信息的变化 + JSONObject jsonObject = new JSONObject(); + jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime())); + jsonObject.put("serial", mobilePosition.getDeviceId()); + jsonObject.put("code", mobilePosition.getChannelId()); + jsonObject.put("longitude", mobilePosition.getLongitude()); + jsonObject.put("latitude", mobilePosition.getLatitude()); + jsonObject.put("altitude", mobilePosition.getAltitude()); + jsonObject.put("direction", mobilePosition.getDirection()); + jsonObject.put("speed", mobilePosition.getSpeed()); + redisCatchStorage.sendMobilePositionMsg(jsonObject); + } + } catch (DocumentException e) { + logger.error("未处理的异常 ", e); } - -// deviceChannel = deviceChannelService.updateGps(deviceChannel, device); -// -// mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84()); -// mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84()); -// mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); -// mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); - -// deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); - - if (!dynamicTask.contains(talkKey)) { - dynamicTask.startDelay(talkKey, this::executeSave, 3000); - } - - } catch (DocumentException e) { - logger.error("未处理的异常 ", e); } - - - } - - private void executeSave(){ - executeSaveChannel(); - executeSaveMobilePosition(); - dynamicTask.stop(talkKey); - } - - @Async("taskExecutor") - public void executeSaveChannel(){ - dynamicTask.execute(); - try { - logger.info("[移动位置订阅]更新通道位置: {}", updateChannelMap.size()); - ArrayList deviceChannels = new ArrayList<>(updateChannelMap.values()); - deviceChannelService.batchUpdateChannelGPS(deviceChannels); + if(!updateChannelMap.isEmpty()) { + List channels = new ArrayList<>(updateChannelMap.values()); + logger.info("[移动位置订阅]更新通道位置: {}", channels.size()); + deviceChannelService.batchUpdateChannelGPS(channels); updateChannelMap.clear(); - }catch (Exception e) { - } - } - @Async("taskExecutor") - public void executeSaveMobilePosition(){ - if (userSetting.isSavePositionHistory()) { + if (userSetting.isSavePositionHistory() && !addMobilePositionList.isEmpty()) { try { logger.info("[移动位置订阅] 添加通道轨迹点位: {}", addMobilePositionList.size()); deviceChannelService.batchAddMobilePosition(addMobilePositionList); - addMobilePositionList.clear(); }catch (Exception e) { logger.info("[移动位置订阅] b添加通道轨迹点位保存失败: {}", addMobilePositionList.size()); } + addMobilePositionList.clear(); } } - } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index da348b78..8a426242 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -37,6 +37,7 @@ import javax.sip.SipException; import javax.sip.header.FromHeader; import javax.sip.message.Response; import java.text.ParseException; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; @@ -102,55 +103,63 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements responseAck((SIPRequest) evt.getRequest(), Response.BUSY_HERE, null, null); logger.error("[notify] 待处理消息队列已满 {},返回486 BUSY_HERE,消息不做处理", userSetting.getMaxNotifyCountQueue()); return; - }else { + } else { responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null); } - }catch (SipException | InvalidArgumentException | ParseException e) { + } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("未处理的异常 ", e); } boolean runed = !taskQueue.isEmpty(); taskQueue.offer(new HandlerCatchData(evt, null, null)); - if (!runed) { - taskExecutor.execute(()-> { -// logger.warn("开始处理"); - while (!taskQueue.isEmpty()) { - try { - HandlerCatchData take = taskQueue.poll(); - if (take == null) { - continue; - } - Element rootElement = getRootElement(take.getEvt()); - if (rootElement == null) { - logger.error("处理NOTIFY消息时未获取到消息体,{}", take.getEvt().getRequest()); - continue; - } - String cmd = XmlUtil.getText(rootElement, "CmdType"); - - if (CmdType.CATALOG.equals(cmd)) { - logger.info("接收到Catalog通知"); - notifyRequestForCatalogProcessor.process(take.getEvt()); - } else if (CmdType.ALARM.equals(cmd)) { - logger.info("接收到Alarm通知"); - processNotifyAlarm(take.getEvt()); - } else if (CmdType.MOBILE_POSITION.equals(cmd)) { -// logger.info("接收到MobilePosition通知"); -// processNotifyMobilePosition(take.getEvt()); -// taskExecutor.execute(() -> { - notifyRequestForMobilePositionProcessor.process(take.getEvt()); -// }); - - } else { - logger.info("接收到消息:" + cmd); - } - } catch (DocumentException e) { - logger.error("处理NOTIFY消息时错误", e); - } + } + @Scheduled(fixedRate = 200) //每200毫秒执行一次 + public void executeTaskQueue(){ + if (taskQueue.isEmpty()) { + return; + } + try { + List catalogEventList = new ArrayList<>(); + List alarmEventList = new ArrayList<>(); + List mobilePositionEventList = new ArrayList<>(); + for (HandlerCatchData take : taskQueue) { + if (take == null) { + continue; } - }); + Element rootElement = getRootElement(take.getEvt()); + if (rootElement == null) { + logger.error("处理NOTIFY消息时未获取到消息体,{}", take.getEvt().getRequest()); + continue; + } + String cmd = XmlUtil.getText(rootElement, "CmdType"); + + if (CmdType.CATALOG.equals(cmd)) { + catalogEventList.add(take.getEvt()); + } else if (CmdType.ALARM.equals(cmd)) { + alarmEventList.add(take.getEvt()); + } else if (CmdType.MOBILE_POSITION.equals(cmd)) { + mobilePositionEventList.add(take.getEvt()); + } else { + logger.info("接收到消息:" + cmd); + } + } + taskQueue.clear(); + if (!alarmEventList.isEmpty()) { + processNotifyAlarm(alarmEventList); + } + if (!catalogEventList.isEmpty()) { + notifyRequestForCatalogProcessor.process(catalogEventList); + } + if (!mobilePositionEventList.isEmpty()) { + notifyRequestForMobilePositionProcessor.process(mobilePositionEventList); + } + } catch (DocumentException e) { + logger.error("处理NOTIFY消息时错误", e); } } + + /** * 处理MobilePosition移动位置Notify * @@ -253,95 +262,97 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements /*** * 处理alarm设备报警Notify - * - * @param evt */ - private void processNotifyAlarm(RequestEvent evt) { + private void processNotifyAlarm(List evtList) { if (!sipConfig.isAlarm()) { return; } - try { - FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); - String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); + if (!evtList.isEmpty()) { + for (RequestEvent evt : evtList) { + try { + FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); + String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); - Element rootElement = getRootElement(evt); - if (rootElement == null) { - logger.error("处理alarm设备报警Notify时未获取到消息体{}", evt.getRequest()); - return; - } - Element deviceIdElement = rootElement.element("DeviceID"); - String channelId = deviceIdElement.getText().toString(); + Element rootElement = getRootElement(evt); + if (rootElement == null) { + logger.error("处理alarm设备报警Notify时未获取到消息体{}", evt.getRequest()); + return; + } + Element deviceIdElement = rootElement.element("DeviceID"); + String channelId = deviceIdElement.getText().toString(); - Device device = redisCatchStorage.getDevice(deviceId); - if (device == null) { - logger.warn("[ NotifyAlarm ] 未找到设备:{}", deviceId); - return; - } - rootElement = getRootElement(evt, device.getCharset()); - if (rootElement == null) { - logger.warn("[ NotifyAlarm ] content cannot be null, {}", evt.getRequest()); - return; - } - DeviceAlarm deviceAlarm = new DeviceAlarm(); - deviceAlarm.setDeviceId(deviceId); - deviceAlarm.setAlarmPriority(XmlUtil.getText(rootElement, "AlarmPriority")); - deviceAlarm.setAlarmMethod(XmlUtil.getText(rootElement, "AlarmMethod")); - String alarmTime = XmlUtil.getText(rootElement, "AlarmTime"); - if (alarmTime == null) { - logger.warn("[ NotifyAlarm ] AlarmTime cannot be null"); - return; - } - deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime)); - if (XmlUtil.getText(rootElement, "AlarmDescription") == null) { - deviceAlarm.setAlarmDescription(""); - } else { - deviceAlarm.setAlarmDescription(XmlUtil.getText(rootElement, "AlarmDescription")); - } - if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Longitude"))) { - deviceAlarm.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude"))); - } else { - deviceAlarm.setLongitude(0.00); - } - if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Latitude"))) { - deviceAlarm.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude"))); - } else { - deviceAlarm.setLatitude(0.00); - } - logger.info("[收到Notify-Alarm]:{}/{}", device.getDeviceId(), deviceAlarm.getChannelId()); - if ("4".equals(deviceAlarm.getAlarmMethod())) { - MobilePosition mobilePosition = new MobilePosition(); - mobilePosition.setChannelId(channelId); - mobilePosition.setCreateTime(DateUtil.getNow()); - mobilePosition.setDeviceId(deviceAlarm.getDeviceId()); - mobilePosition.setTime(deviceAlarm.getAlarmTime()); - mobilePosition.setLongitude(deviceAlarm.getLongitude()); - mobilePosition.setLatitude(deviceAlarm.getLatitude()); - mobilePosition.setReportSource("GPS Alarm"); + Device device = redisCatchStorage.getDevice(deviceId); + if (device == null) { + logger.warn("[ NotifyAlarm ] 未找到设备:{}", deviceId); + return; + } + rootElement = getRootElement(evt, device.getCharset()); + if (rootElement == null) { + logger.warn("[ NotifyAlarm ] content cannot be null, {}", evt.getRequest()); + return; + } + DeviceAlarm deviceAlarm = new DeviceAlarm(); + deviceAlarm.setDeviceId(deviceId); + deviceAlarm.setAlarmPriority(XmlUtil.getText(rootElement, "AlarmPriority")); + deviceAlarm.setAlarmMethod(XmlUtil.getText(rootElement, "AlarmMethod")); + String alarmTime = XmlUtil.getText(rootElement, "AlarmTime"); + if (alarmTime == null) { + logger.warn("[ NotifyAlarm ] AlarmTime cannot be null"); + return; + } + deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime)); + if (XmlUtil.getText(rootElement, "AlarmDescription") == null) { + deviceAlarm.setAlarmDescription(""); + } else { + deviceAlarm.setAlarmDescription(XmlUtil.getText(rootElement, "AlarmDescription")); + } + if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Longitude"))) { + deviceAlarm.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude"))); + } else { + deviceAlarm.setLongitude(0.00); + } + if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Latitude"))) { + deviceAlarm.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude"))); + } else { + deviceAlarm.setLatitude(0.00); + } + logger.info("[收到Notify-Alarm]:{}/{}", device.getDeviceId(), deviceAlarm.getChannelId()); + if ("4".equals(deviceAlarm.getAlarmMethod())) { + MobilePosition mobilePosition = new MobilePosition(); + mobilePosition.setChannelId(channelId); + mobilePosition.setCreateTime(DateUtil.getNow()); + mobilePosition.setDeviceId(deviceAlarm.getDeviceId()); + mobilePosition.setTime(deviceAlarm.getAlarmTime()); + mobilePosition.setLongitude(deviceAlarm.getLongitude()); + mobilePosition.setLatitude(deviceAlarm.getLatitude()); + mobilePosition.setReportSource("GPS Alarm"); - // 更新device channel 的经纬度 - DeviceChannel deviceChannel = new DeviceChannel(); - deviceChannel.setDeviceId(device.getDeviceId()); - deviceChannel.setChannelId(channelId); - deviceChannel.setLongitude(mobilePosition.getLongitude()); - deviceChannel.setLatitude(mobilePosition.getLatitude()); - deviceChannel.setGpsTime(mobilePosition.getTime()); + // 更新device channel 的经纬度 + DeviceChannel deviceChannel = new DeviceChannel(); + deviceChannel.setDeviceId(device.getDeviceId()); + deviceChannel.setChannelId(channelId); + deviceChannel.setLongitude(mobilePosition.getLongitude()); + deviceChannel.setLatitude(mobilePosition.getLatitude()); + deviceChannel.setGpsTime(mobilePosition.getTime()); - deviceChannel = deviceChannelService.updateGps(deviceChannel, device); + deviceChannel = deviceChannelService.updateGps(deviceChannel, device); - mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84()); - mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84()); - mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); - mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); + mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84()); + mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84()); + mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); + mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); - deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); + deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); + } + + // 回复200 OK + if (redisCatchStorage.deviceIsOnline(deviceId)) { + publisher.deviceAlarmEventPublish(deviceAlarm); + } + } catch (DocumentException e) { + logger.error("未处理的异常 ", e); + } } - - // 回复200 OK - if (redisCatchStorage.deviceIsOnline(deviceId)) { - publisher.deviceAlarmEventPublish(deviceAlarm); - } - } catch (DocumentException e) { - logger.error("未处理的异常 ", e); } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java b/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java index 7dd04e8a..a8cd5813 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java @@ -102,4 +102,9 @@ public interface IDeviceChannelService { void batchAddMobilePosition(List addMobilePositionList); + void online(DeviceChannel channel); + + void offline(DeviceChannel channel); + + void delete(DeviceChannel channel); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java index f7666409..67e34a02 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java @@ -23,6 +23,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import org.springframework.util.ObjectUtils; import java.util.ArrayList; @@ -247,11 +248,27 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { return channelMapper.batchOnline(channels); } + @Override + public void online(DeviceChannel channel) { + channelMapper.online(channel.getDeviceId(), channel.getChannelId()); + } + @Override public int channelsOffline(List channels) { return channelMapper.batchOffline(channels); } + + @Override + public void offline(DeviceChannel channel) { + channelMapper.offline(channel.getDeviceId(), channel.getChannelId()); + } + + @Override + public void delete(DeviceChannel channel) { + channelMapper.del(channel.getDeviceId(), channel.getChannelId()); + } + @Override public DeviceChannel getOne(String deviceId, String channelId){ return channelMapper.queryChannel(deviceId, channelId); @@ -355,12 +372,45 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { } @Override + @Transactional public void batchUpdateChannelGPS(List channelList) { - channelMapper.batchUpdate(channelList); + for (DeviceChannel deviceChannel : channelList) { + deviceChannel.setUpdateTime(DateUtil.getNow()); + if (deviceChannel.getGpsTime() == null) { + deviceChannel.setGpsTime(DateUtil.getNow()); + } + } + int count = 1000; + if (channelList.size() > count) { + for (int i = 0; i < channelList.size(); i+=count) { + int toIndex = i+count; + if ( i + count > channelList.size()) { + toIndex = channelList.size(); + } + List channels = channelList.subList(i, toIndex); + channelMapper.batchUpdatePosition(channels); + } + }else { + channelMapper.batchUpdatePosition(channelList); + } } @Override + @Transactional public void batchAddMobilePosition(List mobilePositions) { +// int count = 500; +// if (mobilePositions.size() > count) { +// for (int i = 0; i < mobilePositions.size(); i+=count) { +// int toIndex = i+count; +// if ( i + count > mobilePositions.size()) { +// toIndex = mobilePositions.size(); +// } +// List mobilePositionsSub = mobilePositions.subList(i, toIndex); +// deviceMobilePositionMapper.batchadd(mobilePositionsSub); +// } +// }else { +// deviceMobilePositionMapper.batchadd(mobilePositions); +// } deviceMobilePositionMapper.batchadd(mobilePositions); } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java index ae223367..10d0ee1c 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java @@ -401,6 +401,23 @@ public interface DeviceChannelMapper { " "}) int updatePosition(DeviceChannel deviceChannel); + @Update({""}) + int batchUpdatePosition(List deviceChannelList); + @Select("SELECT * FROM wvp_device_channel WHERE length(trim(stream_id)) > 0") List getAllChannelInPlay(); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java index e3d89826..c28b16ec 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java @@ -46,6 +46,20 @@ public interface DeviceMobilePositionMapper { "#{item.createTime}) " + " " + "") + void batchadd2(List mobilePositions); + + @Insert("") void batchadd(List mobilePositions); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index e42ea68b..8b9cc079 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -570,7 +570,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @Override public void sendMobilePositionMsg(JSONObject jsonObject) { String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_MOBILE_POSITION; - logger.info("[redis发送通知] 发送 移动位置 {}: {}", key, jsonObject.toString()); +// logger.info("[redis发送通知] 发送 移动位置 {}: {}", key, jsonObject.toString()); redisTemplate.convertAndSend(key, jsonObject); }