diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/CommonGBChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/CommonGBChannelMapper.java index 50206786..f9bfd688 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/CommonGBChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/CommonGBChannelMapper.java @@ -440,4 +440,13 @@ public interface CommonGBChannelMapper { @SelectProvider(type = ChannelProvider.class, method = "queryListByStreamPushList") List queryListByStreamPushList(List streamPushList); + @Update(value = {" "}) + void updateGpsByDeviceIdForStreamPush(List channels); + } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelService.java index ac13ffa0..85f8dcfc 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelService.java @@ -81,4 +81,5 @@ public interface IGbChannelService { List queryListByStreamPushList(List streamPushList); + void updateGpsByDeviceIdForStreamPush(List channels); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java index 0abf38ac..7a1f090e 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java @@ -693,4 +693,9 @@ public class GbChannelServiceImpl implements IGbChannelService { public List queryListByStreamPushList(List streamPushList) { return commonGBChannelMapper.queryListByStreamPushList(streamPushList); } + + @Override + public void updateGpsByDeviceIdForStreamPush(List channels) { + commonGBChannelMapper.updateGpsByDeviceIdForStreamPush(channels); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java index 8fe40485..fd304222 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java @@ -1,24 +1,24 @@ package com.genersoft.iot.vmp.service.redisMsg; import com.alibaba.fastjson2.JSON; -import com.genersoft.iot.vmp.service.IMobilePositionService; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.streamPush.service.IStreamPushService; import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; /** - * 接收来自redis的GPS更新通知 + * 接收来自redis的GPS更新通知, 此处只针对推流设备 + * * @author lin * 监听: SUBSCRIBE VM_MSG_GPS * 发布 PUBLISH VM_MSG_GPS '{"messageId":"1727228507555","id":"24212345671381000047","lng":116.30307666666667,"lat":40.03295833333333,"time":"2024-09-25T09:41:47","direction":"56.0","speed":0.0,"altitude":60.0,"unitNo":"100000000","memberNo":"10000047"}' @@ -31,34 +31,42 @@ public class RedisGpsMsgListener implements MessageListener { private IRedisCatchStorage redisCatchStorage; @Autowired - private IMobilePositionService mobilePositionService; + private IStreamPushService streamPushService; private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); - @Qualifier("taskExecutor") - @Autowired - private ThreadPoolTaskExecutor taskExecutor; - @Override public void onMessage(@NotNull Message message, byte[] bytes) { - boolean isEmpty = taskQueue.isEmpty(); taskQueue.offer(message); - if (isEmpty) { - taskExecutor.execute(() -> { - while (!taskQueue.isEmpty()) { - Message msg = taskQueue.poll(); - try { - GPSMsgInfo gpsMsgInfo = JSON.parseObject(msg.getBody(), GPSMsgInfo.class); - log.info("[REDIS的位置变化通知], {}", JSON.toJSONString(gpsMsgInfo)); - // 只是放入redis缓存起来 - redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo); - }catch (Exception e) { - log.warn("[REDIS的位置变化通知] 发现未处理的异常, \r\n{}", JSON.toJSONString(message)); - log.error("[REDIS的位置变化通知] 异常内容: ", e); - } - } - }); + } + + @Scheduled(fixedDelay = 200) //每400毫秒执行一次 + public void executeTaskQueue() { + if (taskQueue.isEmpty()) { + return; + } + List messageDataList = new ArrayList<>(); + int size = taskQueue.size(); + for (int i = 0; i < size; i++) { + Message msg = taskQueue.poll(); + if (msg != null) { + messageDataList.add(msg); + } + } + if (messageDataList.isEmpty()) { + return; + } + for (Message msg : messageDataList) { + try { + GPSMsgInfo gpsMsgInfo = JSON.parseObject(msg.getBody(), GPSMsgInfo.class); + log.info("[REDIS的位置变化通知], {}", JSON.toJSONString(gpsMsgInfo)); + // 只是放入redis缓存起来 + redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo); + } catch (Exception e) { + log.warn("[REDIS的位置变化通知] 发现未处理的异常, \r\n{}", JSON.toJSONString(msg)); + log.error("[REDIS的位置变化通知] 异常内容: ", e); + } } } @@ -66,10 +74,11 @@ public class RedisGpsMsgListener implements MessageListener { * 定时将经纬度更新到数据库 */ @Scheduled(fixedDelay = 2 * 1000) //每2秒执行一次 - public void execute(){ + public void execute() { + // 需要查询到 List gpsMsgInfoList = redisCatchStorage.getAllGpsMsgInfo(); if (!gpsMsgInfoList.isEmpty()) { - mobilePositionService.updateStreamGPS(gpsMsgInfoList); + streamPushService.updateGPSFromGPSMsgInfo(gpsMsgInfoList); for (GPSMsgInfo msgInfo : gpsMsgInfoList) { msgInfo.setStored(true); redisCatchStorage.updateGpsMsgInfo(msgInfo); diff --git a/src/main/java/com/genersoft/iot/vmp/streamPush/service/IStreamPushService.java b/src/main/java/com/genersoft/iot/vmp/streamPush/service/IStreamPushService.java index 23dae58c..5992b051 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamPush/service/IStreamPushService.java +++ b/src/main/java/com/genersoft/iot/vmp/streamPush/service/IStreamPushService.java @@ -1,5 +1,6 @@ package com.genersoft.iot.vmp.streamPush.service; +import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis; import com.genersoft.iot.vmp.streamPush.bean.StreamPush; import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo; @@ -97,4 +98,6 @@ public interface IStreamPushService { int delete(int id); void batchRemove(Set ids); + + void updateGPSFromGPSMsgInfo(List gpsMsgInfoList); } diff --git a/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java index aeabcc83..4c699f41 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java @@ -17,6 +17,7 @@ import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType; import com.genersoft.iot.vmp.service.ISendRtpServerService; +import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.streamPush.bean.StreamPush; @@ -583,4 +584,17 @@ public class StreamPushServiceImpl implements IStreamPushService { streamPushMapper.batchDel(streamPushList); gbChannelService.delete(ids); } + + @Override + public void updateGPSFromGPSMsgInfo(List gpsMsgInfoList) { + List channels = new ArrayList<>(); + for (GPSMsgInfo gpsMsgInfo : gpsMsgInfoList) { + CommonGBChannel channel = new CommonGBChannel(); + channel.setGbDeviceId(gpsMsgInfo.getId()); + channel.setGbLongitude(gpsMsgInfo.getLng()); + channel.setGbLatitude(gpsMsgInfo.getLat()); + channels.add(channel); + } + gbChannelService.updateGpsByDeviceIdForStreamPush(channels); + } } diff --git a/web_src/src/components/StreamPushList.vue b/web_src/src/components/StreamPushList.vue index 8e89c6bf..c01ca11d 100755 --- a/web_src/src/components/StreamPushList.vue +++ b/web_src/src/components/StreamPushList.vue @@ -46,7 +46,7 @@ - + @@ -58,6 +58,12 @@ + + + @@ -181,6 +187,12 @@ export default { if (res.data.code === 0) { that.total = res.data.data.total; that.pushList = res.data.data.list; + that.pushList.forEach(e => { + that.$set(e, "location", ""); + if (e.gbLongitude && e.gbLatitude) { + that.$set(e, "location", e.gbLongitude + "," + e.gbLatitude); + } + }); } }).catch(function (error) { diff --git a/web_src/src/components/channelList.vue b/web_src/src/components/channelList.vue index bc0c7ab5..028b4041 100755 --- a/web_src/src/components/channelList.vue +++ b/web_src/src/components/channelList.vue @@ -72,7 +72,11 @@ - + +