Browse Source

优化调整

feature-1.1
wuxicheng 3 years ago
parent
commit
f907768ceb
  1. 15
      bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java
  2. 4
      bnyer-services/bnyer-img/src/main/java/com/bnyer/img/service/ImgMqMessageRecordService.java
  3. 62
      bnyer-services/bnyer-img/src/main/java/com/bnyer/img/service/impl/ImgMqMessageRecordServiceImpl.java
  4. 4
      bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/OrderMqMessageRecordService.java
  5. 61
      bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/OrderMqMessageRecordServiceImpl.java
  6. 4
      bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/PayMqMessageRecordService.java
  7. 63
      bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/impl/PayMqMessageRecordServiceImpl.java

15
bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java

@ -216,19 +216,18 @@ public abstract class EnhanceMessageHandler<T extends BaseMessage> {
/** /**
* 构建返回队列通知消息 * 构建返回队列通知消息
* @param oldMessage * @param message
* @param status * @param status
* @return * @return
*/ */
protected MqRecordMessage buildReturnMessage(MqRecordMessage oldMessage,EnumMessageStatus status){ protected MqRecordMessage buildReturnMessage(MqRecordMessage message,EnumMessageStatus status){
MqRecordMessage mqRecordMessage = new MqRecordMessage(); MqRecordMessage mqRecordMessage = new MqRecordMessage();
String orderReturnTopic = rocketMQEnhanceTemplate.reBuildTopic(oldMessage.getReturnTopic());
mqRecordMessage.setMessageKey(IdUtil.randomUUID()); mqRecordMessage.setMessageKey(IdUtil.randomUUID());
mqRecordMessage.setSource(applicationName); mqRecordMessage.setSource(applicationName);
mqRecordMessage.setTopic(orderReturnTopic); mqRecordMessage.setTopic(message.getReturnTopic());
mqRecordMessage.setConsumerGroupName(orderReturnTopic); mqRecordMessage.setConsumerGroupName(message.getReturnTopic());
mqRecordMessage.setStatus(status); mqRecordMessage.setStatus(status);
mqRecordMessage.setId(oldMessage.getId()); mqRecordMessage.setId(message.getId());
return mqRecordMessage; return mqRecordMessage;
} }
@ -238,7 +237,7 @@ public abstract class EnhanceMessageHandler<T extends BaseMessage> {
*/ */
protected void sendReturnSuccessMessage(MqRecordMessage message){ protected void sendReturnSuccessMessage(MqRecordMessage message){
MqRecordMessage returnMessage = buildReturnMessage(message, EnumMessageStatus.SUCCESS); MqRecordMessage returnMessage = buildReturnMessage(message, EnumMessageStatus.SUCCESS);
rocketMQEnhanceTemplate.sendAsyncMsg(returnMessage.getReturnTopic(),null,returnMessage); rocketMQEnhanceTemplate.sendAsyncMsg(message.getReturnTopic(),null,returnMessage);
} }
/** /**
@ -247,7 +246,7 @@ public abstract class EnhanceMessageHandler<T extends BaseMessage> {
*/ */
protected void sendReturnFailsMessage(MqRecordMessage message){ protected void sendReturnFailsMessage(MqRecordMessage message){
MqRecordMessage returnMessage = buildReturnMessage(message, EnumMessageStatus.FAILS); MqRecordMessage returnMessage = buildReturnMessage(message, EnumMessageStatus.FAILS);
rocketMQEnhanceTemplate.sendAsyncMsg(returnMessage.getReturnTopic(),null,returnMessage); rocketMQEnhanceTemplate.sendAsyncMsg(message.getReturnTopic(),null,returnMessage);
} }
} }

4
bnyer-services/bnyer-img/src/main/java/com/bnyer/img/service/ImgMqMessageRecordService.java

