diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java index e49b01de..661e370a 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java @@ -15,7 +15,6 @@ import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; @@ -216,10 +215,11 @@ public class RedisRpcConfig implements MessageListener { return callbacks.size(); } - @Scheduled(fixedRate = 1000) //每1秒执行一次 - public void execute(){ - System.out.println("callbacks的长度: " + callbacks.size()); - System.out.println("队列的长度: " + topicSubscribers.size()); - System.out.println("HOOK监听的长度: " + hookSubscribe.size()); - } +// @Scheduled(fixedRate = 1000) //每1秒执行一次 +// public void execute(){ +// logger.info("callbacks的长度: " + callbacks.size()); +// logger.info("队列的长度: " + topicSubscribers.size()); +// logger.info("HOOK监听的长度: " + hookSubscribe.size()); +// logger.info(""); +// } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java index 2593ec67..8290a458 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java @@ -238,7 +238,7 @@ public class DeviceChannel { * 是否含有音频 */ @Schema(description = "是否含有音频") - private boolean hasAudio; + private Boolean hasAudio; /** * 标记通道的类型,0->国标通道 1->直播流通道 2->业务分组/虚拟组织/行政区划 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 8e2f5038..d61a818f 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -831,6 +831,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } }); // 添加回复的拒绝或者错误的通知 + // redis消息例如: PUBLISH VM_MSG_STREAM_PUSH_RESPONSE '{"code":1,"msg":"失败","app":"1","stream":"2"}' redisPushStreamResponseListener.addEvent(sendRtpItem.getApp(), sendRtpItem.getStream(), response -> { if (response.getCode() != 0) { dynamicTask.stop(sendRtpItem.getCallId()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java index cd97786d..cde70eb4 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java @@ -295,9 +295,10 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent private void executeSaveForUpdate(){ if (!updateChannelMap.values().isEmpty()) { + logger.info("[存储收到的更新通道], 数量: {}", updateChannelMap.size()); ArrayList deviceChannels = new ArrayList<>(updateChannelMap.values()); - updateChannelMap.clear(); deviceChannelService.batchUpdateChannel(deviceChannels); + updateChannelMap.clear(); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java new file mode 100755 index 00000000..89f57c2a --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java @@ -0,0 +1,214 @@ +package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; + +import com.genersoft.iot.vmp.conf.CivilCodeFileConf; +import com.genersoft.iot.vmp.conf.DynamicTask; +import com.genersoft.iot.vmp.conf.SipConfig; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; +import com.genersoft.iot.vmp.gb28181.event.EventPublisher; +import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; +import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; +import com.genersoft.iot.vmp.gb28181.utils.SipUtils; +import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; +import com.genersoft.iot.vmp.service.IDeviceChannelService; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.utils.DateUtil; +import org.dom4j.DocumentException; +import org.dom4j.Element; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.springframework.util.ObjectUtils; + +import javax.sip.RequestEvent; +import javax.sip.header.FromHeader; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; + +/** + * SIP命令类型: NOTIFY请求中的移动位置请求处理 + */ +@Component +public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessorParent { + + + private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForMobilePositionProcessor.class); + + private final Map updateChannelMap = new ConcurrentHashMap<>(); + + private final List addMobilePositionList = new CopyOnWriteArrayList(); + + + @Autowired + private UserSetting userSetting; + + @Autowired + private EventPublisher eventPublisher; + + @Autowired + private IRedisCatchStorage redisCatchStorage; + + @Autowired + private IDeviceChannelService deviceChannelService; + + @Autowired + private DynamicTask dynamicTask; + + @Autowired + private CivilCodeFileConf civilCodeFileConf; + + @Autowired + private SipConfig sipConfig; + + private final static String talkKey = "notify-request-for-mobile-position-task"; + + public void process(RequestEvent evt) { + try { + FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); + String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); + + // 回复 200 OK + Element rootElement = getRootElement(evt); + if (rootElement == null) { + logger.error("处理MobilePosition移动位置Notify时未获取到消息体,{}", evt.getRequest()); + return; + } + + MobilePosition mobilePosition = new MobilePosition(); + mobilePosition.setCreateTime(DateUtil.getNow()); + + Element deviceIdElement = rootElement.element("DeviceID"); + String channelId = deviceIdElement.getTextTrim().toString(); + Device device = redisCatchStorage.getDevice(deviceId); + + if (device == null) { + device = redisCatchStorage.getDevice(channelId); + if (device == null) { + // 根据通道id查询设备Id + List deviceList = deviceChannelService.getDeviceByChannelId(channelId); + if (deviceList.size() > 0) { + device = deviceList.get(0); + } + } + } + if (device == null) { + logger.warn("[mobilePosition移动位置Notify] 未找到通道{}所属的设备", channelId); + return; + } + // 兼容设备部分设备上报是通道编号与设备编号一致的情况 + if (deviceId.equals(channelId)) { + List deviceChannels = deviceChannelService.queryChaneListByDeviceId(deviceId); + if (deviceChannels.size() == 1) { + channelId = deviceChannels.get(0).getChannelId(); + } + } + if (!ObjectUtils.isEmpty(device.getName())) { + mobilePosition.setDeviceName(device.getName()); + } + + mobilePosition.setDeviceId(device.getDeviceId()); + mobilePosition.setChannelId(channelId); + String time = XmlUtil.getText(rootElement, "Time"); + if (ObjectUtils.isEmpty(time)) { + mobilePosition.setTime(DateUtil.getNow()); + } else { + mobilePosition.setTime(SipUtils.parseTime(time)); + } + + mobilePosition.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude"))); + mobilePosition.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude"))); + if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Speed"))) { + mobilePosition.setSpeed(Double.parseDouble(XmlUtil.getText(rootElement, "Speed"))); + } else { + mobilePosition.setSpeed(0.0); + } + if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Direction"))) { + mobilePosition.setDirection(Double.parseDouble(XmlUtil.getText(rootElement, "Direction"))); + } else { + mobilePosition.setDirection(0.0); + } + if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Altitude"))) { + mobilePosition.setAltitude(Double.parseDouble(XmlUtil.getText(rootElement, "Altitude"))); + } else { + mobilePosition.setAltitude(0.0); + } + logger.info("[收到移动位置订阅通知]:{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(), + mobilePosition.getLongitude(), mobilePosition.getLatitude()); + mobilePosition.setReportSource("Mobile Position"); + + // 更新device channel 的经纬度 + DeviceChannel deviceChannel = new DeviceChannel(); + deviceChannel.setDeviceId(device.getDeviceId()); + deviceChannel.setChannelId(channelId); + deviceChannel.setLongitude(mobilePosition.getLongitude()); + deviceChannel.setLatitude(mobilePosition.getLatitude()); + deviceChannel.setGpsTime(mobilePosition.getTime()); + updateChannelMap.put(channelId, deviceChannel); + addMobilePositionList.add(mobilePosition); + if(updateChannelMap.size() > 300) { + executeSaveChannel(); + } + if (userSetting.isSavePositionHistory()) { + if(addMobilePositionList.size() > 300) { + executeSaveMobilePosition(); + } + } + +// deviceChannel = deviceChannelService.updateGps(deviceChannel, device); +// +// mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84()); +// mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84()); +// mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); +// mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); + +// deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); + + if (!dynamicTask.contains(talkKey)) { + dynamicTask.startDelay(talkKey, this::executeSave, 1000); + } + + } catch (DocumentException e) { + logger.error("未处理的异常 ", e); + } + + + } + + private void executeSave(){ + executeSaveChannel(); + executeSaveMobilePosition(); + dynamicTask.stop(talkKey); + } + + private void executeSaveChannel(){ + try { + logger.info("[移动位置订阅]更新通道位置: {}", updateChannelMap.size()); + ArrayList deviceChannels = new ArrayList<>(updateChannelMap.values()); + deviceChannelService.batchUpdateChannelGPS(deviceChannels); + updateChannelMap.clear(); + }catch (Exception e) { + + } + } + + private void executeSaveMobilePosition(){ + if (userSetting.isSavePositionHistory()) { + try { + logger.info("[移动位置订阅] 添加通道轨迹点位: {}", addMobilePositionList.size()); + deviceChannelService.batchAddMobilePosition(addMobilePositionList); + addMobilePositionList.clear(); + }catch (Exception e) { + logger.info("[移动位置订阅] b添加通道轨迹点位保存失败: {}", addMobilePositionList.size()); + } + } + } + + + +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index edfee495..84f44b58 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -228,12 +228,12 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements deviceChannel.setLongitude(mobilePosition.getLongitude()); deviceChannel.setLatitude(mobilePosition.getLatitude()); deviceChannel.setGpsTime(mobilePosition.getTime()); - deviceChannel = deviceChannelService.updateGps(deviceChannel, device); - - mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84()); - mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84()); - mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); - mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); +// deviceChannel = deviceChannelService.updateGps(deviceChannel, device); +// +// mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84()); +// mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84()); +// mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); +// mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/utils/XmlUtil.java b/src/main/java/com/genersoft/iot/vmp/gb28181/utils/XmlUtil.java index 70702bb6..6596f53a 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/utils/XmlUtil.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/utils/XmlUtil.java @@ -532,16 +532,17 @@ public class XmlUtil { String status = getText(itemDevice, "Status"); if (status != null) { // ONLINE OFFLINE HIKVISION DS-7716N-E4 NVR的兼容性处理 - if (status.equals("ON") || status.equals("On") || status.equals("ONLINE") || status.equals("OK")) { + if (status.equalsIgnoreCase("ON") || status.equalsIgnoreCase("On") || status.equalsIgnoreCase("ONLINE") || status.equalsIgnoreCase("OK")) { deviceChannel.setStatus(true); } - if (status.equals("OFF") || status.equals("Off") || status.equals("OFFLINE")) { + if (status.equalsIgnoreCase("OFF") || status.equalsIgnoreCase("Off") || status.equalsIgnoreCase("OFFLINE")) { deviceChannel.setStatus(false); } }else { deviceChannel.setStatus(true); } - +// logger.info("状态字符串: {}", status); +// logger.info("状态结果: {}", deviceChannel.isStatus()); // 经度 String longitude = getText(itemDevice, "Longitude"); if (NumericUtil.isDouble(longitude)) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java b/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java index c690f11b..7dd04e8a 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java @@ -98,4 +98,8 @@ public interface IDeviceChannelService { void updateChannelGPS(Device device, DeviceChannel deviceChannel, MobilePosition mobilePosition); + void batchUpdateChannelGPS(List channelList); + + void batchAddMobilePosition(List addMobilePositionList); + } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java index 632be91e..37603812 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java @@ -353,4 +353,14 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { redisCatchStorage.sendMobilePositionMsg(jsonObject); } } + + @Override + public void batchUpdateChannelGPS(List channelList) { + + } + + @Override + public void batchAddMobilePosition(List mobilePositions) { + + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java index 2c2674fd..f4c99232 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java @@ -565,6 +565,7 @@ public class DeviceServiceImpl implements IDeviceService { removeMobilePositionSubscribe(deviceInStore, result->{ // 开启订阅 deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition()); + deviceInStore.setMobilePositionSubmissionInterval(device.getMobilePositionSubmissionInterval()); addMobilePositionSubscribe(deviceInStore); // 因为是异步执行,需要在这里更新下数据 deviceMapper.updateCustom(deviceInStore); @@ -573,12 +574,14 @@ public class DeviceServiceImpl implements IDeviceService { }else { // 开启订阅 deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition()); + deviceInStore.setMobilePositionSubmissionInterval(device.getMobilePositionSubmissionInterval()); addMobilePositionSubscribe(deviceInStore); } }else if (device.getSubscribeCycleForMobilePosition() == 0) { // 取消订阅 deviceInStore.setSubscribeCycleForMobilePosition(0); + deviceInStore.setMobilePositionSubmissionInterval(0); removeMobilePositionSubscribe(deviceInStore, null); } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 69f947e2..3f478442 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -2,4 +2,4 @@ spring: application: name: wvp profiles: - active: local2 \ No newline at end of file + active: local \ No newline at end of file