Browse Source

实现本地消息表消息补偿,消息去重

feature-1.1
wuxicheng 3 years ago
parent
commit
7a973f8e2d
  1. 74
      bnyer-common/bnyer-common-core/src/main/java/com/bnyer/common/core/domain/BaseMqMessage.java
  2. 19
      bnyer-common/bnyer-common-core/src/main/java/com/bnyer/common/core/domain/OrderMqMessageRecord.java
  3. 22
      bnyer-common/bnyer-common-core/src/main/java/com/bnyer/common/core/enums/EnumMessageStatus.java
  4. 91
      bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/config/RepeatConsumerConfig.java
  5. 23
      bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/constant/RocketMqRepeatConstant.java
  6. 5
      bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/constant/RocketMqTopic.java
  7. 22
      bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/BaseMessage.java
  8. 84
      bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/MessageLog.java
  9. 37
      bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/RepeatElement.java
  10. 6
      bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/img/AddUserVipRecordMessage.java
  11. 39
      bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/order/OrderMqLocalRecordMessage.java
  12. 28
      bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/enums/EnumMessageStatus.java
  13. 131
      bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java
  14. 49
      bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/persist/IPersist.java
  15. 81
      bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/persist/RedisPersist.java
  16. 19
      bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/strategy/NormalRepeatStrategy.java
  17. 57
      bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/strategy/RedisRepeatStrategy.java
  18. 16
      bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/strategy/RepeatConsumerStrategy.java
  19. 51
      bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/template/RocketMQEnhanceTemplate.java
  20. 2
      bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/GoldRewardConsumer.java
  21. 39
      bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/VipRecordCreateConsumer.java
  22. 2
      bnyer-services/bnyer-img/src/main/java/com/bnyer/img/service/impl/CreatorProfitServiceImpl.java
  23. 61
      bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/OrderReturnMessageConsumer.java
  24. 4
      bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderCancelConsumer.java
  25. 2
      bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderPayNotifyConsumer.java
  26. 14
      bnyer-services/bnyer-order/src/main/java/com/bnyer/order/mapper/OrderMqMessageRecordMapper.java
  27. 48
      bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/OrderMqMessageRecordService.java
  28. 132
      bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/OrderMqMessageRecordServiceImpl.java
  29. 29
      bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/VipOrderServiceImpl.java
  30. 29
      bnyer-services/bnyer-order/src/main/resources/com/bnyer/order/mapper/OrderMqMessageRecordMapper.xml
  31. 4
      bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/impl/PayInfoServiceImpl.java

74
bnyer-common/bnyer-common-core/src/main/java/com/bnyer/common/core/domain/BaseMqMessage.java

@ -0,0 +1,74 @@
package com.bnyer.common.core.domain;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.bnyer.common.core.enums.EnumMessageStatus;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;
import java.util.Date;
/**
* @author :WXC
* @Date :2023/05/18
* @description :消息记录公共实体
*/
@Data
public class BaseMqMessage {
@TableId(value = "id", type = IdType.ASSIGN_ID)
private Long id;
/**
* 消息key作为redis的key消费的时候判断是否存在
* 存在就判断状态是否是已消费如果不是就进行消费如果是就过滤该消息
*/
@TableField(value = "message_key")
private String messageKey;
/**
* 消息主题
*/
@TableField(value = "topic")
private String topic;
/**
* 消息tag
*/
@TableField(value = "tag")
private String tag;
/**
* 消费组名称
*/
@TableField(value = "consumer_group_name")
private String consumerGroupName;
/**
* 消息状态
*/
@TableField(value = "status")
private EnumMessageStatus status;
/**
* 消息内容
*/
@TableField(value = "content")
private String content;
/**
* 错误信息
*/
@TableField(value = "err_msg")
private String errMsg;
/**
* 创建时间
*/
@TableField(value = "create_time")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date createTime;
}

19
bnyer-common/bnyer-common-core/src/main/java/com/bnyer/common/core/domain/OrderMqMessageRecord.java

@ -0,0 +1,19 @@
package com.bnyer.common.core.domain;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
/**
* @author :WXC
* @Date :2023/05/18
* @description :订单服务本地消息表
*/
@Getter
@Setter
@NoArgsConstructor
@TableName(value = "order_mq_message_record")
public class OrderMqMessageRecord extends BaseMqMessage{
}

22
bnyer-common/bnyer-common-core/src/main/java/com/bnyer/common/core/enums/EnumMessageStatus.java

@ -0,0 +1,22 @@
package com.bnyer.common.core.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* @author :WXC
* @Date :2023/05/18
* @description :消息状态
*/
@Getter
@AllArgsConstructor
public enum EnumMessageStatus {
//处理中
PROCESS,
//成功
SUCCESS,
//失败,
FAILS,
//作废
INVALID,
}

91
bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/config/RepeatConsumerConfig.java

@ -0,0 +1,91 @@
package com.bnyer.common.rocketmq.config;
import com.bnyer.common.rocketmq.persist.IPersist;
import com.bnyer.common.rocketmq.persist.RedisPersist;
import lombok.Getter;
import lombok.ToString;
import org.springframework.data.redis.core.StringRedisTemplate;
/**
* @author :WXC
* @Date :2023/05/19
* @description :重复消费配置
*/
@Getter
@ToString
public class RepeatConsumerConfig {
/**
* 不启用去重
*/
public static final int REPEAT_STRATEGY_DISABLE = 0;
/**
* 开启去重发现有处理中的消息后面再重试
*/
public static final int REPEAT_STRATEGY_CONSUME_LATER = 1;
/**
* 用以标记去重的时候是哪个应用消费的同一个应用才需要去重
*/
private final String applicationName;
private IPersist persist;
/**
* 去重策略默认不去重
*/
private int repeatStrategy = REPEAT_STRATEGY_DISABLE;
/**
* 对于消费中的消息多少毫秒内认为重复默认一分钟即一分钟内的重复消息都会串行处理等待前一个消息消费成功/失败超过这个时间如果消息还在消费就不认为重复了为了防止消息丢失
*/
private long processingExpireMilliSeconds = 60 * 1000;
/**
* 消息消费成功后记录保留多少分钟默认一天即一天内的消息不会重复
*/
private long recordReserveMinutes = 60 * 24;
private RepeatConsumerConfig(String applicationName, int repeatStrategy, StringRedisTemplate redisTemplate) {
if (redisTemplate !=null) {
this.persist = new RedisPersist(redisTemplate);
}
this.repeatStrategy = repeatStrategy;
this.applicationName = applicationName;
}
private RepeatConsumerConfig(String applicationName) {
this.repeatStrategy = REPEAT_STRATEGY_DISABLE;
this.applicationName = applicationName;
}
/**
* 利用redis去重
* @param applicationName
* @param redisTemplate
* @return
*/
public static RepeatConsumerConfig enableRedisConfig(String applicationName, StringRedisTemplate redisTemplate) {
return new RepeatConsumerConfig(applicationName, REPEAT_STRATEGY_CONSUME_LATER, redisTemplate);
}
public static RepeatConsumerConfig disableConfig(String applicationName) {
return new RepeatConsumerConfig(applicationName);
}
public void setProcessingExpireMilliSeconds(long processingExpireMilliSeconds) {
this.processingExpireMilliSeconds = processingExpireMilliSeconds;
}
public void setRecordReserveMinutes(long recordReserveMinutes) {
this.recordReserveMinutes = recordReserveMinutes;
}
}

23
bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/constant/RocketMqRepeatConstant.java

@ -0,0 +1,23 @@
package com.bnyer.common.rocketmq.constant;
/**
* @author :WXC
* @Date :2023/05/20
* @description :
*/
public class RocketMqRepeatConstant {
/**
* 消费中
*/
public final static String CONSUME_STATUS_CONSUMING = "CONSUME_ING";
/**
* 消费成功
*/
public final static String CONSUME_STATUS_SUCCESS = "CONSUME_SUCCESS";
/**
* 重复消息redis前缀
*/
public final static String REPEAT_REDIS_KEY_PREFIX = "RepeatMessage:";
}

