diff --git a/bnyer-common/bnyer-common-core/src/main/java/com/bnyer/common/core/domain/BaseMqMessage.java b/bnyer-common/bnyer-common-core/src/main/java/com/bnyer/common/core/domain/BaseMqMessage.java new file mode 100644 index 0000000..da17c87 --- /dev/null +++ b/bnyer-common/bnyer-common-core/src/main/java/com/bnyer/common/core/domain/BaseMqMessage.java @@ -0,0 +1,74 @@ +package com.bnyer.common.core.domain; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableId; +import com.bnyer.common.core.enums.EnumMessageStatus; +import com.fasterxml.jackson.annotation.JsonFormat; +import lombok.Data; + +import java.util.Date; + +/** + * @author :WXC + * @Date :2023/05/18 + * @description :消息记录公共实体 + */ +@Data +public class BaseMqMessage { + + @TableId(value = "id", type = IdType.ASSIGN_ID) + private Long id; + + /** + * 消息key,作为redis的key,消费的时候判断是否存在, + * 存在就判断状态是否是已消费,如果不是就进行消费,如果是就过滤该消息 + */ + @TableField(value = "message_key") + private String messageKey; + + /** + * 消息主题 + */ + @TableField(value = "topic") + private String topic; + + /** + * 消息tag + */ + @TableField(value = "tag") + private String tag; + + /** + * 消费组名称 + */ + @TableField(value = "consumer_group_name") + private String consumerGroupName; + + /** + * 消息状态 + */ + @TableField(value = "status") + private EnumMessageStatus status; + + /** + * 消息内容 + */ + @TableField(value = "content") + private String content; + + /** + * 错误信息 + */ + @TableField(value = "err_msg") + private String errMsg; + + /** + * 创建时间 + */ + @TableField(value = "create_time") + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private Date createTime; + + +} diff --git a/bnyer-common/bnyer-common-core/src/main/java/com/bnyer/common/core/domain/OrderMqMessageRecord.java b/bnyer-common/bnyer-common-core/src/main/java/com/bnyer/common/core/domain/OrderMqMessageRecord.java new file mode 100644 index 0000000..4f183bc --- /dev/null +++ b/bnyer-common/bnyer-common-core/src/main/java/com/bnyer/common/core/domain/OrderMqMessageRecord.java @@ -0,0 +1,19 @@ +package com.bnyer.common.core.domain; + +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +/** + * @author :WXC + * @Date :2023/05/18 + * @description :订单服务本地消息表 + */ +@Getter +@Setter +@NoArgsConstructor +@TableName(value = "order_mq_message_record") +public class OrderMqMessageRecord extends BaseMqMessage{ + +} diff --git a/bnyer-common/bnyer-common-core/src/main/java/com/bnyer/common/core/enums/EnumMessageStatus.java b/bnyer-common/bnyer-common-core/src/main/java/com/bnyer/common/core/enums/EnumMessageStatus.java new file mode 100644 index 0000000..d06f4af --- /dev/null +++ b/bnyer-common/bnyer-common-core/src/main/java/com/bnyer/common/core/enums/EnumMessageStatus.java @@ -0,0 +1,22 @@ +package com.bnyer.common.core.enums; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +/** + * @author :WXC + * @Date :2023/05/18 + * @description :消息状态 + */ +@Getter +@AllArgsConstructor +public enum EnumMessageStatus { + //处理中 + PROCESS, + //成功 + SUCCESS, + //失败, + FAILS, + //作废 + INVALID, +} diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/config/RepeatConsumerConfig.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/config/RepeatConsumerConfig.java new file mode 100644 index 0000000..8926742 --- /dev/null +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/config/RepeatConsumerConfig.java @@ -0,0 +1,91 @@ +package com.bnyer.common.rocketmq.config; + + +import com.bnyer.common.rocketmq.persist.IPersist; +import com.bnyer.common.rocketmq.persist.RedisPersist; +import lombok.Getter; +import lombok.ToString; +import org.springframework.data.redis.core.StringRedisTemplate; + +/** + * @author :WXC + * @Date :2023/05/19 + * @description :重复消费配置 + */ +@Getter +@ToString +public class RepeatConsumerConfig { + + /** + * 不启用去重 + */ + public static final int REPEAT_STRATEGY_DISABLE = 0; + /** + * 开启去重,发现有处理中的消息,后面再重试 + */ + public static final int REPEAT_STRATEGY_CONSUME_LATER = 1; + /** + * 用以标记去重的时候是哪个应用消费的,同一个应用才需要去重 + */ + private final String applicationName; + + private IPersist persist; + + + /** + * 去重策略,默认不去重 + */ + private int repeatStrategy = REPEAT_STRATEGY_DISABLE; + + + /** + * 对于消费中的消息,多少毫秒内认为重复,默认一分钟,即一分钟内的重复消息都会串行处理(等待前一个消息消费成功/失败),超过这个时间如果消息还在消费就不认为重复了(为了防止消息丢失) + */ + private long processingExpireMilliSeconds = 60 * 1000; + + /** + * 消息消费成功后,记录保留多少分钟,默认一天,即一天内的消息不会重复 + */ + private long recordReserveMinutes = 60 * 24; + + private RepeatConsumerConfig(String applicationName, int repeatStrategy, StringRedisTemplate redisTemplate) { + if (redisTemplate !=null) { + this.persist = new RedisPersist(redisTemplate); + } + this.repeatStrategy = repeatStrategy; + this.applicationName = applicationName; + } + + private RepeatConsumerConfig(String applicationName) { + this.repeatStrategy = REPEAT_STRATEGY_DISABLE; + this.applicationName = applicationName; + } + + + /** + * 利用redis去重 + * @param applicationName + * @param redisTemplate + * @return + */ + public static RepeatConsumerConfig enableRedisConfig(String applicationName, StringRedisTemplate redisTemplate) { + return new RepeatConsumerConfig(applicationName, REPEAT_STRATEGY_CONSUME_LATER, redisTemplate); + } + + + public static RepeatConsumerConfig disableConfig(String applicationName) { + return new RepeatConsumerConfig(applicationName); + } + + + + public void setProcessingExpireMilliSeconds(long processingExpireMilliSeconds) { + this.processingExpireMilliSeconds = processingExpireMilliSeconds; + } + + public void setRecordReserveMinutes(long recordReserveMinutes) { + this.recordReserveMinutes = recordReserveMinutes; + } + + +} \ No newline at end of file diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/constant/RocketMqRepeatConstant.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/constant/RocketMqRepeatConstant.java new file mode 100644 index 0000000..ec24b49 --- /dev/null +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/constant/RocketMqRepeatConstant.java @@ -0,0 +1,23 @@ +package com.bnyer.common.rocketmq.constant; + +/** + * @author :WXC + * @Date :2023/05/20 + * @description : + */ +public class RocketMqRepeatConstant { + /** + * 消费中 + */ + public final static String CONSUME_STATUS_CONSUMING = "CONSUME_ING"; + /** + * 消费成功 + */ + public final static String CONSUME_STATUS_SUCCESS = "CONSUME_SUCCESS"; + + /** + * 重复消息redis前缀 + */ + public final static String REPEAT_REDIS_KEY_PREFIX = "RepeatMessage:"; + +} diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/constant/RocketMqTopic.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/constant/RocketMqTopic.java index b3a1bcb..28c4146 100644 --- a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/constant/RocketMqTopic.java +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/constant/RocketMqTopic.java @@ -7,6 +7,11 @@ package com.bnyer.common.rocketmq.constant; */ public class RocketMqTopic { + /** + * 订单服务本地消息表记录返回队列 + */ + public static final String ORDER_RETURN_MSG_TOPIC = "order-rerun-msg-topic"; + /** * 订单取消 */ diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/BaseMessage.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/BaseMessage.java index 1171c40..d70ab58 100644 --- a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/BaseMessage.java +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/BaseMessage.java @@ -12,13 +12,29 @@ import java.time.LocalDateTime; @Data public class BaseMessage { /** - * 业务键,用于RocketMQ控制台查看消费情况 + * 消息key,消息唯一标识 */ - protected String key; + protected String messageKey; + /** * 发送消息来源,用于排查问题 */ protected String source = ""; + + /** + * 消息主题 + */ + protected String topic; + + /** + * 消息tag + */ + protected String tag; + + /** + * 消费组名称 + */ + protected String consumerGroupName; /** * 发送时间 */ @@ -26,6 +42,6 @@ public class BaseMessage { /** * 重试次数,用于判断重试次数,超过重试次数发送异常警告 */ - protected Integer retryTimes = 0; + protected Integer retryTimes; } diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/MessageLog.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/MessageLog.java deleted file mode 100644 index 65b5ecf..0000000 --- a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/MessageLog.java +++ /dev/null @@ -1,84 +0,0 @@ -package com.bnyer.common.rocketmq.domain; - -import com.alibaba.fastjson.JSON; -import com.bnyer.common.rocketmq.enums.EnumMessageStatus; -import lombok.Builder; -import lombok.Getter; -import lombok.Setter; -import org.apache.rocketmq.client.producer.SendResult; -import org.apache.rocketmq.client.producer.SendStatus; - -import java.time.LocalDateTime; -import java.util.Objects; - -/** - * @author :WXC - * @Date :2023/05/18 - * @description :消息日志 - */ -@Getter -@Setter -@Builder -public class MessageLog { - - /** - * 消息key,作为redis的key,消费的时候判断是否存在, - * 存在就判断状态是否是已消费,如果不是就进行消费,如果是就过滤该消息 - */ - private String messageKey; - - /** - * 消息主题 - */ - private String topic; - - /** - * 消息tag - */ - private String tag; - - /** - * 消费组名称 - */ - private String consumerGroupName; - - /** - * 消息状态 - */ - private EnumMessageStatus status; - - /** - * 错误信息 - */ - private String errMsg; - - /** - * 消费成功时间 - */ - private LocalDateTime consumerTime; - - /** - * 消息内容 - */ - private String message; - - /** - * 重试次数,用于判断重试次数,超过最大重试次数就人工补偿 - */ - protected Integer retryTimes = 0; - - public MessageLog(T message, SendResult sendResult) { - this.messageKey = message.getKey(); - this.message = JSON.toJSONString(message); - if (Objects.isNull(sendResult)){ - this.status = EnumMessageStatus.WALT_SEND; - }else { - if (SendStatus.SEND_OK == sendResult.getSendStatus()){ - this.status = EnumMessageStatus.SEND_OK; - }else { - this.status = EnumMessageStatus.SEND_FAILS; - } - } - } - -} diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/RepeatElement.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/RepeatElement.java new file mode 100644 index 0000000..7149bd4 --- /dev/null +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/RepeatElement.java @@ -0,0 +1,37 @@ +package com.bnyer.common.rocketmq.domain; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.ToString; + +/** + * @author :WXC + * @Date :2023/05/20 + * @description : 重复消息元素 + */ +@AllArgsConstructor +@Getter +@ToString +public class RepeatElement { + /** + * 消费的应用 + */ + private String applicationName; + /** + * 唯一key + */ + private String messageKey; + /** + * 消费组 + */ + private String consumerGroupName; + /** + * 主题 + */ + private String topic; + /** + * 标签 + */ + private String tag; + +} diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/img/AddUserVipRecordMessage.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/img/AddUserVipRecordMessage.java index 4a1908b..7c1de89 100644 --- a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/img/AddUserVipRecordMessage.java +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/img/AddUserVipRecordMessage.java @@ -1,14 +1,10 @@ package com.bnyer.common.rocketmq.domain.img; -import com.bnyer.common.rocketmq.domain.BaseMessage; -import com.fasterxml.jackson.annotation.JsonFormat; import io.swagger.annotations.ApiModelProperty; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; -import java.util.Date; - /** * @author :WXC * @Date :2023/05/17 @@ -17,7 +13,7 @@ import java.util.Date; @Getter @Setter @NoArgsConstructor -public class AddUserVipRecordMessage extends BaseMessage { +public class AddUserVipRecordMessage { @ApiModelProperty(value="用户id") private Long userId; diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/order/OrderMqLocalRecordMessage.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/order/OrderMqLocalRecordMessage.java new file mode 100644 index 0000000..47a993c --- /dev/null +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/order/OrderMqLocalRecordMessage.java @@ -0,0 +1,39 @@ +package com.bnyer.common.rocketmq.domain.order; + +import com.bnyer.common.core.enums.EnumMessageStatus; +import com.bnyer.common.rocketmq.domain.BaseMessage; +import com.fasterxml.jackson.annotation.JsonFormat; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +import java.util.Date; + +/** + * @author :WXC + * @Date :2023/05/18 + * @description :订单服务本地消息表记录 + */ +@Getter +@Setter +@NoArgsConstructor +public class OrderMqLocalRecordMessage extends BaseMessage { + private Long id; + + /** + * 消息状态 + */ + private EnumMessageStatus status; + + /** + * 消息内容 + */ + private String content; + + /** + * 创建时间 + */ + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private Date createTime; + +} diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/enums/EnumMessageStatus.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/enums/EnumMessageStatus.java deleted file mode 100644 index acfe26f..0000000 --- a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/enums/EnumMessageStatus.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.bnyer.common.rocketmq.enums; - -import lombok.AllArgsConstructor; -import lombok.Getter; - -/** - * @author :WXC - * @Date :2023/05/18 - * @description :消息状态 - */ -@Getter -@AllArgsConstructor -public enum EnumMessageStatus { - //待发送 - WALT_SEND, - //发送成功 - SEND_OK, - //发送失败 - SEND_FAILS, - //待消费 - WALT_CONSUMER, - //消费中 - CONSUMER_PROCESS, - //消费成功 - CONSUMER_OK, - //消费失败 - CONSUMER_FAILS -} diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java index 65ade3e..e249df5 100644 --- a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java @@ -1,25 +1,25 @@ package com.bnyer.common.rocketmq.handle; -import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; -import com.alibaba.fastjson.TypeReference; -import com.bnyer.common.redis.service.RedisService; -import com.bnyer.common.redis.service.RedissonService; +import com.bnyer.common.rocketmq.config.RepeatConsumerConfig; import com.bnyer.common.rocketmq.constant.RocketMqConstant; import com.bnyer.common.rocketmq.domain.BaseMessage; -import com.bnyer.common.rocketmq.domain.MessageLog; -import com.bnyer.common.rocketmq.enums.EnumMessageStatus; +import com.bnyer.common.rocketmq.domain.RepeatElement; +import com.bnyer.common.rocketmq.persist.IPersist; +import com.bnyer.common.rocketmq.strategy.RepeatConsumerStrategy; +import com.bnyer.common.rocketmq.strategy.NormalRepeatStrategy; +import com.bnyer.common.rocketmq.strategy.RedisRepeatStrategy; import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; -import org.redisson.api.RLock; +import org.springframework.core.env.Environment; +import org.springframework.data.redis.core.StringRedisTemplate; +import javax.annotation.PostConstruct; import javax.annotation.Resource; -import java.time.LocalDateTime; -import java.util.Objects; -import java.util.concurrent.TimeUnit; +import java.util.function.Function; /** * @author :WXC @@ -39,13 +39,21 @@ public abstract class EnhanceMessageHandler { private static final int DELAY_LEVEL = RocketMqConstant.FIVE_SECOND; @Resource - private RocketMQEnhanceTemplate rocketMQEnhanceTemplate; + private Environment env ; + + @Resource + private RepeatConsumerConfig repeatConsumerConfig; @Resource - private RedisService redisService; + private RocketMQEnhanceTemplate rocketMQEnhanceTemplate; @Resource - private RedissonService redissonService; + private StringRedisTemplate stringRedisTemplate; + + @PostConstruct + public void init(){ + this.repeatConsumerConfig = RepeatConsumerConfig.enableRedisConfig(env.getProperty("spring.application.name"),stringRedisTemplate); + } /** * 消息处理 @@ -101,6 +109,7 @@ public abstract class EnhanceMessageHandler { return DELAY_LEVEL; } + /** * 使用模板模式构建消息消费框架,可自由扩展或删减 */ @@ -108,7 +117,7 @@ public abstract class EnhanceMessageHandler { // 基础日志记录被父类处理了 log.info("消费者收到消息[{}]", JSONObject.toJSON(message)); if (filter(message)) { - log.info("消息id{}不满足消费条件,已过滤。",message.getKey()); + log.info("消息id{}不满足消费条件,已过滤。",message.getMessageKey()); return; } //超过最大重试次数时调用子类方法处理 @@ -116,13 +125,22 @@ public abstract class EnhanceMessageHandler { handleMaxRetriesExceeded(message); return; } + IPersist persist = repeatConsumerConfig.getPersist(); + RepeatElement repeatElement = new RepeatElement(repeatConsumerConfig.getApplicationName(), message.getConsumerGroupName(),message.getTopic() + , message.getTag()==null ? "" : message.getTag() + , this.deDupMessageKey(message)); + //消费消息,末尾消费失败会删除消费记录,消费成功则更新消费状态 try { long now = System.currentTimeMillis(); handleMessage(message); long costTime = System.currentTimeMillis() - now; - log.info("消息{}消费成功,耗时[{}ms]", message.getKey(),costTime); + log.info("消息{}消费成功,耗时[{}ms],修改缓存状态为已消费:{}", message.getMessageKey(),costTime, repeatElement); + persist.markConsumed(repeatElement, repeatConsumerConfig.getRecordReserveMinutes()); } catch (Exception e) { - log.error("消息{}消费异常", message.getKey(),e); + log.error("消息{}消费异常", message.getMessageKey(),e); + log.info("消费失败,删除缓存中的记录 {} , {}", repeatElement, persist); + //消费失败,删除这个key + persist.delete(repeatElement); // 是捕获异常还是抛出,由子类决定 if (throwException()) { //抛出异常,由DefaultMessageListenerConcurrently类处理 @@ -135,6 +153,10 @@ public abstract class EnhanceMessageHandler { } } + /** + * 消息重试 + * @param message + */ protected void handleRetry(T message) { // 获取子类RocketMQMessageListener注解拿到topic和tag RocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class); @@ -166,80 +188,25 @@ public abstract class EnhanceMessageHandler { } /** - * 模板方法,过滤不符合条件的消息 + * 消息去重 * @param message * @return */ - protected boolean handleFilter(T message){ - return false; -// RLock rLock = redissonService.getRLock(message.getKey()); -// try { -// //如果redis存在表示之前消费过,直接过滤 -// boolean tryLock = rLock.tryLock( 5,0, TimeUnit.MINUTES); -// if (!tryLock){ -// return true; -// } -// }catch (Exception e){ -// //需要重试消费 -// log.error("消息过滤执行异常,error:{}",e.getMessage()); -// throw new RuntimeException("消息过滤异常"); -// }finally { -// rLock.unlock(); -// } -// return true; - } - - /** - * 模板方法。保存消息日志 - * @param message - * @param status - */ - protected void handleSaveMessageLog(T message,EnumMessageStatus status){ - //超过最大重试次数以后,判定消费失败,记录日志,后续定时补偿 - MessageLog messageLog = getMessageLog(message); - if (Objects.isNull(messageLog)){ - messageLog = buildMessageLog(message); + protected Boolean handleMsgRepeat(final T message) { + RepeatConsumerStrategy strategy = new NormalRepeatStrategy(); + Function repeatKeyFunction = baseMessage -> deDupMessageKey(message); + if (repeatConsumerConfig.getRepeatStrategy() == RepeatConsumerConfig.REPEAT_STRATEGY_CONSUME_LATER) { + strategy = new RedisRepeatStrategy(repeatConsumerConfig, repeatKeyFunction); } - if (EnumMessageStatus.CONSUMER_OK == status){ - messageLog.setConsumerTime(LocalDateTime.now()); - } - messageLog.setStatus(status); - saveMessageLog(messageLog); + //调用对应的去重策略 + return strategy.invoke(message); } /** - * 获取消息日志 - * @param message + * 默认拿消息key 作为去重的标识,子类可复写该方法自定义去重标识 */ - protected MessageLog getMessageLog(T message){ - Object cacheObject = redisService.getCacheObject(message.getKey()); - if (Objects.nonNull(cacheObject)){ - MessageLog messageLog = JSON.parseObject(cacheObject.toString(),new TypeReference>(){}); - return messageLog; - } - return null; - } - - /** - * 构建消息日志 - * @param message - * @return - */ - protected MessageLog buildMessageLog(T message){ - RocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class); - return MessageLog.builder().messageKey(message.getKey()).message(JSON.toJSONString(message)) - .tag(annotation.selectorExpression()) - .topic(annotation.topic()) - .consumerGroupName(annotation.consumerGroup()).build(); - } - - /** - * 保存消息日志 - * @param messageLog - * @param - */ - protected void saveMessageLog(MessageLog messageLog){ - + protected String deDupMessageKey(T message) { + return message.getMessageKey(); } } \ No newline at end of file diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/persist/IPersist.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/persist/IPersist.java new file mode 100644 index 0000000..f8becff --- /dev/null +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/persist/IPersist.java @@ -0,0 +1,49 @@ +package com.bnyer.common.rocketmq.persist; + + +import com.bnyer.common.rocketmq.domain.BaseMessage; +import com.bnyer.common.rocketmq.domain.RepeatElement; + +/** + * @author :WXC + * @Date :2023/05/20 + * @description : 持久层处理重复消息,目前支持redis,可扩展mysql + */ +public interface IPersist { + + /** + * 设置正在消费的消息到存储中 + * @param repeatElement + * @param processingExpireMilliSeconds + * @return + * @param + */ + boolean setConsumingIfNX(RepeatElement repeatElement, long processingExpireMilliSeconds); + + /** + * 删除存储中的消息 + * @param repeatElement + * @param + */ + void delete(RepeatElement repeatElement); + + /** + * 修改存储中的消息 + * @param repeatElement + * @param recordReserveMinutes + * @param + */ + void markConsumed(RepeatElement repeatElement, long recordReserveMinutes); + + /** + * 获取存储中的消息 + * @param repeatElement + * @return + * @param + */ + String get(RepeatElement repeatElement); + + default String toPrintInfo(RepeatElement repeatElement) { + return repeatElement.toString(); + } +} diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/persist/RedisPersist.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/persist/RedisPersist.java new file mode 100644 index 0000000..7dbd944 --- /dev/null +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/persist/RedisPersist.java @@ -0,0 +1,81 @@ +package com.bnyer.common.rocketmq.persist; + + +import com.bnyer.common.rocketmq.domain.BaseMessage; +import com.bnyer.common.rocketmq.domain.RepeatElement; +import org.apache.commons.lang3.StringUtils; +import org.springframework.data.redis.connection.RedisStringCommands; +import org.springframework.data.redis.core.RedisCallback; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.data.redis.core.types.Expiration; + +import java.util.concurrent.TimeUnit; + +import static com.bnyer.common.rocketmq.constant.RocketMqRepeatConstant.*; + +/** + * @author :WXC + * @Date :2023/05/20 + * @description : redis持久层 + */ +public class RedisPersist implements IPersist { + + private final StringRedisTemplate redisTemplate; + + public RedisPersist(StringRedisTemplate redisTemplate) { + if (redisTemplate == null) { + throw new NullPointerException("redis template is null"); + } + this.redisTemplate = redisTemplate; + } + + @Override + public boolean setConsumingIfNX(RepeatElement repeatElement, long dedupProcessingExpireMilliSeconds) { + String repeatKey = buildRepeatMessageRedisKey(repeatElement.getApplicationName(), repeatElement.getTopic(), repeatElement.getTag(), repeatElement.getMessageKey()); + //setnx, 成功就可以消费 + Boolean execute = redisTemplate.execute((RedisCallback) redisConnection -> redisConnection.set(repeatKey.getBytes(), + (CONSUME_STATUS_CONSUMING).getBytes(), Expiration.milliseconds(dedupProcessingExpireMilliSeconds), RedisStringCommands.SetOption.SET_IF_ABSENT)); + if (execute == null) { + return false; + } + return execute; + } + + @Override + public void delete(RepeatElement repeatElement) { + String repeatKey = buildRepeatMessageRedisKey(repeatElement.getApplicationName(), repeatElement.getTopic(), repeatElement.getTag(), repeatElement.getMessageKey()); + redisTemplate.delete(repeatKey); + } + + @Override + public void markConsumed(RepeatElement repeatElement, long dedupRecordReserveMinutes) { + String repeatKey = buildRepeatMessageRedisKey(repeatElement.getApplicationName(), repeatElement.getTopic(), repeatElement.getTag(), repeatElement.getMessageKey()); + redisTemplate.opsForValue().set(repeatKey, CONSUME_STATUS_SUCCESS, dedupRecordReserveMinutes, TimeUnit.MINUTES); + } + + @Override + public String get(RepeatElement repeatElement) { + String repeatKey = buildRepeatMessageRedisKey(repeatElement.getApplicationName(), repeatElement.getTopic(), repeatElement.getTag(), repeatElement.getMessageKey()); + return redisTemplate.opsForValue().get(repeatKey); + } + + @Override + public String toPrintInfo(RepeatElement repeatElement) { + return buildRepeatMessageRedisKey(repeatElement.getApplicationName(), repeatElement.getTopic(), repeatElement.getTag(), repeatElement.getMessageKey()); + } + + /** + * 构建重复消息rediskey + * @param applicationName + * @param topic + * @param tag + * @param messageKey + * @return + */ + private String buildRepeatMessageRedisKey(String applicationName, String topic, String tag, String messageKey) { + String prefix = REPEAT_REDIS_KEY_PREFIX + applicationName + ":" + topic + (StringUtils.isNotEmpty(tag) ? ":"+ tag :""); + return prefix + ":" + messageKey; + } + + +} diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/strategy/NormalRepeatStrategy.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/strategy/NormalRepeatStrategy.java new file mode 100644 index 0000000..9cd49cc --- /dev/null +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/strategy/NormalRepeatStrategy.java @@ -0,0 +1,19 @@ +package com.bnyer.common.rocketmq.strategy; + + +import com.bnyer.common.rocketmq.domain.BaseMessage; +import lombok.extern.slf4j.Slf4j; +/** + * @author :WXC + * @Date :2023/05/20 + * @description : + */ +@Slf4j +public class NormalRepeatStrategy implements RepeatConsumerStrategy { + + @Override + public boolean invoke(T message) { + return false; + } + +} \ No newline at end of file diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/strategy/RedisRepeatStrategy.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/strategy/RedisRepeatStrategy.java new file mode 100644 index 0000000..aad34ee --- /dev/null +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/strategy/RedisRepeatStrategy.java @@ -0,0 +1,57 @@ +package com.bnyer.common.rocketmq.strategy; + + +import com.bnyer.common.rocketmq.config.RepeatConsumerConfig; +import com.bnyer.common.rocketmq.domain.BaseMessage; +import com.bnyer.common.rocketmq.domain.RepeatElement; +import com.bnyer.common.rocketmq.persist.IPersist; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; + +import java.util.function.Function; + + +/** + * @author :WXC + * @Date :2023/05/20 + * @description :如果已经消费过,则直接消费幂等掉 + */ +@Slf4j +public class RedisRepeatStrategy implements RepeatConsumerStrategy { + + private final RepeatConsumerConfig filterConfig; + + /** + * 获取去重键的函数 + */ + private final Function repeatMessageKeyFunction; + + public RedisRepeatStrategy(RepeatConsumerConfig dedupConfig, Function dedupMessageKeyFunction) { + this.filterConfig = dedupConfig; + this.repeatMessageKeyFunction = dedupMessageKeyFunction; + } + + + @Override + public boolean invoke(T message) { + return doInvoke(message); + } + + private boolean doInvoke(T message) { + IPersist persist = filterConfig.getPersist(); + RepeatElement repeatElement = new RepeatElement(filterConfig.getApplicationName(), message.getConsumerGroupName(),message.getTopic() + , message.getTag()==null ? "" : message.getTag() + , repeatMessageKeyFunction.apply(message)); + boolean shouldConsume = true; + if (StringUtils.isNotBlank(repeatElement.getMessageKey())) { + shouldConsume = persist.setConsumingIfNX(repeatElement, filterConfig.getProcessingExpireMilliSeconds()); + } + //设置成功,证明没有消费过 + if (shouldConsume) { + return false; + } else { + return true; + } + } + +} \ No newline at end of file diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/strategy/RepeatConsumerStrategy.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/strategy/RepeatConsumerStrategy.java new file mode 100644 index 0000000..39506b3 --- /dev/null +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/strategy/RepeatConsumerStrategy.java @@ -0,0 +1,16 @@ +package com.bnyer.common.rocketmq.strategy; + + +import com.bnyer.common.rocketmq.domain.BaseMessage; + +/** + * @author :WXC + * @Date :2023/05/20 + * @description : 重复消费策略,目前支持redis,可扩展mysql方式 + */ +public interface RepeatConsumerStrategy { + + boolean invoke(T message); + +} + diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/template/RocketMQEnhanceTemplate.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/template/RocketMQEnhanceTemplate.java index 769549a..13b77f1 100644 --- a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/template/RocketMQEnhanceTemplate.java +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/template/RocketMQEnhanceTemplate.java @@ -2,11 +2,8 @@ package com.bnyer.common.rocketmq.template; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; -import com.bnyer.common.redis.service.RedisService; import com.bnyer.common.rocketmq.config.RocketEnhanceProperties; import com.bnyer.common.rocketmq.domain.BaseMessage; -import com.bnyer.common.rocketmq.domain.MessageLog; -import com.bnyer.common.rocketmq.enums.EnumMessageStatus; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -30,8 +27,6 @@ public class RocketMQEnhanceTemplate { private final RocketMQTemplate template; - private final RedisService redisService; - @Resource private RocketEnhanceProperties rocketEnhanceProperties; @@ -70,14 +65,11 @@ public class RocketMQEnhanceTemplate { public SendResult send(String topic, String tag, T message) { // 设置业务键,此处根据公共的参数进行处理 // 更多的其它基础业务处理... - MessageLog messageLog = saveMessageLog(message, null); - Message sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build(); + Message sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getMessageKey()).build(); SendResult sendResult; try { sendResult = template.syncSend(buildDestination(topic,tag), sendMessage); - delMessageLog(messageLog); } catch (Exception e) { - editMessageLog(messageLog); throw new RuntimeException(e); } // 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如ELK采集 @@ -95,14 +87,11 @@ public class RocketMQEnhanceTemplate { * @param */ public SendResult send(String topic, String tag, T message, int delayLevel) { - MessageLog messageLog = saveMessageLog(message, null); - Message sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build(); + Message sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getMessageKey()).build(); SendResult sendResult; try { sendResult = template.syncSend(buildDestination(topic,tag), sendMessage, 10000, delayLevel); - delMessageLog(messageLog); } catch (Exception e) { - editMessageLog(messageLog); throw new RuntimeException(e); } log.info("topic:[{}]tag:[{}]延迟等级[{}]消息[{}]发送结果[{}]", topic,tag, delayLevel, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); @@ -114,19 +103,16 @@ public class RocketMQEnhanceTemplate { * (适合对响应时间敏感的业务场景) */ public void sendAsyncMsg(String topic, String tag, T message) { - MessageLog messageLog = saveMessageLog(message, null); template.asyncSend(buildDestination(topic,tag), MessageBuilder.withPayload(message).build(), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { // 处理消息发送成功逻辑 log.info("消息发送成功,topic:{},tag:{},result:{}",topic,tag, JSON.toJSONString(sendResult)); - delMessageLog(messageLog); } @Override public void onException(Throwable throwable) { // 处理消息发送失败逻辑 log.error("消息发送失败,topic:{},tag:{},error:{}",topic,tag,throwable.getMessage()); - editMessageLog(messageLog); } }); } @@ -136,18 +122,15 @@ public class RocketMQEnhanceTemplate { * (适合对响应时间敏感的业务场景) */ public void sendAsyncMsg(String topic, String tag, T message, int delayLevel) { - MessageLog messageLog = saveMessageLog(message, null); template.asyncSend(buildDestination(topic,tag), MessageBuilder.withPayload(message).build(), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { // 处理消息发送成功逻辑 log.info("消息发送成功,topic:{},tag:{},result:{}",topic,tag,JSON.toJSONString(sendResult)); - delMessageLog(messageLog); } @Override public void onException(Throwable throwable) { log.error("消息发送失败,topic:{},tag:{},error:{}",topic,tag,throwable.getMessage()); - editMessageLog(messageLog); } },10000,delayLevel); } @@ -167,36 +150,6 @@ public class RocketMQEnhanceTemplate { return sendResult.getSendStatus() == SendStatus.SEND_OK; } - /** - * 添加消息日志 - * @param message - * @param sendResult - * @param - */ - private MessageLog saveMessageLog(T message,SendResult sendResult){ - log.info("消息发送前,入库消息日志表,如果发送失败,可进行消息补偿"); - MessageLog messageLog = new MessageLog<>(message, sendResult); - //保存 - return messageLog; - } - - /** - * 修改消息日志 - * @param messageLog - * @param - */ - private void editMessageLog(MessageLog messageLog){ - log.info("消息发送失败,修改消息日志状态,可进行消息补偿"); - //修改 - messageLog.setStatus(EnumMessageStatus.SEND_FAILS); - } - /** - * 删除消息日志 - * @param messageLog - */ - private void delMessageLog(MessageLog messageLog){ - log.info("消息发送成功,删除消息日志"); - } } diff --git a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/GoldRewardConsumer.java b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/GoldRewardConsumer.java index 8705e01..227021c 100644 --- a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/GoldRewardConsumer.java +++ b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/GoldRewardConsumer.java @@ -137,7 +137,7 @@ public class GoldRewardConsumer extends EnhanceMessageHandler @Override protected boolean filter(GoldRewardMessage message) { - return super.handleFilter(message); + return super.handleMsgRepeat(message); } diff --git a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/VipRecordCreateConsumer.java b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/VipRecordCreateConsumer.java index e7dd556..960680b 100644 --- a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/VipRecordCreateConsumer.java +++ b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/VipRecordCreateConsumer.java @@ -1,10 +1,14 @@ package com.bnyer.img.listener; +import com.alibaba.fastjson.JSON; import com.bnyer.common.core.dto.AddUserVipRecordDto; +import com.bnyer.common.core.enums.EnumMessageStatus; import com.bnyer.common.core.utils.bean.EntityConvertUtil; import com.bnyer.common.rocketmq.constant.RocketMqTopic; import com.bnyer.common.rocketmq.domain.img.AddUserVipRecordMessage; +import com.bnyer.common.rocketmq.domain.order.OrderMqLocalRecordMessage; import com.bnyer.common.rocketmq.handle.EnhanceMessageHandler; +import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate; import com.bnyer.img.service.UserVipRecordService; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; @@ -12,40 +16,55 @@ import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import javax.annotation.Resource; + /** * @author :WXC * @Date :2023/03/24 - * @description :取消订单mq消费监听 + * @description :保存会员记录mq消费监听 */ @Slf4j @Component @RocketMQMessageListener(topic = RocketMqTopic.VIP_RECORD_CREATE_TOPIC,consumerGroup = RocketMqTopic.VIP_RECORD_CREATE_TOPIC) -public class VipRecordCreateConsumer extends EnhanceMessageHandler implements RocketMQListener { +public class VipRecordCreateConsumer extends EnhanceMessageHandler implements RocketMQListener { + + @Resource + private RocketMQEnhanceTemplate rocketMQEnhanceTemplate; @Autowired private UserVipRecordService userVipRecordService; @Override - public void onMessage(AddUserVipRecordMessage message) { + public void onMessage(OrderMqLocalRecordMessage message) { super.dispatchMessage(message); - AddUserVipRecordDto addUserVipRecordDto = EntityConvertUtil.copy(message, AddUserVipRecordDto.class); + String content = message.getContent(); + AddUserVipRecordMessage addUserVipRecordMessage = JSON.parseObject(content, AddUserVipRecordMessage.class); + AddUserVipRecordDto addUserVipRecordDto = EntityConvertUtil.copy(addUserVipRecordMessage, AddUserVipRecordDto.class); //保存用户会员记录 userVipRecordService.saveUserVipRecord(addUserVipRecordDto); } @Override - protected void handleMessage(AddUserVipRecordMessage message) throws Exception { - + protected void handleMessage(OrderMqLocalRecordMessage message) throws Exception { + //发送返回队列,告知已经处理成功,完成最终一致性 + //TODO: 2023/05/20 为避免这里消息通知失败,定时补偿的时候需要根据创建时间,判断如果超过固定时间比如30分钟,就直接设置成作废,避免 + // 定时任务一直补偿,因为这里不做补偿机制,是不会重发消息的 + message.setStatus(EnumMessageStatus.SUCCESS); + rocketMQEnhanceTemplate.sendAsyncMsg(RocketMqTopic.ORDER_RETURN_MSG_TOPIC,null,message); } @Override - protected void handleMaxRetriesExceeded(AddUserVipRecordMessage message) { - + protected void handleMaxRetriesExceeded(OrderMqLocalRecordMessage message) { + //发送返回队列,告知已经超过最大重试次数,消费失败,需人工处理该记录 + //TODO: 2023/05/20 为避免这里消息通知失败,定时补偿的时候需要根据创建时间,判断如果超过固定时间比如30分钟,就直接设置成作废,避免 + // 定时任务一直补偿,因为这里不做补偿机制,是不会重发消息的 + message.setStatus(EnumMessageStatus.FAILS); + rocketMQEnhanceTemplate.sendAsyncMsg(RocketMqTopic.ORDER_RETURN_MSG_TOPIC,null,message); } @Override - protected boolean filter(AddUserVipRecordMessage message) { - return super.handleFilter(message); + protected boolean filter(OrderMqLocalRecordMessage message) { + return super.handleMsgRepeat(message); } @Override diff --git a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/service/impl/CreatorProfitServiceImpl.java b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/service/impl/CreatorProfitServiceImpl.java index 0cef3cd..290d105 100644 --- a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/service/impl/CreatorProfitServiceImpl.java +++ b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/service/impl/CreatorProfitServiceImpl.java @@ -150,7 +150,7 @@ public class CreatorProfitServiceImpl implements CreatorProfitService { */ private GoldRewardMessage buildGoldRewardMsg(Long userId, int goldNum, String goldCode, String platform, Integer userClientType) { GoldRewardMessage message = new GoldRewardMessage(); - message.setKey(IdUtils.randomUUID()); + message.setMessageKey(IdUtils.randomUUID()); message.setSource(ServiceNameConstants.IMG_SERVICE); message.setUserId(userId); message.setGoldNum(goldNum); diff --git a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/OrderReturnMessageConsumer.java b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/OrderReturnMessageConsumer.java new file mode 100644 index 0000000..50047a0 --- /dev/null +++ b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/OrderReturnMessageConsumer.java @@ -0,0 +1,61 @@ +package com.bnyer.order.listener; + +import com.bnyer.common.core.domain.OrderMqMessageRecord; +import com.bnyer.common.core.utils.bean.EntityConvertUtil; +import com.bnyer.common.rocketmq.constant.RocketMqTopic; +import com.bnyer.common.rocketmq.domain.order.OrderMqLocalRecordMessage; +import com.bnyer.common.rocketmq.handle.EnhanceMessageHandler; +import com.bnyer.order.service.OrderMqMessageRecordService; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; + +/** + * @author :WXC + * @Date :2023/05/20 + * @description : + */ +@Slf4j +@Component +@RocketMQMessageListener(topic = RocketMqTopic.ORDER_RETURN_MSG_TOPIC,consumerGroup = RocketMqTopic.ORDER_RETURN_MSG_TOPIC) +public class OrderReturnMessageConsumer extends EnhanceMessageHandler implements RocketMQListener { + + @Resource + private OrderMqMessageRecordService orderMqMessageRecordService; + + @Override + public void onMessage(OrderMqLocalRecordMessage message) { + super.dispatchMessage(message); + OrderMqMessageRecord orderMqMessageRecord = EntityConvertUtil.copy(message, OrderMqMessageRecord.class); + orderMqMessageRecordService.editMessageRecord(orderMqMessageRecord); + } + + @Override + protected void handleMessage(OrderMqLocalRecordMessage message) throws Exception { + + } + + @Override + protected void handleMaxRetriesExceeded(OrderMqLocalRecordMessage message) { + + } + + @Override + protected boolean filter(OrderMqLocalRecordMessage message) { + return super.handleMsgRepeat(message); + } + + @Override + protected boolean isRetry() { + return true; + } + + @Override + protected boolean throwException() { + return false; + } + +} diff --git a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderCancelConsumer.java b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderCancelConsumer.java index d60b2a6..e9e3ea0 100644 --- a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderCancelConsumer.java +++ b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderCancelConsumer.java @@ -20,7 +20,7 @@ import org.springframework.stereotype.Component; */ @Slf4j @Component -@RocketMQMessageListener(topic = RocketMqTopic.ORDER_CANCEL_TOPIC,consumerGroup = RocketMqTag.ORDER_VIP_TAG) +@RocketMQMessageListener(topic = RocketMqTopic.ORDER_CANCEL_TOPIC,selectorExpression = RocketMqTag.ORDER_VIP_TAG,consumerGroup = RocketMqTopic.ORDER_CANCEL_TOPIC) public class VipOrderCancelConsumer extends EnhanceMessageHandler implements RocketMQListener { @Autowired @@ -48,7 +48,7 @@ public class VipOrderCancelConsumer extends EnhanceMessageHandler { + +} diff --git a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/OrderMqMessageRecordService.java b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/OrderMqMessageRecordService.java new file mode 100644 index 0000000..4e0aaf6 --- /dev/null +++ b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/OrderMqMessageRecordService.java @@ -0,0 +1,48 @@ +package com.bnyer.order.service; + +import com.bnyer.common.core.domain.OrderMqMessageRecord; +import com.bnyer.common.rocketmq.domain.BaseMessage; + +/** + * @author :WXC + * @Date :2023/05/20 + * @description :订单服务本地消息service层 + */ +public interface OrderMqMessageRecordService { + + /** + * 发送异步消息 + * @param topic + * @param tag + * @param addUserVipRecordMessage + * @param + */ + void sendAsyncMsg(String topic, String tag, T addUserVipRecordMessage); + + /** + * 发送异步延时消息 + * @param topic + * @param tag + * @param message + * @param delayLevel + * @param + */ + void sendAsyncMsg(String topic, String tag, T message, int delayLevel); + + /** + * 保存消息记录 + * @param topic + * @param tag + * @param message + * @return + * @param + */ + OrderMqMessageRecord saveMessageRecord(String topic, String tag, T message); + + /** + * 修改消息记录 + * @param orderMqMessageRecord + */ + void editMessageRecord(OrderMqMessageRecord orderMqMessageRecord); + +} diff --git a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/OrderMqMessageRecordServiceImpl.java b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/OrderMqMessageRecordServiceImpl.java new file mode 100644 index 0000000..811f58e --- /dev/null +++ b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/OrderMqMessageRecordServiceImpl.java @@ -0,0 +1,132 @@ +package com.bnyer.order.service.impl; + +import com.alibaba.fastjson.JSON; +import com.bnyer.common.core.constant.ServiceNameConstants; +import com.bnyer.common.core.domain.OrderMqMessageRecord; +import com.bnyer.common.core.enums.EnumMessageStatus; +import com.bnyer.common.core.utils.bean.EntityConvertUtil; +import com.bnyer.common.core.utils.uuid.IdUtils; +import com.bnyer.common.rocketmq.domain.order.OrderMqLocalRecordMessage; +import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate; +import com.bnyer.order.mapper.OrderMqMessageRecordMapper; +import com.bnyer.order.service.OrderMqMessageRecordService; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import javax.annotation.Resource; +import java.util.Date; + +/** + * @author :WXC + * @Date :2023/05/19 + * @description : + */ +@Slf4j +@Service +public class OrderMqMessageRecordServiceImpl implements OrderMqMessageRecordService { + + /** + * 发送超时时间 + */ + private final static int TIME_OUT = 10000; + + @Resource + private RocketMQEnhanceTemplate rocketMQEnhanceTemplate; + + @Resource + private OrderMqMessageRecordMapper orderMqMessageRecordMapper; + + /** + * 发送异步消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑) + * (适合对响应时间敏感的业务场景) + */ + @Transactional + @Override + public void sendAsyncMsg(String topic, String tag, T message) { + RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); + //保存消息记录 + log.info("消息发送中,开始入库本地消息记录表"); + OrderMqMessageRecord orderMqMessageRecord = saveMessageRecord(topic,tag, message); + //发消息 + OrderMqLocalRecordMessage mqLocalMessage = EntityConvertUtil.copy(orderMqMessageRecord, OrderMqLocalRecordMessage.class); + mqLocalMessage.setSource(ServiceNameConstants.ORDER_SERVICE); + rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqLocalMessage).build(), new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + log.info("消息发送成功,topic:{},tag:{},result:{}",topic,tag, JSON.toJSONString(sendResult)); + } + @Override + public void onException(Throwable throwable) { + log.error("消息发送失败,topic:{},tag:{},error:{}",topic,tag,throwable.getMessage()); + log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); + orderMqMessageRecord.setErrMsg(throwable.getMessage()); + editMessageRecord(orderMqMessageRecord); + } + }); + } + + /** + * 发送异步延时消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑) + * (适合对响应时间敏感的业务场景) + */ + @Transactional + @Override + public void sendAsyncMsg(String topic, String tag, T message, int delayLevel) { + RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); + //保存消息记录 + log.info("消息发送中,开始入库本地消息记录表"); + OrderMqMessageRecord orderMqMessageRecord = saveMessageRecord(topic,tag, message); + //发消息 + OrderMqLocalRecordMessage mqLocalMessage = EntityConvertUtil.copy(orderMqMessageRecord, OrderMqLocalRecordMessage.class); + mqLocalMessage.setRetryTimes(0); + mqLocalMessage.setSource(ServiceNameConstants.ORDER_SERVICE); + rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqLocalMessage).build(), new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + log.info("消息发送成功,topic:{},tag:{},result:{}",topic,tag,JSON.toJSONString(sendResult)); + } + @Override + public void onException(Throwable throwable) { + log.error("消息发送失败,topic:{},tag:{},error:{}",topic,tag,throwable.getMessage()); + log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); + orderMqMessageRecord.setErrMsg(throwable.getMessage()); + editMessageRecord(orderMqMessageRecord); + } + },TIME_OUT,delayLevel); + } + + /** + * 添加消息记录 + * @param message + * @param + */ + @Transactional + public OrderMqMessageRecord saveMessageRecord(String topic, String tag, T message){ + RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); + String consumerGroup = rocketMQTemplate.getConsumer().getConsumerGroup(); + OrderMqMessageRecord orderMqMessageRecord = new OrderMqMessageRecord(); + orderMqMessageRecord.setStatus(EnumMessageStatus.PROCESS); + orderMqMessageRecord.setConsumerGroupName(consumerGroup); + orderMqMessageRecord.setTopic(topic); + orderMqMessageRecord.setTag(tag); + orderMqMessageRecord.setCreateTime(new Date()); + orderMqMessageRecord.setMessageKey(IdUtils.randomUUID()); + orderMqMessageRecordMapper.insert(orderMqMessageRecord); + return orderMqMessageRecord; + } + + /** + * 修改消息记录 + * @param orderMqMessageRecord + */ + @Transactional + public void editMessageRecord(OrderMqMessageRecord orderMqMessageRecord){ + orderMqMessageRecordMapper.updateById(orderMqMessageRecord); + } + +} diff --git a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/VipOrderServiceImpl.java b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/VipOrderServiceImpl.java index c596abb..fcf58ab 100644 --- a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/VipOrderServiceImpl.java +++ b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/VipOrderServiceImpl.java @@ -31,6 +31,7 @@ import com.bnyer.order.bean.query.VipOrderExtQuery; import com.bnyer.order.bean.query.VipOrderQuery; import com.bnyer.order.bean.vo.VipOrderVo; import com.bnyer.order.mapper.VipOrderMapper; +import com.bnyer.order.service.OrderMqMessageRecordService; import com.bnyer.order.service.VipOrderService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -58,6 +59,9 @@ public class VipOrderServiceImpl implements VipOrderService { @Autowired private RocketMQEnhanceTemplate rocketMQEnhanceTemplate; + @Autowired + private OrderMqMessageRecordService orderMqMessageRecordService; + @Autowired private VipOrderMapper vipOrderMapper; @@ -113,17 +117,9 @@ public class VipOrderServiceImpl implements VipOrderService { String orderNo = vipOrder.getOrderNo(); //发送消息,如果三十分钟后没有支付,则取消订单 VipOrderCancelMessage vipOrderCancelMessage = new VipOrderCancelMessage(); - vipOrderCancelMessage.setKey(IdUtils.randomUUID()); - vipOrderCancelMessage.setSource(ServiceNameConstants.ORDER_SERVICE); vipOrderCancelMessage.setOrderNo(orderNo); - rocketMQEnhanceTemplate.send(RocketMqTopic.ORDER_CANCEL_TOPIC, RocketMqTag.ORDER_VIP_TAG,vipOrderCancelMessage,RocketMqConstant.THIRTY_MINUTES) -// SendStatus sendStatus = orderCancelMqTemplate.syncSend(RocketMqConstant.VIP_ORDER_CANCEL_TOPIC, new GenericMessage<>(orderNo), RocketMqConstant.TIMEOUT, RocketMqConstant.CANCEL_ORDER_DELAY_LEVEL).getSendStatus(); -// if (!Objects.equals(sendStatus,SendStatus.SEND_OK)) { -// // 消息发不出去就抛异常,发的出去无所谓 -// throw new ServiceException(ResponseEnum.SERVER_ERROR); -// }else { -// log.info("消息发送成功,topic:{}",RocketMqTopic.VIP_ORDER_CANCEL_TOPIC); -// } +// rocketMQEnhanceTemplate.send(RocketMqTopic.ORDER_CANCEL_TOPIC, RocketMqTag.ORDER_VIP_TAG,vipOrderCancelMessage,RocketMqConstant.THIRTY_MINUTES); + orderMqMessageRecordService.sendAsyncMsg(RocketMqTopic.ORDER_CANCEL_TOPIC, RocketMqTag.ORDER_VIP_TAG,vipOrderCancelMessage,RocketMqConstant.THIRTY_MINUTES); return orderNo; } @@ -232,11 +228,14 @@ public class VipOrderServiceImpl implements VipOrderService { //发消息,添加用户会员记录 AddUserVipRecordMessage addUserVipRecordMessage = buildVipRecordMsg(vipOrder); - rocketMQEnhanceTemplate.sendAsyncMsg(RocketMqTopic.VIP_RECORD_CREATE_TOPIC,null, addUserVipRecordMessage); +// rocketMQEnhanceTemplate.sendAsyncMsg(RocketMqTopic.VIP_RECORD_CREATE_TOPIC,null, addUserVipRecordMessage); + + orderMqMessageRecordService.sendAsyncMsg(RocketMqTopic.VIP_RECORD_CREATE_TOPIC,null, addUserVipRecordMessage); //发送开会员画意值奖励并写入记录消息 GoldRewardMessage goldMsg = buildGoldRewardMsg(vipOrder.getUserId(),300, GoldEnum.BUY_VIP.getCode(),null,vipOrder.getUserClientType()); - rocketMQEnhanceTemplate.sendAsyncMsg(RocketMqTopic.GOLD_REWARD_TOPIC,null,goldMsg); +// rocketMQEnhanceTemplate.sendAsyncMsg(RocketMqTopic.GOLD_REWARD_TOPIC,null,goldMsg); + orderMqMessageRecordService.sendAsyncMsg(RocketMqTopic.GOLD_REWARD_TOPIC,null,goldMsg); } @@ -247,8 +246,6 @@ public class VipOrderServiceImpl implements VipOrderService { */ private AddUserVipRecordMessage buildVipRecordMsg(VipOrder vipOrder) { AddUserVipRecordMessage message = new AddUserVipRecordMessage(); - message.setKey(IdUtils.randomUUID()); - message.setSource(ServiceNameConstants.ORDER_SERVICE); message.setDays(vipOrder.getDays()); message.setVipId(vipOrder.getVipId()); message.setVipName(vipOrder.getVipName()); @@ -270,8 +267,8 @@ public class VipOrderServiceImpl implements VipOrderService { */ private GoldRewardMessage buildGoldRewardMsg(Long userId, int goldNum, String goldCode, String platform, Integer userClientType) { GoldRewardMessage message = new GoldRewardMessage(); - message.setKey(IdUtils.randomUUID()); - message.setSource(ServiceNameConstants.ORDER_SERVICE); +// message.setMessageKey(IdUtils.randomUUID()); +// message.setSource(ServiceNameConstants.ORDER_SERVICE); message.setUserId(userId); message.setGoldNum(goldNum); message.setGoldCode(goldCode); diff --git a/bnyer-services/bnyer-order/src/main/resources/com/bnyer/order/mapper/OrderMqMessageRecordMapper.xml b/bnyer-services/bnyer-order/src/main/resources/com/bnyer/order/mapper/OrderMqMessageRecordMapper.xml new file mode 100644 index 0000000..70718a3 --- /dev/null +++ b/bnyer-services/bnyer-order/src/main/resources/com/bnyer/order/mapper/OrderMqMessageRecordMapper.xml @@ -0,0 +1,29 @@ + + + + + + + + + + + + + + + + + + t.id, + t.message_key, + t.topic, + t.tag, + t.consumer_group_name, + t.status, + t.err_msg, + t.content, + t.create_time, + + + diff --git a/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/impl/PayInfoServiceImpl.java b/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/impl/PayInfoServiceImpl.java index 57cf08f..b1e0ec6 100644 --- a/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/impl/PayInfoServiceImpl.java +++ b/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/impl/PayInfoServiceImpl.java @@ -81,9 +81,9 @@ public class PayInfoServiceImpl implements PayInfoService { //会员充值场景 case VIP_RECHARGE: // 发送消息,订单支付成功 - // TODO: 2023/04/23可优化为:添加一张消息日志表,字段:topicName,消息内容,消息状态(发送中、成功、失败),错误信息 + // TODO: 2023/05/20 暂时使用普通消息发送,待修改成本地消息表模式 VipOrderPayNotifyMessage vipOrderPayNotifyMessage = new VipOrderPayNotifyMessage(); - vipOrderPayNotifyMessage.setKey(IdUtils.randomUUID()); + vipOrderPayNotifyMessage.setMessageKey(IdUtils.randomUUID()); vipOrderPayNotifyMessage.setSource(ServiceNameConstants.PAY_SERVICE); vipOrderPayNotifyMessage.setOrderNo(orderNo); rocketMQEnhanceTemplate.sendAsyncMsg(RocketMqTopic.ORDER_PAY_NOTIFY_TOPIC, RocketMqTag.ORDER_VIP_TAG, vipOrderPayNotifyMessage);