From 46f3790492492bfaee83cf33039a445de9787b6d Mon Sep 17 00:00:00 2001 From: gaibu <1016771049@qq.com> Date: Fri, 23 Dec 2022 10:28:59 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E8=A7=A3=E5=86=B3=20redis=20mq=20?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E4=B8=A2=E5=A4=B1=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mq/config/YudaoMQAutoConfiguration.java | 21 ++++-- ...java => RedisPendingMessageResendJob.java} | 74 ++++++++++--------- 2 files changed, 52 insertions(+), 43 deletions(-) rename yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/{PendingMessageScheduler.java => RedisPendingMessageResendJob.java} (52%) diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java index 9c70a1246..e306389be 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java @@ -8,9 +8,11 @@ import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor; import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener; import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener; -import cn.iocoder.yudao.framework.mq.scheduler.PendingMessageScheduler; +import cn.iocoder.yudao.framework.mq.scheduler.RedisPendingMessageResendJob; import cn.iocoder.yudao.framework.redis.config.YudaoRedisAutoConfiguration; import lombok.extern.slf4j.Slf4j; +import org.redisson.api.RedissonClient; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.data.redis.connection.RedisServerCommands; @@ -25,6 +27,7 @@ import org.springframework.data.redis.listener.ChannelTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.stream.DefaultStreamMessageListenerContainerX; import org.springframework.data.redis.stream.StreamMessageListenerContainer; +import org.springframework.scheduling.annotation.EnableScheduling; import java.util.List; import java.util.Properties; @@ -35,6 +38,7 @@ import java.util.Properties; * @author 芋道源码 */ @Slf4j +@EnableScheduling // 启用定时任务,用于 RedisPendingMessageResendJob 重发消息 @AutoConfiguration(after = YudaoRedisAutoConfiguration.class) public class YudaoMQAutoConfiguration { @@ -70,17 +74,19 @@ public class YudaoMQAutoConfiguration { } /** - * - * @return + * 创建 Redis Stream 重新消费的任务 */ @Bean - public PendingMessageScheduler pendingMessageScheduler(){ - return new PendingMessageScheduler(); + public RedisPendingMessageResendJob redisPendingMessageResendJob(List> listeners, + RedisMQTemplate redisTemplate, + @Value("${spring.application.name}") String groupName, + RedissonClient redissonClient) { + return new RedisPendingMessageResendJob(listeners, redisTemplate, groupName, redissonClient); } /** * 创建 Redis Stream 集群消费的容器 - * + *

* Redis Stream 的 xreadgroup 命令:https://www.geek-book.com/src/docs/redis/redis/redis.io/commands/xreadgroup.html */ @Bean(initMethod = "start", destroyMethod = "stop") @@ -108,7 +114,8 @@ public class YudaoMQAutoConfiguration { // 创建 listener 对应的消费者分组 try { redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup()); - } catch (Exception ignore) {} + } catch (Exception ignore) { + } // 设置 listener 对应的 redisTemplate listener.setRedisMQTemplate(redisMQTemplate); // 创建 Consumer 对象 diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/PendingMessageScheduler.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/RedisPendingMessageResendJob.java similarity index 52% rename from yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/PendingMessageScheduler.java rename to yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/RedisPendingMessageResendJob.java index 1d5825a10..2f64ab498 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/PendingMessageScheduler.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/scheduler/RedisPendingMessageResendJob.java @@ -1,12 +1,11 @@ package cn.iocoder.yudao.framework.mq.scheduler; +import cn.hutool.core.collection.CollUtil; import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.connection.stream.Consumer; import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.connection.stream.PendingMessagesSummary; @@ -14,51 +13,53 @@ import org.springframework.data.redis.connection.stream.ReadOffset; import org.springframework.data.redis.connection.stream.StreamOffset; import org.springframework.data.redis.connection.stream.StreamRecords; import org.springframework.data.redis.core.StreamOperations; -import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; /** - * 这个定时器用于处理,crash 之后的消费者未消费完的消息 + * 这个任务用于处理,crash 之后的消费者未消费完的消息 */ @Slf4j -@EnableScheduling -public class PendingMessageScheduler { +public class RedisPendingMessageResendJob { private static final String LOCK_KEY = "redis:pending:msg:lock"; - @Autowired - private List> listeners; - @Autowired - private RedisMQTemplate redisTemplate; - @Value("${spring.application.name}") - private String groupName; - @Autowired - private RedissonClient redissonClient; + + private final List> listeners; + private final RedisMQTemplate redisTemplate; + private final String groupName; + private final RedissonClient redissonClient; + + public RedisPendingMessageResendJob(List> listeners, RedisMQTemplate redisTemplate, String groupName, RedissonClient redissonClient) { + this.listeners = listeners; + this.redisTemplate = redisTemplate; + this.groupName = groupName; + this.redissonClient = redissonClient; + } /** - * 一分钟执行一次 + * 一分钟执行一次,这里选择每分钟的35秒执行,是为了避免整点任务过多的问题 */ - @Scheduled(fixedRate = 60 * 1000) - public void processPendingMessage() { - final RLock lock = redissonClient.getLock(LOCK_KEY); - try { - // 尝试加锁,最多等待 30 秒,上锁以后 60 秒自动解锁 - boolean lockFlag = lock.tryLock(30, 60, TimeUnit.SECONDS); - if (lockFlag) { + @Scheduled(cron = "35 * * * * ?") + public void messageResend() { + RLock lock = redissonClient.getLock(LOCK_KEY); + log.info("[messageResend][尝试获取锁]"); + // 尝试加锁 + if (lock.tryLock()) { + try { execute(); + } catch (Exception ex) { + log.error("[messageResend][执行异常]", ex); + } finally { + lock.unlock(); } - } catch (InterruptedException e) { - log.error("获取锁失败", e); } - } private void execute() { StreamOperations ops = redisTemplate.getRedisTemplate().opsForStream(); - for (AbstractStreamMessageListener listener : listeners) { + listeners.forEach(listener -> { PendingMessagesSummary pendingMessagesSummary = ops.pending(listener.getStreamKey(), groupName); // 每个消费者的pending消息数量 Map pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer(); @@ -69,17 +70,18 @@ public class PendingMessageScheduler { // 从消费者的pending队列中读取消息 List> retVal = ops.read(Consumer.from(groupName, consumerName), StreamOffset.create(listener.getStreamKey(), ReadOffset.from("0"))); + if (CollUtil.isNotEmpty(retVal)) { + for (MapRecord record : retVal) { + // 重新投递消息 + redisTemplate.getRedisTemplate().opsForStream().add(StreamRecords.newRecord() + .ofObject(record.getValue()) // 设置内容 + .withStreamKey(listener.getStreamKey())); - for (MapRecord record : retVal) { - // 重新投递消息 - redisTemplate.getRedisTemplate().opsForStream().add(StreamRecords.newRecord() - .ofObject(record.getValue()) // 设置内容 - .withStreamKey(listener.getStreamKey())); - - // ack 消息消费完成 - redisTemplate.getRedisTemplate().opsForStream().acknowledge(groupName, record); + // ack 消息消费完成 + redisTemplate.getRedisTemplate().opsForStream().acknowledge(groupName, record); + } } }); - } + }); } }