优化大量notify 移动位置订阅的入库

pull/1489/head
648540858 2024-05-13 17:22:36 +08:00
parent 588b1da35a
commit 5564cfb384
6 changed files with 132 additions and 51 deletions

View File

@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.conf.redis; package com.genersoft.iot.vmp.conf.redis;
import com.alibaba.fastjson2.support.spring.data.redis.GenericFastJsonRedisSerializer; import com.alibaba.fastjson2.support.spring.data.redis.GenericFastJsonRedisSerializer;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.RedisConnectionFactory;
@ -25,4 +26,20 @@ public class RedisTemplateConfig {
redisTemplate.setConnectionFactory(redisConnectionFactory); redisTemplate.setConnectionFactory(redisConnectionFactory);
return redisTemplate; return redisTemplate;
} }
@Bean
public RedisTemplate<String, MobilePosition> getRedisTemplateForMobilePosition(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, MobilePosition> redisTemplate = new RedisTemplate<>();
// 使用fastJson序列化
GenericFastJsonRedisSerializer fastJsonRedisSerializer = new GenericFastJsonRedisSerializer();
// value值的序列化采用fastJsonRedisSerializer
redisTemplate.setValueSerializer(fastJsonRedisSerializer);
redisTemplate.setHashValueSerializer(fastJsonRedisSerializer);
// key的序列化采用StringRedisSerializer
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setConnectionFactory(redisConnectionFactory);
return redisTemplate;
}
} }

View File

