Browse Source

bug修复

feature-1.1
wuxicheng 3 years ago
parent
commit
aa9369d03c
  1. 18
      bnyer-common/bnyer-common-core/src/main/java/com/bnyer/common/core/enums/EnumMessageType.java
  2. 3
      bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java
  3. 8
      bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/VipRecordCreateConsumer.java
  4. 17
      bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/OrderMqMessageRecordService.java
  5. 35
      bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/OrderMqMessageRecordServiceImpl.java
  6. 9
      bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/VipOrderServiceImpl.java

18
bnyer-common/bnyer-common-core/src/main/java/com/bnyer/common/core/enums/EnumMessageType.java

@ -0,0 +1,18 @@
package com.bnyer.common.core.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* @author :WXC
* @Date :2023/05/18
* @description :消息类型
*/
@Getter
@AllArgsConstructor
public enum EnumMessageType {
//普通
GENERAL,
//延时
DELAY
}

3
bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java

@ -217,12 +217,13 @@ public abstract class EnhanceMessageHandler<T extends BaseMessage> {
* @param success
* @param message
*/
protected void buildRerunMessage(EnumMessageStatus success, OrderMqLocalRecordMessage message) {
protected OrderMqLocalRecordMessage buildRerunMessage(EnumMessageStatus success, OrderMqLocalRecordMessage message) {
OrderMqLocalRecordMessage orderMqLocalRecordMessage = new OrderMqLocalRecordMessage();
orderMqLocalRecordMessage.setStatus(success);
orderMqLocalRecordMessage.setId(message.getId());
orderMqLocalRecordMessage.setSource(ServiceNameConstants.IMG_SERVICE);
orderMqLocalRecordMessage.setMessageKey(IdUtil.randomUUID());
return orderMqLocalRecordMessage;
}
}

8
bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/VipRecordCreateConsumer.java

@ -52,8 +52,8 @@ public class VipRecordCreateConsumer extends EnhanceMessageHandler<OrderMqLocalR
//发送返回队列,告知已经处理成功,完成最终一致性
//TODO: 2023/05/20 为避免这里消息通知失败,定时补偿的时候需要根据创建时间,判断如果超过固定时间比如30分钟,就直接设置成作废,避免
// 定时任务一直补偿,因为这里不做补偿机制,是不会重发消息的
super.buildRerunMessage(EnumMessageStatus.SUCCESS, message);
rocketMQEnhanceTemplate.sendAsyncMsg(RocketMqTopic.ORDER_RETURN_MSG_TOPIC,null,message);
OrderMqLocalRecordMessage returnMessage = super.buildRerunMessage(EnumMessageStatus.SUCCESS, message);
rocketMQEnhanceTemplate.sendAsyncMsg(RocketMqTopic.ORDER_RETURN_MSG_TOPIC,null,returnMessage);
}
@Override
@ -61,8 +61,8 @@ public class VipRecordCreateConsumer extends EnhanceMessageHandler<OrderMqLocalR
//发送返回队列,告知已经超过最大重试次数,消费失败,需人工处理该记录
//TODO: 2023/05/20 为避免这里消息通知失败,定时补偿的时候需要根据创建时间,判断如果超过固定时间比如30分钟,就直接设置成作废,避免
// 定时任务一直补偿,因为这里不做补偿机制,是不会重发消息的
super.buildRerunMessage(EnumMessageStatus.FAILS, message);
rocketMQEnhanceTemplate.sendAsyncMsg(RocketMqTopic.ORDER_RETURN_MSG_TOPIC,null,message);
OrderMqLocalRecordMessage returnMessage = super.buildRerunMessage(EnumMessageStatus.FAILS, message);
rocketMQEnhanceTemplate.sendAsyncMsg(RocketMqTopic.ORDER_RETURN_MSG_TOPIC,null,returnMessage);
}
@Override

17
bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/OrderMqMessageRecordService.java

