From f2da6fccbe24509dda237ce496066988509c266f Mon Sep 17 00:00:00 2001 From: wuxicheng <1441859745@qq.com> Date: Wed, 17 May 2023 14:20:41 +0800 Subject: [PATCH] =?UTF-8?q?mq=E7=BB=9F=E4=B8=80=E6=94=B9=E4=B8=BA=E5=BC=82?= =?UTF-8?q?=E6=AD=A5=E5=8F=91=E9=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../vip/VipOrderPayNotifyConsumer.java | 38 ++++++++++---- .../service/impl/VipOrderServiceImpl.java | 26 +++++++--- .../pay/service/impl/PayInfoServiceImpl.java | 51 +++++++++++-------- 3 files changed, 77 insertions(+), 38 deletions(-) diff --git a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderPayNotifyConsumer.java b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderPayNotifyConsumer.java index 9292742..c2fb898 100644 --- a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderPayNotifyConsumer.java +++ b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderPayNotifyConsumer.java @@ -4,18 +4,15 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.bnyer.common.core.domain.VipOrder; -import com.bnyer.common.core.enums.ResponseEnum; -import com.bnyer.common.core.exception.ServiceException; -import com.bnyer.common.core.utils.SpringUtils; import com.bnyer.common.rocketmq.config.RocketMqConstant; import com.bnyer.order.mapper.VipOrderMapper; import com.bnyer.order.service.VipOrderService; import lombok.extern.slf4j.Slf4j; -import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.core.RocketMQTemplate; -import org.springframework.messaging.support.GenericMessage; import org.springframework.stereotype.Component; import javax.annotation.Resource; @@ -55,11 +52,32 @@ public class VipOrderPayNotifyConsumer implements RocketMQListener { vipOrderService.updateByToPaySuccess(vipOrder); //发消息,添加用户会员记录 String msg = buildVipRecordMsg(vipOrder); - SendStatus sendStatus = vipRecordMqTemplate.syncSend(RocketMqConstant.VIP_RECORD_CREATE_TOPIC, new GenericMessage<>(msg)).getSendStatus(); - if (!Objects.equals(sendStatus, SendStatus.SEND_OK)) { - // 消息发不出去就抛异常 - throw new ServiceException(ResponseEnum.SERVER_ERROR); - } + sendMsg(RocketMqConstant.VIP_RECORD_CREATE_TOPIC, msg); +// SendStatus sendStatus = vipRecordMqTemplate.syncSend(RocketMqConstant.VIP_RECORD_CREATE_TOPIC, new GenericMessage<>(msg)).getSendStatus(); +// if (!Objects.equals(sendStatus, SendStatus.SEND_OK)) { +// // 消息发不出去就抛异常 +// throw new ServiceException(ResponseEnum.SERVER_ERROR); +// } + // TODO: 2023/05/17 发消息添加其他奖励 + } + + /** + * 发送消息 + * @param topic + * @param msg + * @param + */ + public void sendMsg(String topic, T msg) { + vipRecordMqTemplate.asyncSend(topic, msg,new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + log.info("topic:{}消息发送成功,result:{}",topic,JSON.toJSONString(sendResult)); + } + @Override + public void onException(Throwable e) { + log.info("topic:{}消息发送失败,error:{}",topic,e.getMessage()); + } + }); } /** diff --git a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/VipOrderServiceImpl.java b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/VipOrderServiceImpl.java index 05c08cf..2ee83da 100644 --- a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/VipOrderServiceImpl.java +++ b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/VipOrderServiceImpl.java @@ -27,6 +27,8 @@ import com.bnyer.common.core.enums.EnumVipOrderStatus; import com.bnyer.order.mapper.VipOrderMapper; import com.bnyer.order.service.VipOrderService; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; @@ -105,13 +107,23 @@ public class VipOrderServiceImpl extends ServiceImpl i vipOrderMapper.insert(vipOrder); String orderNo = vipOrder.getOrderNo(); //发送消息,如果三十分钟后没有支付,则取消订单 - SendStatus sendStatus = orderCancelMqTemplate.syncSend(RocketMqConstant.VIP_ORDER_CANCEL_TOPIC, new GenericMessage<>(orderNo), RocketMqConstant.TIMEOUT, RocketMqConstant.CANCEL_ORDER_DELAY_LEVEL).getSendStatus(); - if (!Objects.equals(sendStatus,SendStatus.SEND_OK)) { - // 消息发不出去就抛异常,发的出去无所谓 - throw new ServiceException(ResponseEnum.SERVER_ERROR); - }else { - log.info("消息发送成功,topic:{}",RocketMqConstant.VIP_ORDER_CANCEL_TOPIC); - } + orderCancelMqTemplate.asyncSend(RocketMqConstant.VIP_ORDER_CANCEL_TOPIC, orderNo,new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + log.info("topic:{}消息发送成功,result:{}",RocketMqConstant.VIP_ORDER_CANCEL_TOPIC,JSON.toJSONString(sendResult)); + } + @Override + public void onException(Throwable e) { + log.info("topic:{}消息发送失败,error:{}",RocketMqConstant.VIP_ORDER_CANCEL_TOPIC,e.getMessage()); + } + },RocketMqConstant.TIMEOUT); +// SendStatus sendStatus = orderCancelMqTemplate.syncSend(RocketMqConstant.VIP_ORDER_CANCEL_TOPIC, new GenericMessage<>(orderNo), RocketMqConstant.TIMEOUT, RocketMqConstant.CANCEL_ORDER_DELAY_LEVEL).getSendStatus(); +// if (!Objects.equals(sendStatus,SendStatus.SEND_OK)) { +// // 消息发不出去就抛异常,发的出去无所谓 +// throw new ServiceException(ResponseEnum.SERVER_ERROR); +// }else { +// log.info("消息发送成功,topic:{}",RocketMqConstant.VIP_ORDER_CANCEL_TOPIC); +// } return orderNo; } diff --git a/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/impl/PayInfoServiceImpl.java b/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/impl/PayInfoServiceImpl.java index 1e30e3d..9f768dc 100644 --- a/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/impl/PayInfoServiceImpl.java +++ b/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/impl/PayInfoServiceImpl.java @@ -5,7 +5,6 @@ import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.bnyer.common.core.domain.PayInfo; -import com.bnyer.common.core.enums.EnumPayStatus; import com.bnyer.common.core.enums.EnumSceneCode; import com.bnyer.common.core.enums.ResponseEnum; import com.bnyer.common.core.exception.ServiceException; @@ -14,14 +13,14 @@ import com.bnyer.common.rocketmq.config.RocketMqConstant; import com.bnyer.pay.bean.dto.AddPayInfoDto; import com.bnyer.pay.bean.dto.EditPayInfoNotifyDto; import com.bnyer.pay.bean.dto.EditPayInfoSingleDto; +import com.bnyer.pay.bean.vo.PayInfoDetailsVo; import com.bnyer.pay.mapper.PayInfoMapper; import com.bnyer.pay.service.PayInfoService; -import com.bnyer.pay.bean.vo.PayInfoDetailsVo; import lombok.extern.slf4j.Slf4j; -import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.messaging.support.GenericMessage; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -86,26 +85,36 @@ public class PayInfoServiceImpl extends ServiceImpl impl vipRechargeMsgObj.put("orderNo",orderNo); String vipRechargeMsgStr = JSON.toJSONString(vipRechargeMsgObj); // TODO: 2023/04/23可优化为:添加一张消息日志表,字段:topicName,消息内容,消息状态(发送中、成功、失败),错误信息 -// vipOrderPayNotifyMqTemplate.asyncSend(RocketMqConstant.VIP_ORDER_PAY_NOTIFY_TOPIC,vipRechargeMsgStr,new SendCallback() { -// @Override -// public void onSuccess(SendResult sendResult) { -// log.info("topic:{}消息发送成功",RocketMqConstant.VIP_ORDER_PAY_NOTIFY_TOPIC); -// } -// @Override -// public void onException(Throwable e) { -// log.info("topic:{}消息发送失败,error:{}",RocketMqConstant.VIP_ORDER_PAY_NOTIFY_TOPIC,e.getMessage()); -// } -// }); - SendStatus sendStatus = vipOrderPayNotifyMqTemplate.syncSend(RocketMqConstant.VIP_ORDER_PAY_NOTIFY_TOPIC, - new GenericMessage<>(vipRechargeMsgStr)).getSendStatus(); - if (!Objects.equals(sendStatus, SendStatus.SEND_OK)) { - // 消息发不出去就抛异常,因为订单回调会有多次,几乎不可能每次都无法发送出去,发的出去无所谓 - throw new ServiceException(ResponseEnum.SERVER_ERROR); - } - break; + sendMsg(RocketMqConstant.VIP_ORDER_PAY_NOTIFY_TOPIC,vipRechargeMsgStr); +// SendStatus sendStatus = vipOrderPayNotifyMqTemplate.syncSend(RocketMqConstant.VIP_ORDER_PAY_NOTIFY_TOPIC, +// new GenericMessage<>(vipRechargeMsgStr)).getSendStatus(); +// if (!Objects.equals(sendStatus, SendStatus.SEND_OK)) { +// // 消息发不出去就抛异常,因为订单回调会有多次,几乎不可能每次都无法发送出去,发的出去无所谓 +// throw new ServiceException(ResponseEnum.SERVER_ERROR); +// } +// break; } } + /** + * 发送消息 + * @param topic + * @param msg + * @param + */ + public void sendMsg(String topic, T msg) { + vipOrderPayNotifyMqTemplate.asyncSend(topic, msg,new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + log.info("topic:{}消息发送成功,result:{}",topic,JSON.toJSONString(sendResult)); + } + @Override + public void onException(Throwable e) { + log.info("topic:{}消息发送失败,error:{}",topic,e.getMessage()); + } + }); + } + /** * 构建修改支付单信息实体 * @param editPayInfoNotifyDto