diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisTemplateConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisTemplateConfig.java index df3345ee..2dc66b38 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisTemplateConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisTemplateConfig.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.conf.redis; import com.alibaba.fastjson2.support.spring.data.redis.GenericFastJsonRedisSerializer; +import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; @@ -25,4 +26,20 @@ public class RedisTemplateConfig { redisTemplate.setConnectionFactory(redisConnectionFactory); return redisTemplate; } + + @Bean + public RedisTemplate getRedisTemplateForMobilePosition(RedisConnectionFactory redisConnectionFactory) { + RedisTemplate redisTemplate = new RedisTemplate<>(); + // 使用fastJson序列化 + GenericFastJsonRedisSerializer fastJsonRedisSerializer = new GenericFastJsonRedisSerializer(); + // value值的序列化采用fastJsonRedisSerializer + redisTemplate.setValueSerializer(fastJsonRedisSerializer); + redisTemplate.setHashValueSerializer(fastJsonRedisSerializer); + + // key的序列化采用StringRedisSerializer + redisTemplate.setKeySerializer(new StringRedisSerializer()); + redisTemplate.setHashKeySerializer(new StringRedisSerializer()); + redisTemplate.setConnectionFactory(redisConnectionFactory); + return redisTemplate; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java index 013d95eb..96e4ecac 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java @@ -11,6 +11,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorP import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.service.IDeviceChannelService; +import com.genersoft.iot.vmp.service.IMobilePositionService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.utils.DateUtil; import org.dom4j.DocumentException; @@ -20,15 +21,11 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; -import org.springframework.transaction.annotation.Transactional; import org.springframework.util.ObjectUtils; import javax.sip.RequestEvent; import javax.sip.header.FromHeader; -import java.util.ArrayList; import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; /** @@ -54,6 +51,9 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor @Autowired private IDeviceChannelService deviceChannelService; + @Autowired + private IMobilePositionService mobilePositionService; + public void process(RequestEvent evt) { if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) { @@ -64,13 +64,10 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor } @Scheduled(fixedRate = 200) //每200毫秒执行一次 - @Transactional public void executeTaskQueue() { if (taskQueue.isEmpty()) { return; } - Map updateChannelMap = new ConcurrentHashMap<>(); - List addMobilePositionList = new ArrayList<>(); for (HandlerCatchData take : taskQueue) { if (take == null) { continue; @@ -150,16 +147,7 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor // mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime); mobilePosition.setReportSource("Mobile Position"); - // 更新device channel 的经纬度 - DeviceChannel deviceChannel = new DeviceChannel(); - deviceChannel.setDeviceId(device.getDeviceId()); - deviceChannel.setLongitude(mobilePosition.getLongitude()); - deviceChannel.setLatitude(mobilePosition.getLatitude()); - deviceChannel.setGpsTime(mobilePosition.getTime()); - updateChannelMap.put(deviceId + mobilePosition.getChannelId(), deviceChannel); - addMobilePositionList.add(mobilePosition); - - + mobilePositionService.add(mobilePosition); // 向关联了该通道并且开启移动位置订阅的上级平台发送移动位置订阅消息 try { eventPublisher.mobilePositionEventPublish(mobilePosition); @@ -199,21 +187,6 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor } } taskQueue.clear(); - if(!updateChannelMap.isEmpty()) { - List channels = new ArrayList<>(updateChannelMap.values()); - logger.info("[移动位置订阅]更新通道位置: {}", channels.size()); - deviceChannelService.batchUpdateChannel(channels); - updateChannelMap.clear(); - } - if (userSetting.isSavePositionHistory() && !addMobilePositionList.isEmpty()) { - try { - logger.info("[移动位置订阅] 添加通道轨迹点位: {}", addMobilePositionList.size()); - deviceChannelService.batchAddMobilePosition(addMobilePositionList); - }catch (Exception e) { - logger.info("[移动位置订阅] b添加通道轨迹点位保存失败: {}", addMobilePositionList.size()); - } - addMobilePositionList.clear(); - } } @Scheduled(fixedRate = 10000) public void execute(){ diff --git a/src/main/java/com/genersoft/iot/vmp/service/IMobilePositionService.java b/src/main/java/com/genersoft/iot/vmp/service/IMobilePositionService.java new file mode 100644 index 00000000..9af64150 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/IMobilePositionService.java @@ -0,0 +1,13 @@ +package com.genersoft.iot.vmp.service; + + +import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; + +import java.util.List; + +public interface IMobilePositionService { + + void add(List mobilePositionList); + + void add(MobilePosition mobilePosition); +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MobilePositionServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MobilePositionServiceImpl.java new file mode 100644 index 00000000..277493ad --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MobilePositionServiceImpl.java @@ -0,0 +1,95 @@ +package com.genersoft.iot.vmp.service.impl; + +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; +import com.genersoft.iot.vmp.service.IMobilePositionService; +import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper; +import com.genersoft.iot.vmp.storager.dao.DeviceMobilePositionMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + + +@Service +public class MobilePositionServiceImpl implements IMobilePositionService { + + @Autowired + private DeviceChannelMapper channelMapper; + + @Autowired + private DeviceMobilePositionMapper mobilePositionMapper; + + @Autowired + private UserSetting userSetting; + + @Autowired + private RedisTemplate redisTemplate; + + private final static Logger logger = LoggerFactory.getLogger(MobilePositionServiceImpl.class); + + private final String REDIS_MOBILE_POSITION_LIST = "redis_mobile_position_list"; + + @Override + public void add(MobilePosition mobilePosition) { + List list = new ArrayList<>(); + list.add(mobilePosition); + add(list); + } + + @Override + public void add(List mobilePositionList) { + redisTemplate.opsForList().leftPushAll(REDIS_MOBILE_POSITION_LIST, mobilePositionList); + } + + private List get(int length) { + Long size = redisTemplate.opsForList().size(REDIS_MOBILE_POSITION_LIST); + if (size == null || size == 0) { + return new ArrayList<>(); + } + List mobilePositions; + if (size > length) { + mobilePositions = redisTemplate.opsForList().rightPop(REDIS_MOBILE_POSITION_LIST, length); + }else { + mobilePositions = redisTemplate.opsForList().rightPop(REDIS_MOBILE_POSITION_LIST, size); + } + return mobilePositions; + } + + + + @Scheduled(fixedRate = 1000) + @Transactional + public void executeTaskQueue() { + int countLimit = 3000; + List mobilePositions = get(countLimit); + if (mobilePositions == null || mobilePositions.isEmpty()) { + return; + } + if (userSetting.getSavePositionHistory()) { + mobilePositionMapper.batchadd(mobilePositions); + } + logger.info("[移动位置订阅]更新通道位置: {}", mobilePositions.size()); + Map updateChannelMap = new HashMap<>(); + for (MobilePosition mobilePosition : mobilePositions) { + DeviceChannel deviceChannel = new DeviceChannel(); + deviceChannel.setDeviceId(mobilePosition.getDeviceId()); + deviceChannel.setLongitude(mobilePosition.getLongitude()); + deviceChannel.setLatitude(mobilePosition.getLatitude()); + deviceChannel.setGpsTime(mobilePosition.getTime()); + updateChannelMap.put(mobilePosition.getDeviceId() + mobilePosition.getChannelId(), deviceChannel); + } + List channels = new ArrayList<>(updateChannelMap.values()); + channelMapper.batchUpdatePosition(channels); + } + +} diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java index be78a520..15552211 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java @@ -401,23 +401,6 @@ public interface DeviceChannelMapper { " "}) int updatePosition(DeviceChannel deviceChannel); - @Update({""}) - int batchUpdatePosition(List deviceChannelList); - @Select("SELECT * FROM wvp_device_channel WHERE length(trim(stream_id)) > 0") List getAllChannelInPlay(); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java index c28b16ec..51243106 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java @@ -49,7 +49,7 @@ public interface DeviceMobilePositionMapper { void batchadd2(List mobilePositions); @Insert("") void batchadd(List mobilePositions);