From 604266d33c6b29a829282294b02d076e8fbe1b17 Mon Sep 17 00:00:00 2001 From: gaibu <1016771049@qq.com> Date: Tue, 27 Dec 2022 22:52:19 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E8=A7=A3=E5=86=B3=20redis=20mq=20?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E4=B8=A2=E5=A4=B1=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mq/job/RedisPendingMessageResendJob.java | 34 ++++++++----------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/job/RedisPendingMessageResendJob.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/job/RedisPendingMessageResendJob.java index ff01e3e3a..f4ba050c0 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/job/RedisPendingMessageResendJob.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/job/RedisPendingMessageResendJob.java @@ -3,6 +3,7 @@ package cn.iocoder.yudao.framework.mq.job; import cn.hutool.core.collection.CollUtil; import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener; +import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; @@ -22,7 +23,9 @@ import java.util.Map; * 这个任务用于处理,crash 之后的消费者未消费完的消息 */ @Slf4j +@AllArgsConstructor public class RedisPendingMessageResendJob { + private static final String LOCK_KEY = "redis:pending:msg:lock"; private final List> listeners; @@ -30,13 +33,6 @@ public class RedisPendingMessageResendJob { private final String groupName; private final RedissonClient redissonClient; - public RedisPendingMessageResendJob(List> listeners, RedisMQTemplate redisTemplate, String groupName, RedissonClient redissonClient) { - this.listeners = listeners; - this.redisTemplate = redisTemplate; - this.groupName = groupName; - this.redissonClient = redissonClient; - } - /** * 一分钟执行一次,这里选择每分钟的35秒执行,是为了避免整点任务过多的问题 */ @@ -57,28 +53,26 @@ public class RedisPendingMessageResendJob { private void execute() { StreamOperations ops = redisTemplate.getRedisTemplate().opsForStream(); - listeners.forEach(listener -> { PendingMessagesSummary pendingMessagesSummary = ops.pending(listener.getStreamKey(), groupName); // 每个消费者的 pending 队列消息数量 Map pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer(); - pendingMessagesPerConsumer.entrySet().forEach(entry -> { - String consumerName = entry.getKey(); - Long pendingMessageCount = entry.getValue(); + pendingMessagesPerConsumer.forEach((consumerName, pendingMessageCount) -> { log.info("[processPendingMessage][消费者({}) 消息数量({})]", consumerName, pendingMessageCount); // 从消费者的 pending 队列中读取消息 List> records = ops.read(Consumer.from(groupName, consumerName), StreamOffset.create(listener.getStreamKey(), ReadOffset.from("0"))); - if (CollUtil.isNotEmpty(records)) { - for (MapRecord record : records) { - // 重新投递消息 - redisTemplate.getRedisTemplate().opsForStream().add(StreamRecords.newRecord() - .ofObject(record.getValue()) // 设置内容 - .withStreamKey(listener.getStreamKey())); + if (CollUtil.isEmpty(records)) { + return; + } + for (MapRecord record : records) { + // 重新投递消息 + redisTemplate.getRedisTemplate().opsForStream().add(StreamRecords.newRecord() + .ofObject(record.getValue()) // 设置内容 + .withStreamKey(listener.getStreamKey())); - // ack 消息消费完成 - redisTemplate.getRedisTemplate().opsForStream().acknowledge(groupName, record); - } + // ack 消息消费完成 + redisTemplate.getRedisTemplate().opsForStream().acknowledge(groupName, record); } }); });