@ -2,7 +2,6 @@ package com.bnyer.order.service;
import com.bnyer.common.core.domain.OrderMqMessageRecord;
import com.bnyer.common.core.enums.EnumMessageStatus;
import com.bnyer.common.rocketmq.domain.BaseMessage;
/**
* @author :WXC
@ -11,23 +10,35 @@ import com.bnyer.common.rocketmq.domain.BaseMessage;
*/
public interface OrderMqMessageRecordService {
/**
* 发送同步步消息
* @param topic
* @param tag
* @param message
* @param <T>
*/
<T> void send(String topic, String tag, T message);
/**
* 发送异步消息
* @param topic
* @param tag
* @param addUserVipRecordMessage
* @param message
* @param <T>
*/
<T> void sendAsyncMsg(String topic, String tag, T addUserVipRecordMessage);
<T> void sendAsyncMsg(String topic, String tag, T message);
/**
* 发送异步延时消息
* 废弃延时消息不走消息表因为延时消息最少支持秒级延时定时任务不可能对消息表进行秒级扫描那样性能损耗太大了
* 所以确保消息一定发送成功走同步发送如果发送失败直接抛异常确保本地事物回滚
* @param topic
* @param tag
* @param message
* @param delayLevel
* @param <T>
*/
@Deprecated
<T> void sendAsyncMsg(String topic, String tag, T message, int delayLevel);
/**

35
bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/OrderMqMessageRecordServiceImpl.java

@ -1,6 +1,7 @@
package com.bnyer.order.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.bnyer.common.core.constant.ServiceNameConstants;
import com.bnyer.common.core.domain.OrderMqMessageRecord;
import com.bnyer.common.core.enums.EnumMessageStatus;
@ -14,6 +15,8 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@ -41,6 +44,38 @@ public class OrderMqMessageRecordServiceImpl implements OrderMqMessageRecordServ
@Resource
private OrderMqMessageRecordMapper orderMqMessageRecordMapper;
/**
* 发送同步步消息
* @param topic
* @param tag
* @param message
* @param <T>
*/
@Transactional
@Override
public <T> void send(String topic, String tag, T message) {
// 设置业务键,此处根据公共的参数进行处理
// 更多的其它基础业务处理...
RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate();
//保存消息记录
log.info("消息发送中,开始入库本地消息记录表");
OrderMqMessageRecord orderMqMessageRecord = saveMessageRecord(topic,tag, message);
//发消息
OrderMqLocalRecordMessage mqLocalMessage = EntityConvertUtil.copy(orderMqMessageRecord, OrderMqLocalRecordMessage.class);
mqLocalMessage.setSource(ServiceNameConstants.ORDER_SERVICE);
Message<OrderMqLocalRecordMessage> sendMessage = MessageBuilder.withPayload(mqLocalMessage).setHeader(RocketMQHeaders.KEYS, orderMqMessageRecord.getMessageKey()).build();
SendResult sendResult;
try {
sendResult = rocketMQTemplate.syncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), sendMessage);
log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", topic,tag, JSONObject.toJSON(message), JSONObject.toJSON(sendResult));
} catch (Exception e) {
editMessageRecordStatus(orderMqMessageRecord.getId(), orderMqMessageRecord.getStatus(),e.getMessage());
log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", topic,tag, JSONObject.toJSON(message), e.getMessage());
log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿");
// throw new RuntimeException(e);
}
}
/**
* 发送异步消息通过线程池执行发送到broker的消息任务执行完后回调在SendCallback中可处理相关成功失败时的逻辑
* 适合对响应时间敏感的业务场景

9
bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/VipOrderServiceImpl.java

@ -34,6 +34,7 @@ import com.bnyer.order.mapper.VipOrderMapper;
import com.bnyer.order.service.OrderMqMessageRecordService;
import com.bnyer.order.service.VipOrderService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@ -118,8 +119,12 @@ public class VipOrderServiceImpl implements VipOrderService {
//发送消息,如果三十分钟后没有支付,则取消订单
VipOrderCancelMessage vipOrderCancelMessage = new VipOrderCancelMessage();
vipOrderCancelMessage.setOrderNo(orderNo);
// rocketMQEnhanceTemplate.send(RocketMqTopic.ORDER_CANCEL_TOPIC, RocketMqTag.ORDER_VIP_TAG,vipOrderCancelMessage,RocketMqConstant.THIRTY_MINUTES);
orderMqMessageRecordService.sendAsyncMsg(RocketMqTopic.ORDER_CANCEL_TOPIC, RocketMqTag.ORDER_VIP_TAG,vipOrderCancelMessage,RocketMqConstant.THIRTY_MINUTES);
//延时消息不走消息表,因为延时消息最少支持秒级延时,定时任务不可能对消息表进行秒级扫描,那样性能损耗太大了
//所以确保消息一定发送成功,走同步发送,如果发送失败直接抛异常,确保本地事物回滚
SendStatus sendStatus = rocketMQEnhanceTemplate.send(RocketMqTopic.ORDER_CANCEL_TOPIC, RocketMqTag.ORDER_VIP_TAG, vipOrderCancelMessage, RocketMqConstant.THIRTY_MINUTES).getSendStatus();
if (SendStatus.SEND_OK != sendStatus){
throw new ServiceException("下单失败,请重试");
}
return orderNo;
}

Loading…
Cancel
Save