Browse Source

实现本地消息表消息补偿,消息去重

feature-1.1
wuxicheng 3 years ago
parent
commit
b8b05db310
  1. 6
      bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java

6
bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java

@ -128,7 +128,7 @@ public abstract class EnhanceMessageHandler<T extends BaseMessage> {
IPersist persist = repeatConsumerConfig.getPersist(); IPersist persist = repeatConsumerConfig.getPersist();
RepeatElement repeatElement = new RepeatElement(repeatConsumerConfig.getApplicationName(), message.getConsumerGroupName(),message.getTopic() RepeatElement repeatElement = new RepeatElement(repeatConsumerConfig.getApplicationName(), message.getConsumerGroupName(),message.getTopic()
, message.getTag()==null ? "" : message.getTag() , message.getTag()==null ? "" : message.getTag()
, this.deDupMessageKey(message)); , this.repeatMessageKey(message));
//消费消息,末尾消费失败会删除消费记录,消费成功则更新消费状态 //消费消息,末尾消费失败会删除消费记录,消费成功则更新消费状态
try { try {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
@ -194,7 +194,7 @@ public abstract class EnhanceMessageHandler<T extends BaseMessage> {
*/ */
protected Boolean handleMsgRepeat(final T message) { protected Boolean handleMsgRepeat(final T message) {
RepeatConsumerStrategy strategy = new NormalRepeatStrategy(); RepeatConsumerStrategy strategy = new NormalRepeatStrategy();
Function<BaseMessage, String> repeatKeyFunction = baseMessage -> deDupMessageKey(message); Function<BaseMessage, String> repeatKeyFunction = baseMessage -> repeatMessageKey(message);
if (repeatConsumerConfig.getRepeatStrategy() == RepeatConsumerConfig.REPEAT_STRATEGY_CONSUME_LATER) { if (repeatConsumerConfig.getRepeatStrategy() == RepeatConsumerConfig.REPEAT_STRATEGY_CONSUME_LATER) {
strategy = new RedisRepeatStrategy(repeatConsumerConfig, repeatKeyFunction); strategy = new RedisRepeatStrategy(repeatConsumerConfig, repeatKeyFunction);
} }
@ -205,7 +205,7 @@ public abstract class EnhanceMessageHandler<T extends BaseMessage> {
/** /**
* 默认拿消息key 作为去重的标识子类可复写该方法自定义去重标识 * 默认拿消息key 作为去重的标识子类可复写该方法自定义去重标识
*/ */
protected <T extends BaseMessage> String deDupMessageKey(T message) { protected String repeatMessageKey(T message) {
return message.getMessageKey(); return message.getMessageKey();
} }

Loading…
Cancel
Save