From d29b0beb9b528c379c78407730221250cf11965f Mon Sep 17 00:00:00 2001 From: YunaiV Date: Sun, 5 Dec 2021 23:00:41 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=20mq=20starter=20=E7=BB=84?= =?UTF-8?q?=E4=BB=B6=EF=BC=8C=E5=A2=9E=E5=8A=A0=20RedisMQTemplate=20?= =?UTF-8?q?=E6=A8=A1=E6=9D=BF=E7=B1=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../config/InfConfigRefreshMessage.java | 4 +-- .../mq/producer/config/InfConfigProducer.java | 7 +++-- .../message/dept/SysDeptRefreshMessage.java | 8 ++++-- .../dict/SysDictDataRefreshMessage.java | 6 +++-- .../mq/message/mail/SysMailSendMessage.java | 6 +++-- .../permission/SysMenuRefreshMessage.java | 8 ++++-- .../permission/SysRoleMenuRefreshMessage.java | 8 ++++-- .../permission/SysRoleRefreshMessage.java | 8 ++++-- .../sms/SysSmsChannelRefreshMessage.java | 8 ++++-- .../sms/SysSmsTemplateRefreshMessage.java | 8 ++++-- .../mq/producer/dept/SysDeptProducer.java | 7 +++-- .../mq/producer/dict/SysDictDataProducer.java | 7 +++-- .../producer/permission/SysMenuProducer.java | 7 +++-- .../permission/SysPermissionProducer.java | 7 +++-- .../producer/permission/SysRoleProducer.java | 7 +++-- .../mq/producer/sms/SysSmsProducer.java | 11 +++----- .../mq/message/sms/SysSmsSendMessage.java | 6 +++-- .../mq/producer/sms/SysSmsCoreProducer.java | 7 +++-- .../mq/config/YudaoMQAutoConfiguration.java | 11 ++++++-- ...MessageUtils.java => RedisMQTemplate.java} | 20 +++++++------- .../mq/core/message/AbstractRedisMessage.java | 26 +++++++++++++++++++ ...ssage.java => AbstractChannelMessage.java} | 7 ++--- .../AbstractChannelMessageListener.java | 2 +- ...essage.java => AbstractStreamMessage.java} | 6 ++--- .../stream/AbstractStreamMessageListener.java | 4 +-- .../yudao/framework/tenant/package-info.java | 6 +++++ 26 files changed, 137 insertions(+), 75 deletions(-) rename yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/{util/RedisMessageUtils.java => RedisMQTemplate.java} (61%) create mode 100644 yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/message/AbstractRedisMessage.java rename yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/{ChannelMessage.java => AbstractChannelMessage.java} (53%) rename yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/{StreamMessage.java => AbstractStreamMessage.java} (67%) diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/infra/mq/message/config/InfConfigRefreshMessage.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/infra/mq/message/config/InfConfigRefreshMessage.java index 433048143..4846d9b90 100644 --- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/infra/mq/message/config/InfConfigRefreshMessage.java +++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/infra/mq/message/config/InfConfigRefreshMessage.java @@ -1,13 +1,13 @@ package cn.iocoder.yudao.adminserver.modules.infra.mq.message.config; -import cn.iocoder.yudao.framework.mq.core.pubsub.ChannelMessage; +import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessage; import lombok.Data; /** * 配置数据刷新 Message */ @Data -public class InfConfigRefreshMessage implements ChannelMessage { +public class InfConfigRefreshMessage extends AbstractChannelMessage { @Override public String getChannel() { diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/infra/mq/producer/config/InfConfigProducer.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/infra/mq/producer/config/InfConfigProducer.java index 39a8e76b3..2a0dc66fd 100644 --- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/infra/mq/producer/config/InfConfigProducer.java +++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/infra/mq/producer/config/InfConfigProducer.java @@ -1,8 +1,7 @@ package cn.iocoder.yudao.adminserver.modules.infra.mq.producer.config; import cn.iocoder.yudao.adminserver.modules.infra.mq.message.config.InfConfigRefreshMessage; -import cn.iocoder.yudao.framework.mq.core.util.RedisMessageUtils; -import org.springframework.data.redis.core.StringRedisTemplate; +import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; @@ -14,14 +13,14 @@ import javax.annotation.Resource; public class InfConfigProducer { @Resource - private StringRedisTemplate stringRedisTemplate; + private RedisMQTemplate redisMQTemplate; /** * 发送 {@link InfConfigRefreshMessage} 消息 */ public void sendConfigRefreshMessage() { InfConfigRefreshMessage message = new InfConfigRefreshMessage(); - RedisMessageUtils.sendChannelMessage(stringRedisTemplate, message); + redisMQTemplate.send(message); } } diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/dept/SysDeptRefreshMessage.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/dept/SysDeptRefreshMessage.java index 724547d68..96362a8fe 100644 --- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/dept/SysDeptRefreshMessage.java +++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/dept/SysDeptRefreshMessage.java @@ -1,13 +1,17 @@ package cn.iocoder.yudao.adminserver.modules.system.mq.message.dept; -import cn.iocoder.yudao.framework.mq.core.pubsub.ChannelMessage; +import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessage; import lombok.Data; +import lombok.EqualsAndHashCode; /** * 部门数据刷新 Message + * + * @author 芋道源码 */ @Data -public class SysDeptRefreshMessage implements ChannelMessage { +@EqualsAndHashCode(callSuper = true) +public class SysDeptRefreshMessage extends AbstractChannelMessage { @Override public String getChannel() { diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/dict/SysDictDataRefreshMessage.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/dict/SysDictDataRefreshMessage.java index 7b735deb9..4415d60bc 100644 --- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/dict/SysDictDataRefreshMessage.java +++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/dict/SysDictDataRefreshMessage.java @@ -1,13 +1,15 @@ package cn.iocoder.yudao.adminserver.modules.system.mq.message.dict; -import cn.iocoder.yudao.framework.mq.core.pubsub.ChannelMessage; +import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessage; import lombok.Data; +import lombok.EqualsAndHashCode; /** * 字典数据数据刷新 Message */ @Data -public class SysDictDataRefreshMessage implements ChannelMessage { +@EqualsAndHashCode(callSuper = true) +public class SysDictDataRefreshMessage extends AbstractChannelMessage { @Override public String getChannel() { diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/mail/SysMailSendMessage.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/mail/SysMailSendMessage.java index bb9f62170..a0e258796 100644 --- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/mail/SysMailSendMessage.java +++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/mail/SysMailSendMessage.java @@ -1,7 +1,8 @@ package cn.iocoder.yudao.adminserver.modules.system.mq.message.mail; -import cn.iocoder.yudao.framework.mq.core.stream.StreamMessage; +import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessage; import lombok.Data; +import lombok.EqualsAndHashCode; import javax.validation.constraints.NotNull; import java.util.Map; @@ -12,7 +13,8 @@ import java.util.Map; * @author 芋道源码 */ @Data -public class SysMailSendMessage implements StreamMessage { +@EqualsAndHashCode(callSuper = true) +public class SysMailSendMessage extends AbstractStreamMessage { /** * 邮箱地址 diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/permission/SysMenuRefreshMessage.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/permission/SysMenuRefreshMessage.java index 1fa2a3879..3f0ef3da9 100644 --- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/permission/SysMenuRefreshMessage.java +++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/permission/SysMenuRefreshMessage.java @@ -1,13 +1,17 @@ package cn.iocoder.yudao.adminserver.modules.system.mq.message.permission; -import cn.iocoder.yudao.framework.mq.core.pubsub.ChannelMessage; +import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessage; import lombok.Data; +import lombok.EqualsAndHashCode; /** * 菜单数据刷新 Message + * + * @author 芋道源码 */ @Data -public class SysMenuRefreshMessage implements ChannelMessage { +@EqualsAndHashCode(callSuper = true) +public class SysMenuRefreshMessage extends AbstractChannelMessage { @Override public String getChannel() { diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/permission/SysRoleMenuRefreshMessage.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/permission/SysRoleMenuRefreshMessage.java index 8b9f50c91..8ddcb17d4 100644 --- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/permission/SysRoleMenuRefreshMessage.java +++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/permission/SysRoleMenuRefreshMessage.java @@ -1,13 +1,17 @@ package cn.iocoder.yudao.adminserver.modules.system.mq.message.permission; -import cn.iocoder.yudao.framework.mq.core.pubsub.ChannelMessage; +import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessage; import lombok.Data; +import lombok.EqualsAndHashCode; /** * 角色与菜单数据刷新 Message + * + * @author 芋道源码 */ @Data -public class SysRoleMenuRefreshMessage implements ChannelMessage { +@EqualsAndHashCode(callSuper = true) +public class SysRoleMenuRefreshMessage extends AbstractChannelMessage { @Override public String getChannel() { diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/permission/SysRoleRefreshMessage.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/permission/SysRoleRefreshMessage.java index 8d8d1e01a..9e89fa186 100644 --- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/permission/SysRoleRefreshMessage.java +++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/permission/SysRoleRefreshMessage.java @@ -1,13 +1,17 @@ package cn.iocoder.yudao.adminserver.modules.system.mq.message.permission; -import cn.iocoder.yudao.framework.mq.core.pubsub.ChannelMessage; +import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessage; import lombok.Data; +import lombok.EqualsAndHashCode; /** * 角色数据刷新 Message + * + * @author 芋道源码 */ @Data -public class SysRoleRefreshMessage implements ChannelMessage { +@EqualsAndHashCode(callSuper = true) +public class SysRoleRefreshMessage extends AbstractChannelMessage { @Override public String getChannel() { diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/sms/SysSmsChannelRefreshMessage.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/sms/SysSmsChannelRefreshMessage.java index a37295615..22c9341d5 100644 --- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/sms/SysSmsChannelRefreshMessage.java +++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/sms/SysSmsChannelRefreshMessage.java @@ -1,13 +1,17 @@ package cn.iocoder.yudao.adminserver.modules.system.mq.message.sms; -import cn.iocoder.yudao.framework.mq.core.pubsub.ChannelMessage; +import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessage; import lombok.Data; +import lombok.EqualsAndHashCode; /** * 短信渠道的数据刷新 Message + * + * @author 芋道源码 */ @Data -public class SysSmsChannelRefreshMessage implements ChannelMessage { +@EqualsAndHashCode(callSuper = true) +public class SysSmsChannelRefreshMessage extends AbstractChannelMessage { @Override public String getChannel() { diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/sms/SysSmsTemplateRefreshMessage.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/sms/SysSmsTemplateRefreshMessage.java index c8bb00af2..8401829cb 100644 --- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/sms/SysSmsTemplateRefreshMessage.java +++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/message/sms/SysSmsTemplateRefreshMessage.java @@ -1,13 +1,17 @@ package cn.iocoder.yudao.adminserver.modules.system.mq.message.sms; -import cn.iocoder.yudao.framework.mq.core.pubsub.ChannelMessage; +import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessage; import lombok.Data; +import lombok.EqualsAndHashCode; /** * 短信模板的数据刷新 Message + * + * @author 芋道源码 */ @Data -public class SysSmsTemplateRefreshMessage implements ChannelMessage { +@EqualsAndHashCode(callSuper = true) +public class SysSmsTemplateRefreshMessage extends AbstractChannelMessage { @Override public String getChannel() { diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/dept/SysDeptProducer.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/dept/SysDeptProducer.java index 948796e22..c4bbb7419 100644 --- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/dept/SysDeptProducer.java +++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/dept/SysDeptProducer.java @@ -1,8 +1,7 @@ package cn.iocoder.yudao.adminserver.modules.system.mq.producer.dept; -import cn.iocoder.yudao.framework.mq.core.util.RedisMessageUtils; import cn.iocoder.yudao.adminserver.modules.system.mq.message.dept.SysDeptRefreshMessage; -import org.springframework.data.redis.core.StringRedisTemplate; +import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; @@ -14,14 +13,14 @@ import javax.annotation.Resource; public class SysDeptProducer { @Resource - private StringRedisTemplate stringRedisTemplate; + private RedisMQTemplate redisMQTemplate; /** * 发送 {@link SysDeptRefreshMessage} 消息 */ public void sendDeptRefreshMessage() { SysDeptRefreshMessage message = new SysDeptRefreshMessage(); - RedisMessageUtils.sendChannelMessage(stringRedisTemplate, message); + redisMQTemplate.send(message); } } diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/dict/SysDictDataProducer.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/dict/SysDictDataProducer.java index ea0183722..4bce9db1c 100644 --- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/dict/SysDictDataProducer.java +++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/dict/SysDictDataProducer.java @@ -1,8 +1,7 @@ package cn.iocoder.yudao.adminserver.modules.system.mq.producer.dict; -import cn.iocoder.yudao.framework.mq.core.util.RedisMessageUtils; import cn.iocoder.yudao.adminserver.modules.system.mq.message.dict.SysDictDataRefreshMessage; -import org.springframework.data.redis.core.StringRedisTemplate; +import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; @@ -14,14 +13,14 @@ import javax.annotation.Resource; public class SysDictDataProducer { @Resource - private StringRedisTemplate stringRedisTemplate; + private RedisMQTemplate redisMQTemplate; /** * 发送 {@link SysDictDataRefreshMessage} 消息 */ public void sendDictDataRefreshMessage() { SysDictDataRefreshMessage message = new SysDictDataRefreshMessage(); - RedisMessageUtils.sendChannelMessage(stringRedisTemplate, message); + redisMQTemplate.send(message); } } diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/permission/SysMenuProducer.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/permission/SysMenuProducer.java index 6b3493469..1f4296530 100644 --- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/permission/SysMenuProducer.java +++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/permission/SysMenuProducer.java @@ -1,8 +1,7 @@ package cn.iocoder.yudao.adminserver.modules.system.mq.producer.permission; -import cn.iocoder.yudao.framework.mq.core.util.RedisMessageUtils; import cn.iocoder.yudao.adminserver.modules.system.mq.message.permission.SysMenuRefreshMessage; -import org.springframework.data.redis.core.StringRedisTemplate; +import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; @@ -14,14 +13,14 @@ import javax.annotation.Resource; public class SysMenuProducer { @Resource - private StringRedisTemplate stringRedisTemplate; + private RedisMQTemplate redisMQTemplate; /** * 发送 {@link SysMenuRefreshMessage} 消息 */ public void sendMenuRefreshMessage() { SysMenuRefreshMessage message = new SysMenuRefreshMessage(); - RedisMessageUtils.sendChannelMessage(stringRedisTemplate, message); + redisMQTemplate.send(message); } } diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/permission/SysPermissionProducer.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/permission/SysPermissionProducer.java index d9a1bfcc2..def201962 100644 --- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/permission/SysPermissionProducer.java +++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/permission/SysPermissionProducer.java @@ -1,8 +1,7 @@ package cn.iocoder.yudao.adminserver.modules.system.mq.producer.permission; -import cn.iocoder.yudao.framework.mq.core.util.RedisMessageUtils; import cn.iocoder.yudao.adminserver.modules.system.mq.message.permission.SysRoleMenuRefreshMessage; -import org.springframework.data.redis.core.StringRedisTemplate; +import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; @@ -14,14 +13,14 @@ import javax.annotation.Resource; public class SysPermissionProducer { @Resource - private StringRedisTemplate stringRedisTemplate; + private RedisMQTemplate redisMQTemplate; /** * 发送 {@link SysRoleMenuRefreshMessage} 消息 */ public void sendRoleMenuRefreshMessage() { SysRoleMenuRefreshMessage message = new SysRoleMenuRefreshMessage(); - RedisMessageUtils.sendChannelMessage(stringRedisTemplate, message); + redisMQTemplate.send(message); } } diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/permission/SysRoleProducer.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/permission/SysRoleProducer.java index 6888f27bc..c3ab839a1 100644 --- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/permission/SysRoleProducer.java +++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/permission/SysRoleProducer.java @@ -1,8 +1,7 @@ package cn.iocoder.yudao.adminserver.modules.system.mq.producer.permission; -import cn.iocoder.yudao.framework.mq.core.util.RedisMessageUtils; import cn.iocoder.yudao.adminserver.modules.system.mq.message.permission.SysRoleRefreshMessage; -import org.springframework.data.redis.core.StringRedisTemplate; +import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; @@ -16,14 +15,14 @@ import javax.annotation.Resource; public class SysRoleProducer { @Resource - private StringRedisTemplate stringRedisTemplate; + private RedisMQTemplate redisMQTemplate; /** * 发送 {@link SysRoleRefreshMessage} 消息 */ public void sendRoleRefreshMessage() { SysRoleRefreshMessage message = new SysRoleRefreshMessage(); - RedisMessageUtils.sendChannelMessage(stringRedisTemplate, message); + redisMQTemplate.send(message); } } diff --git a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/sms/SysSmsProducer.java b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/sms/SysSmsProducer.java index 739603301..acc4fee7a 100644 --- a/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/sms/SysSmsProducer.java +++ b/yudao-admin-server/src/main/java/cn/iocoder/yudao/adminserver/modules/system/mq/producer/sms/SysSmsProducer.java @@ -1,15 +1,12 @@ package cn.iocoder.yudao.adminserver.modules.system.mq.producer.sms; -import cn.iocoder.yudao.framework.common.core.KeyValue; -import cn.iocoder.yudao.framework.mq.core.util.RedisMessageUtils; import cn.iocoder.yudao.adminserver.modules.system.mq.message.sms.SysSmsChannelRefreshMessage; import cn.iocoder.yudao.adminserver.modules.system.mq.message.sms.SysSmsTemplateRefreshMessage; +import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; import lombok.extern.slf4j.Slf4j; -import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; -import java.util.List; /** * Sms 短信相关消息的 Producer @@ -22,14 +19,14 @@ import java.util.List; public class SysSmsProducer { @Resource - private StringRedisTemplate stringRedisTemplate; + private RedisMQTemplate redisMQTemplate; /** * 发送 {@link SysSmsChannelRefreshMessage} 消息 */ public void sendSmsChannelRefreshMessage() { SysSmsChannelRefreshMessage message = new SysSmsChannelRefreshMessage(); - RedisMessageUtils.sendChannelMessage(stringRedisTemplate, message); + redisMQTemplate.send(message); } /** @@ -37,7 +34,7 @@ public class SysSmsProducer { */ public void sendSmsTemplateRefreshMessage() { SysSmsTemplateRefreshMessage message = new SysSmsTemplateRefreshMessage(); - RedisMessageUtils.sendChannelMessage(stringRedisTemplate, message); + redisMQTemplate.send(message); } } diff --git a/yudao-core-service/src/main/java/cn/iocoder/yudao/coreservice/modules/system/mq/message/sms/SysSmsSendMessage.java b/yudao-core-service/src/main/java/cn/iocoder/yudao/coreservice/modules/system/mq/message/sms/SysSmsSendMessage.java index db4d1140c..19c7d044c 100644 --- a/yudao-core-service/src/main/java/cn/iocoder/yudao/coreservice/modules/system/mq/message/sms/SysSmsSendMessage.java +++ b/yudao-core-service/src/main/java/cn/iocoder/yudao/coreservice/modules/system/mq/message/sms/SysSmsSendMessage.java @@ -1,8 +1,9 @@ package cn.iocoder.yudao.coreservice.modules.system.mq.message.sms; import cn.iocoder.yudao.framework.common.core.KeyValue; -import cn.iocoder.yudao.framework.mq.core.stream.StreamMessage; +import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessage; import lombok.Data; +import lombok.EqualsAndHashCode; import javax.validation.constraints.NotNull; import java.util.List; @@ -13,7 +14,8 @@ import java.util.List; * @author 芋道源码 */ @Data -public class SysSmsSendMessage implements StreamMessage { +@EqualsAndHashCode(callSuper = true) +public class SysSmsSendMessage extends AbstractStreamMessage { /** * 短信日志编号 diff --git a/yudao-core-service/src/main/java/cn/iocoder/yudao/coreservice/modules/system/mq/producer/sms/SysSmsCoreProducer.java b/yudao-core-service/src/main/java/cn/iocoder/yudao/coreservice/modules/system/mq/producer/sms/SysSmsCoreProducer.java index bd4d7c36a..9ecf64706 100644 --- a/yudao-core-service/src/main/java/cn/iocoder/yudao/coreservice/modules/system/mq/producer/sms/SysSmsCoreProducer.java +++ b/yudao-core-service/src/main/java/cn/iocoder/yudao/coreservice/modules/system/mq/producer/sms/SysSmsCoreProducer.java @@ -2,9 +2,8 @@ package cn.iocoder.yudao.coreservice.modules.system.mq.producer.sms; import cn.iocoder.yudao.coreservice.modules.system.mq.message.sms.SysSmsSendMessage; import cn.iocoder.yudao.framework.common.core.KeyValue; -import cn.iocoder.yudao.framework.mq.core.util.RedisMessageUtils; +import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; import lombok.extern.slf4j.Slf4j; -import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; @@ -21,7 +20,7 @@ import java.util.List; public class SysSmsCoreProducer { @Resource - private StringRedisTemplate stringRedisTemplate; + private RedisMQTemplate redisMQTemplate; /** * 发送 {@link SysSmsSendMessage} 消息 @@ -36,7 +35,7 @@ public class SysSmsCoreProducer { Long channelId, String apiTemplateId, List> templateParams) { SysSmsSendMessage message = new SysSmsSendMessage().setLogId(logId).setMobile(mobile); message.setChannelId(channelId).setApiTemplateId(apiTemplateId).setTemplateParams(templateParams); - RedisMessageUtils.sendStreamMessage(stringRedisTemplate, message); + redisMQTemplate.send(message); } } diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java index 8b3366dd7..cd6739c5a 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java @@ -1,12 +1,12 @@ package cn.iocoder.yudao.framework.mq.config; import cn.hutool.system.SystemUtil; +import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener; import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener; import cn.iocoder.yudao.framework.redis.config.YudaoRedisAutoConfiguration; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.AutoConfigureAfter; -import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; @@ -15,6 +15,7 @@ import org.springframework.data.redis.connection.stream.ObjectRecord; import org.springframework.data.redis.connection.stream.ReadOffset; import org.springframework.data.redis.connection.stream.StreamOffset; import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.listener.ChannelTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.stream.StreamMessageListenerContainer; @@ -31,6 +32,13 @@ import java.util.List; @Slf4j public class YudaoMQAutoConfiguration { + @Bean + public RedisMQTemplate redisMQTemplate(StringRedisTemplate stringRedisTemplate) { + return new RedisMQTemplate(stringRedisTemplate); + } + + // ========== 消费者相关 ========== + /** * 创建 Redis Pub/Sub 广播消费的容器 */ @@ -71,7 +79,6 @@ public class YudaoMQAutoConfiguration { // 第二步,注册监听器,消费对应的 Stream 主题 String consumerName = buildConsumerName(); -// String consumerName = "110"; listeners.forEach(listener -> { // 创建 listener 对应的消费者分组 try { diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/util/RedisMessageUtils.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/RedisMQTemplate.java similarity index 61% rename from yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/util/RedisMessageUtils.java rename to yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/RedisMQTemplate.java index 57c925fa7..45d1874ea 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/util/RedisMessageUtils.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/RedisMQTemplate.java @@ -1,37 +1,39 @@ -package cn.iocoder.yudao.framework.mq.core.util; +package cn.iocoder.yudao.framework.mq.core; -import cn.iocoder.yudao.framework.mq.core.pubsub.ChannelMessage; -import cn.iocoder.yudao.framework.mq.core.stream.StreamMessage; import cn.iocoder.yudao.framework.common.util.json.JsonUtils; +import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessage; +import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessage; +import lombok.AllArgsConstructor; import org.springframework.data.redis.connection.stream.RecordId; import org.springframework.data.redis.connection.stream.StreamRecords; import org.springframework.data.redis.core.RedisTemplate; /** - * Redis 消息工具类 + * Redis MQ 操作模板类 * * @author 芋道源码 */ -public class RedisMessageUtils { +@AllArgsConstructor +public class RedisMQTemplate { + + private final RedisTemplate redisTemplate; /** * 发送 Redis 消息,基于 Redis pub/sub 实现 * - * @param redisTemplate Redis 操作模板 * @param message 消息 */ - public static void sendChannelMessage(RedisTemplate redisTemplate, T message) { + public void send(T message) { redisTemplate.convertAndSend(message.getChannel(), JsonUtils.toJsonString(message)); } /** * 发送 Redis 消息,基于 Redis Stream 实现 * - * @param redisTemplate Redis 操作模板 * @param message 消息 * @return 消息记录的编号对象 */ - public static RecordId sendStreamMessage(RedisTemplate redisTemplate, T message) { + public RecordId send(T message) { return redisTemplate.opsForStream().add(StreamRecords.newRecord() .ofObject(JsonUtils.toJsonString(message)) // 设置内容 .withStreamKey(message.getStreamKey())); // 设置 stream key diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/message/AbstractRedisMessage.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/message/AbstractRedisMessage.java new file mode 100644 index 000000000..70a668154 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/message/AbstractRedisMessage.java @@ -0,0 +1,26 @@ +package cn.iocoder.yudao.framework.mq.core.message; + +import java.util.HashMap; +import java.util.Map; + +/** + * Redis 消息抽象基类 + * + * @author 芋道源码 + */ +public abstract class AbstractRedisMessage { + + /** + * 头 + */ + private final Map headers = new HashMap<>(); + + public String getHeader(String key) { + return headers.get(key); + } + + public void addHeader(String key, String value) { + headers.put(key, value); + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/ChannelMessage.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessage.java similarity index 53% rename from yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/ChannelMessage.java rename to yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessage.java index ff55f8b01..53040c6ec 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/ChannelMessage.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessage.java @@ -1,13 +1,14 @@ package cn.iocoder.yudao.framework.mq.core.pubsub; +import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage; import com.fasterxml.jackson.annotation.JsonIgnore; /** - * Redis Channel Message 接口 + * Redis Channel Message 抽象类 * * @author 芋道源码 */ -public interface ChannelMessage { +public abstract class AbstractChannelMessage extends AbstractRedisMessage { /** * 获得 Redis Channel @@ -15,6 +16,6 @@ public interface ChannelMessage { * @return Channel */ @JsonIgnore // 避免序列化 - String getChannel(); + public abstract String getChannel(); } diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessageListener.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessageListener.java index 9905a08ed..ff0725580 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessageListener.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessageListener.java @@ -15,7 +15,7 @@ import java.lang.reflect.Type; * * @author 芋道源码 */ -public abstract class AbstractChannelMessageListener implements MessageListener { +public abstract class AbstractChannelMessageListener implements MessageListener { /** * 消息类型 diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/StreamMessage.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessage.java similarity index 67% rename from yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/StreamMessage.java rename to yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessage.java index 30b38c62d..bafa685b3 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/StreamMessage.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessage.java @@ -3,11 +3,11 @@ package cn.iocoder.yudao.framework.mq.core.stream; import com.fasterxml.jackson.annotation.JsonIgnore; /** - * Redis Stream Message 接口 + * Redis Stream Message 抽象类 * * @author 芋道源码 */ -public interface StreamMessage { +public abstract class AbstractStreamMessage { /** * 获得 Redis Stream Key @@ -15,6 +15,6 @@ public interface StreamMessage { * @return Channel */ @JsonIgnore // 避免序列化 - String getStreamKey(); + public abstract String getStreamKey(); } diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessageListener.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessageListener.java index 612b5a029..8ccac0ce5 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessageListener.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessageListener.java @@ -19,7 +19,7 @@ import java.lang.reflect.Type; * * @author 芋道源码 */ -public abstract class AbstractStreamMessageListener +public abstract class AbstractStreamMessageListener implements StreamListener> { /** @@ -39,7 +39,7 @@ public abstract class AbstractStreamMessageListener @Getter private String group; /** - * + * RedisTemplate */ @Setter private RedisTemplate redisTemplate; diff --git a/yudao-framework/yudao-spring-boot-starter-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/package-info.java b/yudao-framework/yudao-spring-boot-starter-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/package-info.java index 54a0e1ae6..b2be77ae1 100644 --- a/yudao-framework/yudao-spring-boot-starter-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/package-info.java +++ b/yudao-framework/yudao-spring-boot-starter-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/package-info.java @@ -4,5 +4,11 @@ * 2. Web:请求 HTTP API 时,Header 带上 tenant-id 租户编号。 * 3. Job:在 JobHandler 执行任务时,会按照每个租户,都独立并行执行一次。 * 4. MQ:TODO + * 5. Async:异步需要保证 ThreadLocal 的传递性,通过使用阿里开源的 TransmittableThreadLocal 实现。相关的改造点,可见: + * 1)Spring Async: + * {@link cn.iocoder.yudao.framework.quartz.config.YudaoAsyncAutoConfiguration#threadPoolTaskExecutorBeanPostProcessor()} + * 2)Spring Security: + * TransmittableThreadLocalSecurityContextHolderStrategy + * 和 YudaoSecurityAutoConfiguration#securityContextHolderMethodInvokingFactoryBean() 方法 */ package cn.iocoder.yudao.framework.tenant;