diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/template/RocketMQEnhanceTemplate.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/template/RocketMQEnhanceTemplate.java index eb5b057..fdd4aa0 100644 --- a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/template/RocketMQEnhanceTemplate.java +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/template/RocketMQEnhanceTemplate.java @@ -27,16 +27,6 @@ import javax.annotation.Resource; @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class RocketMQEnhanceTemplate { - /** - * 延时等级 - */ - private static final int DELAY_LEVEL = RocketMqConstant.FIVE_SECOND; - - /** - * 默认重试次数 - */ - private static final int MAX_RETRY_TIMES = 3; - private final RocketMQTemplate template; @Resource @@ -152,14 +142,6 @@ public class RocketMQEnhanceTemplate { }); } - /** - * 发送异步可重试消息(通过线程池执行发送到broker的消息任务,执行完后回调:如果发送失败进行重试) - * - */ - public void sendAsyncRetryMsg(String topic, String tag, T message) { - sendAsyncMsg(buildDestination(topic,tag),message,getDelayLevel()); - } - /** * 发送异步延时消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑) * @@ -182,57 +164,8 @@ public class RocketMQEnhanceTemplate { @Override public void onException(Throwable throwable) { log.error("消息发送失败,destination:{},error:{}",destination,throwable.getMessage()); - // 超过最大重试次数时调用子类方法处理 - if (message.getRetryTimes() > getMaxRetryTimes()) { - handleMaxRetriesExceeded(message); - return; - } - //发送失败,重试发送 - handleRetry(destination,message); - } - /** - * 重试发送异步消息 - * @param destination - * @param message - * @param - */ - private void handleRetry(String destination,T message) { - //重新构建消息体 - String messageSource = message.getSource(); - if(!messageSource.startsWith(RocketMqConstant.RETRY_PREFIX)){ - message.setSource(RocketMqConstant.RETRY_PREFIX + messageSource); - } - message.setRetryTimes(message.getRetryTimes() + 1); - // 如果消息发送不成功,则再次重新发送 - sendAsyncMsg(destination, message,getDelayLevel()); } - - /** - * 超过最大重试次数调用该方法处理 - * @param message - * @param - */ - private void handleMaxRetriesExceeded(T message) { - log.error("发送消息超过最大重试次数,message:{}",message); - } - },delayLevel); - } - - /** - * isRetry开启时,重新入队延迟时间 - * @return -1:立即入队重试 - */ - protected int getDelayLevel() { - return DELAY_LEVEL; - } - - /** - * 最大重试次数 - * - * @return 最大重试次数,默认3次 - */ - protected int getMaxRetryTimes() { - return MAX_RETRY_TIMES; + },10000,delayLevel); } } diff --git a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderPayNotifyConsumer.java b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderPayNotifyConsumer.java index 756d7d1..55debfc 100644 --- a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderPayNotifyConsumer.java +++ b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderPayNotifyConsumer.java @@ -55,11 +55,11 @@ public class VipOrderPayNotifyConsumer extends EnhanceMessageHandler impl vipOrderPayNotifyMessage.setKey(IdUtils.randomUUID()); vipOrderPayNotifyMessage.setSource(ServiceNameConstants.PAY_SERVICE); vipOrderPayNotifyMessage.setOrderNo(orderNo); - rocketMQEnhanceTemplate.sendAsyncRetryMsg(RocketMqTopic.VIP_ORDER_PAY_NOTIFY_TOPIC,null, vipOrderPayNotifyMessage); + rocketMQEnhanceTemplate.sendAsyncMsg(RocketMqTopic.VIP_ORDER_PAY_NOTIFY_TOPIC,null, vipOrderPayNotifyMessage); break; } }