5
bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/constant/RocketMqTopic.java

@ -7,6 +7,11 @@ package com.bnyer.common.rocketmq.constant;
*/ */
public class RocketMqTopic { public class RocketMqTopic {
/**
* 订单服务本地消息表记录返回队列
*/
public static final String ORDER_RETURN_MSG_TOPIC = "order-rerun-msg-topic";
/** /**
* 订单取消 * 订单取消
*/ */

22
bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/BaseMessage.java

@ -12,13 +12,29 @@ import java.time.LocalDateTime;
@Data @Data
public class BaseMessage { public class BaseMessage {
/** /**
* 业务键用于RocketMQ控制台查看消费情况 * 消息key消息唯一标识
*/ */
protected String key; protected String messageKey;
/** /**
* 发送消息来源用于排查问题 * 发送消息来源用于排查问题
*/ */
protected String source = ""; protected String source = "";
/**
* 消息主题
*/
protected String topic;
/**
* 消息tag
*/
protected String tag;
/**
* 消费组名称
*/
protected String consumerGroupName;
/** /**
* 发送时间 * 发送时间
*/ */
@ -26,6 +42,6 @@ public class BaseMessage {
/** /**
* 重试次数用于判断重试次数超过重试次数发送异常警告 * 重试次数用于判断重试次数超过重试次数发送异常警告
*/ */
protected Integer retryTimes = 0; protected Integer retryTimes;
} }

84
bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/MessageLog.java

@ -1,84 +0,0 @@
package com.bnyer.common.rocketmq.domain;
import com.alibaba.fastjson.JSON;
import com.bnyer.common.rocketmq.enums.EnumMessageStatus;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import java.time.LocalDateTime;
import java.util.Objects;
/**
* @author :WXC
* @Date :2023/05/18
* @description :消息日志
*/
@Getter
@Setter
@Builder
public class MessageLog <T extends BaseMessage>{
/**
* 消息key作为redis的key消费的时候判断是否存在
* 存在就判断状态是否是已消费如果不是就进行消费如果是就过滤该消息
*/
private String messageKey;
/**
* 消息主题
*/
private String topic;
/**
* 消息tag
*/
private String tag;
/**
* 消费组名称
*/
private String consumerGroupName;
/**
* 消息状态
*/
private EnumMessageStatus status;
/**
* 错误信息
*/
private String errMsg;
/**
* 消费成功时间
*/
private LocalDateTime consumerTime;
/**
* 消息内容
*/
private String message;
/**
* 重试次数用于判断重试次数超过最大重试次数就人工补偿
*/
protected Integer retryTimes = 0;
public MessageLog(T message, SendResult sendResult) {
this.messageKey = message.getKey();
this.message = JSON.toJSONString(message);
if (Objects.isNull(sendResult)){
this.status = EnumMessageStatus.WALT_SEND;
}else {
if (SendStatus.SEND_OK == sendResult.getSendStatus()){
this.status = EnumMessageStatus.SEND_OK;
}else {
this.status = EnumMessageStatus.SEND_FAILS;
}
}
}
}

37
bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/RepeatElement.java

@ -0,0 +1,37 @@
package com.bnyer.common.rocketmq.domain;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;
/**
* @author :WXC
* @Date :2023/05/20
* @description : 重复消息元素
*/
@AllArgsConstructor
@Getter
@ToString
public class RepeatElement {
/**
* 消费的应用
*/
private String applicationName;
/**
* 唯一key
*/
private String messageKey;
/**
* 消费组
*/
private String consumerGroupName;
/**
* 主题
*/
private String topic;
/**
* 标签
*/
private String tag;
}

6
bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/img/AddUserVipRecordMessage.java

@ -1,14 +1,10 @@
package com.bnyer.common.rocketmq.domain.img; package com.bnyer.common.rocketmq.domain.img;
import com.bnyer.common.rocketmq.domain.BaseMessage;
import com.fasterxml.jackson.annotation.JsonFormat;
import io.swagger.annotations.ApiModelProperty; import io.swagger.annotations.ApiModelProperty;
import lombok.Getter; import lombok.Getter;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import lombok.Setter; import lombok.Setter;
import java.util.Date;
/** /**
* @author :WXC * @author :WXC
* @Date :2023/05/17 * @Date :2023/05/17
@ -17,7 +13,7 @@ import java.util.Date;
@Getter @Getter
@Setter @Setter
@NoArgsConstructor @NoArgsConstructor
public class AddUserVipRecordMessage extends BaseMessage { public class AddUserVipRecordMessage {
@ApiModelProperty(value="用户id") @ApiModelProperty(value="用户id")
private Long userId; private Long userId;

39
bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/order/OrderMqLocalRecordMessage.java

@ -0,0 +1,39 @@
package com.bnyer.common.rocketmq.domain.order;
import com.bnyer.common.core.enums.EnumMessageStatus;
import com.bnyer.common.rocketmq.domain.BaseMessage;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import java.util.Date;
/**
* @author :WXC
* @Date :2023/05/18
* @description :订单服务本地消息表记录
*/
@Getter
@Setter
@NoArgsConstructor
public class OrderMqLocalRecordMessage extends BaseMessage {
private Long id;
/**
* 消息状态
*/
private EnumMessageStatus status;
/**
* 消息内容
*/
private String content;
/**
* 创建时间
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date createTime;
}

28
bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/enums/EnumMessageStatus.java

@ -1,28 +0,0 @@
package com.bnyer.common.rocketmq.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* @author :WXC
* @Date :2023/05/18
* @description :消息状态
*/
@Getter
@AllArgsConstructor
public enum EnumMessageStatus {
//待发送
WALT_SEND,
//发送成功
SEND_OK,
//发送失败
SEND_FAILS,
//待消费
WALT_CONSUMER,
//消费中
CONSUMER_PROCESS,
//消费成功
CONSUMER_OK,
//消费失败
CONSUMER_FAILS
}

131
bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java

@ -1,25 +1,25 @@
package com.bnyer.common.rocketmq.handle; package com.bnyer.common.rocketmq.handle;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference; import com.bnyer.common.rocketmq.config.RepeatConsumerConfig;
import com.bnyer.common.redis.service.RedisService;
import com.bnyer.common.redis.service.RedissonService;
import com.bnyer.common.rocketmq.constant.RocketMqConstant; import com.bnyer.common.rocketmq.constant.RocketMqConstant;
import com.bnyer.common.rocketmq.domain.BaseMessage; import com.bnyer.common.rocketmq.domain.BaseMessage;
import com.bnyer.common.rocketmq.domain.MessageLog; import com.bnyer.common.rocketmq.domain.RepeatElement;
import com.bnyer.common.rocketmq.enums.EnumMessageStatus; import com.bnyer.common.rocketmq.persist.IPersist;
import com.bnyer.common.rocketmq.strategy.RepeatConsumerStrategy;
import com.bnyer.common.rocketmq.strategy.NormalRepeatStrategy;
import com.bnyer.common.rocketmq.strategy.RedisRepeatStrategy;
import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate; import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.redisson.api.RLock; import org.springframework.core.env.Environment;
import org.springframework.data.redis.core.StringRedisTemplate;
import javax.annotation.PostConstruct;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.time.LocalDateTime; import java.util.function.Function;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/** /**
* @author :WXC * @author :WXC
@ -39,13 +39,21 @@ public abstract class EnhanceMessageHandler<T extends BaseMessage> {
private static final int DELAY_LEVEL = RocketMqConstant.FIVE_SECOND; private static final int DELAY_LEVEL = RocketMqConstant.FIVE_SECOND;
@Resource @Resource
private RocketMQEnhanceTemplate rocketMQEnhanceTemplate; private Environment env ;
@Resource
private RepeatConsumerConfig repeatConsumerConfig;
@Resource @Resource
private RedisService redisService; private RocketMQEnhanceTemplate rocketMQEnhanceTemplate;
@Resource @Resource
private RedissonService redissonService; private StringRedisTemplate stringRedisTemplate;
@PostConstruct
public void init(){
this.repeatConsumerConfig = RepeatConsumerConfig.enableRedisConfig(env.getProperty("spring.application.name"),stringRedisTemplate);
}
/** /**
* 消息处理 * 消息处理
@ -101,6 +109,7 @@ public abstract class EnhanceMessageHandler<T extends BaseMessage> {
return DELAY_LEVEL; return DELAY_LEVEL;
} }
/** /**
* 使用模板模式构建消息消费框架可自由扩展或删减 * 使用模板模式构建消息消费框架可自由扩展或删减
*/ */
@ -108,7 +117,7 @@ public abstract class EnhanceMessageHandler<T extends BaseMessage> {
// 基础日志记录被父类处理了 // 基础日志记录被父类处理了
log.info("消费者收到消息[{}]", JSONObject.toJSON(message)); log.info("消费者收到消息[{}]", JSONObject.toJSON(message));
if (filter(message)) { if (filter(message)) {
log.info("消息id{}不满足消费条件,已过滤。",message.getKey()); log.info("消息id{}不满足消费条件,已过滤。",message.getMessageKey());
return; return;
} }
//超过最大重试次数时调用子类方法处理 //超过最大重试次数时调用子类方法处理
@ -116,13 +125,22 @@ public abstract class EnhanceMessageHandler<T extends BaseMessage> {
handleMaxRetriesExceeded(message); handleMaxRetriesExceeded(message);
return; return;
} }
IPersist persist = repeatConsumerConfig.getPersist();
RepeatElement repeatElement = new RepeatElement(repeatConsumerConfig.getApplicationName(), message.getConsumerGroupName(),message.getTopic()
, message.getTag()==null ? "" : message.getTag()
, this.deDupMessageKey(message));
//消费消息,末尾消费失败会删除消费记录,消费成功则更新消费状态
try { try {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
handleMessage(message); handleMessage(message);
long costTime = System.currentTimeMillis() - now; long costTime = System.currentTimeMillis() - now;
log.info("消息{}消费成功,耗时[{}ms]", message.getKey(),costTime); log.info("消息{}消费成功,耗时[{}ms],修改缓存状态为已消费:{}", message.getMessageKey(),costTime, repeatElement);
persist.markConsumed(repeatElement, repeatConsumerConfig.getRecordReserveMinutes());
} catch (Exception e) { } catch (Exception e) {
log.error("消息{}消费异常", message.getKey(),e); log.error("消息{}消费异常", message.getMessageKey(),e);
log.info("消费失败,删除缓存中的记录 {} , {}", repeatElement, persist);
//消费失败,删除这个key
persist.delete(repeatElement);
// 是捕获异常还是抛出,由子类决定 // 是捕获异常还是抛出,由子类决定
if (throwException()) { if (throwException()) {
//抛出异常,由DefaultMessageListenerConcurrently类处理 //抛出异常,由DefaultMessageListenerConcurrently类处理
@ -135,6 +153,10 @@ public abstract class EnhanceMessageHandler<T extends BaseMessage> {
} }
} }
/**
* 消息重试
* @param message
*/
protected void handleRetry(T message) { protected void handleRetry(T message) {
// 获取子类RocketMQMessageListener注解拿到topic和tag // 获取子类RocketMQMessageListener注解拿到topic和tag
RocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class); RocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class);
@ -166,80 +188,25 @@ public abstract class EnhanceMessageHandler<T extends BaseMessage> {
} }
/** /**
* 模板方法过滤不符合条件的消息 * 消息去重
* @param message * @param message
* @return * @return
*/ */
protected boolean handleFilter(T message){ protected Boolean handleMsgRepeat(final T message) {
return false; RepeatConsumerStrategy strategy = new NormalRepeatStrategy();
// RLock rLock = redissonService.getRLock(message.getKey()); Function<BaseMessage, String> repeatKeyFunction = baseMessage -> deDupMessageKey(message);
// try { if (repeatConsumerConfig.getRepeatStrategy() == RepeatConsumerConfig.REPEAT_STRATEGY_CONSUME_LATER) {
// //如果redis存在表示之前消费过,直接过滤 strategy = new RedisRepeatStrategy(repeatConsumerConfig, repeatKeyFunction);
// boolean tryLock = rLock.tryLock( 5,0, TimeUnit.MINUTES);
// if (!tryLock){
// return true;
// }
// }catch (Exception e){
// //需要重试消费
// log.error("消息过滤执行异常,error:{}",e.getMessage());
// throw new RuntimeException("消息过滤异常");
// }finally {
// rLock.unlock();
// }
// return true;
}
/**
* 模板方法保存消息日志
* @param message
* @param status
*/
protected void handleSaveMessageLog(T message,EnumMessageStatus status){
//超过最大重试次数以后,判定消费失败,记录日志,后续定时补偿
MessageLog<BaseMessage> messageLog = getMessageLog(message);
if (Objects.isNull(messageLog)){
messageLog = buildMessageLog(message);
} }
if (EnumMessageStatus.CONSUMER_OK == status){ //调用对应的去重策略
messageLog.setConsumerTime(LocalDateTime.now()); return strategy.invoke(message);
}
messageLog.setStatus(status);
saveMessageLog(messageLog);
} }
/** /**
* 获取消息日志 * 默认拿消息key 作为去重的标识子类可复写该方法自定义去重标识
* @param message
*/ */
protected MessageLog<BaseMessage> getMessageLog(T message){ protected <T extends BaseMessage> String deDupMessageKey(T message) {
Object cacheObject = redisService.getCacheObject(message.getKey()); return message.getMessageKey();
if (Objects.nonNull(cacheObject)){
MessageLog<BaseMessage> messageLog = JSON.parseObject(cacheObject.toString(),new TypeReference<MessageLog<BaseMessage>>(){});
return messageLog;
}
return null;
}
/**
* 构建消息日志
* @param message
* @return
*/
protected MessageLog<BaseMessage> buildMessageLog(T message){
RocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class);
return MessageLog.builder().messageKey(message.getKey()).message(JSON.toJSONString(message))
.tag(annotation.selectorExpression())
.topic(annotation.topic())
.consumerGroupName(annotation.consumerGroup()).build();
}
/**
* 保存消息日志
* @param messageLog
* @param <T>
*/
protected <T extends BaseMessage> void saveMessageLog(MessageLog<BaseMessage> messageLog){
} }
} }