@ -11,6 +11,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorP
import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.service.IDeviceChannelService; import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.service.IMobilePositionService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.utils.DateUtil;
import org.dom4j.DocumentException; import org.dom4j.DocumentException;
@ -20,15 +21,11 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import javax.sip.RequestEvent; import javax.sip.RequestEvent;
import javax.sip.header.FromHeader; import javax.sip.header.FromHeader;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
/** /**
@ -54,6 +51,9 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor
@Autowired @Autowired
private IDeviceChannelService deviceChannelService; private IDeviceChannelService deviceChannelService;
@Autowired
private IMobilePositionService mobilePositionService;
public void process(RequestEvent evt) { public void process(RequestEvent evt) {
if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) { if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) {
@ -64,13 +64,10 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor
} }
@Scheduled(fixedRate = 200) //每200毫秒执行一次 @Scheduled(fixedRate = 200) //每200毫秒执行一次
@Transactional
public void executeTaskQueue() { public void executeTaskQueue() {
if (taskQueue.isEmpty()) { if (taskQueue.isEmpty()) {
return; return;
} }
Map<String, DeviceChannel> updateChannelMap = new ConcurrentHashMap<>();
List<MobilePosition> addMobilePositionList = new ArrayList<>();
for (HandlerCatchData take : taskQueue) { for (HandlerCatchData take : taskQueue) {
if (take == null) { if (take == null) {
continue; continue;
@ -150,16 +147,7 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor
// mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime); // mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime);
mobilePosition.setReportSource("Mobile Position"); mobilePosition.setReportSource("Mobile Position");
// 更新device channel 的经纬度 mobilePositionService.add(mobilePosition);
DeviceChannel deviceChannel = new DeviceChannel();
deviceChannel.setDeviceId(device.getDeviceId());
deviceChannel.setLongitude(mobilePosition.getLongitude());
deviceChannel.setLatitude(mobilePosition.getLatitude());
deviceChannel.setGpsTime(mobilePosition.getTime());
updateChannelMap.put(deviceId + mobilePosition.getChannelId(), deviceChannel);
addMobilePositionList.add(mobilePosition);
// 向关联了该通道并且开启移动位置订阅的上级平台发送移动位置订阅消息 // 向关联了该通道并且开启移动位置订阅的上级平台发送移动位置订阅消息
try { try {
eventPublisher.mobilePositionEventPublish(mobilePosition); eventPublisher.mobilePositionEventPublish(mobilePosition);
@ -199,21 +187,6 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor
} }
} }
taskQueue.clear(); taskQueue.clear();
if(!updateChannelMap.isEmpty()) {
List<DeviceChannel> channels = new ArrayList<>(updateChannelMap.values());
logger.info("[移动位置订阅]更新通道位置: {}", channels.size());
deviceChannelService.batchUpdateChannel(channels);
updateChannelMap.clear();
}
if (userSetting.isSavePositionHistory() && !addMobilePositionList.isEmpty()) {
try {
logger.info("[移动位置订阅] 添加通道轨迹点位: {}", addMobilePositionList.size());
deviceChannelService.batchAddMobilePosition(addMobilePositionList);
}catch (Exception e) {
logger.info("[移动位置订阅] b添加通道轨迹点位保存失败 {}", addMobilePositionList.size());
}
addMobilePositionList.clear();
}
} }
@Scheduled(fixedRate = 10000) @Scheduled(fixedRate = 10000)
public void execute(){ public void execute(){

View File

@ -0,0 +1,13 @@
package com.genersoft.iot.vmp.service;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import java.util.List;
public interface IMobilePositionService {
void add(List<MobilePosition> mobilePositionList);
void add(MobilePosition mobilePosition);
}

View File

@ -0,0 +1,95 @@
package com.genersoft.iot.vmp.service.impl;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import com.genersoft.iot.vmp.service.IMobilePositionService;
import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper;
import com.genersoft.iot.vmp.storager.dao.DeviceMobilePositionMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Service
public class MobilePositionServiceImpl implements IMobilePositionService {
@Autowired
private DeviceChannelMapper channelMapper;
@Autowired
private DeviceMobilePositionMapper mobilePositionMapper;
@Autowired
private UserSetting userSetting;
@Autowired
private RedisTemplate<String, MobilePosition> redisTemplate;
private final static Logger logger = LoggerFactory.getLogger(MobilePositionServiceImpl.class);
private final String REDIS_MOBILE_POSITION_LIST = "redis_mobile_position_list";
@Override
public void add(MobilePosition mobilePosition) {
List<MobilePosition> list = new ArrayList<>();
list.add(mobilePosition);
add(list);
}
@Override
public void add(List<MobilePosition> mobilePositionList) {
redisTemplate.opsForList().leftPushAll(REDIS_MOBILE_POSITION_LIST, mobilePositionList);
}
private List<MobilePosition> get(int length) {
Long size = redisTemplate.opsForList().size(REDIS_MOBILE_POSITION_LIST);
if (size == null || size == 0) {
return new ArrayList<>();
}
List<MobilePosition> mobilePositions;
if (size > length) {
mobilePositions = redisTemplate.opsForList().rightPop(REDIS_MOBILE_POSITION_LIST, length);
}else {
mobilePositions = redisTemplate.opsForList().rightPop(REDIS_MOBILE_POSITION_LIST, size);
}
return mobilePositions;
}
@Scheduled(fixedRate = 1000)
@Transactional
public void executeTaskQueue() {
int countLimit = 3000;
List<MobilePosition> mobilePositions = get(countLimit);
if (mobilePositions == null || mobilePositions.isEmpty()) {
return;
}
if (userSetting.getSavePositionHistory()) {
mobilePositionMapper.batchadd(mobilePositions);
}
logger.info("[移动位置订阅]更新通道位置: {}", mobilePositions.size());
Map<String, DeviceChannel> updateChannelMap = new HashMap<>();
for (MobilePosition mobilePosition : mobilePositions) {
DeviceChannel deviceChannel = new DeviceChannel();
deviceChannel.setDeviceId(mobilePosition.getDeviceId());
deviceChannel.setLongitude(mobilePosition.getLongitude());
deviceChannel.setLatitude(mobilePosition.getLatitude());
deviceChannel.setGpsTime(mobilePosition.getTime());
updateChannelMap.put(mobilePosition.getDeviceId() + mobilePosition.getChannelId(), deviceChannel);
}
List<DeviceChannel> channels = new ArrayList<>(updateChannelMap.values());
channelMapper.batchUpdatePosition(channels);
}
}

View File

@ -401,23 +401,6 @@ public interface DeviceChannelMapper {
" </script>"}) " </script>"})
int updatePosition(DeviceChannel deviceChannel); int updatePosition(DeviceChannel deviceChannel);
@Update({"<script>" +
"<foreach collection='deviceChannelList' item='item' separator=';'>" +
" UPDATE" +
" wvp_device_channel" +
" SET gps_time=#{item.gpsTime}" +
"<if test='item.longitude != null'>, longitude=#{item.longitude}</if>" +
"<if test='item.latitude != null'>, latitude=#{item.latitude}</if>" +
"<if test='item.longitudeGcj02 != null'>, longitude_gcj02=#{item.longitudeGcj02}</if>" +
"<if test='item.latitudeGcj02 != null'>, latitude_gcj02=#{item.latitudeGcj02}</if>" +
"<if test='item.longitudeWgs84 != null'>, longitude_wgs84=#{item.longitudeWgs84}</if>" +
"<if test='item.latitudeWgs84 != null'>, latitude_wgs84=#{item.latitudeWgs84}</if>" +
"WHERE device_id=#{item.deviceId} " +
" <if test='item.channelId != null' > AND channel_id=#{item.channelId}</if>" +
"</foreach>" +
"</script>"})
int batchUpdatePosition(List<DeviceChannel> deviceChannelList);
@Select("SELECT * FROM wvp_device_channel WHERE length(trim(stream_id)) > 0") @Select("SELECT * FROM wvp_device_channel WHERE length(trim(stream_id)) > 0")
List<DeviceChannel> getAllChannelInPlay(); List<DeviceChannel> getAllChannelInPlay();

View File

@ -49,7 +49,7 @@ public interface DeviceMobilePositionMapper {
void batchadd2(List<MobilePosition> mobilePositions); void batchadd2(List<MobilePosition> mobilePositions);
@Insert("<script> " + @Insert("<script> " +
"<foreach collection='mobilePositions' index='index' item='item' separator=','> " + "<foreach collection='mobilePositions' index='index' item='item' separator=';'> " +
"insert into wvp_device_mobile_position " + "insert into wvp_device_mobile_position " +
"(device_id,channel_id, device_name,time,longitude,latitude,altitude,speed,direction,report_source," + "(device_id,channel_id, device_name,time,longitude,latitude,altitude,speed,direction,report_source," +
"longitude_gcj02,latitude_gcj02,longitude_wgs84,latitude_wgs84,create_time)"+ "longitude_gcj02,latitude_gcj02,longitude_wgs84,latitude_wgs84,create_time)"+
@ -57,7 +57,7 @@ public interface DeviceMobilePositionMapper {
"(#{item.deviceId}, #{item.channelId}, #{item.deviceName}, #{item.time}, #{item.longitude}, " + "(#{item.deviceId}, #{item.channelId}, #{item.deviceName}, #{item.time}, #{item.longitude}, " +
"#{item.latitude}, #{item.altitude}, #{item.speed},#{item.direction}," + "#{item.latitude}, #{item.altitude}, #{item.speed},#{item.direction}," +
"#{item.reportSource}, #{item.longitudeGcj02}, #{item.latitudeGcj02}, #{item.longitudeWgs84}, #{item.latitudeWgs84}, " + "#{item.reportSource}, #{item.longitudeGcj02}, #{item.latitudeGcj02}, #{item.longitudeWgs84}, #{item.latitudeWgs84}, " +
"#{item.createTime}); " + "#{item.createTime}) " +
"</foreach> " + "</foreach> " +
"</script>") "</script>")
void batchadd(List<MobilePosition> mobilePositions); void batchadd(List<MobilePosition> mobilePositions);