diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/constant/RocketMqConstant.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/constant/RocketMqConstant.java index dff2dd9..26dbf73 100644 --- a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/constant/RocketMqConstant.java +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/constant/RocketMqConstant.java @@ -15,7 +15,7 @@ public class RocketMqConstant { /** * 数据来源前缀 */ - public static final String RETRY_PREFIX = "bnyer-"; + public static final String MESSAGE_SOURCE_PREFIX = "bnyer"; /** * 按顺序匹配:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h (1-18) diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java index 863193e..7908243 100644 --- a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java @@ -1,6 +1,5 @@ package com.bnyer.common.rocketmq.handle; -import cn.hutool.core.util.IdUtil; import com.alibaba.fastjson.JSONObject; import com.bnyer.common.core.enums.EnumMessageStatus; import com.bnyer.common.rocketmq.config.RepeatConsumerConfig; @@ -170,8 +169,8 @@ public abstract class EnhanceMessageHandler { } //重新构建消息体 String messageSource = message.getSource(); - if(!messageSource.startsWith(RocketMqConstant.RETRY_PREFIX)){ - message.setSource(RocketMqConstant.RETRY_PREFIX + messageSource); + if(!messageSource.startsWith(RocketMqConstant.MESSAGE_SOURCE_PREFIX)){ + message.setSource(RocketMqConstant.MESSAGE_SOURCE_PREFIX + "-" + messageSource); } message.setRetryTimes(message.getRetryTimes() + 1); @@ -236,4 +235,4 @@ public abstract class EnhanceMessageHandler { rocketMQEnhanceTemplate.sendAsyncMsg(message.getReturnTopic(),null,returnMessage); } -} \ No newline at end of file +} diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/persist/IPersist.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/persist/IPersist.java index f8becff..9207c15 100644 --- a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/persist/IPersist.java +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/persist/IPersist.java @@ -1,7 +1,6 @@ package com.bnyer.common.rocketmq.persist; -import com.bnyer.common.rocketmq.domain.BaseMessage; import com.bnyer.common.rocketmq.domain.RepeatElement; /** @@ -16,34 +15,30 @@ public interface IPersist { * @param repeatElement * @param processingExpireMilliSeconds * @return - * @param */ - boolean setConsumingIfNX(RepeatElement repeatElement, long processingExpireMilliSeconds); + boolean setConsumingIfNX(RepeatElement repeatElement, long processingExpireMilliSeconds); /** * 删除存储中的消息 * @param repeatElement - * @param */ - void delete(RepeatElement repeatElement); + void delete(RepeatElement repeatElement); /** * 修改存储中的消息 * @param repeatElement * @param recordReserveMinutes - * @param */ - void markConsumed(RepeatElement repeatElement, long recordReserveMinutes); + void markConsumed(RepeatElement repeatElement, long recordReserveMinutes); /** * 获取存储中的消息 * @param repeatElement * @return - * @param */ - String get(RepeatElement repeatElement); + String get(RepeatElement repeatElement); - default String toPrintInfo(RepeatElement repeatElement) { + default String toPrintInfo(RepeatElement repeatElement) { return repeatElement.toString(); } } diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/persist/RedisPersist.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/persist/RedisPersist.java index d1b96a0..5525c3b 100644 --- a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/persist/RedisPersist.java +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/persist/RedisPersist.java @@ -2,7 +2,6 @@ package com.bnyer.common.rocketmq.persist; import com.bnyer.common.rocketmq.constant.RocketMqRepeatConstant; -import com.bnyer.common.rocketmq.domain.BaseMessage; import com.bnyer.common.rocketmq.domain.RepeatElement; import org.apache.commons.lang3.StringUtils; import org.springframework.data.redis.connection.RedisStringCommands; @@ -29,7 +28,7 @@ public class RedisPersist implements IPersist { } @Override - public boolean setConsumingIfNX(RepeatElement repeatElement, long processingExpireMilliSeconds) { + public boolean setConsumingIfNX(RepeatElement repeatElement, long processingExpireMilliSeconds) { String repeatKey = buildRepeatMessageRedisKey(repeatElement.getApplicationName(), repeatElement.getTopic(), repeatElement.getTag(), repeatElement.getMessageKey()); //setnx, 成功就可以消费 Boolean execute = redisTemplate.execute((RedisCallback) redisConnection -> redisConnection.set(repeatKey.getBytes(), @@ -41,25 +40,25 @@ public class RedisPersist implements IPersist { } @Override - public void delete(RepeatElement repeatElement) { + public void delete(RepeatElement repeatElement) { String repeatKey = buildRepeatMessageRedisKey(repeatElement.getApplicationName(), repeatElement.getTopic(), repeatElement.getTag(), repeatElement.getMessageKey()); redisTemplate.delete(repeatKey); } @Override - public void markConsumed(RepeatElement repeatElement, long recordReserveMinutes) { + public void markConsumed(RepeatElement repeatElement, long recordReserveMinutes) { String repeatKey = buildRepeatMessageRedisKey(repeatElement.getApplicationName(), repeatElement.getTopic(), repeatElement.getTag(), repeatElement.getMessageKey()); redisTemplate.opsForValue().set(repeatKey, RocketMqRepeatConstant.CONSUME_STATUS_SUCCESS, recordReserveMinutes, TimeUnit.MINUTES); } @Override - public String get(RepeatElement repeatElement) { + public String get(RepeatElement repeatElement) { String repeatKey = buildRepeatMessageRedisKey(repeatElement.getApplicationName(), repeatElement.getTopic(), repeatElement.getTag(), repeatElement.getMessageKey()); return redisTemplate.opsForValue().get(repeatKey); } @Override - public String toPrintInfo(RepeatElement repeatElement) { + public String toPrintInfo(RepeatElement repeatElement) { return buildRepeatMessageRedisKey(repeatElement.getApplicationName(), repeatElement.getTopic(), repeatElement.getTag(), repeatElement.getMessageKey()); } diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/strategy/RedisRepeatStrategy.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/strategy/RedisRepeatStrategy.java index aad34ee..1317de6 100644 --- a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/strategy/RedisRepeatStrategy.java +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/strategy/RedisRepeatStrategy.java @@ -14,21 +14,26 @@ import java.util.function.Function; /** * @author :WXC * @Date :2023/05/20 - * @description :如果已经消费过,则直接消费幂等掉 + * @description :如果已经消费过,则直接幂等掉 */ @Slf4j public class RedisRepeatStrategy implements RepeatConsumerStrategy { - private final RepeatConsumerConfig filterConfig; + private final RepeatConsumerConfig repeatConfig; /** * 获取去重键的函数 */ private final Function repeatMessageKeyFunction; - public RedisRepeatStrategy(RepeatConsumerConfig dedupConfig, Function dedupMessageKeyFunction) { - this.filterConfig = dedupConfig; - this.repeatMessageKeyFunction = dedupMessageKeyFunction; + /** + * 重复消息 + */ + private RepeatElement repeatElement; + + public RedisRepeatStrategy(RepeatConsumerConfig repeatConfig, Function repeatMessageKeyFunction) { + this.repeatConfig = repeatConfig; + this.repeatMessageKeyFunction = repeatMessageKeyFunction; } @@ -38,13 +43,13 @@ public class RedisRepeatStrategy implements RepeatConsumerStrategy { } private boolean doInvoke(T message) { - IPersist persist = filterConfig.getPersist(); - RepeatElement repeatElement = new RepeatElement(filterConfig.getApplicationName(), message.getConsumerGroupName(),message.getTopic() - , message.getTag()==null ? "" : message.getTag() - , repeatMessageKeyFunction.apply(message)); + IPersist persist = repeatConfig.getPersist(); + //RepeatElement repeatElement = new RepeatElement(repeatConfig.getApplicationName(), message.getConsumerGroupName(),message.getTopic() + // , message.getTag()==null ? "" : message.getTag() + // , repeatMessageKeyFunction.apply(message)); boolean shouldConsume = true; if (StringUtils.isNotBlank(repeatElement.getMessageKey())) { - shouldConsume = persist.setConsumingIfNX(repeatElement, filterConfig.getProcessingExpireMilliSeconds()); + shouldConsume = persist.setConsumingIfNX(repeatElement, repeatConfig.getProcessingExpireMilliSeconds()); } //设置成功,证明没有消费过 if (shouldConsume) { @@ -54,4 +59,4 @@ public class RedisRepeatStrategy implements RepeatConsumerStrategy { } } -} \ No newline at end of file +} diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/template/RocketMQEnhanceTemplate.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/template/RocketMQEnhanceTemplate.java index f6c836a..8d088de 100644 --- a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/template/RocketMQEnhanceTemplate.java +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/template/RocketMQEnhanceTemplate.java @@ -4,6 +4,7 @@ import cn.hutool.core.util.IdUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.bnyer.common.rocketmq.config.RocketEnhanceProperties; +import com.bnyer.common.rocketmq.constant.RocketMqConstant; import com.bnyer.common.rocketmq.domain.BaseMessage; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -107,7 +108,7 @@ public class RocketMQEnhanceTemplate { Message sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getMessageKey()).build(); SendResult sendResult; try { - sendResult = template.syncSend(buildDestination(topic,tag), sendMessage, 10000, delayLevel); + sendResult = template.syncSend(buildDestination(topic,tag), sendMessage, RocketMqConstant.TIME_OUT, delayLevel); } catch (Exception e) { throw new RuntimeException(e); } @@ -151,7 +152,7 @@ public class RocketMQEnhanceTemplate { public void onException(Throwable throwable) { log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",topic,tag,JSON.toJSONString(message),throwable.getMessage()); } - },10000,delayLevel); + }, RocketMqConstant.TIME_OUT,delayLevel); } /**