49
bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/persist/IPersist.java

@ -0,0 +1,49 @@
package com.bnyer.common.rocketmq.persist;
import com.bnyer.common.rocketmq.domain.BaseMessage;
import com.bnyer.common.rocketmq.domain.RepeatElement;
/**
* @author :WXC
* @Date :2023/05/20
* @description : 持久层处理重复消息目前支持redis可扩展mysql
*/
public interface IPersist {
/**
* 设置正在消费的消息到存储中
* @param repeatElement
* @param processingExpireMilliSeconds
* @return
* @param <T>
*/
<T extends BaseMessage> boolean setConsumingIfNX(RepeatElement repeatElement, long processingExpireMilliSeconds);
/**
* 删除存储中的消息
* @param repeatElement
* @param <T>
*/
<T extends BaseMessage> void delete(RepeatElement repeatElement);
/**
* 修改存储中的消息
* @param repeatElement
* @param recordReserveMinutes
* @param <T>
*/
<T extends BaseMessage> void markConsumed(RepeatElement repeatElement, long recordReserveMinutes);
/**
* 获取存储中的消息
* @param repeatElement
* @return
* @param <T>
*/
<T extends BaseMessage> String get(RepeatElement repeatElement);
default <T extends BaseMessage> String toPrintInfo(RepeatElement repeatElement) {
return repeatElement.toString();
}
}

81
bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/persist/RedisPersist.java

