21 changed files with 918 additions and 263 deletions
@ -0,0 +1,47 @@ |
|||||
|
package com.bnyer.common.rocketmq.config; |
||||
|
|
||||
|
import lombok.RequiredArgsConstructor; |
||||
|
import org.apache.commons.lang3.StringUtils; |
||||
|
import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer; |
||||
|
import org.jetbrains.annotations.NotNull; |
||||
|
import org.springframework.beans.BeansException; |
||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||
|
import org.springframework.beans.factory.config.BeanPostProcessor; |
||||
|
import org.springframework.context.annotation.Configuration; |
||||
|
|
||||
|
/** |
||||
|
* @author :WXC |
||||
|
* @Date :2023/05/17 |
||||
|
* @description :环境隔离配置:避免测试环境和生产环境可能共用一个RocketMQ环境。如果没有进行处理, |
||||
|
* 在测试环境发送的消息就可能被生产环境的消费者消费,生产环境发送的消息也可能被测试环境的消费者消费,从而导致数据混乱的问题。 |
||||
|
*/ |
||||
|
@Configuration |
||||
|
@RequiredArgsConstructor(onConstructor = @__(@Autowired)) |
||||
|
public class EnvironmentIsolationConfig implements BeanPostProcessor { |
||||
|
|
||||
|
private final RocketEnhanceProperties rocketEnhanceProperties; |
||||
|
|
||||
|
/** |
||||
|
* 在装载Bean之前实现参数修改 |
||||
|
*/ |
||||
|
@Override |
||||
|
public Object postProcessBeforeInitialization(@NotNull Object bean, @NotNull String beanName) throws BeansException { |
||||
|
if(bean instanceof DefaultRocketMQListenerContainer){ |
||||
|
DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean; |
||||
|
if(rocketEnhanceProperties.isEnabledIsolation() && StringUtils.isNotBlank(rocketEnhanceProperties.getEnvironment())){ |
||||
|
String topic = container.getTopic(); |
||||
|
String consumerGroup = container.getConsumerGroup(); |
||||
|
//拼接Topic
|
||||
|
if (!topic.endsWith(rocketEnhanceProperties.getEnvironment())){ |
||||
|
container.setTopic(String.join("-", container.getTopic(),rocketEnhanceProperties.getEnvironment())); |
||||
|
} |
||||
|
//拼接ConsumerGroup
|
||||
|
if (!consumerGroup.endsWith(rocketEnhanceProperties.getEnvironment())){ |
||||
|
container.setConsumerGroup(String.join("-",container.getConsumerGroup(),rocketEnhanceProperties.getEnvironment())); |
||||
|
} |
||||
|
} |
||||
|
return container; |
||||
|
} |
||||
|
return bean; |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,24 @@ |
|||||
|
package com.bnyer.common.rocketmq.config; |
||||
|
|
||||
|
import lombok.Data; |
||||
|
import org.springframework.boot.context.properties.ConfigurationProperties; |
||||
|
|
||||
|
/** |
||||
|
* @author :WXC |
||||
|
* @Date :2023/05/17 |
||||
|
* @description :增强配置类 |
||||
|
*/ |
||||
|
@ConfigurationProperties(prefix = "rocketmq.enhance") |
||||
|
@Data |
||||
|
public class RocketEnhanceProperties { |
||||
|
|
||||
|
/** |
||||
|
* 启动隔离,启动后会自动在topic上拼接激活的配置文件,达到自动隔离的效果 |
||||
|
*/ |
||||
|
private boolean enabledIsolation; |
||||
|
|
||||
|
/** |
||||
|
* 隔离环境名称,拼接到topic后,如:dev,拼接后:topic-dev,默认空字符串 |
||||
|
*/ |
||||
|
private String environment; |
||||
|
} |
||||
@ -0,0 +1,66 @@ |
|||||
|
package com.bnyer.common.rocketmq.config; |
||||
|
|
||||
|
import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate; |
||||
|
import com.fasterxml.jackson.databind.ObjectMapper; |
||||
|
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; |
||||
|
import org.apache.rocketmq.spring.core.RocketMQTemplate; |
||||
|
import org.apache.rocketmq.spring.support.RocketMQMessageConverter; |
||||
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; |
||||
|
import org.springframework.boot.context.properties.EnableConfigurationProperties; |
||||
|
import org.springframework.context.annotation.Bean; |
||||
|
import org.springframework.context.annotation.Configuration; |
||||
|
import org.springframework.context.annotation.Primary; |
||||
|
import org.springframework.messaging.converter.CompositeMessageConverter; |
||||
|
import org.springframework.messaging.converter.MappingJackson2MessageConverter; |
||||
|
import org.springframework.messaging.converter.MessageConverter; |
||||
|
|
||||
|
import java.util.List; |
||||
|
|
||||
|
/** |
||||
|
* @author :WXC |
||||
|
* @Date :2023/05/17 |
||||
|
* @description : |
||||
|
*/ |
||||
|
@Configuration |
||||
|
@EnableConfigurationProperties(RocketEnhanceProperties.class) |
||||
|
public class RocketMQEnhanceAutoConfiguration { |
||||
|
|
||||
|
/** |
||||
|
* 注入增强的RocketMQEnhanceTemplate |
||||
|
*/ |
||||
|
@Bean |
||||
|
public RocketMQEnhanceTemplate rocketMQEnhanceTemplate(RocketMQTemplate rocketMQTemplate){ |
||||
|
return new RocketMQEnhanceTemplate(rocketMQTemplate); |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 解决RocketMQ Jackson不支持Java时间类型配置 |
||||
|
* 源码参考:{@link org.apache.rocketmq.spring.autoconfigure.MessageConverterConfiguration} |
||||
|
*/ |
||||
|
@Bean |
||||
|
@Primary |
||||
|
public RocketMQMessageConverter enhanceRocketMQMessageConverter(){ |
||||
|
RocketMQMessageConverter converter = new RocketMQMessageConverter(); |
||||
|
CompositeMessageConverter compositeMessageConverter = (CompositeMessageConverter) converter.getMessageConverter(); |
||||
|
List<MessageConverter> messageConverterList = compositeMessageConverter.getConverters(); |
||||
|
for (MessageConverter messageConverter : messageConverterList) { |
||||
|
if(messageConverter instanceof MappingJackson2MessageConverter){ |
||||
|
MappingJackson2MessageConverter jackson2MessageConverter = (MappingJackson2MessageConverter) messageConverter; |
||||
|
ObjectMapper objectMapper = jackson2MessageConverter.getObjectMapper(); |
||||
|
objectMapper.registerModules(new JavaTimeModule()); |
||||
|
} |
||||
|
} |
||||
|
return converter; |
||||
|
} |
||||
|
|
||||
|
|
||||
|
/** |
||||
|
* 环境隔离配置 |
||||
|
*/ |
||||
|
@Bean |
||||
|
@ConditionalOnProperty(name="rocketmq.enhance.enabledIsolation", havingValue="true") |
||||
|
public EnvironmentIsolationConfig environmentSetup(RocketEnhanceProperties rocketEnhanceProperties){ |
||||
|
return new EnvironmentIsolationConfig(rocketEnhanceProperties); |
||||
|
} |
||||
|
|
||||
|
} |
||||
@ -1,38 +0,0 @@ |
|||||
package com.bnyer.common.rocketmq.config; |
|
||||
|
|
||||
import org.apache.rocketmq.client.producer.DefaultMQProducer; |
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate; |
|
||||
import org.apache.rocketmq.spring.support.RocketMQMessageConverter; |
|
||||
import org.springframework.beans.factory.annotation.Autowired; |
|
||||
import org.springframework.beans.factory.annotation.Value; |
|
||||
import org.springframework.cloud.context.config.annotation.RefreshScope; |
|
||||
import org.springframework.context.annotation.Configuration; |
|
||||
|
|
||||
/** |
|
||||
* @author :WXC |
|
||||
* @Date :2023/03/24 |
|
||||
* @description : |
|
||||
*/ |
|
||||
@RefreshScope |
|
||||
@Configuration |
|
||||
public class RocketMqAdapter { |
|
||||
|
|
||||
@Autowired |
|
||||
private RocketMQMessageConverter rocketMqMessageConverter; |
|
||||
|
|
||||
@Value("${rocketmq.name-server:}") |
|
||||
private String nameServer; |
|
||||
|
|
||||
public RocketMQTemplate getTemplateByTopicName(String topic){ |
|
||||
RocketMQTemplate mqTemplate = new RocketMQTemplate(); |
|
||||
DefaultMQProducer producer = new DefaultMQProducer(topic); |
|
||||
producer.setNamesrvAddr(nameServer); |
|
||||
producer.setRetryTimesWhenSendFailed(RocketMqConstant.SYNC_RETRY_FAILED_COUNT); |
|
||||
producer.setRetryTimesWhenSendAsyncFailed(RocketMqConstant.ASYNC_RETRY_FAILED_COUNT); |
|
||||
producer.setSendMsgTimeout((int) RocketMqConstant.TIMEOUT); |
|
||||
mqTemplate.setProducer(producer); |
|
||||
mqTemplate.setMessageConverter(rocketMqMessageConverter.getMessageConverter()); |
|
||||
return mqTemplate; |
|
||||
} |
|
||||
|
|
||||
} |
|
||||
@ -1,45 +0,0 @@ |
|||||
package com.bnyer.common.rocketmq.config; |
|
||||
|
|
||||
/** |
|
||||
* @author :WXC |
|
||||
* @Date :2023/03/24 |
|
||||
* @description : |
|
||||
*/ |
|
||||
public class RocketMqConstant { |
|
||||
/** |
|
||||
* 默认发送消息超时时间 |
|
||||
*/ |
|
||||
public static final long TIMEOUT = 3000; |
|
||||
|
|
||||
/** |
|
||||
* 延迟队列取消订单时间,实际上30分钟 |
|
||||
* 按顺序匹配:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h (1-18) |
|
||||
*/ |
|
||||
public static final int CANCEL_ORDER_DELAY_LEVEL = 16; |
|
||||
|
|
||||
/** |
|
||||
* 发送同步消息失败重试次数,默认2 |
|
||||
*/ |
|
||||
public static final int SYNC_RETRY_FAILED_COUNT = 2; |
|
||||
|
|
||||
/** |
|
||||
* 发送异步消息失败重试次数,默认2 |
|
||||
*/ |
|
||||
public static final int ASYNC_RETRY_FAILED_COUNT = 2; |
|
||||
|
|
||||
/** |
|
||||
* vip订单取消 |
|
||||
*/ |
|
||||
public static final String VIP_ORDER_CANCEL_TOPIC = "vip-order-cancel-topic"; |
|
||||
|
|
||||
/** |
|
||||
* vip订单支付成功 |
|
||||
*/ |
|
||||
public static final String VIP_ORDER_PAY_NOTIFY_TOPIC = "vip-order-pay-notify-topic"; |
|
||||
|
|
||||
/** |
|
||||
* vip记录创建 |
|
||||
*/ |
|
||||
public static final String VIP_RECORD_CREATE_TOPIC = "vip-record-create-topic"; |
|
||||
|
|
||||
} |
|
||||
@ -0,0 +1,25 @@ |
|||||
|
package com.bnyer.common.rocketmq.constant; |
||||
|
|
||||
|
/** |
||||
|
* @author :WXC |
||||
|
* @Date :2023/03/24 |
||||
|
* @description : mq常量池 |
||||
|
*/ |
||||
|
public class RocketMqConstant { |
||||
|
|
||||
|
/** |
||||
|
* 数据来源前缀 |
||||
|
*/ |
||||
|
public static final String RETRY_PREFIX = "bnyer-"; |
||||
|
|
||||
|
/** |
||||
|
* 按顺序匹配:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h (1-18) |
||||
|
*/ |
||||
|
public static final int FIVE_SECOND = 2; |
||||
|
|
||||
|
/** |
||||
|
* 延迟队列取消订单时间,实际上30分钟 |
||||
|
* 按顺序匹配:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h (1-18) |
||||
|
*/ |
||||
|
public static final int THIRTY_MINUTES = 16; |
||||
|
} |
||||
@ -0,0 +1,25 @@ |
|||||
|
package com.bnyer.common.rocketmq.constant; |
||||
|
|
||||
|
/** |
||||
|
* @author :WXC |
||||
|
* @Date :2023/05/17 |
||||
|
* @description : 消息主题 |
||||
|
*/ |
||||
|
public class RocketMqTopic { |
||||
|
|
||||
|
/** |
||||
|
* vip订单取消 |
||||
|
*/ |
||||
|
public static final String VIP_ORDER_CANCEL_TOPIC = "vip-order-cancel-topic"; |
||||
|
|
||||
|
/** |
||||
|
* vip订单支付成功 |
||||
|
*/ |
||||
|
public static final String VIP_ORDER_PAY_NOTIFY_TOPIC = "vip-order-pay-notify-topic"; |
||||
|
|
||||
|
/** |
||||
|
* vip记录创建 |
||||
|
*/ |
||||
|
public static final String VIP_RECORD_CREATE_TOPIC = "vip-record-create-topic"; |
||||
|
|
||||
|
} |
||||
@ -0,0 +1,31 @@ |
|||||
|
package com.bnyer.common.rocketmq.domain; |
||||
|
|
||||
|
import lombok.Data; |
||||
|
|
||||
|
import java.time.LocalDateTime; |
||||
|
|
||||
|
/** |
||||
|
* @author :WXC |
||||
|
* @Date :2023/05/17 |
||||
|
* @description :消息实体,所有消息都需要继承此类 |
||||
|
*/ |
||||
|
@Data |
||||
|
public class BaseMessage { |
||||
|
/** |
||||
|
* 业务键,用于RocketMQ控制台查看消费情况 |
||||
|
*/ |
||||
|
protected String key; |
||||
|
/** |
||||
|
* 发送消息来源,用于排查问题 |
||||
|
*/ |
||||
|
protected String source = ""; |
||||
|
/** |
||||
|
* 发送时间 |
||||
|
*/ |
||||
|
protected LocalDateTime sendTime = LocalDateTime.now(); |
||||
|
/** |
||||
|
* 重试次数,用于判断重试次数,超过重试次数发送异常警告 |
||||
|
*/ |
||||
|
protected Integer retryTimes = 0; |
||||
|
|
||||
|
} |
||||
@ -0,0 +1,50 @@ |
|||||
|
package com.bnyer.common.rocketmq.domain.img; |
||||
|
|
||||
|
import com.bnyer.common.rocketmq.domain.BaseMessage; |
||||
|
import com.fasterxml.jackson.annotation.JsonFormat; |
||||
|
import io.swagger.annotations.ApiModelProperty; |
||||
|
import lombok.Getter; |
||||
|
import lombok.NoArgsConstructor; |
||||
|
import lombok.Setter; |
||||
|
|
||||
|
import java.util.Date; |
||||
|
|
||||
|
/** |
||||
|
* @author :WXC |
||||
|
* @Date :2023/05/17 |
||||
|
* @description : 添加用户会员记录 |
||||
|
*/ |
||||
|
@Getter |
||||
|
@Setter |
||||
|
@NoArgsConstructor |
||||
|
public class AddUserVipRecordMessage extends BaseMessage { |
||||
|
|
||||
|
@ApiModelProperty(value="订单号") |
||||
|
private String orderNo; |
||||
|
|
||||
|
@ApiModelProperty(value="用户id") |
||||
|
private Long userId; |
||||
|
|
||||
|
@ApiModelProperty(value="用户手机号") |
||||
|
private String phone; |
||||
|
|
||||
|
@ApiModelProperty(value="vip表id") |
||||
|
private Long vipId; |
||||
|
|
||||
|
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") |
||||
|
@ApiModelProperty(value="开始时间") |
||||
|
private Date startTime; |
||||
|
|
||||
|
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") |
||||
|
@ApiModelProperty(value="到期时间") |
||||
|
private Date endTime; |
||||
|
|
||||
|
@ApiModelProperty(value="vip名称") |
||||
|
private String vipName; |
||||
|
|
||||
|
@ApiModelProperty(value = "vip类型名称") |
||||
|
private String vipTypeName; |
||||
|
|
||||
|
@ApiModelProperty(value = "用户客户端类型:10用户-抖音 20用户-快手 30用户-微信 40艺术家-微信") |
||||
|
private Integer userClientType; |
||||
|
} |
||||
@ -0,0 +1,21 @@ |
|||||
|
package com.bnyer.common.rocketmq.domain.order; |
||||
|
|
||||
|
import com.bnyer.common.rocketmq.domain.BaseMessage; |
||||
|
import lombok.Getter; |
||||
|
import lombok.NoArgsConstructor; |
||||
|
import lombok.Setter; |
||||
|
|
||||
|
/** |
||||
|
* @author :WXC |
||||
|
* @Date :2023/05/17 |
||||
|
* @description : 会员订单取消 |
||||
|
*/ |
||||
|
@Getter |
||||
|
@Setter |
||||
|
@NoArgsConstructor |
||||
|
public class VipOrderCancelMessage extends BaseMessage { |
||||
|
/** |
||||
|
* 订单号 |
||||
|
*/ |
||||
|
private String orderNo; |
||||
|
} |
||||
@ -0,0 +1,21 @@ |
|||||
|
package com.bnyer.common.rocketmq.domain.order; |
||||
|
|
||||
|
import com.bnyer.common.rocketmq.domain.BaseMessage; |
||||
|
import lombok.Getter; |
||||
|
import lombok.NoArgsConstructor; |
||||
|
import lombok.Setter; |
||||
|
|
||||
|
/** |
||||
|
* @author :WXC |
||||
|
* @Date :2023/05/17 |
||||
|
* @description : 会员订单支付回调 |
||||
|
*/ |
||||
|
@Getter |
||||
|
@Setter |
||||
|
@NoArgsConstructor |
||||
|
public class VipOrderPayNotifyMessage extends BaseMessage { |
||||
|
/** |
||||
|
* 订单号 |
||||
|
*/ |
||||
|
private String orderNo; |
||||
|
} |
||||
@ -0,0 +1,151 @@ |
|||||
|
package com.bnyer.common.rocketmq.handle; |
||||
|
|
||||
|
import com.alibaba.fastjson.JSONObject; |
||||
|
import com.bnyer.common.rocketmq.constant.RocketMqConstant; |
||||
|
import com.bnyer.common.rocketmq.domain.BaseMessage; |
||||
|
import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate; |
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
import org.apache.rocketmq.client.producer.SendResult; |
||||
|
import org.apache.rocketmq.client.producer.SendStatus; |
||||
|
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; |
||||
|
|
||||
|
import javax.annotation.Resource; |
||||
|
|
||||
|
/** |
||||
|
* @author :WXC |
||||
|
* @Date :2023/05/17 |
||||
|
* @description : 消息模板抽象类,父类提供公共模板方法 |
||||
|
*/ |
||||
|
@Slf4j |
||||
|
public abstract class EnhanceMessageHandler<T extends BaseMessage> { |
||||
|
/** |
||||
|
* 默认重试次数 |
||||
|
*/ |
||||
|
private static final int MAX_RETRY_TIMES = 3; |
||||
|
|
||||
|
/** |
||||
|
* 延时等级 |
||||
|
*/ |
||||
|
private static final int DELAY_LEVEL = RocketMqConstant.FIVE_SECOND; |
||||
|
|
||||
|
@Resource |
||||
|
private RocketMQEnhanceTemplate rocketMQEnhanceTemplate; |
||||
|
|
||||
|
/** |
||||
|
* 消息处理 |
||||
|
* |
||||
|
* @param message 待处理消息 |
||||
|
* @throws Exception 消费异常 |
||||
|
*/ |
||||
|
protected abstract void handleMessage(T message) throws Exception; |
||||
|
|
||||
|
/** |
||||
|
* 超过重试次数消息,需要启用isRetry |
||||
|
* |
||||
|
* @param message 待处理消息 |
||||
|
*/ |
||||
|
protected abstract void handleMaxRetriesExceeded(T message); |
||||
|
|
||||
|
|
||||
|
/** |
||||
|
* 是否需要根据业务规则过滤消息,去重逻辑可以在此处处理 |
||||
|
* @param message 待处理消息 |
||||
|
* @return true: 本次消息被过滤,false:不过滤 |
||||
|
*/ |
||||
|
protected abstract boolean filter(T message); |
||||
|
|
||||
|
/** |
||||
|
* 是否异常时重复发送 |
||||
|
* |
||||
|
* @return true: 消息重试,false:不重试 |
||||
|
*/ |
||||
|
protected abstract boolean isRetry(); |
||||
|
|
||||
|
/** |
||||
|
* 消费异常时是否抛出异常 |
||||
|
* 返回true,则由rocketmq机制自动重试 |
||||
|
* false:消费异常(如果没有开启重试则消息会被自动ack) |
||||
|
*/ |
||||
|
protected abstract boolean throwException(); |
||||
|
|
||||
|
/** |
||||
|
* 最大重试次数 |
||||
|
* |
||||
|
* @return 最大重试次数,默认3次 |
||||
|
*/ |
||||
|
protected int getMaxRetryTimes() { |
||||
|
return MAX_RETRY_TIMES; |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* isRetry开启时,重新入队延迟时间 |
||||
|
* @return -1:立即入队重试 |
||||
|
*/ |
||||
|
protected int getDelayLevel() { |
||||
|
return DELAY_LEVEL; |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 使用模板模式构建消息消费框架,可自由扩展或删减 |
||||
|
*/ |
||||
|
public void dispatchMessage(T message) { |
||||
|
// 基础日志记录被父类处理了
|
||||
|
log.info("消费者收到消息[{}]", JSONObject.toJSON(message)); |
||||
|
if (filter(message)) { |
||||
|
log.info("消息id{}不满足消费条件,已过滤。",message.getKey()); |
||||
|
return; |
||||
|
} |
||||
|
//超过最大重试次数时调用子类方法处理
|
||||
|
if (message.getRetryTimes() > getMaxRetryTimes()) { |
||||
|
handleMaxRetriesExceeded(message); |
||||
|
return; |
||||
|
} |
||||
|
try { |
||||
|
long now = System.currentTimeMillis(); |
||||
|
handleMessage(message); |
||||
|
long costTime = System.currentTimeMillis() - now; |
||||
|
log.info("消息{}消费成功,耗时[{}ms]", message.getKey(),costTime); |
||||
|
} catch (Exception e) { |
||||
|
log.error("消息{}消费异常", message.getKey(),e); |
||||
|
// 是捕获异常还是抛出,由子类决定
|
||||
|
if (throwException()) { |
||||
|
//抛出异常,由DefaultMessageListenerConcurrently类处理
|
||||
|
throw new RuntimeException(e); |
||||
|
} |
||||
|
//此时如果不开启重试机制,则默认ACK了
|
||||
|
if (isRetry()) { |
||||
|
handleRetry(message); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
protected void handleRetry(T message) { |
||||
|
// 获取子类RocketMQMessageListener注解拿到topic和tag
|
||||
|
RocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class); |
||||
|
if (annotation == null) { |
||||
|
return; |
||||
|
} |
||||
|
//重新构建消息体
|
||||
|
String messageSource = message.getSource(); |
||||
|
if(!messageSource.startsWith(RocketMqConstant.RETRY_PREFIX)){ |
||||
|
message.setSource(RocketMqConstant.RETRY_PREFIX + messageSource); |
||||
|
} |
||||
|
message.setRetryTimes(message.getRetryTimes() + 1); |
||||
|
|
||||
|
SendResult sendResult; |
||||
|
|
||||
|
try { |
||||
|
// 如果消息发送不成功,则再次重新发送,如果发送异常则抛出由MQ再次处理(异常时不走延迟消息)
|
||||
|
sendResult = rocketMQEnhanceTemplate.send(annotation.topic(), annotation.selectorExpression(), message, getDelayLevel()); |
||||
|
} catch (Exception ex) { |
||||
|
// 此处捕获之后,相当于此条消息被消息完成然后重新发送新的消息
|
||||
|
//由生产者直接发送
|
||||
|
throw new RuntimeException(ex); |
||||
|
} |
||||
|
// 发送失败的处理就是不进行ACK,由RocketMQ重试
|
||||
|
if (sendResult.getSendStatus() != SendStatus.SEND_OK) { |
||||
|
throw new RuntimeException("重试消息发送失败"); |
||||
|
} |
||||
|
|
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,238 @@ |
|||||
|
package com.bnyer.common.rocketmq.template; |
||||
|
|
||||
|
import com.alibaba.fastjson.JSON; |
||||
|
import com.alibaba.fastjson.JSONObject; |
||||
|
import com.bnyer.common.rocketmq.config.RocketEnhanceProperties; |
||||
|
import com.bnyer.common.rocketmq.constant.RocketMqConstant; |
||||
|
import com.bnyer.common.rocketmq.domain.BaseMessage; |
||||
|
import lombok.RequiredArgsConstructor; |
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
import org.apache.commons.lang3.StringUtils; |
||||
|
import org.apache.rocketmq.client.producer.SendCallback; |
||||
|
import org.apache.rocketmq.client.producer.SendResult; |
||||
|
import org.apache.rocketmq.spring.core.RocketMQTemplate; |
||||
|
import org.apache.rocketmq.spring.support.RocketMQHeaders; |
||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||
|
import org.springframework.messaging.Message; |
||||
|
import org.springframework.messaging.support.MessageBuilder; |
||||
|
|
||||
|
import javax.annotation.Resource; |
||||
|
|
||||
|
/** |
||||
|
* @author :WXC |
||||
|
* @Date :2023/05/17 |
||||
|
* @description : RocketMq增强请求模板类 |
||||
|
*/ |
||||
|
@Slf4j |
||||
|
@RequiredArgsConstructor(onConstructor = @__(@Autowired)) |
||||
|
public class RocketMQEnhanceTemplate { |
||||
|
|
||||
|
/** |
||||
|
* 延时等级 |
||||
|
*/ |
||||
|
private static final int DELAY_LEVEL = RocketMqConstant.FIVE_SECOND; |
||||
|
|
||||
|
/** |
||||
|
* 默认重试次数 |
||||
|
*/ |
||||
|
private static final int MAX_RETRY_TIMES = 3; |
||||
|
|
||||
|
private final RocketMQTemplate template; |
||||
|
|
||||
|
@Resource |
||||
|
private RocketEnhanceProperties rocketEnhanceProperties; |
||||
|
|
||||
|
public RocketMQTemplate getTemplate() { |
||||
|
return template; |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 根据系统上下文自动构建隔离后的topic |
||||
|
* 构建目的地 |
||||
|
*/ |
||||
|
public String buildDestination(String topic, String tag) { |
||||
|
topic = reBuildTopic(topic); |
||||
|
return StringUtils.isNotBlank(tag)?topic + ":" + tag:topic; |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 根据环境重新隔离topic |
||||
|
* @param topic 原始topic |
||||
|
*/ |
||||
|
private String reBuildTopic(String topic) { |
||||
|
if(rocketEnhanceProperties.isEnabledIsolation() && StringUtils.isNoneBlank(rocketEnhanceProperties.getEnvironment())){ |
||||
|
return topic +"-" + rocketEnhanceProperties.getEnvironment(); |
||||
|
} |
||||
|
return topic; |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 发送同步消息 |
||||
|
* @param topic |
||||
|
* @param tag |
||||
|
* @param message |
||||
|
* @return |
||||
|
* @param <T> |
||||
|
*/ |
||||
|
public <T extends BaseMessage> SendResult send(String topic, String tag, T message) { |
||||
|
// 注意分隔符
|
||||
|
return send(buildDestination(topic,tag), message); |
||||
|
} |
||||
|
|
||||
|
|
||||
|
/** |
||||
|
* 发送同步消息 |
||||
|
* @param destination |
||||
|
* @param message |
||||
|
* @return |
||||
|
* @param <T> |
||||
|
*/ |
||||
|
public <T extends BaseMessage> SendResult send(String destination, T message) { |
||||
|
// 设置业务键,此处根据公共的参数进行处理
|
||||
|
// 更多的其它基础业务处理...
|
||||
|
Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build(); |
||||
|
SendResult sendResult = template.syncSend(destination, sendMessage); |
||||
|
// 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如ELK采集
|
||||
|
log.info("[{}]同步消息[{}]发送结果[{}]", destination, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); |
||||
|
return sendResult; |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 发送同步延时消息 |
||||
|
* @param topic |
||||
|
* @param tag |
||||
|
* @param message |
||||
|
* @param delayLevel |
||||
|
* @return |
||||
|
* @param <T> |
||||
|
*/ |
||||
|
public <T extends BaseMessage> SendResult send(String topic, String tag, T message, int delayLevel) { |
||||
|
return send(buildDestination(topic,tag), message, delayLevel); |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 发送同步延时消息 |
||||
|
* @param destination |
||||
|
* @param message |
||||
|
* @param delayLevel |
||||
|
* @return |
||||
|
* @param <T> |
||||
|
*/ |
||||
|
public <T extends BaseMessage> SendResult send(String destination, T message, int delayLevel) { |
||||
|
Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build(); |
||||
|
SendResult sendResult = template.syncSend(destination, sendMessage, 3000, delayLevel); |
||||
|
log.info("[{}]延迟等级[{}]消息[{}]发送结果[{}]", destination, delayLevel, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); |
||||
|
return sendResult; |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 发送异步消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑) |
||||
|
* (适合对响应时间敏感的业务场景) |
||||
|
*/ |
||||
|
public <T extends BaseMessage> void sendAsyncMsg(String topic, String tag, T message) { |
||||
|
sendAsyncMsg(buildDestination(topic,tag),message); |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 发送异步消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑) |
||||
|
* (适合对响应时间敏感的业务场景) |
||||
|
*/ |
||||
|
public <T extends BaseMessage> void sendAsyncMsg(String destination, T message) { |
||||
|
template.asyncSend(destination, MessageBuilder.withPayload(message).build(), new SendCallback() { |
||||
|
@Override |
||||
|
public void onSuccess(SendResult sendResult) { |
||||
|
// 处理消息发送成功逻辑
|
||||
|
log.info("消息发送成功,destination:{},result:{}",destination, JSON.toJSONString(sendResult)); |
||||
|
} |
||||
|
@Override |
||||
|
public void onException(Throwable throwable) { |
||||
|
// 处理消息发送失败逻辑
|
||||
|
log.error("消息发送失败,destination:{},error:{}",destination,throwable.getMessage()); |
||||
|
} |
||||
|
}); |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 发送异步可重试消息(通过线程池执行发送到broker的消息任务,执行完后回调:如果发送失败进行重试) |
||||
|
* |
||||
|
*/ |
||||
|
public <T extends BaseMessage> void sendAsyncRetryMsg(String topic, String tag, T message) { |
||||
|
sendAsyncMsg(buildDestination(topic,tag),message,getDelayLevel()); |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 发送异步延时消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑) |
||||
|
* |
||||
|
*/ |
||||
|
public <T extends BaseMessage> void sendAsyncMsg(String topic, String tag, T message, int delayLevel) { |
||||
|
sendAsyncMsg(buildDestination(topic,tag),message,delayLevel); |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 发送异步延时消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑) |
||||
|
* (适合对响应时间敏感的业务场景) |
||||
|
*/ |
||||
|
public <T extends BaseMessage> void sendAsyncMsg(String destination, T message, int delayLevel) { |
||||
|
template.asyncSend(destination, MessageBuilder.withPayload(message).build(), new SendCallback() { |
||||
|
@Override |
||||
|
public void onSuccess(SendResult sendResult) { |
||||
|
// 处理消息发送成功逻辑
|
||||
|
log.info("消息发送成功,destination:{},result:{}",destination,JSON.toJSONString(sendResult)); |
||||
|
} |
||||
|
@Override |
||||
|
public void onException(Throwable throwable) { |
||||
|
log.error("消息发送失败,destination:{},error:{}",destination,throwable.getMessage()); |
||||
|
// 超过最大重试次数时调用子类方法处理
|
||||
|
if (message.getRetryTimes() > getMaxRetryTimes()) { |
||||
|
handleMaxRetriesExceeded(message); |
||||
|
return; |
||||
|
} |
||||
|
//发送失败,重试发送
|
||||
|
handleRetry(destination,message); |
||||
|
} |
||||
|
/** |
||||
|
* 重试发送异步消息 |
||||
|
* @param destination |
||||
|
* @param message |
||||
|
* @param <T> |
||||
|
*/ |
||||
|
private <T extends BaseMessage> void handleRetry(String destination,T message) { |
||||
|
//重新构建消息体
|
||||
|
String messageSource = message.getSource(); |
||||
|
if(!messageSource.startsWith(RocketMqConstant.RETRY_PREFIX)){ |
||||
|
message.setSource(RocketMqConstant.RETRY_PREFIX + messageSource); |
||||
|
} |
||||
|
message.setRetryTimes(message.getRetryTimes() + 1); |
||||
|
// 如果消息发送不成功,则再次重新发送
|
||||
|
sendAsyncMsg(destination, message,getDelayLevel()); |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 超过最大重试次数调用该方法处理 |
||||
|
* @param message |
||||
|
* @param <T> |
||||
|
*/ |
||||
|
private <T extends BaseMessage> void handleMaxRetriesExceeded(T message) { |
||||
|
log.error("发送消息超过最大重试次数,message:{}",message); |
||||
|
} |
||||
|
},delayLevel); |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* isRetry开启时,重新入队延迟时间 |
||||
|
* @return -1:立即入队重试 |
||||
|
*/ |
||||
|
protected int getDelayLevel() { |
||||
|
return DELAY_LEVEL; |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 最大重试次数 |
||||
|
* |
||||
|
* @return 最大重试次数,默认3次 |
||||
|
*/ |
||||
|
protected int getMaxRetryTimes() { |
||||
|
return MAX_RETRY_TIMES; |
||||
|
} |
||||
|
|
||||
|
} |
||||
@ -1,35 +1,35 @@ |
|||||
package com.bnyer.order.config; |
//package com.bnyer.order.config;
|
||||
|
//
|
||||
import com.bnyer.common.rocketmq.config.RocketMqAdapter; |
//import com.bnyer.common.rocketmq.adapter.RocketMqAdapter;
|
||||
import com.bnyer.common.rocketmq.config.RocketMqConstant; |
//import com.bnyer.common.rocketmq.constant.RocketMqTopic;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate; |
//import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate;
|
||||
import org.springframework.beans.factory.annotation.Autowired; |
//import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.cloud.context.config.annotation.RefreshScope; |
//import org.springframework.cloud.context.config.annotation.RefreshScope;
|
||||
import org.springframework.context.annotation.Bean; |
//import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration; |
//import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Lazy; |
//import org.springframework.context.annotation.Lazy;
|
||||
|
//
|
||||
/** |
///**
|
||||
* @author :WXC |
// * @author :WXC
|
||||
* @Date :2023/03/24 |
// * @Date :2023/03/24
|
||||
* @description : |
// * @description :
|
||||
*/ |
// */
|
||||
@RefreshScope |
//@RefreshScope
|
||||
@Configuration |
//@Configuration
|
||||
public class RocketMqConfig { |
//public class RocketMqConfig {
|
||||
@Autowired |
// @Autowired
|
||||
private RocketMqAdapter rocketMqAdapter; |
// private RocketMqAdapter rocketMqAdapter;
|
||||
|
//
|
||||
@Lazy |
// @Lazy
|
||||
@Bean(destroyMethod = "destroy") |
// @Bean(destroyMethod = "destroy")
|
||||
public RocketMQTemplate orderCancelMqTemplate() { |
// public RocketMQEnhanceTemplate vipOrderCancelMqTemplate() {
|
||||
return rocketMqAdapter.getTemplateByTopicName(RocketMqConstant.VIP_ORDER_CANCEL_TOPIC); |
// return rocketMqAdapter.getTemplateByTopicName(RocketMqTopic.VIP_ORDER_CANCEL_TOPIC);
|
||||
} |
// }
|
||||
|
//
|
||||
@Lazy |
// @Lazy
|
||||
@Bean(destroyMethod = "destroy") |
// @Bean(destroyMethod = "destroy")
|
||||
public RocketMQTemplate vipRecordMqTemplate() { |
// public RocketMQEnhanceTemplate vipRecordCreateMqTemplate() {
|
||||
return rocketMqAdapter.getTemplateByTopicName(RocketMqConstant.VIP_RECORD_CREATE_TOPIC); |
// return rocketMqAdapter.getTemplateByTopicName(RocketMqTopic.VIP_RECORD_CREATE_TOPIC);
|
||||
} |
// }
|
||||
|
//
|
||||
} |
//}
|
||||
|
|||||
@ -1,28 +1,28 @@ |
|||||
package com.bnyer.pay.config; |
//package com.bnyer.pay.config;
|
||||
|
//
|
||||
import com.bnyer.common.rocketmq.config.RocketMqAdapter; |
//import com.bnyer.common.rocketmq.adapter.RocketMqAdapter;
|
||||
import com.bnyer.common.rocketmq.config.RocketMqConstant; |
//import com.bnyer.common.rocketmq.constant.RocketMqTopic;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate; |
//import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate;
|
||||
import org.springframework.beans.factory.annotation.Autowired; |
//import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.cloud.context.config.annotation.RefreshScope; |
//import org.springframework.cloud.context.config.annotation.RefreshScope;
|
||||
import org.springframework.context.annotation.Bean; |
//import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration; |
//import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Lazy; |
//import org.springframework.context.annotation.Lazy;
|
||||
|
//
|
||||
/** |
///**
|
||||
* @author :WXC |
// * @author :WXC
|
||||
* @Date :2023/03/24 |
// * @Date :2023/03/24
|
||||
* @description : |
// * @description :
|
||||
*/ |
// */
|
||||
@RefreshScope |
//@RefreshScope
|
||||
@Configuration |
//@Configuration
|
||||
public class RocketMqConfig { |
//public class RocketMqConfig {
|
||||
@Autowired |
// @Autowired
|
||||
private RocketMqAdapter rocketMqAdapter; |
// private RocketMqAdapter rocketMqAdapter;
|
||||
|
//
|
||||
@Lazy |
// @Lazy
|
||||
@Bean(destroyMethod = "destroy") |
// @Bean(destroyMethod = "destroy")
|
||||
public RocketMQTemplate vipOrderPayNotifyMqTemplate() { |
// public RocketMQEnhanceTemplate vipOrderPayNotifyMqTemplate() {
|
||||
return rocketMqAdapter.getTemplateByTopicName(RocketMqConstant.VIP_ORDER_PAY_NOTIFY_TOPIC); |
// return rocketMqAdapter.getTemplateByTopicName(RocketMqTopic.VIP_ORDER_PAY_NOTIFY_TOPIC);
|
||||
} |
// }
|
||||
} |
//}
|
||||
|
|||||
Loading…
Reference in new issue