|
|
|
@ -2,14 +2,15 @@ package com.bnyer.common.rocketmq.template; |
|
|
|
|
|
|
|
import com.alibaba.fastjson.JSON; |
|
|
|
import com.alibaba.fastjson.JSONObject; |
|
|
|
import com.bnyer.common.redis.service.RedisService; |
|
|
|
import com.bnyer.common.rocketmq.config.RocketEnhanceProperties; |
|
|
|
import com.bnyer.common.rocketmq.constant.RocketMqConstant; |
|
|
|
import com.bnyer.common.rocketmq.domain.BaseMessage; |
|
|
|
import com.bnyer.common.rocketmq.domain.MessageLog; |
|
|
|
import com.bnyer.common.rocketmq.enums.EnumMessageStatus; |
|
|
|
import lombok.RequiredArgsConstructor; |
|
|
|
import lombok.extern.slf4j.Slf4j; |
|
|
|
import org.apache.commons.lang3.StringUtils; |
|
|
|
import org.apache.rocketmq.client.producer.SendCallback; |
|
|
|
import org.apache.rocketmq.client.producer.SendResult; |
|
|
|
import org.apache.rocketmq.client.producer.*; |
|
|
|
import org.apache.rocketmq.spring.core.RocketMQTemplate; |
|
|
|
import org.apache.rocketmq.spring.support.RocketMQHeaders; |
|
|
|
import org.springframework.beans.factory.annotation.Autowired; |
|
|
|
@ -29,6 +30,8 @@ public class RocketMQEnhanceTemplate { |
|
|
|
|
|
|
|
private final RocketMQTemplate template; |
|
|
|
|
|
|
|
private final RedisService redisService; |
|
|
|
|
|
|
|
@Resource |
|
|
|
private RocketEnhanceProperties rocketEnhanceProperties; |
|
|
|
|
|
|
|
@ -65,25 +68,20 @@ public class RocketMQEnhanceTemplate { |
|
|
|
* @param <T> |
|
|
|
*/ |
|
|
|
public <T extends BaseMessage> SendResult send(String topic, String tag, T message) { |
|
|
|
// 注意分隔符
|
|
|
|
return send(buildDestination(topic,tag), message); |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
* 发送同步消息 |
|
|
|
* @param destination |
|
|
|
* @param message |
|
|
|
* @return |
|
|
|
* @param <T> |
|
|
|
*/ |
|
|
|
public <T extends BaseMessage> SendResult send(String destination, T message) { |
|
|
|
// 设置业务键,此处根据公共的参数进行处理
|
|
|
|
// 更多的其它基础业务处理...
|
|
|
|
MessageLog<T> messageLog = saveMessageLog(message, null); |
|
|
|
Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build(); |
|
|
|
SendResult sendResult = template.syncSend(destination, sendMessage); |
|
|
|
SendResult sendResult; |
|
|
|
try { |
|
|
|
sendResult = template.syncSend(buildDestination(topic,tag), sendMessage); |
|
|
|
delMessageLog(messageLog); |
|
|
|
} catch (Exception e) { |
|
|
|
editMessageLog(messageLog); |
|
|
|
throw new RuntimeException(e); |
|
|
|
} |
|
|
|
// 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如ELK采集
|
|
|
|
log.info("[{}]同步消息[{}]发送结果[{}]", destination, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); |
|
|
|
log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", topic,tag, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); |
|
|
|
return sendResult; |
|
|
|
} |
|
|
|
|
|
|
|
@ -97,21 +95,17 @@ public class RocketMQEnhanceTemplate { |
|
|
|
* @param <T> |
|
|
|
*/ |
|
|
|
public <T extends BaseMessage> SendResult send(String topic, String tag, T message, int delayLevel) { |
|
|
|
return send(buildDestination(topic,tag), message, delayLevel); |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* 发送同步延时消息 |
|
|
|
* @param destination |
|
|
|
* @param message |
|
|
|
* @param delayLevel |
|
|
|
* @return |
|
|
|
* @param <T> |
|
|
|
*/ |
|
|
|
public <T extends BaseMessage> SendResult send(String destination, T message, int delayLevel) { |
|
|
|
MessageLog<T> messageLog = saveMessageLog(message, null); |
|
|
|
Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build(); |
|
|
|
SendResult sendResult = template.syncSend(destination, sendMessage, 3000, delayLevel); |
|
|
|
log.info("[{}]延迟等级[{}]消息[{}]发送结果[{}]", destination, delayLevel, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); |
|
|
|
SendResult sendResult; |
|
|
|
try { |
|
|
|
sendResult = template.syncSend(buildDestination(topic,tag), sendMessage, 10000, delayLevel); |
|
|
|
delMessageLog(messageLog); |
|
|
|
} catch (Exception e) { |
|
|
|
editMessageLog(messageLog); |
|
|
|
throw new RuntimeException(e); |
|
|
|
} |
|
|
|
log.info("topic:[{}]tag:[{}]延迟等级[{}]消息[{}]发送结果[{}]", topic,tag, delayLevel, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); |
|
|
|
return sendResult; |
|
|
|
} |
|
|
|
|
|
|
|
@ -120,52 +114,89 @@ public class RocketMQEnhanceTemplate { |
|
|
|
* (适合对响应时间敏感的业务场景) |
|
|
|
*/ |
|
|
|
public <T extends BaseMessage> void sendAsyncMsg(String topic, String tag, T message) { |
|
|
|
sendAsyncMsg(buildDestination(topic,tag),message); |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* 发送异步消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑) |
|
|
|
* (适合对响应时间敏感的业务场景) |
|
|
|
*/ |
|
|
|
public <T extends BaseMessage> void sendAsyncMsg(String destination, T message) { |
|
|
|
template.asyncSend(destination, MessageBuilder.withPayload(message).build(), new SendCallback() { |
|
|
|
MessageLog<T> messageLog = saveMessageLog(message, null); |
|
|
|
template.asyncSend(buildDestination(topic,tag), MessageBuilder.withPayload(message).build(), new SendCallback() { |
|
|
|
@Override |
|
|
|
public void onSuccess(SendResult sendResult) { |
|
|
|
// 处理消息发送成功逻辑
|
|
|
|
log.info("消息发送成功,destination:{},result:{}",destination, JSON.toJSONString(sendResult)); |
|
|
|
log.info("消息发送成功,topic:{},tag:{},result:{}",topic,tag, JSON.toJSONString(sendResult)); |
|
|
|
delMessageLog(messageLog); |
|
|
|
} |
|
|
|
@Override |
|
|
|
public void onException(Throwable throwable) { |
|
|
|
// 处理消息发送失败逻辑
|
|
|
|
log.error("消息发送失败,destination:{},error:{}",destination,throwable.getMessage()); |
|
|
|
log.error("消息发送失败,topic:{},tag:{},error:{}",topic,tag,throwable.getMessage()); |
|
|
|
editMessageLog(messageLog); |
|
|
|
} |
|
|
|
}); |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* 发送异步延时消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑) |
|
|
|
* |
|
|
|
*/ |
|
|
|
public <T extends BaseMessage> void sendAsyncMsg(String topic, String tag, T message, int delayLevel) { |
|
|
|
sendAsyncMsg(buildDestination(topic,tag),message,delayLevel); |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* 发送异步延时消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑) |
|
|
|
* (适合对响应时间敏感的业务场景) |
|
|
|
*/ |
|
|
|
public <T extends BaseMessage> void sendAsyncMsg(String destination, T message, int delayLevel) { |
|
|
|
template.asyncSend(destination, MessageBuilder.withPayload(message).build(), new SendCallback() { |
|
|
|
public <T extends BaseMessage> void sendAsyncMsg(String topic, String tag, T message, int delayLevel) { |
|
|
|
MessageLog<T> messageLog = saveMessageLog(message, null); |
|
|
|
template.asyncSend(buildDestination(topic,tag), MessageBuilder.withPayload(message).build(), new SendCallback() { |
|
|
|
@Override |
|
|
|
public void onSuccess(SendResult sendResult) { |
|
|
|
// 处理消息发送成功逻辑
|
|
|
|
log.info("消息发送成功,destination:{},result:{}",destination,JSON.toJSONString(sendResult)); |
|
|
|
log.info("消息发送成功,topic:{},tag:{},result:{}",topic,tag,JSON.toJSONString(sendResult)); |
|
|
|
delMessageLog(messageLog); |
|
|
|
} |
|
|
|
@Override |
|
|
|
public void onException(Throwable throwable) { |
|
|
|
log.error("消息发送失败,destination:{},error:{}",destination,throwable.getMessage()); |
|
|
|
log.error("消息发送失败,topic:{},tag:{},error:{}",topic,tag,throwable.getMessage()); |
|
|
|
editMessageLog(messageLog); |
|
|
|
} |
|
|
|
},10000,delayLevel); |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* 发送事务消息 |
|
|
|
* @param topic 主题 |
|
|
|
* @param tag 标签 |
|
|
|
* @param message 消息 |
|
|
|
* @return 发送结果 |
|
|
|
* @param <T> |
|
|
|
*/ |
|
|
|
public <T extends BaseMessage> boolean sendTransactionalMsg(String topic, String tag,String arg, T message) { |
|
|
|
String destination = buildDestination(topic, tag); |
|
|
|
TransactionSendResult sendResult = template.sendMessageInTransaction(destination, MessageBuilder.withPayload(message).build(), arg); |
|
|
|
log.info("Send transaction msg result: " + sendResult); |
|
|
|
return sendResult.getSendStatus() == SendStatus.SEND_OK; |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* 添加消息日志 |
|
|
|
* @param message |
|
|
|
* @param sendResult |
|
|
|
* @param <T> |
|
|
|
*/ |
|
|
|
private <T extends BaseMessage> MessageLog<T> saveMessageLog(T message,SendResult sendResult){ |
|
|
|
log.info("消息发送前,入库消息日志表,如果发送失败,可进行消息补偿"); |
|
|
|
MessageLog<T> messageLog = new MessageLog<>(message, sendResult); |
|
|
|
//保存
|
|
|
|
return messageLog; |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* 修改消息日志 |
|
|
|
* @param messageLog |
|
|
|
* @param <T> |
|
|
|
*/ |
|
|
|
private <T extends BaseMessage> void editMessageLog(MessageLog<T> messageLog){ |
|
|
|
log.info("消息发送失败,修改消息日志状态,可进行消息补偿"); |
|
|
|
//修改
|
|
|
|
messageLog.setStatus(EnumMessageStatus.SEND_FAILS); |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* 删除消息日志 |
|
|
|
* @param messageLog |
|
|
|
*/ |
|
|
|
private <T extends BaseMessage> void delMessageLog(MessageLog<T> messageLog){ |
|
|
|
log.info("消息发送成功,删除消息日志"); |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|