@ -0,0 +1,81 @@
package com.bnyer.common.rocketmq.persist;
import com.bnyer.common.rocketmq.domain.BaseMessage;
import com.bnyer.common.rocketmq.domain.RepeatElement;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.connection.RedisStringCommands;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.types.Expiration;
import java.util.concurrent.TimeUnit;
import static com.bnyer.common.rocketmq.constant.RocketMqRepeatConstant.*;
/**
* @author :WXC
* @Date :2023/05/20
* @description : redis持久层
*/
public class RedisPersist implements IPersist {
private final StringRedisTemplate redisTemplate;
public RedisPersist(StringRedisTemplate redisTemplate) {
if (redisTemplate == null) {
throw new NullPointerException("redis template is null");
}
this.redisTemplate = redisTemplate;
}
@Override
public <T extends BaseMessage> boolean setConsumingIfNX(RepeatElement repeatElement, long dedupProcessingExpireMilliSeconds) {
String repeatKey = buildRepeatMessageRedisKey(repeatElement.getApplicationName(), repeatElement.getTopic(), repeatElement.getTag(), repeatElement.getMessageKey());
//setnx, 成功就可以消费
Boolean execute = redisTemplate.execute((RedisCallback<Boolean>) redisConnection -> redisConnection.set(repeatKey.getBytes(),
(CONSUME_STATUS_CONSUMING).getBytes(), Expiration.milliseconds(dedupProcessingExpireMilliSeconds), RedisStringCommands.SetOption.SET_IF_ABSENT));
if (execute == null) {
return false;
}
return execute;
}
@Override
public <T extends BaseMessage> void delete(RepeatElement repeatElement) {
String repeatKey = buildRepeatMessageRedisKey(repeatElement.getApplicationName(), repeatElement.getTopic(), repeatElement.getTag(), repeatElement.getMessageKey());
redisTemplate.delete(repeatKey);
}
@Override
public <T extends BaseMessage> void markConsumed(RepeatElement repeatElement, long dedupRecordReserveMinutes) {
String repeatKey = buildRepeatMessageRedisKey(repeatElement.getApplicationName(), repeatElement.getTopic(), repeatElement.getTag(), repeatElement.getMessageKey());
redisTemplate.opsForValue().set(repeatKey, CONSUME_STATUS_SUCCESS, dedupRecordReserveMinutes, TimeUnit.MINUTES);
}
@Override
public <T extends BaseMessage> String get(RepeatElement repeatElement) {
String repeatKey = buildRepeatMessageRedisKey(repeatElement.getApplicationName(), repeatElement.getTopic(), repeatElement.getTag(), repeatElement.getMessageKey());
return redisTemplate.opsForValue().get(repeatKey);
}
@Override
public <T extends BaseMessage> String toPrintInfo(RepeatElement repeatElement) {
return buildRepeatMessageRedisKey(repeatElement.getApplicationName(), repeatElement.getTopic(), repeatElement.getTag(), repeatElement.getMessageKey());
}
/**
* 构建重复消息rediskey
* @param applicationName
* @param topic
* @param tag
* @param messageKey
* @return
*/
private String buildRepeatMessageRedisKey(String applicationName, String topic, String tag, String messageKey) {
String prefix = REPEAT_REDIS_KEY_PREFIX + applicationName + ":" + topic + (StringUtils.isNotEmpty(tag) ? ":"+ tag :"");
return prefix + ":" + messageKey;
}
}

19
bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/strategy/NormalRepeatStrategy.java

@ -0,0 +1,19 @@
package com.bnyer.common.rocketmq.strategy;
import com.bnyer.common.rocketmq.domain.BaseMessage;
import lombok.extern.slf4j.Slf4j;
/**
* @author :WXC
* @Date :2023/05/20
* @description :
*/
@Slf4j
public class NormalRepeatStrategy implements RepeatConsumerStrategy {
@Override
public <T extends BaseMessage> boolean invoke(T message) {
return false;
}
}

57
bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/strategy/RedisRepeatStrategy.java

@ -0,0 +1,57 @@
package com.bnyer.common.rocketmq.strategy;
import com.bnyer.common.rocketmq.config.RepeatConsumerConfig;
import com.bnyer.common.rocketmq.domain.BaseMessage;
import com.bnyer.common.rocketmq.domain.RepeatElement;
import com.bnyer.common.rocketmq.persist.IPersist;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.util.function.Function;
/**
* @author :WXC
* @Date :2023/05/20
* @description :如果已经消费过则直接消费幂等掉
*/
@Slf4j
public class RedisRepeatStrategy implements RepeatConsumerStrategy {
private final RepeatConsumerConfig filterConfig;
/**
* 获取去重键的函数
*/
private final Function<BaseMessage, String> repeatMessageKeyFunction;
public RedisRepeatStrategy(RepeatConsumerConfig dedupConfig, Function<BaseMessage, String> dedupMessageKeyFunction) {
this.filterConfig = dedupConfig;
this.repeatMessageKeyFunction = dedupMessageKeyFunction;
}
@Override
public <T extends BaseMessage> boolean invoke(T message) {
return doInvoke(message);
}
private <T extends BaseMessage> boolean doInvoke(T message) {
IPersist persist = filterConfig.getPersist();
RepeatElement repeatElement = new RepeatElement(filterConfig.getApplicationName(), message.getConsumerGroupName(),message.getTopic()
, message.getTag()==null ? "" : message.getTag()
, repeatMessageKeyFunction.apply(message));
boolean shouldConsume = true;
if (StringUtils.isNotBlank(repeatElement.getMessageKey())) {
shouldConsume = persist.setConsumingIfNX(repeatElement, filterConfig.getProcessingExpireMilliSeconds());
}
//设置成功,证明没有消费过
if (shouldConsume) {
return false;
} else {
return true;
}
}
}

16
bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/strategy/RepeatConsumerStrategy.java

@ -0,0 +1,16 @@
package com.bnyer.common.rocketmq.strategy;
import com.bnyer.common.rocketmq.domain.BaseMessage;
/**
* @author :WXC
* @Date :2023/05/20
* @description : 重复消费策略目前支持redis可扩展mysql方式
*/
public interface RepeatConsumerStrategy {
<T extends BaseMessage> boolean invoke(T message);
}

51
bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/template/RocketMQEnhanceTemplate.java

