diff --git a/src/main/java/cn/iocoder/dashboard/framework/redis/config/RedisConfig.java b/src/main/java/cn/iocoder/dashboard/framework/redis/config/RedisConfig.java index f069738e5..f398625ea 100644 --- a/src/main/java/cn/iocoder/dashboard/framework/redis/config/RedisConfig.java +++ b/src/main/java/cn/iocoder/dashboard/framework/redis/config/RedisConfig.java @@ -83,12 +83,15 @@ public class RedisConfig { redisTemplate.getRequiredConnectionFactory(), containerOptions); // 第二步,注册监听器,消费对应的 Stream 主题 - String consumerName = buildConsumerName(); +// String consumerName = buildConsumerName(); + String consumerName = "110"; listeners.forEach(listener -> { // 创建 listener 对应的消费者分组 try { redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup()); } catch (Exception ignore) {} + // 设置 listener 对应的 redisTemplate + listener.setRedisTemplate(redisTemplate); // 创建 Consumer 对象 Consumer consumer = Consumer.from(listener.getGroup(), consumerName); // 设置 Consumer 消费进度,以最小消费进度为准 diff --git a/src/main/java/cn/iocoder/dashboard/framework/redis/core/pubsub/AbstractChannelMessageListener.java b/src/main/java/cn/iocoder/dashboard/framework/redis/core/pubsub/AbstractChannelMessageListener.java index 23b40e228..2abf03d4d 100644 --- a/src/main/java/cn/iocoder/dashboard/framework/redis/core/pubsub/AbstractChannelMessageListener.java +++ b/src/main/java/cn/iocoder/dashboard/framework/redis/core/pubsub/AbstractChannelMessageListener.java @@ -1,11 +1,10 @@ package cn.iocoder.dashboard.framework.redis.core.pubsub; -import cn.hutool.core.util.ArrayUtil; +import cn.hutool.core.util.TypeUtil; import cn.iocoder.dashboard.util.json.JsonUtils; import lombok.SneakyThrows; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; -import sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl; import java.lang.reflect.Type; @@ -62,21 +61,11 @@ public abstract class AbstractChannelMessageListener i */ @SuppressWarnings("unchecked") private Class getMessageClass() { - Class targetClass = getClass(); - while (targetClass.getSuperclass() != null) { - // 如果不是 AbstractMessageListener 父类,继续向上查找 - if (targetClass.getSuperclass() != AbstractChannelMessageListener.class) { - targetClass = targetClass.getSuperclass(); - continue; - } - // 如果是 AbstractMessageListener 父类,则解析泛型 - Type[] types = ((ParameterizedTypeImpl) targetClass.getGenericSuperclass()).getActualTypeArguments(); - if (ArrayUtil.isEmpty(types)) { - throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName())); - } - return (Class) types[0]; + Type type = TypeUtil.getTypeArgument(getClass(), 0); + if (type == null) { + throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName())); } - throw new IllegalStateException(String.format("类型(%s) 找不到 AbstractMessageListener 父类", getClass().getName())); + return (Class) type; } } diff --git a/src/main/java/cn/iocoder/dashboard/framework/redis/core/stream/AbstractStreamMessageListener.java b/src/main/java/cn/iocoder/dashboard/framework/redis/core/stream/AbstractStreamMessageListener.java index 7621d3638..5fb01aa15 100644 --- a/src/main/java/cn/iocoder/dashboard/framework/redis/core/stream/AbstractStreamMessageListener.java +++ b/src/main/java/cn/iocoder/dashboard/framework/redis/core/stream/AbstractStreamMessageListener.java @@ -1,12 +1,14 @@ package cn.iocoder.dashboard.framework.redis.core.stream; -import cn.hutool.core.util.ArrayUtil; +import cn.hutool.core.util.TypeUtil; +import cn.iocoder.dashboard.util.json.JsonUtils; import lombok.Getter; +import lombok.Setter; import lombok.SneakyThrows; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.connection.stream.ObjectRecord; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.stream.StreamListener; -import sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl; import java.lang.reflect.Type; @@ -33,9 +35,14 @@ public abstract class AbstractStreamMessageListener /** * Redis 消费者分组,默认使用 spring.application.name 名字 */ - @Value("spring.application.name") + @Value("${spring.application.name}") @Getter private String group; + /** + * + */ + @Setter + private RedisTemplate redisTemplate; @SneakyThrows protected AbstractStreamMessageListener() { @@ -45,10 +52,16 @@ public abstract class AbstractStreamMessageListener @Override public void onMessage(ObjectRecord message) { - System.out.println(message); - if (true) { -// throw new IllegalStateException("测试下"); - } + // 消费消息 + T messageObj = JsonUtils.parseObject(message.getValue(), messageType); + this.onMessage(messageObj); + // ack 消息消费完成 + redisTemplate.opsForStream().acknowledge(group, message); + // TODO 芋艿:需要额外考虑以下几个点: + // 1. 处理异常的情况 + // 2. 发送日志;以及事务的结合 + // 3. 消费日志;以及通用的幂等性 + // 4. 消费失败的重试,https://zhuanlan.zhihu.com/p/60501638 } /** @@ -64,23 +77,12 @@ public abstract class AbstractStreamMessageListener * @return 消息类型 */ @SuppressWarnings("unchecked") - // TODO 芋艿:稍后重构 private Class getMessageClass() { - Class targetClass = getClass(); - while (targetClass.getSuperclass() != null) { - // 如果不是 AbstractMessageListener 父类,继续向上查找 - if (targetClass.getSuperclass() != AbstractStreamMessageListener.class) { - targetClass = targetClass.getSuperclass(); - continue; - } - // 如果是 AbstractMessageListener 父类,则解析泛型 - Type[] types = ((ParameterizedTypeImpl) targetClass.getGenericSuperclass()).getActualTypeArguments(); - if (ArrayUtil.isEmpty(types)) { - throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName())); - } - return (Class) types[0]; + Type type = TypeUtil.getTypeArgument(getClass(), 0); + if (type == null) { + throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName())); } - throw new IllegalStateException(String.format("类型(%s) 找不到 AbstractStreamMessageListener 父类", getClass().getName())); + return (Class) type; } } diff --git a/src/main/java/cn/iocoder/dashboard/modules/system/mq/consumer/mail/SysMailSendConsumer.java b/src/main/java/cn/iocoder/dashboard/modules/system/mq/consumer/mail/SysMailSendConsumer.java index 91b796f39..3a0f22ee6 100644 --- a/src/main/java/cn/iocoder/dashboard/modules/system/mq/consumer/mail/SysMailSendConsumer.java +++ b/src/main/java/cn/iocoder/dashboard/modules/system/mq/consumer/mail/SysMailSendConsumer.java @@ -2,14 +2,16 @@ package cn.iocoder.dashboard.modules.system.mq.consumer.mail; import cn.iocoder.dashboard.framework.redis.core.stream.AbstractStreamMessageListener; import cn.iocoder.dashboard.modules.system.mq.message.mail.SysMailSendMessage; +import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @Component +@Slf4j public class SysMailSendConsumer extends AbstractStreamMessageListener { @Override public void onMessage(SysMailSendMessage message) { - + log.info("[onMessage][消息内容({})]", message); } } diff --git a/src/main/java/cn/iocoder/dashboard/modules/system/mq/consumer/sms/SysSmsSendConsumer.java b/src/main/java/cn/iocoder/dashboard/modules/system/mq/consumer/sms/SysSmsSendConsumer.java index 3dde73618..e3b18ca75 100644 --- a/src/main/java/cn/iocoder/dashboard/modules/system/mq/consumer/sms/SysSmsSendConsumer.java +++ b/src/main/java/cn/iocoder/dashboard/modules/system/mq/consumer/sms/SysSmsSendConsumer.java @@ -2,14 +2,16 @@ package cn.iocoder.dashboard.modules.system.mq.consumer.sms; import cn.iocoder.dashboard.framework.redis.core.stream.AbstractStreamMessageListener; import cn.iocoder.dashboard.modules.system.mq.message.sms.SysSmsSendMessage; +import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @Component +@Slf4j public class SysSmsSendConsumer extends AbstractStreamMessageListener { @Override public void onMessage(SysSmsSendMessage message) { - System.out.println(message); + log.info("[onMessage][消息内容({})]", message); } }