From cb72a8b3bb40ec79761ee2050cdf161adc845259 Mon Sep 17 00:00:00 2001 From: wuxicheng <1441859745@qq.com> Date: Wed, 17 May 2023 23:33:33 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0mq=E9=85=8D=E7=BD=AE=E5=A2=9E?= =?UTF-8?q?=E5=BC=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../core/constant/ServiceNameConstants.java | 5 + .../config/EnvironmentIsolationConfig.java | 47 ++++ .../config/RocketEnhanceProperties.java | 24 ++ .../RocketMQEnhanceAutoConfiguration.java | 66 +++++ .../rocketmq/config/RocketMqAdapter.java | 38 --- .../rocketmq/config/RocketMqConstant.java | 45 ---- .../rocketmq/constant/RocketMqConstant.java | 25 ++ .../rocketmq/constant/RocketMqTopic.java | 25 ++ .../common/rocketmq/domain/BaseMessage.java | 31 +++ .../domain/img/AddUserVipRecordMessage.java | 50 ++++ .../domain/order/VipOrderCancelMessage.java | 21 ++ .../order/VipOrderPayNotifyMessage.java | 21 ++ .../handle/EnhanceMessageHandler.java | 151 +++++++++++ .../template/RocketMQEnhanceTemplate.java | 238 ++++++++++++++++++ .../img/listener/VipRecordCreateConsumer.java | 44 +++- .../bnyer/order/config/RocketMqConfig.java | 70 +++--- .../listener/vip/VipOrderCancelConsumer.java | 39 ++- .../vip/VipOrderPayNotifyConsumer.java | 105 ++++---- .../service/impl/VipOrderServiceImpl.java | 32 +-- .../com/bnyer/pay/config/RocketMqConfig.java | 56 ++--- .../pay/service/impl/PayInfoServiceImpl.java | 48 +--- 21 files changed, 918 insertions(+), 263 deletions(-) create mode 100644 bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/config/EnvironmentIsolationConfig.java create mode 100644 bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/config/RocketEnhanceProperties.java create mode 100644 bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/config/RocketMQEnhanceAutoConfiguration.java delete mode 100644 bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/config/RocketMqAdapter.java delete mode 100644 bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/config/RocketMqConstant.java create mode 100644 bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/constant/RocketMqConstant.java create mode 100644 bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/constant/RocketMqTopic.java create mode 100644 bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/BaseMessage.java create mode 100644 bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/img/AddUserVipRecordMessage.java create mode 100644 bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/order/VipOrderCancelMessage.java create mode 100644 bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/order/VipOrderPayNotifyMessage.java create mode 100644 bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java create mode 100644 bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/template/RocketMQEnhanceTemplate.java diff --git a/bnyer-common/bnyer-common-core/src/main/java/com/bnyer/common/core/constant/ServiceNameConstants.java b/bnyer-common/bnyer-common-core/src/main/java/com/bnyer/common/core/constant/ServiceNameConstants.java index 275279b..e33eaf7 100644 --- a/bnyer-common/bnyer-common-core/src/main/java/com/bnyer/common/core/constant/ServiceNameConstants.java +++ b/bnyer-common/bnyer-common-core/src/main/java/com/bnyer/common/core/constant/ServiceNameConstants.java @@ -36,4 +36,9 @@ public class ServiceNameConstants * 文件服务的serviceid */ public static final String FILE_SERVICE = "bnyer-file"; + + /** + * pay服务的serviceid + */ + public static final String PAY_SERVICE = "bnyer-pay"; } diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/config/EnvironmentIsolationConfig.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/config/EnvironmentIsolationConfig.java new file mode 100644 index 0000000..f473d5c --- /dev/null +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/config/EnvironmentIsolationConfig.java @@ -0,0 +1,47 @@ +package com.bnyer.common.rocketmq.config; + +import lombok.RequiredArgsConstructor; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer; +import org.jetbrains.annotations.NotNull; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.config.BeanPostProcessor; +import org.springframework.context.annotation.Configuration; + +/** + * @author :WXC + * @Date :2023/05/17 + * @description :环境隔离配置:避免测试环境和生产环境可能共用一个RocketMQ环境。如果没有进行处理, + * 在测试环境发送的消息就可能被生产环境的消费者消费,生产环境发送的消息也可能被测试环境的消费者消费,从而导致数据混乱的问题。 + */ +@Configuration +@RequiredArgsConstructor(onConstructor = @__(@Autowired)) +public class EnvironmentIsolationConfig implements BeanPostProcessor { + + private final RocketEnhanceProperties rocketEnhanceProperties; + + /** + * 在装载Bean之前实现参数修改 + */ + @Override + public Object postProcessBeforeInitialization(@NotNull Object bean, @NotNull String beanName) throws BeansException { + if(bean instanceof DefaultRocketMQListenerContainer){ + DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean; + if(rocketEnhanceProperties.isEnabledIsolation() && StringUtils.isNotBlank(rocketEnhanceProperties.getEnvironment())){ + String topic = container.getTopic(); + String consumerGroup = container.getConsumerGroup(); + //拼接Topic + if (!topic.endsWith(rocketEnhanceProperties.getEnvironment())){ + container.setTopic(String.join("-", container.getTopic(),rocketEnhanceProperties.getEnvironment())); + } + //拼接ConsumerGroup + if (!consumerGroup.endsWith(rocketEnhanceProperties.getEnvironment())){ + container.setConsumerGroup(String.join("-",container.getConsumerGroup(),rocketEnhanceProperties.getEnvironment())); + } + } + return container; + } + return bean; + } +} \ No newline at end of file diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/config/RocketEnhanceProperties.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/config/RocketEnhanceProperties.java new file mode 100644 index 0000000..ae68581 --- /dev/null +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/config/RocketEnhanceProperties.java @@ -0,0 +1,24 @@ +package com.bnyer.common.rocketmq.config; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * @author :WXC + * @Date :2023/05/17 + * @description :增强配置类 + */ +@ConfigurationProperties(prefix = "rocketmq.enhance") +@Data +public class RocketEnhanceProperties { + + /** + * 启动隔离,启动后会自动在topic上拼接激活的配置文件,达到自动隔离的效果 + */ + private boolean enabledIsolation; + + /** + * 隔离环境名称,拼接到topic后,如:dev,拼接后:topic-dev,默认空字符串 + */ + private String environment; +} \ No newline at end of file diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/config/RocketMQEnhanceAutoConfiguration.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/config/RocketMQEnhanceAutoConfiguration.java new file mode 100644 index 0000000..57bb659 --- /dev/null +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/config/RocketMQEnhanceAutoConfiguration.java @@ -0,0 +1,66 @@ +package com.bnyer.common.rocketmq.config; + +import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.apache.rocketmq.spring.support.RocketMQMessageConverter; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; +import org.springframework.messaging.converter.CompositeMessageConverter; +import org.springframework.messaging.converter.MappingJackson2MessageConverter; +import org.springframework.messaging.converter.MessageConverter; + +import java.util.List; + +/** + * @author :WXC + * @Date :2023/05/17 + * @description : + */ +@Configuration +@EnableConfigurationProperties(RocketEnhanceProperties.class) +public class RocketMQEnhanceAutoConfiguration { + + /** + * 注入增强的RocketMQEnhanceTemplate + */ + @Bean + public RocketMQEnhanceTemplate rocketMQEnhanceTemplate(RocketMQTemplate rocketMQTemplate){ + return new RocketMQEnhanceTemplate(rocketMQTemplate); + } + + /** + * 解决RocketMQ Jackson不支持Java时间类型配置 + * 源码参考:{@link org.apache.rocketmq.spring.autoconfigure.MessageConverterConfiguration} + */ + @Bean + @Primary + public RocketMQMessageConverter enhanceRocketMQMessageConverter(){ + RocketMQMessageConverter converter = new RocketMQMessageConverter(); + CompositeMessageConverter compositeMessageConverter = (CompositeMessageConverter) converter.getMessageConverter(); + List messageConverterList = compositeMessageConverter.getConverters(); + for (MessageConverter messageConverter : messageConverterList) { + if(messageConverter instanceof MappingJackson2MessageConverter){ + MappingJackson2MessageConverter jackson2MessageConverter = (MappingJackson2MessageConverter) messageConverter; + ObjectMapper objectMapper = jackson2MessageConverter.getObjectMapper(); + objectMapper.registerModules(new JavaTimeModule()); + } + } + return converter; + } + + + /** + * 环境隔离配置 + */ + @Bean + @ConditionalOnProperty(name="rocketmq.enhance.enabledIsolation", havingValue="true") + public EnvironmentIsolationConfig environmentSetup(RocketEnhanceProperties rocketEnhanceProperties){ + return new EnvironmentIsolationConfig(rocketEnhanceProperties); + } + +} \ No newline at end of file diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/config/RocketMqAdapter.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/config/RocketMqAdapter.java deleted file mode 100644 index 6b03f74..0000000 --- a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/config/RocketMqAdapter.java +++ /dev/null @@ -1,38 +0,0 @@ -package com.bnyer.common.rocketmq.config; - -import org.apache.rocketmq.client.producer.DefaultMQProducer; -import org.apache.rocketmq.spring.core.RocketMQTemplate; -import org.apache.rocketmq.spring.support.RocketMQMessageConverter; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.cloud.context.config.annotation.RefreshScope; -import org.springframework.context.annotation.Configuration; - -/** - * @author :WXC - * @Date :2023/03/24 - * @description : - */ -@RefreshScope -@Configuration -public class RocketMqAdapter { - - @Autowired - private RocketMQMessageConverter rocketMqMessageConverter; - - @Value("${rocketmq.name-server:}") - private String nameServer; - - public RocketMQTemplate getTemplateByTopicName(String topic){ - RocketMQTemplate mqTemplate = new RocketMQTemplate(); - DefaultMQProducer producer = new DefaultMQProducer(topic); - producer.setNamesrvAddr(nameServer); - producer.setRetryTimesWhenSendFailed(RocketMqConstant.SYNC_RETRY_FAILED_COUNT); - producer.setRetryTimesWhenSendAsyncFailed(RocketMqConstant.ASYNC_RETRY_FAILED_COUNT); - producer.setSendMsgTimeout((int) RocketMqConstant.TIMEOUT); - mqTemplate.setProducer(producer); - mqTemplate.setMessageConverter(rocketMqMessageConverter.getMessageConverter()); - return mqTemplate; - } - -} diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/config/RocketMqConstant.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/config/RocketMqConstant.java deleted file mode 100644 index 03a3c9f..0000000 --- a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/config/RocketMqConstant.java +++ /dev/null @@ -1,45 +0,0 @@ -package com.bnyer.common.rocketmq.config; - -/** - * @author :WXC - * @Date :2023/03/24 - * @description : - */ -public class RocketMqConstant { - /** - * 默认发送消息超时时间 - */ - public static final long TIMEOUT = 3000; - - /** - * 延迟队列取消订单时间,实际上30分钟 - * 按顺序匹配:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h (1-18) - */ - public static final int CANCEL_ORDER_DELAY_LEVEL = 16; - - /** - * 发送同步消息失败重试次数,默认2 - */ - public static final int SYNC_RETRY_FAILED_COUNT = 2; - - /** - * 发送异步消息失败重试次数,默认2 - */ - public static final int ASYNC_RETRY_FAILED_COUNT = 2; - - /** - * vip订单取消 - */ - public static final String VIP_ORDER_CANCEL_TOPIC = "vip-order-cancel-topic"; - - /** - * vip订单支付成功 - */ - public static final String VIP_ORDER_PAY_NOTIFY_TOPIC = "vip-order-pay-notify-topic"; - - /** - * vip记录创建 - */ - public static final String VIP_RECORD_CREATE_TOPIC = "vip-record-create-topic"; - -} diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/constant/RocketMqConstant.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/constant/RocketMqConstant.java new file mode 100644 index 0000000..a124c4b --- /dev/null +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/constant/RocketMqConstant.java @@ -0,0 +1,25 @@ +package com.bnyer.common.rocketmq.constant; + +/** + * @author :WXC + * @Date :2023/03/24 + * @description : mq常量池 + */ +public class RocketMqConstant { + + /** + * 数据来源前缀 + */ + public static final String RETRY_PREFIX = "bnyer-"; + + /** + * 按顺序匹配:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h (1-18) + */ + public static final int FIVE_SECOND = 2; + + /** + * 延迟队列取消订单时间,实际上30分钟 + * 按顺序匹配:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h (1-18) + */ + public static final int THIRTY_MINUTES = 16; +} diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/constant/RocketMqTopic.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/constant/RocketMqTopic.java new file mode 100644 index 0000000..6370849 --- /dev/null +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/constant/RocketMqTopic.java @@ -0,0 +1,25 @@ +package com.bnyer.common.rocketmq.constant; + +/** + * @author :WXC + * @Date :2023/05/17 + * @description : 消息主题 + */ +public class RocketMqTopic { + + /** + * vip订单取消 + */ + public static final String VIP_ORDER_CANCEL_TOPIC = "vip-order-cancel-topic"; + + /** + * vip订单支付成功 + */ + public static final String VIP_ORDER_PAY_NOTIFY_TOPIC = "vip-order-pay-notify-topic"; + + /** + * vip记录创建 + */ + public static final String VIP_RECORD_CREATE_TOPIC = "vip-record-create-topic"; + +} diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/BaseMessage.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/BaseMessage.java new file mode 100644 index 0000000..1171c40 --- /dev/null +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/BaseMessage.java @@ -0,0 +1,31 @@ +package com.bnyer.common.rocketmq.domain; + +import lombok.Data; + +import java.time.LocalDateTime; + +/** + * @author :WXC + * @Date :2023/05/17 + * @description :消息实体,所有消息都需要继承此类 + */ +@Data +public class BaseMessage { + /** + * 业务键,用于RocketMQ控制台查看消费情况 + */ + protected String key; + /** + * 发送消息来源,用于排查问题 + */ + protected String source = ""; + /** + * 发送时间 + */ + protected LocalDateTime sendTime = LocalDateTime.now(); + /** + * 重试次数,用于判断重试次数,超过重试次数发送异常警告 + */ + protected Integer retryTimes = 0; + +} diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/img/AddUserVipRecordMessage.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/img/AddUserVipRecordMessage.java new file mode 100644 index 0000000..3661bab --- /dev/null +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/img/AddUserVipRecordMessage.java @@ -0,0 +1,50 @@ +package com.bnyer.common.rocketmq.domain.img; + +import com.bnyer.common.rocketmq.domain.BaseMessage; +import com.fasterxml.jackson.annotation.JsonFormat; +import io.swagger.annotations.ApiModelProperty; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +import java.util.Date; + +/** + * @author :WXC + * @Date :2023/05/17 + * @description : 添加用户会员记录 + */ +@Getter +@Setter +@NoArgsConstructor +public class AddUserVipRecordMessage extends BaseMessage { + + @ApiModelProperty(value="订单号") + private String orderNo; + + @ApiModelProperty(value="用户id") + private Long userId; + + @ApiModelProperty(value="用户手机号") + private String phone; + + @ApiModelProperty(value="vip表id") + private Long vipId; + + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + @ApiModelProperty(value="开始时间") + private Date startTime; + + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + @ApiModelProperty(value="到期时间") + private Date endTime; + + @ApiModelProperty(value="vip名称") + private String vipName; + + @ApiModelProperty(value = "vip类型名称") + private String vipTypeName; + + @ApiModelProperty(value = "用户客户端类型:10用户-抖音 20用户-快手 30用户-微信 40艺术家-微信") + private Integer userClientType; +} diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/order/VipOrderCancelMessage.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/order/VipOrderCancelMessage.java new file mode 100644 index 0000000..98e55ab --- /dev/null +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/order/VipOrderCancelMessage.java @@ -0,0 +1,21 @@ +package com.bnyer.common.rocketmq.domain.order; + +import com.bnyer.common.rocketmq.domain.BaseMessage; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +/** + * @author :WXC + * @Date :2023/05/17 + * @description : 会员订单取消 + */ +@Getter +@Setter +@NoArgsConstructor +public class VipOrderCancelMessage extends BaseMessage { + /** + * 订单号 + */ + private String orderNo; +} diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/order/VipOrderPayNotifyMessage.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/order/VipOrderPayNotifyMessage.java new file mode 100644 index 0000000..bcfb90d --- /dev/null +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/order/VipOrderPayNotifyMessage.java @@ -0,0 +1,21 @@ +package com.bnyer.common.rocketmq.domain.order; + +import com.bnyer.common.rocketmq.domain.BaseMessage; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +/** + * @author :WXC + * @Date :2023/05/17 + * @description : 会员订单支付回调 + */ +@Getter +@Setter +@NoArgsConstructor +public class VipOrderPayNotifyMessage extends BaseMessage { + /** + * 订单号 + */ + private String orderNo; +} diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java new file mode 100644 index 0000000..c51382e --- /dev/null +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java @@ -0,0 +1,151 @@ +package com.bnyer.common.rocketmq.handle; + +import com.alibaba.fastjson.JSONObject; +import com.bnyer.common.rocketmq.constant.RocketMqConstant; +import com.bnyer.common.rocketmq.domain.BaseMessage; +import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; + +import javax.annotation.Resource; + +/** + * @author :WXC + * @Date :2023/05/17 + * @description : 消息模板抽象类,父类提供公共模板方法 + */ +@Slf4j +public abstract class EnhanceMessageHandler { + /** + * 默认重试次数 + */ + private static final int MAX_RETRY_TIMES = 3; + + /** + * 延时等级 + */ + private static final int DELAY_LEVEL = RocketMqConstant.FIVE_SECOND; + + @Resource + private RocketMQEnhanceTemplate rocketMQEnhanceTemplate; + + /** + * 消息处理 + * + * @param message 待处理消息 + * @throws Exception 消费异常 + */ + protected abstract void handleMessage(T message) throws Exception; + + /** + * 超过重试次数消息,需要启用isRetry + * + * @param message 待处理消息 + */ + protected abstract void handleMaxRetriesExceeded(T message); + + + /** + * 是否需要根据业务规则过滤消息,去重逻辑可以在此处处理 + * @param message 待处理消息 + * @return true: 本次消息被过滤,false:不过滤 + */ + protected abstract boolean filter(T message); + + /** + * 是否异常时重复发送 + * + * @return true: 消息重试,false:不重试 + */ + protected abstract boolean isRetry(); + + /** + * 消费异常时是否抛出异常 + * 返回true,则由rocketmq机制自动重试 + * false:消费异常(如果没有开启重试则消息会被自动ack) + */ + protected abstract boolean throwException(); + + /** + * 最大重试次数 + * + * @return 最大重试次数,默认3次 + */ + protected int getMaxRetryTimes() { + return MAX_RETRY_TIMES; + } + + /** + * isRetry开启时,重新入队延迟时间 + * @return -1:立即入队重试 + */ + protected int getDelayLevel() { + return DELAY_LEVEL; + } + + /** + * 使用模板模式构建消息消费框架,可自由扩展或删减 + */ + public void dispatchMessage(T message) { + // 基础日志记录被父类处理了 + log.info("消费者收到消息[{}]", JSONObject.toJSON(message)); + if (filter(message)) { + log.info("消息id{}不满足消费条件,已过滤。",message.getKey()); + return; + } + //超过最大重试次数时调用子类方法处理 + if (message.getRetryTimes() > getMaxRetryTimes()) { + handleMaxRetriesExceeded(message); + return; + } + try { + long now = System.currentTimeMillis(); + handleMessage(message); + long costTime = System.currentTimeMillis() - now; + log.info("消息{}消费成功,耗时[{}ms]", message.getKey(),costTime); + } catch (Exception e) { + log.error("消息{}消费异常", message.getKey(),e); + // 是捕获异常还是抛出,由子类决定 + if (throwException()) { + //抛出异常,由DefaultMessageListenerConcurrently类处理 + throw new RuntimeException(e); + } + //此时如果不开启重试机制,则默认ACK了 + if (isRetry()) { + handleRetry(message); + } + } + } + + protected void handleRetry(T message) { + // 获取子类RocketMQMessageListener注解拿到topic和tag + RocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class); + if (annotation == null) { + return; + } + //重新构建消息体 + String messageSource = message.getSource(); + if(!messageSource.startsWith(RocketMqConstant.RETRY_PREFIX)){ + message.setSource(RocketMqConstant.RETRY_PREFIX + messageSource); + } + message.setRetryTimes(message.getRetryTimes() + 1); + + SendResult sendResult; + + try { + // 如果消息发送不成功,则再次重新发送,如果发送异常则抛出由MQ再次处理(异常时不走延迟消息) + sendResult = rocketMQEnhanceTemplate.send(annotation.topic(), annotation.selectorExpression(), message, getDelayLevel()); + } catch (Exception ex) { + // 此处捕获之后,相当于此条消息被消息完成然后重新发送新的消息 + //由生产者直接发送 + throw new RuntimeException(ex); + } + // 发送失败的处理就是不进行ACK,由RocketMQ重试 + if (sendResult.getSendStatus() != SendStatus.SEND_OK) { + throw new RuntimeException("重试消息发送失败"); + } + + } +} \ No newline at end of file diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/template/RocketMQEnhanceTemplate.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/template/RocketMQEnhanceTemplate.java new file mode 100644 index 0000000..eb5b057 --- /dev/null +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/template/RocketMQEnhanceTemplate.java @@ -0,0 +1,238 @@ +package com.bnyer.common.rocketmq.template; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.bnyer.common.rocketmq.config.RocketEnhanceProperties; +import com.bnyer.common.rocketmq.constant.RocketMqConstant; +import com.bnyer.common.rocketmq.domain.BaseMessage; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.apache.rocketmq.spring.support.RocketMQHeaders; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; + +import javax.annotation.Resource; + +/** + * @author :WXC + * @Date :2023/05/17 + * @description : RocketMq增强请求模板类 + */ +@Slf4j +@RequiredArgsConstructor(onConstructor = @__(@Autowired)) +public class RocketMQEnhanceTemplate { + + /** + * 延时等级 + */ + private static final int DELAY_LEVEL = RocketMqConstant.FIVE_SECOND; + + /** + * 默认重试次数 + */ + private static final int MAX_RETRY_TIMES = 3; + + private final RocketMQTemplate template; + + @Resource + private RocketEnhanceProperties rocketEnhanceProperties; + + public RocketMQTemplate getTemplate() { + return template; + } + + /** + * 根据系统上下文自动构建隔离后的topic + * 构建目的地 + */ + public String buildDestination(String topic, String tag) { + topic = reBuildTopic(topic); + return StringUtils.isNotBlank(tag)?topic + ":" + tag:topic; + } + + /** + * 根据环境重新隔离topic + * @param topic 原始topic + */ + private String reBuildTopic(String topic) { + if(rocketEnhanceProperties.isEnabledIsolation() && StringUtils.isNoneBlank(rocketEnhanceProperties.getEnvironment())){ + return topic +"-" + rocketEnhanceProperties.getEnvironment(); + } + return topic; + } + + /** + * 发送同步消息 + * @param topic + * @param tag + * @param message + * @return + * @param + */ + public SendResult send(String topic, String tag, T message) { + // 注意分隔符 + return send(buildDestination(topic,tag), message); + } + + + /** + * 发送同步消息 + * @param destination + * @param message + * @return + * @param + */ + public SendResult send(String destination, T message) { + // 设置业务键,此处根据公共的参数进行处理 + // 更多的其它基础业务处理... + Message sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build(); + SendResult sendResult = template.syncSend(destination, sendMessage); + // 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如ELK采集 + log.info("[{}]同步消息[{}]发送结果[{}]", destination, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); + return sendResult; + } + + /** + * 发送同步延时消息 + * @param topic + * @param tag + * @param message + * @param delayLevel + * @return + * @param + */ + public SendResult send(String topic, String tag, T message, int delayLevel) { + return send(buildDestination(topic,tag), message, delayLevel); + } + + /** + * 发送同步延时消息 + * @param destination + * @param message + * @param delayLevel + * @return + * @param + */ + public SendResult send(String destination, T message, int delayLevel) { + Message sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build(); + SendResult sendResult = template.syncSend(destination, sendMessage, 3000, delayLevel); + log.info("[{}]延迟等级[{}]消息[{}]发送结果[{}]", destination, delayLevel, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); + return sendResult; + } + + /** + * 发送异步消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑) + * (适合对响应时间敏感的业务场景) + */ + public void sendAsyncMsg(String topic, String tag, T message) { + sendAsyncMsg(buildDestination(topic,tag),message); + } + + /** + * 发送异步消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑) + * (适合对响应时间敏感的业务场景) + */ + public void sendAsyncMsg(String destination, T message) { + template.asyncSend(destination, MessageBuilder.withPayload(message).build(), new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + // 处理消息发送成功逻辑 + log.info("消息发送成功,destination:{},result:{}",destination, JSON.toJSONString(sendResult)); + } + @Override + public void onException(Throwable throwable) { + // 处理消息发送失败逻辑 + log.error("消息发送失败,destination:{},error:{}",destination,throwable.getMessage()); + } + }); + } + + /** + * 发送异步可重试消息(通过线程池执行发送到broker的消息任务,执行完后回调:如果发送失败进行重试) + * + */ + public void sendAsyncRetryMsg(String topic, String tag, T message) { + sendAsyncMsg(buildDestination(topic,tag),message,getDelayLevel()); + } + + /** + * 发送异步延时消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑) + * + */ + public void sendAsyncMsg(String topic, String tag, T message, int delayLevel) { + sendAsyncMsg(buildDestination(topic,tag),message,delayLevel); + } + + /** + * 发送异步延时消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑) + * (适合对响应时间敏感的业务场景) + */ + public void sendAsyncMsg(String destination, T message, int delayLevel) { + template.asyncSend(destination, MessageBuilder.withPayload(message).build(), new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + // 处理消息发送成功逻辑 + log.info("消息发送成功,destination:{},result:{}",destination,JSON.toJSONString(sendResult)); + } + @Override + public void onException(Throwable throwable) { + log.error("消息发送失败,destination:{},error:{}",destination,throwable.getMessage()); + // 超过最大重试次数时调用子类方法处理 + if (message.getRetryTimes() > getMaxRetryTimes()) { + handleMaxRetriesExceeded(message); + return; + } + //发送失败,重试发送 + handleRetry(destination,message); + } + /** + * 重试发送异步消息 + * @param destination + * @param message + * @param + */ + private void handleRetry(String destination,T message) { + //重新构建消息体 + String messageSource = message.getSource(); + if(!messageSource.startsWith(RocketMqConstant.RETRY_PREFIX)){ + message.setSource(RocketMqConstant.RETRY_PREFIX + messageSource); + } + message.setRetryTimes(message.getRetryTimes() + 1); + // 如果消息发送不成功,则再次重新发送 + sendAsyncMsg(destination, message,getDelayLevel()); + } + + /** + * 超过最大重试次数调用该方法处理 + * @param message + * @param + */ + private void handleMaxRetriesExceeded(T message) { + log.error("发送消息超过最大重试次数,message:{}",message); + } + },delayLevel); + } + + /** + * isRetry开启时,重新入队延迟时间 + * @return -1:立即入队重试 + */ + protected int getDelayLevel() { + return DELAY_LEVEL; + } + + /** + * 最大重试次数 + * + * @return 最大重试次数,默认3次 + */ + protected int getMaxRetryTimes() { + return MAX_RETRY_TIMES; + } + +} diff --git a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/VipRecordCreateConsumer.java b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/VipRecordCreateConsumer.java index b365133..608a209 100644 --- a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/VipRecordCreateConsumer.java +++ b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/VipRecordCreateConsumer.java @@ -1,10 +1,10 @@ package com.bnyer.img.listener; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; -import com.bnyer.common.core.domain.UserVipRecord; import com.bnyer.common.core.dto.AddUserVipRecordDto; -import com.bnyer.common.rocketmq.config.RocketMqConstant; +import com.bnyer.common.core.utils.bean.EntityConvertUtil; +import com.bnyer.common.rocketmq.constant.RocketMqTopic; +import com.bnyer.common.rocketmq.domain.img.AddUserVipRecordMessage; +import com.bnyer.common.rocketmq.handle.EnhanceMessageHandler; import com.bnyer.img.service.UserVipRecordService; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; @@ -19,18 +19,42 @@ import org.springframework.stereotype.Component; */ @Slf4j @Component -@RocketMQMessageListener(topic = RocketMqConstant.VIP_RECORD_CREATE_TOPIC,consumerGroup = RocketMqConstant.VIP_RECORD_CREATE_TOPIC) -public class VipRecordCreateConsumer implements RocketMQListener { +@RocketMQMessageListener(topic = RocketMqTopic.VIP_RECORD_CREATE_TOPIC,consumerGroup = RocketMqTopic.VIP_RECORD_CREATE_TOPIC) +public class VipRecordCreateConsumer extends EnhanceMessageHandler implements RocketMQListener { @Autowired private UserVipRecordService userVipRecordService; @Override - public void onMessage(String message) { - log.info("收到消息:{}", message); - JSONObject jsonObject = JSON.parseObject(message); - AddUserVipRecordDto addUserVipRecordDto = JSON.toJavaObject(jsonObject, AddUserVipRecordDto.class); + public void onMessage(AddUserVipRecordMessage message) { + super.dispatchMessage(message); + AddUserVipRecordDto addUserVipRecordDto = EntityConvertUtil.copy(message, AddUserVipRecordDto.class); //添加用户会员记录 userVipRecordService.addUserVipRecord(addUserVipRecordDto); } + + @Override + protected void handleMessage(AddUserVipRecordMessage message) throws Exception { + + } + + @Override + protected void handleMaxRetriesExceeded(AddUserVipRecordMessage message) { + + } + + @Override + protected boolean filter(AddUserVipRecordMessage message) { + return false; + } + + @Override + protected boolean isRetry() { + return true; + } + + @Override + protected boolean throwException() { + return false; + } } diff --git a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/config/RocketMqConfig.java b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/config/RocketMqConfig.java index ad5d314..752a7f6 100644 --- a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/config/RocketMqConfig.java +++ b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/config/RocketMqConfig.java @@ -1,35 +1,35 @@ -package com.bnyer.order.config; - -import com.bnyer.common.rocketmq.config.RocketMqAdapter; -import com.bnyer.common.rocketmq.config.RocketMqConstant; -import org.apache.rocketmq.spring.core.RocketMQTemplate; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.cloud.context.config.annotation.RefreshScope; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Lazy; - -/** - * @author :WXC - * @Date :2023/03/24 - * @description : - */ -@RefreshScope -@Configuration -public class RocketMqConfig { - @Autowired - private RocketMqAdapter rocketMqAdapter; - - @Lazy - @Bean(destroyMethod = "destroy") - public RocketMQTemplate orderCancelMqTemplate() { - return rocketMqAdapter.getTemplateByTopicName(RocketMqConstant.VIP_ORDER_CANCEL_TOPIC); - } - - @Lazy - @Bean(destroyMethod = "destroy") - public RocketMQTemplate vipRecordMqTemplate() { - return rocketMqAdapter.getTemplateByTopicName(RocketMqConstant.VIP_RECORD_CREATE_TOPIC); - } - -} +//package com.bnyer.order.config; +// +//import com.bnyer.common.rocketmq.adapter.RocketMqAdapter; +//import com.bnyer.common.rocketmq.constant.RocketMqTopic; +//import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.cloud.context.config.annotation.RefreshScope; +//import org.springframework.context.annotation.Bean; +//import org.springframework.context.annotation.Configuration; +//import org.springframework.context.annotation.Lazy; +// +///** +// * @author :WXC +// * @Date :2023/03/24 +// * @description : +// */ +//@RefreshScope +//@Configuration +//public class RocketMqConfig { +// @Autowired +// private RocketMqAdapter rocketMqAdapter; +// +// @Lazy +// @Bean(destroyMethod = "destroy") +// public RocketMQEnhanceTemplate vipOrderCancelMqTemplate() { +// return rocketMqAdapter.getTemplateByTopicName(RocketMqTopic.VIP_ORDER_CANCEL_TOPIC); +// } +// +// @Lazy +// @Bean(destroyMethod = "destroy") +// public RocketMQEnhanceTemplate vipRecordCreateMqTemplate() { +// return rocketMqAdapter.getTemplateByTopicName(RocketMqTopic.VIP_RECORD_CREATE_TOPIC); +// } +// +//} diff --git a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderCancelConsumer.java b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderCancelConsumer.java index c9b7800..0812690 100644 --- a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderCancelConsumer.java +++ b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderCancelConsumer.java @@ -1,6 +1,8 @@ package com.bnyer.order.listener.vip; -import com.bnyer.common.rocketmq.config.RocketMqConstant; +import com.bnyer.common.rocketmq.constant.RocketMqTopic; +import com.bnyer.common.rocketmq.domain.order.VipOrderCancelMessage; +import com.bnyer.common.rocketmq.handle.EnhanceMessageHandler; import com.bnyer.order.bean.dto.CancelVipOrderDto; import com.bnyer.order.service.VipOrderService; import com.google.common.collect.Lists; @@ -17,19 +19,44 @@ import org.springframework.stereotype.Component; */ @Slf4j @Component -@RocketMQMessageListener(topic = RocketMqConstant.VIP_ORDER_CANCEL_TOPIC,consumerGroup = RocketMqConstant.VIP_ORDER_CANCEL_TOPIC) -public class VipOrderCancelConsumer implements RocketMQListener { +@RocketMQMessageListener(topic = RocketMqTopic.VIP_ORDER_CANCEL_TOPIC,consumerGroup = RocketMqTopic.VIP_ORDER_CANCEL_TOPIC) +public class VipOrderCancelConsumer extends EnhanceMessageHandler implements RocketMQListener { @Autowired private VipOrderService vipOrderService; @Override - public void onMessage(String orderNo) { - log.info("收到消息:{}", orderNo); + public void onMessage(VipOrderCancelMessage message) { + super.dispatchMessage(message); // 如果订单未支付的话,将订单设为取消状态 CancelVipOrderDto cancelVipOrderDto = new CancelVipOrderDto(); cancelVipOrderDto.setOrderStatus(3); - cancelVipOrderDto.setOrderNos(Lists.newArrayList(orderNo)); + cancelVipOrderDto.setOrderNos(Lists.newArrayList(message.getOrderNo())); vipOrderService.cancelVipOrder(cancelVipOrderDto); } + + @Override + protected void handleMessage(VipOrderCancelMessage message) throws Exception { + + } + + @Override + protected void handleMaxRetriesExceeded(VipOrderCancelMessage message) { + log.error("消息消费失败,可扩展执行后续处理"); + } + + @Override + protected boolean filter(VipOrderCancelMessage message) { + return false; + } + + @Override + protected boolean isRetry() { + return true; + } + + @Override + protected boolean throwException() { + return false; + } } diff --git a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderPayNotifyConsumer.java b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderPayNotifyConsumer.java index c2fb898..32cde75 100644 --- a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderPayNotifyConsumer.java +++ b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderPayNotifyConsumer.java @@ -1,18 +1,19 @@ package com.bnyer.order.listener.vip; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.bnyer.common.core.constant.ServiceNameConstants; import com.bnyer.common.core.domain.VipOrder; -import com.bnyer.common.rocketmq.config.RocketMqConstant; +import com.bnyer.common.core.utils.uuid.IdUtils; +import com.bnyer.common.rocketmq.constant.RocketMqTopic; +import com.bnyer.common.rocketmq.domain.img.AddUserVipRecordMessage; +import com.bnyer.common.rocketmq.domain.order.VipOrderPayNotifyMessage; +import com.bnyer.common.rocketmq.handle.EnhanceMessageHandler; +import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate; import com.bnyer.order.mapper.VipOrderMapper; import com.bnyer.order.service.VipOrderService; import lombok.extern.slf4j.Slf4j; -import org.apache.rocketmq.client.producer.SendCallback; -import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; -import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; @@ -25,8 +26,8 @@ import java.util.Objects; */ @Slf4j @Component -@RocketMQMessageListener(topic = RocketMqConstant.VIP_ORDER_PAY_NOTIFY_TOPIC,consumerGroup = RocketMqConstant.VIP_ORDER_PAY_NOTIFY_TOPIC) -public class VipOrderPayNotifyConsumer implements RocketMQListener { +@RocketMQMessageListener(topic = RocketMqTopic.VIP_ORDER_PAY_NOTIFY_TOPIC,consumerGroup = RocketMqTopic.VIP_ORDER_PAY_NOTIFY_TOPIC) +public class VipOrderPayNotifyConsumer extends EnhanceMessageHandler implements RocketMQListener { @Resource private VipOrderMapper vipOrderMapper; @@ -35,14 +36,13 @@ public class VipOrderPayNotifyConsumer implements RocketMQListener { private VipOrderService vipOrderService; @Resource - private RocketMQTemplate vipRecordMqTemplate; + private RocketMQEnhanceTemplate rocketMQEnhanceTemplate; @Override - public void onMessage(String message) { - log.info("收到消息:{}", message); + public void onMessage(VipOrderPayNotifyMessage message) { + super.dispatchMessage(message); //修改订单并添加会员记录 - JSONObject orderObj = JSON.parseObject(message); - String orderNo = orderObj.getString("orderNo"); + String orderNo = message.getOrderNo(); VipOrder vipOrder = vipOrderMapper.selectOne(new LambdaQueryWrapper().eq(VipOrder::getOrderNo, orderNo)); if (Objects.isNull(vipOrder)){ log.error("订单不存在,订单号:{}",orderNo); @@ -51,33 +51,12 @@ public class VipOrderPayNotifyConsumer implements RocketMQListener { //修改订单表状态为已支付 vipOrderService.updateByToPaySuccess(vipOrder); //发消息,添加用户会员记录 - String msg = buildVipRecordMsg(vipOrder); - sendMsg(RocketMqConstant.VIP_RECORD_CREATE_TOPIC, msg); -// SendStatus sendStatus = vipRecordMqTemplate.syncSend(RocketMqConstant.VIP_RECORD_CREATE_TOPIC, new GenericMessage<>(msg)).getSendStatus(); -// if (!Objects.equals(sendStatus, SendStatus.SEND_OK)) { -// // 消息发不出去就抛异常 -// throw new ServiceException(ResponseEnum.SERVER_ERROR); -// } + AddUserVipRecordMessage addUserVipRecordMessage = buildVipRecordMsg(vipOrder); + rocketMQEnhanceTemplate.sendAsyncRetryMsg(RocketMqTopic.VIP_RECORD_CREATE_TOPIC,null, addUserVipRecordMessage); + // TODO: 2023/05/17 发消息添加其他奖励 - } - /** - * 发送消息 - * @param topic - * @param msg - * @param - */ - public void sendMsg(String topic, T msg) { - vipRecordMqTemplate.asyncSend(topic, msg,new SendCallback() { - @Override - public void onSuccess(SendResult sendResult) { - log.info("topic:{}消息发送成功,result:{}",topic,JSON.toJSONString(sendResult)); - } - @Override - public void onException(Throwable e) { - log.info("topic:{}消息发送失败,error:{}",topic,e.getMessage()); - } - }); + } /** @@ -85,18 +64,44 @@ public class VipOrderPayNotifyConsumer implements RocketMQListener { * @param vipOrder * @return */ - private String buildVipRecordMsg(VipOrder vipOrder) { - JSONObject jsonObject = new JSONObject(); - jsonObject.put("orderNo",vipOrder.getOrderNo()); - jsonObject.put("startTime",vipOrder.getStartTime()); - jsonObject.put("endTime",vipOrder.getEndTime()); - jsonObject.put("vipId",vipOrder.getVipId()); - jsonObject.put("vipName",vipOrder.getVipName()); - jsonObject.put("vipTypeName",vipOrder.getVipTypeName()); - jsonObject.put("userClientType",vipOrder.getUserClientType()); - jsonObject.put("phone",vipOrder.getPhone()); - jsonObject.put("userId",vipOrder.getUserId()); - return JSON.toJSONString(jsonObject); + private AddUserVipRecordMessage buildVipRecordMsg(VipOrder vipOrder) { + AddUserVipRecordMessage message = new AddUserVipRecordMessage(); + message.setKey(IdUtils.randomUUID()); + message.setSource(ServiceNameConstants.ORDER_SERVICE); + message.setOrderNo(vipOrder.getOrderNo()); + message.setStartTime(vipOrder.getStartTime()); + message.setEndTime(vipOrder.getEndTime()); + message.setVipId(vipOrder.getVipId()); + message.setVipName(vipOrder.getVipName()); + message.setVipTypeName(vipOrder.getVipTypeName()); + message.setUserClientType(vipOrder.getUserClientType()); + message.setPhone(vipOrder.getPhone()); + message.setUserId(vipOrder.getUserId()); + return message; } + @Override + protected void handleMessage(VipOrderPayNotifyMessage message) throws Exception { + + } + + @Override + protected void handleMaxRetriesExceeded(VipOrderPayNotifyMessage message) { + + } + + @Override + protected boolean filter(VipOrderPayNotifyMessage message) { + return true; + } + + @Override + protected boolean isRetry() { + return true; + } + + @Override + protected boolean throwException() { + return false; + } } diff --git a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/VipOrderServiceImpl.java b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/VipOrderServiceImpl.java index 2ee83da..74015cb 100644 --- a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/VipOrderServiceImpl.java +++ b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/VipOrderServiceImpl.java @@ -5,6 +5,7 @@ import cn.hutool.core.date.DateUtil; import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.bnyer.common.core.constant.ServiceNameConstants; import com.bnyer.common.core.domain.R; import com.bnyer.common.core.domain.VipOrder; import com.bnyer.common.core.enums.*; @@ -13,8 +14,12 @@ import com.bnyer.common.core.utils.DateUtils; import com.bnyer.common.core.utils.OrderUtil; import com.bnyer.common.core.utils.StringUtils; import com.bnyer.common.core.utils.bean.EntityConvertUtil; +import com.bnyer.common.core.utils.uuid.IdUtils; import com.bnyer.common.core.vo.UserInfoVo; -import com.bnyer.common.rocketmq.config.RocketMqConstant; +import com.bnyer.common.rocketmq.constant.RocketMqConstant; +import com.bnyer.common.rocketmq.constant.RocketMqTopic; +import com.bnyer.common.rocketmq.domain.order.VipOrderCancelMessage; +import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate; import com.bnyer.common.security.utils.SecurityUtils; import com.bnyer.img.api.remote.RemoteUserVipService; import com.bnyer.img.api.vo.UserVipVo; @@ -27,8 +32,6 @@ import com.bnyer.common.core.enums.EnumVipOrderStatus; import com.bnyer.order.mapper.VipOrderMapper; import com.bnyer.order.service.VipOrderService; import lombok.extern.slf4j.Slf4j; -import org.apache.rocketmq.client.producer.SendCallback; -import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; @@ -50,8 +53,12 @@ import java.util.stream.Collectors; @Service public class VipOrderServiceImpl extends ServiceImpl implements VipOrderService { +// @Autowired +// private RocketMQTemplate orderCancelMqTemplate; + + //注入增强后的模板,可以自动实现环境隔离,日志记录 @Autowired - private RocketMQTemplate orderCancelMqTemplate; + private RocketMQEnhanceTemplate rocketMQEnhanceTemplate; @Autowired private VipOrderMapper vipOrderMapper; @@ -107,22 +114,17 @@ public class VipOrderServiceImpl extends ServiceImpl i vipOrderMapper.insert(vipOrder); String orderNo = vipOrder.getOrderNo(); //发送消息,如果三十分钟后没有支付,则取消订单 - orderCancelMqTemplate.asyncSend(RocketMqConstant.VIP_ORDER_CANCEL_TOPIC, orderNo,new SendCallback() { - @Override - public void onSuccess(SendResult sendResult) { - log.info("topic:{}消息发送成功,result:{}",RocketMqConstant.VIP_ORDER_CANCEL_TOPIC,JSON.toJSONString(sendResult)); - } - @Override - public void onException(Throwable e) { - log.info("topic:{}消息发送失败,error:{}",RocketMqConstant.VIP_ORDER_CANCEL_TOPIC,e.getMessage()); - } - },RocketMqConstant.TIMEOUT); + VipOrderCancelMessage vipOrderCancelMessage = new VipOrderCancelMessage(); + vipOrderCancelMessage.setKey(IdUtils.randomUUID()); + vipOrderCancelMessage.setSource(ServiceNameConstants.ORDER_SERVICE); + vipOrderCancelMessage.setOrderNo(orderNo); + rocketMQEnhanceTemplate.sendAsyncMsg(RocketMqTopic.VIP_ORDER_CANCEL_TOPIC,null,vipOrderCancelMessage,RocketMqConstant.THIRTY_MINUTES); // SendStatus sendStatus = orderCancelMqTemplate.syncSend(RocketMqConstant.VIP_ORDER_CANCEL_TOPIC, new GenericMessage<>(orderNo), RocketMqConstant.TIMEOUT, RocketMqConstant.CANCEL_ORDER_DELAY_LEVEL).getSendStatus(); // if (!Objects.equals(sendStatus,SendStatus.SEND_OK)) { // // 消息发不出去就抛异常,发的出去无所谓 // throw new ServiceException(ResponseEnum.SERVER_ERROR); // }else { -// log.info("消息发送成功,topic:{}",RocketMqConstant.VIP_ORDER_CANCEL_TOPIC); +// log.info("消息发送成功,topic:{}",RocketMqTopic.VIP_ORDER_CANCEL_TOPIC); // } return orderNo; } diff --git a/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/config/RocketMqConfig.java b/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/config/RocketMqConfig.java index 3b4cc5f..33eb41b 100644 --- a/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/config/RocketMqConfig.java +++ b/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/config/RocketMqConfig.java @@ -1,28 +1,28 @@ -package com.bnyer.pay.config; - -import com.bnyer.common.rocketmq.config.RocketMqAdapter; -import com.bnyer.common.rocketmq.config.RocketMqConstant; -import org.apache.rocketmq.spring.core.RocketMQTemplate; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.cloud.context.config.annotation.RefreshScope; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Lazy; - -/** - * @author :WXC - * @Date :2023/03/24 - * @description : - */ -@RefreshScope -@Configuration -public class RocketMqConfig { - @Autowired - private RocketMqAdapter rocketMqAdapter; - - @Lazy - @Bean(destroyMethod = "destroy") - public RocketMQTemplate vipOrderPayNotifyMqTemplate() { - return rocketMqAdapter.getTemplateByTopicName(RocketMqConstant.VIP_ORDER_PAY_NOTIFY_TOPIC); - } -} +//package com.bnyer.pay.config; +// +//import com.bnyer.common.rocketmq.adapter.RocketMqAdapter; +//import com.bnyer.common.rocketmq.constant.RocketMqTopic; +//import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.cloud.context.config.annotation.RefreshScope; +//import org.springframework.context.annotation.Bean; +//import org.springframework.context.annotation.Configuration; +//import org.springframework.context.annotation.Lazy; +// +///** +// * @author :WXC +// * @Date :2023/03/24 +// * @description : +// */ +//@RefreshScope +//@Configuration +//public class RocketMqConfig { +// @Autowired +// private RocketMqAdapter rocketMqAdapter; +// +// @Lazy +// @Bean(destroyMethod = "destroy") +// public RocketMQEnhanceTemplate vipOrderPayNotifyMqTemplate() { +// return rocketMqAdapter.getTemplateByTopicName(RocketMqTopic.VIP_ORDER_PAY_NOTIFY_TOPIC); +// } +//} diff --git a/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/impl/PayInfoServiceImpl.java b/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/impl/PayInfoServiceImpl.java index 9f768dc..a07b235 100644 --- a/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/impl/PayInfoServiceImpl.java +++ b/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/impl/PayInfoServiceImpl.java @@ -1,15 +1,18 @@ package com.bnyer.pay.service.impl; import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.bnyer.common.core.constant.ServiceNameConstants; import com.bnyer.common.core.domain.PayInfo; import com.bnyer.common.core.enums.EnumSceneCode; import com.bnyer.common.core.enums.ResponseEnum; import com.bnyer.common.core.exception.ServiceException; import com.bnyer.common.core.utils.bean.EntityConvertUtil; -import com.bnyer.common.rocketmq.config.RocketMqConstant; +import com.bnyer.common.core.utils.uuid.IdUtils; +import com.bnyer.common.rocketmq.constant.RocketMqTopic; +import com.bnyer.common.rocketmq.domain.order.VipOrderPayNotifyMessage; +import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate; import com.bnyer.pay.bean.dto.AddPayInfoDto; import com.bnyer.pay.bean.dto.EditPayInfoNotifyDto; import com.bnyer.pay.bean.dto.EditPayInfoSingleDto; @@ -17,9 +20,6 @@ import com.bnyer.pay.bean.vo.PayInfoDetailsVo; import com.bnyer.pay.mapper.PayInfoMapper; import com.bnyer.pay.service.PayInfoService; import lombok.extern.slf4j.Slf4j; -import org.apache.rocketmq.client.producer.SendCallback; -import org.apache.rocketmq.client.producer.SendResult; -import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -38,7 +38,7 @@ public class PayInfoServiceImpl extends ServiceImpl impl private PayInfoMapper payInfoMapper; @Autowired - private RocketMQTemplate vipOrderPayNotifyMqTemplate; + private RocketMQEnhanceTemplate rocketMQEnhanceTemplate; /** * 添加支付订单 @@ -81,40 +81,16 @@ public class PayInfoServiceImpl extends ServiceImpl impl //会员充值场景 case VIP_RECHARGE: // 发送消息,订单支付成功 - JSONObject vipRechargeMsgObj = new JSONObject(); - vipRechargeMsgObj.put("orderNo",orderNo); - String vipRechargeMsgStr = JSON.toJSONString(vipRechargeMsgObj); // TODO: 2023/04/23可优化为:添加一张消息日志表,字段:topicName,消息内容,消息状态(发送中、成功、失败),错误信息 - sendMsg(RocketMqConstant.VIP_ORDER_PAY_NOTIFY_TOPIC,vipRechargeMsgStr); -// SendStatus sendStatus = vipOrderPayNotifyMqTemplate.syncSend(RocketMqConstant.VIP_ORDER_PAY_NOTIFY_TOPIC, -// new GenericMessage<>(vipRechargeMsgStr)).getSendStatus(); -// if (!Objects.equals(sendStatus, SendStatus.SEND_OK)) { -// // 消息发不出去就抛异常,因为订单回调会有多次,几乎不可能每次都无法发送出去,发的出去无所谓 -// throw new ServiceException(ResponseEnum.SERVER_ERROR); -// } -// break; + VipOrderPayNotifyMessage vipOrderPayNotifyMessage = new VipOrderPayNotifyMessage(); + vipOrderPayNotifyMessage.setKey(IdUtils.randomUUID()); + vipOrderPayNotifyMessage.setSource(ServiceNameConstants.PAY_SERVICE); + vipOrderPayNotifyMessage.setOrderNo(orderNo); + rocketMQEnhanceTemplate.sendAsyncRetryMsg(RocketMqTopic.VIP_ORDER_PAY_NOTIFY_TOPIC,null, vipOrderPayNotifyMessage); + break; } } - /** - * 发送消息 - * @param topic - * @param msg - * @param - */ - public void sendMsg(String topic, T msg) { - vipOrderPayNotifyMqTemplate.asyncSend(topic, msg,new SendCallback() { - @Override - public void onSuccess(SendResult sendResult) { - log.info("topic:{}消息发送成功,result:{}",topic,JSON.toJSONString(sendResult)); - } - @Override - public void onException(Throwable e) { - log.info("topic:{}消息发送失败,error:{}",topic,e.getMessage()); - } - }); - } - /** * 构建修改支付单信息实体 * @param editPayInfoNotifyDto