@ -2,6 +2,7 @@ package com.bnyer.img.service;
import com.bnyer.common.core.domain.ImgMqMessageRecord; import com.bnyer.common.core.domain.ImgMqMessageRecord;
import com.bnyer.common.core.enums.EnumMessageStatus; import com.bnyer.common.core.enums.EnumMessageStatus;
import com.bnyer.common.rocketmq.domain.MqRecordMessage;
/** /**
* @author :WXC * @author :WXC
@ -47,9 +48,8 @@ public interface ImgMqMessageRecordService {
* @param tag * @param tag
* @param message * @param message
* @return * @return
* @param <T>
*/ */
<T> ImgMqMessageRecord saveMessageRecord(String topic, String tag, T message); ImgMqMessageRecord saveMessageRecord(String topic, String tag, MqRecordMessage message);
/** /**
* 修改消息记录状态 * 修改消息记录状态

62
bnyer-services/bnyer-img/src/main/java/com/bnyer/img/service/impl/ImgMqMessageRecordServiceImpl.java

@ -4,7 +4,6 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.bnyer.common.core.domain.ImgMqMessageRecord; import com.bnyer.common.core.domain.ImgMqMessageRecord;
import com.bnyer.common.core.enums.EnumMessageStatus; import com.bnyer.common.core.enums.EnumMessageStatus;
import com.bnyer.common.core.utils.bean.EntityConvertUtil;
import com.bnyer.common.core.utils.uuid.IdUtils; import com.bnyer.common.core.utils.uuid.IdUtils;
import com.bnyer.common.rocketmq.constant.RocketMqConstant; import com.bnyer.common.rocketmq.constant.RocketMqConstant;
import com.bnyer.common.rocketmq.constant.RocketMqTopic; import com.bnyer.common.rocketmq.constant.RocketMqTopic;
@ -17,6 +16,7 @@ import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.jetbrains.annotations.NotNull;
import org.springframework.core.env.Environment; import org.springframework.core.env.Environment;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder; import org.springframework.messaging.support.MessageBuilder;
@ -53,6 +53,28 @@ public class ImgMqMessageRecordServiceImpl implements ImgMqMessageRecordService
this.applicationName = env.getProperty("spring.application.name"); this.applicationName = env.getProperty("spring.application.name");
} }
/**
* 构建消息内容
* @param topic
* @param tag
* @param message
* @return
* @param <T>
*/
@NotNull
private <T> MqRecordMessage getMqRecordMessage(String topic, String tag, T message) {
String buildTopic = rocketMQEnhanceTemplate.reBuildTopic(topic);
MqRecordMessage mqRecordMessage = new MqRecordMessage();
mqRecordMessage.setTopic(buildTopic);
mqRecordMessage.setTag(tag);
mqRecordMessage.setConsumerGroupName(buildTopic);
mqRecordMessage.setMessageKey(IdUtils.randomUUID());
mqRecordMessage.setContent(JSON.toJSONString(message));
mqRecordMessage.setSource(applicationName);
mqRecordMessage.setReturnTopic(RocketMqTopic.IMG_RETURN_MSG_TOPIC);
return mqRecordMessage;
}
/** /**
* 发送同步步消息 * 发送同步步消息
* @param topic * @param topic
@ -68,18 +90,17 @@ public class ImgMqMessageRecordServiceImpl implements ImgMqMessageRecordService
RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate();
//保存消息记录 //保存消息记录
log.info("消息发送中,开始入库本地消息记录表"); log.info("消息发送中,开始入库本地消息记录表");
ImgMqMessageRecord imgMqMessageRecord = saveMessageRecord(topic,tag, message); MqRecordMessage mqRecordMessage = getMqRecordMessage(topic, tag, message);
ImgMqMessageRecord imgMqMessageRecord = saveMessageRecord(topic,tag, mqRecordMessage);
mqRecordMessage.setId(imgMqMessageRecord.getId());
//发消息 //发消息
MqRecordMessage mqRecordMessage = EntityConvertUtil.copy(imgMqMessageRecord, MqRecordMessage.class); Message<MqRecordMessage> sendMessage = MessageBuilder.withPayload(mqRecordMessage).setHeader(RocketMQHeaders.KEYS, mqRecordMessage.getMessageKey()).build();
mqRecordMessage.setSource(applicationName);
mqRecordMessage.setReturnTopic(RocketMqTopic.IMG_RETURN_MSG_TOPIC);
Message<MqRecordMessage> sendMessage = MessageBuilder.withPayload(mqRecordMessage).setHeader(RocketMQHeaders.KEYS, imgMqMessageRecord.getMessageKey()).build();
SendResult sendResult; SendResult sendResult;
try { try {
sendResult = rocketMQTemplate.syncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), sendMessage); sendResult = rocketMQTemplate.syncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), sendMessage);
log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", topic,tag, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", mqRecordMessage.getTopic(),tag, JSONObject.toJSON(mqRecordMessage), JSONObject.toJSON(sendResult));
} catch (Exception e) { } catch (Exception e) {
log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", topic,tag, JSONObject.toJSON(message), e.getMessage()); log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", mqRecordMessage.getTopic(),tag, JSONObject.toJSON(mqRecordMessage), e.getMessage());
log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿");
editMessageRecordStatus(imgMqMessageRecord.getId(), imgMqMessageRecord.getStatus(),e.getMessage()); editMessageRecordStatus(imgMqMessageRecord.getId(), imgMqMessageRecord.getStatus(),e.getMessage());
// throw new RuntimeException(e); // throw new RuntimeException(e);
@ -96,19 +117,18 @@ public class ImgMqMessageRecordServiceImpl implements ImgMqMessageRecordService
RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate();
//保存消息记录 //保存消息记录
log.info("消息发送中,开始入库本地消息记录表"); log.info("消息发送中,开始入库本地消息记录表");
ImgMqMessageRecord imgMqMessageRecord = saveMessageRecord(topic,tag, message); MqRecordMessage mqRecordMessage = getMqRecordMessage(topic, tag, message);
ImgMqMessageRecord imgMqMessageRecord = saveMessageRecord(topic,tag, mqRecordMessage);
mqRecordMessage.setId(imgMqMessageRecord.getId());
//发消息 //发消息
MqRecordMessage mqRecordMessage = EntityConvertUtil.copy(imgMqMessageRecord, MqRecordMessage.class);
mqRecordMessage.setSource(applicationName);
mqRecordMessage.setReturnTopic(RocketMqTopic.IMG_RETURN_MSG_TOPIC);
rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqRecordMessage).build(), new SendCallback() { rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqRecordMessage).build(), new SendCallback() {
@Override @Override
public void onSuccess(SendResult sendResult) { public void onSuccess(SendResult sendResult) {
log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",topic,tag,JSON.toJSONString(message), JSON.toJSONString(sendResult)); log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",mqRecordMessage.getTopic(),tag,JSON.toJSONString(mqRecordMessage), JSON.toJSONString(sendResult));
} }
@Override @Override
public void onException(Throwable throwable) { public void onException(Throwable throwable) {
log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",topic,tag,JSON.toJSONString(message),throwable.getMessage()); log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",mqRecordMessage.getTopic(),tag,JSON.toJSONString(mqRecordMessage),throwable.getMessage());
log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿");
editMessageRecordStatus(imgMqMessageRecord.getId(),imgMqMessageRecord.getStatus(),throwable.getMessage()); editMessageRecordStatus(imgMqMessageRecord.getId(),imgMqMessageRecord.getStatus(),throwable.getMessage());
} }
@ -125,19 +145,18 @@ public class ImgMqMessageRecordServiceImpl implements ImgMqMessageRecordService
RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate();
//保存消息记录 //保存消息记录
log.info("消息发送中,开始入库本地消息记录表"); log.info("消息发送中,开始入库本地消息记录表");
ImgMqMessageRecord imgMqMessageRecord = saveMessageRecord(topic,tag, message); MqRecordMessage mqRecordMessage = getMqRecordMessage(topic, tag, message);
ImgMqMessageRecord imgMqMessageRecord = saveMessageRecord(topic,tag, mqRecordMessage);
mqRecordMessage.setId(imgMqMessageRecord.getId());
//发消息 //发消息
MqRecordMessage mqRecordMessage = EntityConvertUtil.copy(imgMqMessageRecord, MqRecordMessage.class);
mqRecordMessage.setSource(applicationName);
mqRecordMessage.setReturnTopic(RocketMqTopic.IMG_RETURN_MSG_TOPIC);
rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqRecordMessage).build(), new SendCallback() { rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqRecordMessage).build(), new SendCallback() {
@Override @Override
public void onSuccess(SendResult sendResult) { public void onSuccess(SendResult sendResult) {
log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",topic,tag,JSON.toJSONString(message),JSON.toJSONString(sendResult)); log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",mqRecordMessage.getTopic(),tag,JSON.toJSONString(mqRecordMessage),JSON.toJSONString(sendResult));
} }
@Override @Override
public void onException(Throwable throwable) { public void onException(Throwable throwable) {
log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",topic,tag,JSON.toJSONString(message),throwable.getMessage()); log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",mqRecordMessage.getTopic(),tag,JSON.toJSONString(mqRecordMessage),throwable.getMessage());
log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿");
editMessageRecordStatus(imgMqMessageRecord.getId(),imgMqMessageRecord.getStatus(),throwable.getMessage()); editMessageRecordStatus(imgMqMessageRecord.getId(),imgMqMessageRecord.getStatus(),throwable.getMessage());
} }
@ -147,10 +166,9 @@ public class ImgMqMessageRecordServiceImpl implements ImgMqMessageRecordService
/** /**
* 添加消息记录 * 添加消息记录
* @param message * @param message
* @param <T>
*/ */
@Transactional @Transactional
public <T> ImgMqMessageRecord saveMessageRecord(String topic, String tag, T message){ public ImgMqMessageRecord saveMessageRecord(String topic, String tag, MqRecordMessage message){
topic = rocketMQEnhanceTemplate.reBuildTopic(topic); topic = rocketMQEnhanceTemplate.reBuildTopic(topic);
ImgMqMessageRecord imgMqMessageRecord = new ImgMqMessageRecord(); ImgMqMessageRecord imgMqMessageRecord = new ImgMqMessageRecord();
imgMqMessageRecord.setStatus(EnumMessageStatus.PROCESS); imgMqMessageRecord.setStatus(EnumMessageStatus.PROCESS);

4
bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/OrderMqMessageRecordService.java

@ -2,6 +2,7 @@ package com.bnyer.order.service;
import com.bnyer.common.core.domain.OrderMqMessageRecord; import com.bnyer.common.core.domain.OrderMqMessageRecord;
import com.bnyer.common.core.enums.EnumMessageStatus; import com.bnyer.common.core.enums.EnumMessageStatus;
import com.bnyer.common.rocketmq.domain.MqRecordMessage;
/** /**
* @author :WXC * @author :WXC
@ -47,9 +48,8 @@ public interface OrderMqMessageRecordService {
* @param tag * @param tag
* @param message * @param message
* @return * @return
* @param <T>
*/ */
<T> OrderMqMessageRecord saveMessageRecord(String topic, String tag, T message); OrderMqMessageRecord saveMessageRecord(String topic, String tag, MqRecordMessage message);
/** /**
* 修改消息记录状态 * 修改消息记录状态

61
bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/OrderMqMessageRecordServiceImpl.java

@ -4,7 +4,6 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.bnyer.common.core.domain.OrderMqMessageRecord; import com.bnyer.common.core.domain.OrderMqMessageRecord;
import com.bnyer.common.core.enums.EnumMessageStatus; import com.bnyer.common.core.enums.EnumMessageStatus;
import com.bnyer.common.core.utils.bean.EntityConvertUtil;
import com.bnyer.common.core.utils.uuid.IdUtils; import com.bnyer.common.core.utils.uuid.IdUtils;
import com.bnyer.common.rocketmq.constant.RocketMqConstant; import com.bnyer.common.rocketmq.constant.RocketMqConstant;
import com.bnyer.common.rocketmq.constant.RocketMqTopic; import com.bnyer.common.rocketmq.constant.RocketMqTopic;
@ -17,6 +16,7 @@ import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.jetbrains.annotations.NotNull;
import org.springframework.core.env.Environment; import org.springframework.core.env.Environment;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder; import org.springframework.messaging.support.MessageBuilder;
@ -52,6 +52,28 @@ public class OrderMqMessageRecordServiceImpl implements OrderMqMessageRecordServ
this.applicationName = env.getProperty("spring.application.name"); this.applicationName = env.getProperty("spring.application.name");
} }
/**
* 构建消息内容
* @param topic
* @param tag
* @param message
* @return
* @param <T>
*/
@NotNull
private <T> MqRecordMessage getMqRecordMessage(String topic, String tag, T message) {
String buildTopic = rocketMQEnhanceTemplate.reBuildTopic(topic);
MqRecordMessage mqRecordMessage = new MqRecordMessage();
mqRecordMessage.setTopic(buildTopic);
mqRecordMessage.setTag(tag);
mqRecordMessage.setConsumerGroupName(buildTopic);
mqRecordMessage.setMessageKey(IdUtils.randomUUID());
mqRecordMessage.setContent(JSON.toJSONString(message));
mqRecordMessage.setSource(applicationName);
mqRecordMessage.setReturnTopic(RocketMqTopic.ORDER_RETURN_MSG_TOPIC);
return mqRecordMessage;
}
/** /**
* 发送同步步消息 * 发送同步步消息
* @param topic * @param topic
@ -67,18 +89,16 @@ public class OrderMqMessageRecordServiceImpl implements OrderMqMessageRecordServ
RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate();
//保存消息记录 //保存消息记录
log.info("消息发送中,开始入库本地消息记录表"); log.info("消息发送中,开始入库本地消息记录表");
OrderMqMessageRecord orderMqMessageRecord = saveMessageRecord(topic,tag, message); MqRecordMessage mqRecordMessage = getMqRecordMessage(topic, tag, message);
OrderMqMessageRecord orderMqMessageRecord = saveMessageRecord(topic,tag, mqRecordMessage);
//发消息 //发消息
MqRecordMessage mqRecordMessage = EntityConvertUtil.copy(orderMqMessageRecord, MqRecordMessage.class); Message<MqRecordMessage> sendMessage = MessageBuilder.withPayload(mqRecordMessage).setHeader(RocketMQHeaders.KEYS, mqRecordMessage.getMessageKey()).build();
mqRecordMessage.setSource(applicationName);
mqRecordMessage.setReturnTopic(RocketMqTopic.ORDER_RETURN_MSG_TOPIC);
Message<MqRecordMessage> sendMessage = MessageBuilder.withPayload(mqRecordMessage).setHeader(RocketMQHeaders.KEYS, orderMqMessageRecord.getMessageKey()).build();
SendResult sendResult; SendResult sendResult;
try { try {
sendResult = rocketMQTemplate.syncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), sendMessage); sendResult = rocketMQTemplate.syncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), sendMessage);
log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", topic,tag, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", mqRecordMessage.getTopic(),tag, JSONObject.toJSON(mqRecordMessage), JSONObject.toJSON(sendResult));
} catch (Exception e) { } catch (Exception e) {
log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", topic,tag, JSONObject.toJSON(message), e.getMessage()); log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", mqRecordMessage.getTopic(),tag, JSONObject.toJSON(mqRecordMessage), e.getMessage());
log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿");
editMessageRecordStatus(orderMqMessageRecord.getId(), orderMqMessageRecord.getStatus(),e.getMessage()); editMessageRecordStatus(orderMqMessageRecord.getId(), orderMqMessageRecord.getStatus(),e.getMessage());
// throw new RuntimeException(e); // throw new RuntimeException(e);
@ -95,19 +115,18 @@ public class OrderMqMessageRecordServiceImpl implements OrderMqMessageRecordServ
RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate();
//保存消息记录 //保存消息记录
log.info("消息发送中,开始入库本地消息记录表"); log.info("消息发送中,开始入库本地消息记录表");
OrderMqMessageRecord orderMqMessageRecord = saveMessageRecord(topic,tag, message); MqRecordMessage mqRecordMessage = getMqRecordMessage(topic, tag, message);
OrderMqMessageRecord orderMqMessageRecord = saveMessageRecord(topic,tag, mqRecordMessage);
mqRecordMessage.setId(orderMqMessageRecord.getId());
//发消息 //发消息
MqRecordMessage mqRecordMessage = EntityConvertUtil.copy(orderMqMessageRecord, MqRecordMessage.class);
mqRecordMessage.setSource(applicationName);
mqRecordMessage.setReturnTopic(RocketMqTopic.ORDER_RETURN_MSG_TOPIC);
rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqRecordMessage).build(), new SendCallback() { rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqRecordMessage).build(), new SendCallback() {
@Override @Override
public void onSuccess(SendResult sendResult) { public void onSuccess(SendResult sendResult) {
log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",topic,tag,JSON.toJSONString(message), JSON.toJSONString(sendResult)); log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",mqRecordMessage.getTopic(),tag,JSON.toJSONString(mqRecordMessage), JSON.toJSONString(sendResult));
} }
@Override @Override
public void onException(Throwable throwable) { public void onException(Throwable throwable) {
log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",topic,tag,JSON.toJSONString(message),throwable.getMessage()); log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",mqRecordMessage.getTopic(),tag,JSON.toJSONString(mqRecordMessage),throwable.getMessage());
log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿");
editMessageRecordStatus(orderMqMessageRecord.getId(),orderMqMessageRecord.getStatus(),throwable.getMessage()); editMessageRecordStatus(orderMqMessageRecord.getId(),orderMqMessageRecord.getStatus(),throwable.getMessage());
} }
@ -124,19 +143,18 @@ public class OrderMqMessageRecordServiceImpl implements OrderMqMessageRecordServ
RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate();
//保存消息记录 //保存消息记录
log.info("消息发送中,开始入库本地消息记录表"); log.info("消息发送中,开始入库本地消息记录表");
OrderMqMessageRecord orderMqMessageRecord = saveMessageRecord(topic,tag, message); MqRecordMessage mqRecordMessage = getMqRecordMessage(topic, tag, message);
OrderMqMessageRecord orderMqMessageRecord = saveMessageRecord(topic,tag, mqRecordMessage);
mqRecordMessage.setId(orderMqMessageRecord.getId());
//发消息 //发消息
MqRecordMessage mqRecordMessage = EntityConvertUtil.copy(orderMqMessageRecord, MqRecordMessage.class);
mqRecordMessage.setSource(applicationName);
mqRecordMessage.setReturnTopic(RocketMqTopic.ORDER_RETURN_MSG_TOPIC);
rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqRecordMessage).build(), new SendCallback() { rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqRecordMessage).build(), new SendCallback() {
@Override @Override
public void onSuccess(SendResult sendResult) { public void onSuccess(SendResult sendResult) {
log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",topic,tag,JSON.toJSONString(message),JSON.toJSONString(sendResult)); log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",mqRecordMessage.getTopic(),tag,JSON.toJSONString(mqRecordMessage),JSON.toJSONString(sendResult));
} }
@Override @Override
public void onException(Throwable throwable) { public void onException(Throwable throwable) {
log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",topic,tag,JSON.toJSONString(message),throwable.getMessage()); log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",mqRecordMessage.getTopic(),tag,JSON.toJSONString(mqRecordMessage),throwable.getMessage());
log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿");
editMessageRecordStatus(orderMqMessageRecord.getId(),orderMqMessageRecord.getStatus(),throwable.getMessage()); editMessageRecordStatus(orderMqMessageRecord.getId(),orderMqMessageRecord.getStatus(),throwable.getMessage());
} }
@ -149,8 +167,7 @@ public class OrderMqMessageRecordServiceImpl implements OrderMqMessageRecordServ
* @param <T> * @param <T>
*/ */
@Transactional @Transactional
public <T> OrderMqMessageRecord saveMessageRecord(String topic, String tag, T message){ public OrderMqMessageRecord saveMessageRecord(String topic, String tag, MqRecordMessage message){
topic = rocketMQEnhanceTemplate.reBuildTopic(topic);
OrderMqMessageRecord orderMqMessageRecord = new OrderMqMessageRecord(); OrderMqMessageRecord orderMqMessageRecord = new OrderMqMessageRecord();
orderMqMessageRecord.setStatus(EnumMessageStatus.PROCESS); orderMqMessageRecord.setStatus(EnumMessageStatus.PROCESS);
orderMqMessageRecord.setConsumerGroupName(topic); orderMqMessageRecord.setConsumerGroupName(topic);

4
bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/PayMqMessageRecordService.java

@ -2,6 +2,7 @@ package com.bnyer.pay.service;
import com.bnyer.common.core.domain.PayMqMessageRecord; import com.bnyer.common.core.domain.PayMqMessageRecord;
import com.bnyer.common.core.enums.EnumMessageStatus; import com.bnyer.common.core.enums.EnumMessageStatus;
import com.bnyer.common.rocketmq.domain.MqRecordMessage;
/** /**
* @author :WXC * @author :WXC
@ -47,9 +48,8 @@ public interface PayMqMessageRecordService {
* @param tag * @param tag
* @param message * @param message
* @return * @return
* @param <T>
*/ */
<T> PayMqMessageRecord saveMessageRecord(String topic, String tag, T message); PayMqMessageRecord saveMessageRecord(String topic, String tag, MqRecordMessage message);
/** /**
* 修改消息记录状态 * 修改消息记录状态

63
bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/impl/PayMqMessageRecordServiceImpl.java

@ -17,6 +17,7 @@ import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.jetbrains.annotations.NotNull;
import org.springframework.core.env.Environment; import org.springframework.core.env.Environment;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder; import org.springframework.messaging.support.MessageBuilder;
@ -53,6 +54,28 @@ public class PayMqMessageRecordServiceImpl implements PayMqMessageRecordService
} }
/**
* 构建消息内容
* @param topic
* @param tag
* @param message
* @return
* @param <T>
*/
@NotNull
private <T> MqRecordMessage getMqRecordMessage(String topic, String tag, T message) {
String buildTopic = rocketMQEnhanceTemplate.reBuildTopic(topic);
MqRecordMessage mqRecordMessage = new MqRecordMessage();
mqRecordMessage.setTopic(buildTopic);
mqRecordMessage.setTag(tag);
mqRecordMessage.setConsumerGroupName(buildTopic);
mqRecordMessage.setMessageKey(IdUtils.randomUUID());
mqRecordMessage.setContent(JSON.toJSONString(message));
mqRecordMessage.setSource(applicationName);
mqRecordMessage.setReturnTopic(RocketMqTopic.PAY_RETURN_MSG_TOPIC);
return mqRecordMessage;
}
/** /**
* 发送同步步消息 * 发送同步步消息
* @param topic * @param topic
@ -68,18 +91,17 @@ public class PayMqMessageRecordServiceImpl implements PayMqMessageRecordService
RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate();
//保存消息记录 //保存消息记录
log.info("消息发送中,开始入库本地消息记录表"); log.info("消息发送中,开始入库本地消息记录表");
PayMqMessageRecord payMqMessageRecord = saveMessageRecord(topic,tag, message); MqRecordMessage mqRecordMessage = getMqRecordMessage(topic, tag, message);
PayMqMessageRecord payMqMessageRecord = saveMessageRecord(topic,tag, mqRecordMessage);
mqRecordMessage.setId(payMqMessageRecord.getId());
//发消息 //发消息
MqRecordMessage mqRecordMessage = EntityConvertUtil.copy(payMqMessageRecord, MqRecordMessage.class); Message<MqRecordMessage> sendMessage = MessageBuilder.withPayload(mqRecordMessage).setHeader(RocketMQHeaders.KEYS, mqRecordMessage.getMessageKey()).build();
mqRecordMessage.setSource(applicationName);
mqRecordMessage.setReturnTopic(RocketMqTopic.PAY_RETURN_MSG_TOPIC);
Message<MqRecordMessage> sendMessage = MessageBuilder.withPayload(mqRecordMessage).setHeader(RocketMQHeaders.KEYS, payMqMessageRecord.getMessageKey()).build();
SendResult sendResult; SendResult sendResult;
try { try {
sendResult = rocketMQTemplate.syncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), sendMessage); sendResult = rocketMQTemplate.syncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), sendMessage);
log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", topic,tag, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", mqRecordMessage.getTopic(),tag, JSONObject.toJSON(message), JSONObject.toJSON(sendResult));
} catch (Exception e) { } catch (Exception e) {
log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", topic,tag, JSONObject.toJSON(message), e.getMessage()); log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", mqRecordMessage.getTopic(),tag, JSONObject.toJSON(message), e.getMessage());
log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿");
editMessageRecordStatus(payMqMessageRecord.getId(), payMqMessageRecord.getStatus(),e.getMessage()); editMessageRecordStatus(payMqMessageRecord.getId(), payMqMessageRecord.getStatus(),e.getMessage());
// throw new RuntimeException(e); // throw new RuntimeException(e);
@ -96,19 +118,18 @@ public class PayMqMessageRecordServiceImpl implements PayMqMessageRecordService
RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate();
//保存消息记录 //保存消息记录
log.info("消息发送中,开始入库本地消息记录表"); log.info("消息发送中,开始入库本地消息记录表");
PayMqMessageRecord payMqMessageRecord = saveMessageRecord(topic,tag, message); MqRecordMessage mqRecordMessage = getMqRecordMessage(topic, tag, message);
PayMqMessageRecord payMqMessageRecord = saveMessageRecord(topic,tag, mqRecordMessage);
mqRecordMessage.setId(payMqMessageRecord.getId());
//发消息 //发消息
MqRecordMessage mqRecordMessage = EntityConvertUtil.copy(payMqMessageRecord, MqRecordMessage.class);
mqRecordMessage.setSource(applicationName);
mqRecordMessage.setReturnTopic(RocketMqTopic.PAY_RETURN_MSG_TOPIC);
rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqRecordMessage).build(), new SendCallback() { rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqRecordMessage).build(), new SendCallback() {
@Override @Override
public void onSuccess(SendResult sendResult) { public void onSuccess(SendResult sendResult) {
log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",topic,tag,JSON.toJSONString(message), JSON.toJSONString(sendResult)); log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",mqRecordMessage.getTopic(),tag,JSON.toJSONString(mqRecordMessage), JSON.toJSONString(sendResult));
} }
@Override @Override
public void onException(Throwable throwable) { public void onException(Throwable throwable) {
log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",topic,tag,JSON.toJSONString(message),throwable.getMessage()); log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",mqRecordMessage.getTopic(),tag,JSON.toJSONString(mqRecordMessage),throwable.getMessage());
log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿");
editMessageRecordStatus(payMqMessageRecord.getId(),payMqMessageRecord.getStatus(),throwable.getMessage()); editMessageRecordStatus(payMqMessageRecord.getId(),payMqMessageRecord.getStatus(),throwable.getMessage());
} }
@ -125,19 +146,18 @@ public class PayMqMessageRecordServiceImpl implements PayMqMessageRecordService
RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate();
//保存消息记录 //保存消息记录
log.info("消息发送中,开始入库本地消息记录表"); log.info("消息发送中,开始入库本地消息记录表");
PayMqMessageRecord payMqMessageRecord = saveMessageRecord(topic,tag, message); MqRecordMessage mqRecordMessage = getMqRecordMessage(topic, tag, message);
PayMqMessageRecord payMqMessageRecord = saveMessageRecord(topic,tag, mqRecordMessage);
mqRecordMessage.setId(payMqMessageRecord.getId());
//发消息 //发消息
MqRecordMessage mqRecordMessage = EntityConvertUtil.copy(payMqMessageRecord, MqRecordMessage.class);
mqRecordMessage.setSource(applicationName);
mqRecordMessage.setReturnTopic(RocketMqTopic.PAY_RETURN_MSG_TOPIC);
rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqRecordMessage).build(), new SendCallback() { rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqRecordMessage).build(), new SendCallback() {
@Override @Override
public void onSuccess(SendResult sendResult) { public void onSuccess(SendResult sendResult) {
log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",topic,tag,JSON.toJSONString(message),JSON.toJSONString(sendResult)); log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",mqRecordMessage.getTopic(),tag,JSON.toJSONString(mqRecordMessage),JSON.toJSONString(sendResult));
} }
@Override @Override
public void onException(Throwable throwable) { public void onException(Throwable throwable) {
log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",topic,tag,JSON.toJSONString(message),throwable.getMessage()); log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",mqRecordMessage.getTopic(),tag,JSON.toJSONString(mqRecordMessage),throwable.getMessage());
log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿");
editMessageRecordStatus(payMqMessageRecord.getId(),payMqMessageRecord.getStatus(),throwable.getMessage()); editMessageRecordStatus(payMqMessageRecord.getId(),payMqMessageRecord.getStatus(),throwable.getMessage());
} }
@ -147,11 +167,9 @@ public class PayMqMessageRecordServiceImpl implements PayMqMessageRecordService
/** /**
* 添加消息记录 * 添加消息记录
* @param message * @param message
* @param <T>
*/ */
@Transactional @Transactional
public <T> PayMqMessageRecord saveMessageRecord(String topic, String tag, T message){ public PayMqMessageRecord saveMessageRecord(String topic, String tag, MqRecordMessage message){
topic = rocketMQEnhanceTemplate.reBuildTopic(topic);
PayMqMessageRecord payMqMessageRecord = new PayMqMessageRecord(); PayMqMessageRecord payMqMessageRecord = new PayMqMessageRecord();
payMqMessageRecord.setStatus(EnumMessageStatus.PROCESS); payMqMessageRecord.setStatus(EnumMessageStatus.PROCESS);
payMqMessageRecord.setConsumerGroupName(topic); payMqMessageRecord.setConsumerGroupName(topic);
@ -174,4 +192,5 @@ public class PayMqMessageRecordServiceImpl implements PayMqMessageRecordService
payMqMessageRecordMapper.updateStatusByStatus(id,status,errMsg); payMqMessageRecordMapper.updateStatusByStatus(id,status,errMsg);
} }
} }

Loading…
Cancel
Save