@ -2,11 +2,8 @@ package com.bnyer.common.rocketmq.template;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.bnyer.common.redis.service.RedisService;
import com.bnyer.common.rocketmq.config.RocketEnhanceProperties; import com.bnyer.common.rocketmq.config.RocketEnhanceProperties;
import com.bnyer.common.rocketmq.domain.BaseMessage; import com.bnyer.common.rocketmq.domain.BaseMessage;
import com.bnyer.common.rocketmq.domain.MessageLog;
import com.bnyer.common.rocketmq.enums.EnumMessageStatus;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -30,8 +27,6 @@ public class RocketMQEnhanceTemplate {
private final RocketMQTemplate template; private final RocketMQTemplate template;
private final RedisService redisService;
@Resource @Resource
private RocketEnhanceProperties rocketEnhanceProperties; private RocketEnhanceProperties rocketEnhanceProperties;
@ -70,14 +65,11 @@ public class RocketMQEnhanceTemplate {
public <T extends BaseMessage> SendResult send(String topic, String tag, T message) { public <T extends BaseMessage> SendResult send(String topic, String tag, T message) {
// 设置业务键,此处根据公共的参数进行处理 // 设置业务键,此处根据公共的参数进行处理
// 更多的其它基础业务处理... // 更多的其它基础业务处理...
MessageLog<T> messageLog = saveMessageLog(message, null); Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getMessageKey()).build();
Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
SendResult sendResult; SendResult sendResult;
try { try {
sendResult = template.syncSend(buildDestination(topic,tag), sendMessage); sendResult = template.syncSend(buildDestination(topic,tag), sendMessage);
delMessageLog(messageLog);
} catch (Exception e) { } catch (Exception e) {
editMessageLog(messageLog);
throw new RuntimeException(e); throw new RuntimeException(e);
} }
// 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如ELK采集 // 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如ELK采集
@ -95,14 +87,11 @@ public class RocketMQEnhanceTemplate {
* @param <T> * @param <T>
*/ */
public <T extends BaseMessage> SendResult send(String topic, String tag, T message, int delayLevel) { public <T extends BaseMessage> SendResult send(String topic, String tag, T message, int delayLevel) {
MessageLog<T> messageLog = saveMessageLog(message, null); Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getMessageKey()).build();
Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
SendResult sendResult; SendResult sendResult;
try { try {
sendResult = template.syncSend(buildDestination(topic,tag), sendMessage, 10000, delayLevel); sendResult = template.syncSend(buildDestination(topic,tag), sendMessage, 10000, delayLevel);
delMessageLog(messageLog);
} catch (Exception e) { } catch (Exception e) {
editMessageLog(messageLog);
throw new RuntimeException(e); throw new RuntimeException(e);
} }
log.info("topic:[{}]tag:[{}]延迟等级[{}]消息[{}]发送结果[{}]", topic,tag, delayLevel, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); log.info("topic:[{}]tag:[{}]延迟等级[{}]消息[{}]发送结果[{}]", topic,tag, delayLevel, JSONObject.toJSON(message), JSONObject.toJSON(sendResult));
@ -114,19 +103,16 @@ public class RocketMQEnhanceTemplate {
* 适合对响应时间敏感的业务场景 * 适合对响应时间敏感的业务场景
*/ */
public <T extends BaseMessage> void sendAsyncMsg(String topic, String tag, T message) { public <T extends BaseMessage> void sendAsyncMsg(String topic, String tag, T message) {
MessageLog<T> messageLog = saveMessageLog(message, null);
template.asyncSend(buildDestination(topic,tag), MessageBuilder.withPayload(message).build(), new SendCallback() { template.asyncSend(buildDestination(topic,tag), MessageBuilder.withPayload(message).build(), new SendCallback() {
@Override @Override
public void onSuccess(SendResult sendResult) { public void onSuccess(SendResult sendResult) {
// 处理消息发送成功逻辑 // 处理消息发送成功逻辑
log.info("消息发送成功,topic:{},tag:{},result:{}",topic,tag, JSON.toJSONString(sendResult)); log.info("消息发送成功,topic:{},tag:{},result:{}",topic,tag, JSON.toJSONString(sendResult));
delMessageLog(messageLog);
} }
@Override @Override
public void onException(Throwable throwable) { public void onException(Throwable throwable) {
// 处理消息发送失败逻辑 // 处理消息发送失败逻辑
log.error("消息发送失败,topic:{},tag:{},error:{}",topic,tag,throwable.getMessage()); log.error("消息发送失败,topic:{},tag:{},error:{}",topic,tag,throwable.getMessage());
editMessageLog(messageLog);
} }
}); });
} }
@ -136,18 +122,15 @@ public class RocketMQEnhanceTemplate {
* 适合对响应时间敏感的业务场景 * 适合对响应时间敏感的业务场景
*/ */
public <T extends BaseMessage> void sendAsyncMsg(String topic, String tag, T message, int delayLevel) { public <T extends BaseMessage> void sendAsyncMsg(String topic, String tag, T message, int delayLevel) {
MessageLog<T> messageLog = saveMessageLog(message, null);
template.asyncSend(buildDestination(topic,tag), MessageBuilder.withPayload(message).build(), new SendCallback() { template.asyncSend(buildDestination(topic,tag), MessageBuilder.withPayload(message).build(), new SendCallback() {
@Override @Override
public void onSuccess(SendResult sendResult) { public void onSuccess(SendResult sendResult) {
// 处理消息发送成功逻辑 // 处理消息发送成功逻辑
log.info("消息发送成功,topic:{},tag:{},result:{}",topic,tag,JSON.toJSONString(sendResult)); log.info("消息发送成功,topic:{},tag:{},result:{}",topic,tag,JSON.toJSONString(sendResult));
delMessageLog(messageLog);
} }
@Override @Override
public void onException(Throwable throwable) { public void onException(Throwable throwable) {
log.error("消息发送失败,topic:{},tag:{},error:{}",topic,tag,throwable.getMessage()); log.error("消息发送失败,topic:{},tag:{},error:{}",topic,tag,throwable.getMessage());
editMessageLog(messageLog);
} }
},10000,delayLevel); },10000,delayLevel);
} }
@ -167,36 +150,6 @@ public class RocketMQEnhanceTemplate {
return sendResult.getSendStatus() == SendStatus.SEND_OK; return sendResult.getSendStatus() == SendStatus.SEND_OK;
} }
/**
* 添加消息日志
* @param message
* @param sendResult
* @param <T>
*/
private <T extends BaseMessage> MessageLog<T> saveMessageLog(T message,SendResult sendResult){
log.info("消息发送前,入库消息日志表,如果发送失败,可进行消息补偿");
MessageLog<T> messageLog = new MessageLog<>(message, sendResult);
//保存
return messageLog;
}
/**
* 修改消息日志
* @param messageLog
* @param <T>
*/
private <T extends BaseMessage> void editMessageLog(MessageLog<T> messageLog){
log.info("消息发送失败,修改消息日志状态,可进行消息补偿");
//修改
messageLog.setStatus(EnumMessageStatus.SEND_FAILS);
}
/**
* 删除消息日志
* @param messageLog
*/
private <T extends BaseMessage> void delMessageLog(MessageLog<T> messageLog){
log.info("消息发送成功,删除消息日志");
}
} }

2
bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/GoldRewardConsumer.java

@ -137,7 +137,7 @@ public class GoldRewardConsumer extends EnhanceMessageHandler<GoldRewardMessage>
@Override @Override
protected boolean filter(GoldRewardMessage message) { protected boolean filter(GoldRewardMessage message) {
return super.handleFilter(message); return super.handleMsgRepeat(message);
} }

39
bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/VipRecordCreateConsumer.java

@ -1,10 +1,14 @@
package com.bnyer.img.listener; package com.bnyer.img.listener;
import com.alibaba.fastjson.JSON;
import com.bnyer.common.core.dto.AddUserVipRecordDto; import com.bnyer.common.core.dto.AddUserVipRecordDto;
import com.bnyer.common.core.enums.EnumMessageStatus;
import com.bnyer.common.core.utils.bean.EntityConvertUtil; import com.bnyer.common.core.utils.bean.EntityConvertUtil;
import com.bnyer.common.rocketmq.constant.RocketMqTopic; import com.bnyer.common.rocketmq.constant.RocketMqTopic;
import com.bnyer.common.rocketmq.domain.img.AddUserVipRecordMessage; import com.bnyer.common.rocketmq.domain.img.AddUserVipRecordMessage;
import com.bnyer.common.rocketmq.domain.order.OrderMqLocalRecordMessage;
import com.bnyer.common.rocketmq.handle.EnhanceMessageHandler; import com.bnyer.common.rocketmq.handle.EnhanceMessageHandler;
import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate;
import com.bnyer.img.service.UserVipRecordService; import com.bnyer.img.service.UserVipRecordService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
@ -12,40 +16,55 @@ import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/** /**
* @author :WXC * @author :WXC
* @Date :2023/03/24 * @Date :2023/03/24
* @description :取消订单mq消费监听 * @description :保存会员记录mq消费监听
*/ */
@Slf4j @Slf4j
@Component @Component
@RocketMQMessageListener(topic = RocketMqTopic.VIP_RECORD_CREATE_TOPIC,consumerGroup = RocketMqTopic.VIP_RECORD_CREATE_TOPIC) @RocketMQMessageListener(topic = RocketMqTopic.VIP_RECORD_CREATE_TOPIC,consumerGroup = RocketMqTopic.VIP_RECORD_CREATE_TOPIC)
public class VipRecordCreateConsumer extends EnhanceMessageHandler<AddUserVipRecordMessage> implements RocketMQListener<AddUserVipRecordMessage> { public class VipRecordCreateConsumer extends EnhanceMessageHandler<OrderMqLocalRecordMessage> implements RocketMQListener<OrderMqLocalRecordMessage> {
@Resource
private RocketMQEnhanceTemplate rocketMQEnhanceTemplate;
@Autowired @Autowired
private UserVipRecordService userVipRecordService; private UserVipRecordService userVipRecordService;
@Override @Override
public void onMessage(AddUserVipRecordMessage message) { public void onMessage(OrderMqLocalRecordMessage message) {
super.dispatchMessage(message); super.dispatchMessage(message);
AddUserVipRecordDto addUserVipRecordDto = EntityConvertUtil.copy(message, AddUserVipRecordDto.class); String content = message.getContent();
AddUserVipRecordMessage addUserVipRecordMessage = JSON.parseObject(content, AddUserVipRecordMessage.class);
AddUserVipRecordDto addUserVipRecordDto = EntityConvertUtil.copy(addUserVipRecordMessage, AddUserVipRecordDto.class);
//保存用户会员记录 //保存用户会员记录
userVipRecordService.saveUserVipRecord(addUserVipRecordDto); userVipRecordService.saveUserVipRecord(addUserVipRecordDto);
} }
@Override @Override
protected void handleMessage(AddUserVipRecordMessage message) throws Exception { protected void handleMessage(OrderMqLocalRecordMessage message) throws Exception {
//发送返回队列,告知已经处理成功,完成最终一致性
//TODO: 2023/05/20 为避免这里消息通知失败,定时补偿的时候需要根据创建时间,判断如果超过固定时间比如30分钟,就直接设置成作废,避免
// 定时任务一直补偿,因为这里不做补偿机制,是不会重发消息的
message.setStatus(EnumMessageStatus.SUCCESS);
rocketMQEnhanceTemplate.sendAsyncMsg(RocketMqTopic.ORDER_RETURN_MSG_TOPIC,null,message);
} }
@Override @Override
protected void handleMaxRetriesExceeded(AddUserVipRecordMessage message) { protected void handleMaxRetriesExceeded(OrderMqLocalRecordMessage message) {
//发送返回队列,告知已经超过最大重试次数,消费失败,需人工处理该记录
//TODO: 2023/05/20 为避免这里消息通知失败,定时补偿的时候需要根据创建时间,判断如果超过固定时间比如30分钟,就直接设置成作废,避免
// 定时任务一直补偿,因为这里不做补偿机制,是不会重发消息的
message.setStatus(EnumMessageStatus.FAILS);
rocketMQEnhanceTemplate.sendAsyncMsg(RocketMqTopic.ORDER_RETURN_MSG_TOPIC,null,message);
} }
@Override @Override
protected boolean filter(AddUserVipRecordMessage message) { protected boolean filter(OrderMqLocalRecordMessage message) {
return super.handleFilter(message); return super.handleMsgRepeat(message);
} }
@Override @Override

2
bnyer-services/bnyer-img/src/main/java/com/bnyer/img/service/impl/CreatorProfitServiceImpl.java

@ -150,7 +150,7 @@ public class CreatorProfitServiceImpl implements CreatorProfitService {
*/ */
private GoldRewardMessage buildGoldRewardMsg(Long userId, int goldNum, String goldCode, String platform, Integer userClientType) { private GoldRewardMessage buildGoldRewardMsg(Long userId, int goldNum, String goldCode, String platform, Integer userClientType) {
GoldRewardMessage message = new GoldRewardMessage(); GoldRewardMessage message = new GoldRewardMessage();
message.setKey(IdUtils.randomUUID()); message.setMessageKey(IdUtils.randomUUID());
message.setSource(ServiceNameConstants.IMG_SERVICE); message.setSource(ServiceNameConstants.IMG_SERVICE);
message.setUserId(userId); message.setUserId(userId);
message.setGoldNum(goldNum); message.setGoldNum(goldNum);

61
bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/OrderReturnMessageConsumer.java

@ -0,0 +1,61 @@
package com.bnyer.order.listener;
import com.bnyer.common.core.domain.OrderMqMessageRecord;
import com.bnyer.common.core.utils.bean.EntityConvertUtil;
import com.bnyer.common.rocketmq.constant.RocketMqTopic;
import com.bnyer.common.rocketmq.domain.order.OrderMqLocalRecordMessage;
import com.bnyer.common.rocketmq.handle.EnhanceMessageHandler;
import com.bnyer.order.service.OrderMqMessageRecordService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @author :WXC
* @Date :2023/05/20
* @description :
*/
@Slf4j
@Component
@RocketMQMessageListener(topic = RocketMqTopic.ORDER_RETURN_MSG_TOPIC,consumerGroup = RocketMqTopic.ORDER_RETURN_MSG_TOPIC)
public class OrderReturnMessageConsumer extends EnhanceMessageHandler<OrderMqLocalRecordMessage> implements RocketMQListener<OrderMqLocalRecordMessage> {
@Resource
private OrderMqMessageRecordService orderMqMessageRecordService;
@Override
public void onMessage(OrderMqLocalRecordMessage message) {
super.dispatchMessage(message);
OrderMqMessageRecord orderMqMessageRecord = EntityConvertUtil.copy(message, OrderMqMessageRecord.class);
orderMqMessageRecordService.editMessageRecord(orderMqMessageRecord);
}
@Override
protected void handleMessage(OrderMqLocalRecordMessage message) throws Exception {
}
@Override
protected void handleMaxRetriesExceeded(OrderMqLocalRecordMessage message) {
}
@Override
protected boolean filter(OrderMqLocalRecordMessage message) {
return super.handleMsgRepeat(message);
}
@Override
protected boolean isRetry() {
return true;
}
@Override
protected boolean throwException() {
return false;
}
}

4
bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderCancelConsumer.java

@ -20,7 +20,7 @@ import org.springframework.stereotype.Component;
*/ */
@Slf4j @Slf4j
@Component @Component
@RocketMQMessageListener(topic = RocketMqTopic.ORDER_CANCEL_TOPIC,consumerGroup = RocketMqTag.ORDER_VIP_TAG) @RocketMQMessageListener(topic = RocketMqTopic.ORDER_CANCEL_TOPIC,selectorExpression = RocketMqTag.ORDER_VIP_TAG,consumerGroup = RocketMqTopic.ORDER_CANCEL_TOPIC)
public class VipOrderCancelConsumer extends EnhanceMessageHandler<VipOrderCancelMessage> implements RocketMQListener<VipOrderCancelMessage> { public class VipOrderCancelConsumer extends EnhanceMessageHandler<VipOrderCancelMessage> implements RocketMQListener<VipOrderCancelMessage> {
@Autowired @Autowired
@ -48,7 +48,7 @@ public class VipOrderCancelConsumer extends EnhanceMessageHandler<VipOrderCancel
@Override @Override
protected boolean filter(VipOrderCancelMessage message) { protected boolean filter(VipOrderCancelMessage message) {
return super.handleFilter(message); return super.handleMsgRepeat(message);
} }

2
bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderPayNotifyConsumer.java

@ -59,7 +59,7 @@ public class VipOrderPayNotifyConsumer extends EnhanceMessageHandler<VipOrderPay
@Override @Override
protected boolean filter(VipOrderPayNotifyMessage message) { protected boolean filter(VipOrderPayNotifyMessage message) {
return super.handleFilter(message); return super.handleMsgRepeat(message);
} }
@Override @Override

14
bnyer-services/bnyer-order/src/main/java/com/bnyer/order/mapper/OrderMqMessageRecordMapper.java

@ -0,0 +1,14 @@
package com.bnyer.order.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.bnyer.common.core.domain.OrderMqMessageRecord;
import org.apache.ibatis.annotations.Mapper;
/**
* @author :WXC
* @description :
*/
@Mapper
public interface OrderMqMessageRecordMapper extends BaseMapper<OrderMqMessageRecord> {
}

48
bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/OrderMqMessageRecordService.java

@ -0,0 +1,48 @@
package com.bnyer.order.service;
import com.bnyer.common.core.domain.OrderMqMessageRecord;
import com.bnyer.common.rocketmq.domain.BaseMessage;
/**
* @author :WXC
* @Date :2023/05/20
* @description :订单服务本地消息service层
*/
public interface OrderMqMessageRecordService {
/**
* 发送异步消息
* @param topic
* @param tag
* @param addUserVipRecordMessage
* @param <T>
*/
<T> void sendAsyncMsg(String topic, String tag, T addUserVipRecordMessage);
/**
* 发送异步延时消息
* @param topic
* @param tag
* @param message
* @param delayLevel
* @param <T>
*/
<T> void sendAsyncMsg(String topic, String tag, T message, int delayLevel);
/**
* 保存消息记录
* @param topic
* @param tag
* @param message
* @return
* @param <T>
*/
<T> OrderMqMessageRecord saveMessageRecord(String topic, String tag, T message);
/**
* 修改消息记录
* @param orderMqMessageRecord
*/
void editMessageRecord(OrderMqMessageRecord orderMqMessageRecord);
}

132
bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/OrderMqMessageRecordServiceImpl.java

@ -0,0 +1,132 @@
package com.bnyer.order.service.impl;
import com.alibaba.fastjson.JSON;
import com.bnyer.common.core.constant.ServiceNameConstants;
import com.bnyer.common.core.domain.OrderMqMessageRecord;
import com.bnyer.common.core.enums.EnumMessageStatus;
import com.bnyer.common.core.utils.bean.EntityConvertUtil;
import com.bnyer.common.core.utils.uuid.IdUtils;
import com.bnyer.common.rocketmq.domain.order.OrderMqLocalRecordMessage;
import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate;
import com.bnyer.order.mapper.OrderMqMessageRecordMapper;
import com.bnyer.order.service.OrderMqMessageRecordService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.Date;
/**
* @author :WXC
* @Date :2023/05/19
* @description :
*/
@Slf4j
@Service
public class OrderMqMessageRecordServiceImpl implements OrderMqMessageRecordService {
/**
* 发送超时时间
*/
private final static int TIME_OUT = 10000;
@Resource
private RocketMQEnhanceTemplate rocketMQEnhanceTemplate;
@Resource
private OrderMqMessageRecordMapper orderMqMessageRecordMapper;
/**
* 发送异步消息通过线程池执行发送到broker的消息任务执行完后回调在SendCallback中可处理相关成功失败时的逻辑
* 适合对响应时间敏感的业务场景
*/
@Transactional
@Override
public <T> void sendAsyncMsg(String topic, String tag, T message) {
RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate();
//保存消息记录
log.info("消息发送中,开始入库本地消息记录表");
OrderMqMessageRecord orderMqMessageRecord = saveMessageRecord(topic,tag, message);
//发消息
OrderMqLocalRecordMessage mqLocalMessage = EntityConvertUtil.copy(orderMqMessageRecord, OrderMqLocalRecordMessage.class);
mqLocalMessage.setSource(ServiceNameConstants.ORDER_SERVICE);
rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqLocalMessage).build(), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("消息发送成功,topic:{},tag:{},result:{}",topic,tag, JSON.toJSONString(sendResult));
}
@Override
public void onException(Throwable throwable) {
log.error("消息发送失败,topic:{},tag:{},error:{}",topic,tag,throwable.getMessage());
log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿");
orderMqMessageRecord.setErrMsg(throwable.getMessage());
editMessageRecord(orderMqMessageRecord);
}
});
}
/**
* 发送异步延时消息通过线程池执行发送到broker的消息任务执行完后回调在SendCallback中可处理相关成功失败时的逻辑
* 适合对响应时间敏感的业务场景
*/
@Transactional
@Override
public <T> void sendAsyncMsg(String topic, String tag, T message, int delayLevel) {
RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate();
//保存消息记录
log.info("消息发送中,开始入库本地消息记录表");
OrderMqMessageRecord orderMqMessageRecord = saveMessageRecord(topic,tag, message);
//发消息
OrderMqLocalRecordMessage mqLocalMessage = EntityConvertUtil.copy(orderMqMessageRecord, OrderMqLocalRecordMessage.class);
mqLocalMessage.setRetryTimes(0);
mqLocalMessage.setSource(ServiceNameConstants.ORDER_SERVICE);
rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqLocalMessage).build(), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("消息发送成功,topic:{},tag:{},result:{}",topic,tag,JSON.toJSONString(sendResult));
}
@Override
public void onException(Throwable throwable) {
log.error("消息发送失败,topic:{},tag:{},error:{}",topic,tag,throwable.getMessage());
log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿");
orderMqMessageRecord.setErrMsg(throwable.getMessage());
editMessageRecord(orderMqMessageRecord);
}
},TIME_OUT,delayLevel);
}
/**
* 添加消息记录
* @param message
* @param <T>
*/
@Transactional
public <T> OrderMqMessageRecord saveMessageRecord(String topic, String tag, T message){
RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate();
String consumerGroup = rocketMQTemplate.getConsumer().getConsumerGroup();
OrderMqMessageRecord orderMqMessageRecord = new OrderMqMessageRecord();
orderMqMessageRecord.setStatus(EnumMessageStatus.PROCESS);
orderMqMessageRecord.setConsumerGroupName(consumerGroup);
orderMqMessageRecord.setTopic(topic);
orderMqMessageRecord.setTag(tag);
orderMqMessageRecord.setCreateTime(new Date());
orderMqMessageRecord.setMessageKey(IdUtils.randomUUID());
orderMqMessageRecordMapper.insert(orderMqMessageRecord);
return orderMqMessageRecord;
}
/**
* 修改消息记录
* @param orderMqMessageRecord
*/
@Transactional
public void editMessageRecord(OrderMqMessageRecord orderMqMessageRecord){
orderMqMessageRecordMapper.updateById(orderMqMessageRecord);
}
}

29
bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/VipOrderServiceImpl.java

@ -31,6 +31,7 @@ import com.bnyer.order.bean.query.VipOrderExtQuery;
import com.bnyer.order.bean.query.VipOrderQuery; import com.bnyer.order.bean.query.VipOrderQuery;
import com.bnyer.order.bean.vo.VipOrderVo; import com.bnyer.order.bean.vo.VipOrderVo;
import com.bnyer.order.mapper.VipOrderMapper; import com.bnyer.order.mapper.VipOrderMapper;
import com.bnyer.order.service.OrderMqMessageRecordService;
import com.bnyer.order.service.VipOrderService; import com.bnyer.order.service.VipOrderService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -58,6 +59,9 @@ public class VipOrderServiceImpl implements VipOrderService {
@Autowired @Autowired
private RocketMQEnhanceTemplate rocketMQEnhanceTemplate; private RocketMQEnhanceTemplate rocketMQEnhanceTemplate;
@Autowired
private OrderMqMessageRecordService orderMqMessageRecordService;
@Autowired @Autowired
private VipOrderMapper vipOrderMapper; private VipOrderMapper vipOrderMapper;
@ -113,17 +117,9 @@ public class VipOrderServiceImpl implements VipOrderService {
String orderNo = vipOrder.getOrderNo(); String orderNo = vipOrder.getOrderNo();
//发送消息,如果三十分钟后没有支付,则取消订单 //发送消息,如果三十分钟后没有支付,则取消订单
VipOrderCancelMessage vipOrderCancelMessage = new VipOrderCancelMessage(); VipOrderCancelMessage vipOrderCancelMessage = new VipOrderCancelMessage();
vipOrderCancelMessage.setKey(IdUtils.randomUUID());
vipOrderCancelMessage.setSource(ServiceNameConstants.ORDER_SERVICE);
vipOrderCancelMessage.setOrderNo(orderNo); vipOrderCancelMessage.setOrderNo(orderNo);
rocketMQEnhanceTemplate.send(RocketMqTopic.ORDER_CANCEL_TOPIC, RocketMqTag.ORDER_VIP_TAG,vipOrderCancelMessage,RocketMqConstant.THIRTY_MINUTES) // rocketMQEnhanceTemplate.send(RocketMqTopic.ORDER_CANCEL_TOPIC, RocketMqTag.ORDER_VIP_TAG,vipOrderCancelMessage,RocketMqConstant.THIRTY_MINUTES);
// SendStatus sendStatus = orderCancelMqTemplate.syncSend(RocketMqConstant.VIP_ORDER_CANCEL_TOPIC, new GenericMessage<>(orderNo), RocketMqConstant.TIMEOUT, RocketMqConstant.CANCEL_ORDER_DELAY_LEVEL).getSendStatus(); orderMqMessageRecordService.sendAsyncMsg(RocketMqTopic.ORDER_CANCEL_TOPIC, RocketMqTag.ORDER_VIP_TAG,vipOrderCancelMessage,RocketMqConstant.THIRTY_MINUTES);
// if (!Objects.equals(sendStatus,SendStatus.SEND_OK)) {
// // 消息发不出去就抛异常,发的出去无所谓
// throw new ServiceException(ResponseEnum.SERVER_ERROR);
// }else {
// log.info("消息发送成功,topic:{}",RocketMqTopic.VIP_ORDER_CANCEL_TOPIC);
// }
return orderNo; return orderNo;
} }
@ -232,11 +228,14 @@ public class VipOrderServiceImpl implements VipOrderService {
//发消息,添加用户会员记录 //发消息,添加用户会员记录
AddUserVipRecordMessage addUserVipRecordMessage = buildVipRecordMsg(vipOrder); AddUserVipRecordMessage addUserVipRecordMessage = buildVipRecordMsg(vipOrder);
rocketMQEnhanceTemplate.sendAsyncMsg(RocketMqTopic.VIP_RECORD_CREATE_TOPIC,null, addUserVipRecordMessage); // rocketMQEnhanceTemplate.sendAsyncMsg(RocketMqTopic.VIP_RECORD_CREATE_TOPIC,null, addUserVipRecordMessage);
orderMqMessageRecordService.sendAsyncMsg(RocketMqTopic.VIP_RECORD_CREATE_TOPIC,null, addUserVipRecordMessage);
//发送开会员画意值奖励并写入记录消息 //发送开会员画意值奖励并写入记录消息
GoldRewardMessage goldMsg = buildGoldRewardMsg(vipOrder.getUserId(),300, GoldEnum.BUY_VIP.getCode(),null,vipOrder.getUserClientType()); GoldRewardMessage goldMsg = buildGoldRewardMsg(vipOrder.getUserId(),300, GoldEnum.BUY_VIP.getCode(),null,vipOrder.getUserClientType());
rocketMQEnhanceTemplate.sendAsyncMsg(RocketMqTopic.GOLD_REWARD_TOPIC,null,goldMsg); // rocketMQEnhanceTemplate.sendAsyncMsg(RocketMqTopic.GOLD_REWARD_TOPIC,null,goldMsg);
orderMqMessageRecordService.sendAsyncMsg(RocketMqTopic.GOLD_REWARD_TOPIC,null,goldMsg);
} }
@ -247,8 +246,6 @@ public class VipOrderServiceImpl implements VipOrderService {
*/ */
private AddUserVipRecordMessage buildVipRecordMsg(VipOrder vipOrder) { private AddUserVipRecordMessage buildVipRecordMsg(VipOrder vipOrder) {
AddUserVipRecordMessage message = new AddUserVipRecordMessage(); AddUserVipRecordMessage message = new AddUserVipRecordMessage();
message.setKey(IdUtils.randomUUID());
message.setSource(ServiceNameConstants.ORDER_SERVICE);
message.setDays(vipOrder.getDays()); message.setDays(vipOrder.getDays());
message.setVipId(vipOrder.getVipId()); message.setVipId(vipOrder.getVipId());
message.setVipName(vipOrder.getVipName()); message.setVipName(vipOrder.getVipName());
@ -270,8 +267,8 @@ public class VipOrderServiceImpl implements VipOrderService {
*/ */
private GoldRewardMessage buildGoldRewardMsg(Long userId, int goldNum, String goldCode, String platform, Integer userClientType) { private GoldRewardMessage buildGoldRewardMsg(Long userId, int goldNum, String goldCode, String platform, Integer userClientType) {
GoldRewardMessage message = new GoldRewardMessage(); GoldRewardMessage message = new GoldRewardMessage();
message.setKey(IdUtils.randomUUID()); // message.setMessageKey(IdUtils.randomUUID());
message.setSource(ServiceNameConstants.ORDER_SERVICE); // message.setSource(ServiceNameConstants.ORDER_SERVICE);
message.setUserId(userId); message.setUserId(userId);
message.setGoldNum(goldNum); message.setGoldNum(goldNum);
message.setGoldCode(goldCode); message.setGoldCode(goldCode);

29
bnyer-services/bnyer-order/src/main/resources/com/bnyer/order/mapper/OrderMqMessageRecordMapper.xml

@ -0,0 +1,29 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.bnyer.order.mapper.OrderMqMessageRecordMapper">
<resultMap id="BaseResultMap" type="com.bnyer.common.core.domain.OrderMqMessageRecord">
<!--order_mq_message-->
<id column="id" jdbcType="BIGINT" property="id" />
<result column="message_key" jdbcType="VARCHAR" property="messageKey" />
<result column="topic" jdbcType="VARCHAR" property="topic" />
<result column="tag" jdbcType="VARCHAR" property="tag" />
<result column="consumer_group_name" jdbcType="VARCHAR" property="consumerGroupName" />
<result column="status" jdbcType="VARCHAR" property="status" />
<result column="err_msg" jdbcType="VARCHAR" property="errMsg" />
<result column="content" jdbcType="VARCHAR" property="content" />
<result column="create_time" jdbcType="TIMESTAMP" property="createTime" />
</resultMap>
<sql id="Base_Column_List">
t.id,
t.message_key,
t.topic,
t.tag,
t.consumer_group_name,
t.status,
t.err_msg,
t.content,
t.create_time,
</sql>
</mapper>

4
bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/impl/PayInfoServiceImpl.java

@ -81,9 +81,9 @@ public class PayInfoServiceImpl implements PayInfoService {
//会员充值场景 //会员充值场景
case VIP_RECHARGE: case VIP_RECHARGE:
// 发送消息,订单支付成功 // 发送消息,订单支付成功
// TODO: 2023/04/23可优化为:添加一张消息日志表,字段:topicName,消息内容,消息状态(发送中、成功、失败),错误信息 // TODO: 2023/05/20 暂时使用普通消息发送,待修改成本地消息表模式
VipOrderPayNotifyMessage vipOrderPayNotifyMessage = new VipOrderPayNotifyMessage(); VipOrderPayNotifyMessage vipOrderPayNotifyMessage = new VipOrderPayNotifyMessage();
vipOrderPayNotifyMessage.setKey(IdUtils.randomUUID()); vipOrderPayNotifyMessage.setMessageKey(IdUtils.randomUUID());
vipOrderPayNotifyMessage.setSource(ServiceNameConstants.PAY_SERVICE); vipOrderPayNotifyMessage.setSource(ServiceNameConstants.PAY_SERVICE);
vipOrderPayNotifyMessage.setOrderNo(orderNo); vipOrderPayNotifyMessage.setOrderNo(orderNo);
rocketMQEnhanceTemplate.sendAsyncMsg(RocketMqTopic.ORDER_PAY_NOTIFY_TOPIC, RocketMqTag.ORDER_VIP_TAG, vipOrderPayNotifyMessage); rocketMQEnhanceTemplate.sendAsyncMsg(RocketMqTopic.ORDER_PAY_NOTIFY_TOPIC, RocketMqTag.ORDER_VIP_TAG, vipOrderPayNotifyMessage);

Loading…
Cancel
Save