|
|
|
@ -27,16 +27,6 @@ import javax.annotation.Resource; |
|
|
|
@RequiredArgsConstructor(onConstructor = @__(@Autowired)) |
|
|
|
public class RocketMQEnhanceTemplate { |
|
|
|
|
|
|
|
/** |
|
|
|
* 延时等级 |
|
|
|
*/ |
|
|
|
private static final int DELAY_LEVEL = RocketMqConstant.FIVE_SECOND; |
|
|
|
|
|
|
|
/** |
|
|
|
* 默认重试次数 |
|
|
|
*/ |
|
|
|
private static final int MAX_RETRY_TIMES = 3; |
|
|
|
|
|
|
|
private final RocketMQTemplate template; |
|
|
|
|
|
|
|
@Resource |
|
|
|
@ -152,14 +142,6 @@ public class RocketMQEnhanceTemplate { |
|
|
|
}); |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* 发送异步可重试消息(通过线程池执行发送到broker的消息任务,执行完后回调:如果发送失败进行重试) |
|
|
|
* |
|
|
|
*/ |
|
|
|
public <T extends BaseMessage> void sendAsyncRetryMsg(String topic, String tag, T message) { |
|
|
|
sendAsyncMsg(buildDestination(topic,tag),message,getDelayLevel()); |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* 发送异步延时消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑) |
|
|
|
* |
|
|
|
@ -182,57 +164,8 @@ public class RocketMQEnhanceTemplate { |
|
|
|
@Override |
|
|
|
public void onException(Throwable throwable) { |
|
|
|
log.error("消息发送失败,destination:{},error:{}",destination,throwable.getMessage()); |
|
|
|
// 超过最大重试次数时调用子类方法处理
|
|
|
|
if (message.getRetryTimes() > getMaxRetryTimes()) { |
|
|
|
handleMaxRetriesExceeded(message); |
|
|
|
return; |
|
|
|
} |
|
|
|
//发送失败,重试发送
|
|
|
|
handleRetry(destination,message); |
|
|
|
} |
|
|
|
/** |
|
|
|
* 重试发送异步消息 |
|
|
|
* @param destination |
|
|
|
* @param message |
|
|
|
* @param <T> |
|
|
|
*/ |
|
|
|
private <T extends BaseMessage> void handleRetry(String destination,T message) { |
|
|
|
//重新构建消息体
|
|
|
|
String messageSource = message.getSource(); |
|
|
|
if(!messageSource.startsWith(RocketMqConstant.RETRY_PREFIX)){ |
|
|
|
message.setSource(RocketMqConstant.RETRY_PREFIX + messageSource); |
|
|
|
} |
|
|
|
message.setRetryTimes(message.getRetryTimes() + 1); |
|
|
|
// 如果消息发送不成功,则再次重新发送
|
|
|
|
sendAsyncMsg(destination, message,getDelayLevel()); |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* 超过最大重试次数调用该方法处理 |
|
|
|
* @param message |
|
|
|
* @param <T> |
|
|
|
*/ |
|
|
|
private <T extends BaseMessage> void handleMaxRetriesExceeded(T message) { |
|
|
|
log.error("发送消息超过最大重试次数,message:{}",message); |
|
|
|
} |
|
|
|
},delayLevel); |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* isRetry开启时,重新入队延迟时间 |
|
|
|
* @return -1:立即入队重试 |
|
|
|
*/ |
|
|
|
protected int getDelayLevel() { |
|
|
|
return DELAY_LEVEL; |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* 最大重试次数 |
|
|
|
* |
|
|
|
* @return 最大重试次数,默认3次 |
|
|
|
*/ |
|
|
|
protected int getMaxRetryTimes() { |
|
|
|
return MAX_RETRY_TIMES; |
|
|
|
},10000,delayLevel); |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|