diff --git a/bnyer-common/bnyer-common-rocketmq/pom.xml b/bnyer-common/bnyer-common-rocketmq/pom.xml index 2196bce..388bdeb 100644 --- a/bnyer-common/bnyer-common-rocketmq/pom.xml +++ b/bnyer-common/bnyer-common-rocketmq/pom.xml @@ -27,6 +27,10 @@ org.apache.rocketmq rocketmq-spring-boot-starter + + com.dimensionalnode + bnyer-common-redis + diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/constant/RocketMqTag.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/constant/RocketMqTag.java new file mode 100644 index 0000000..07c2c4f --- /dev/null +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/constant/RocketMqTag.java @@ -0,0 +1,16 @@ +package com.bnyer.common.rocketmq.constant; + +/** + * @author :WXC + * @Date :2023/05/17 + * @description : 消息标签 + */ +public class RocketMqTag { + + /** + * 会员订单 + */ + public static final String ORDER_VIP_TAG = "vip"; + + +} diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/constant/RocketMqTopic.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/constant/RocketMqTopic.java index 6910bf5..b3a1bcb 100644 --- a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/constant/RocketMqTopic.java +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/constant/RocketMqTopic.java @@ -8,14 +8,14 @@ package com.bnyer.common.rocketmq.constant; public class RocketMqTopic { /** - * vip订单取消 + * 订单取消 */ - public static final String VIP_ORDER_CANCEL_TOPIC = "vip-order-cancel-topic"; + public static final String ORDER_CANCEL_TOPIC = "order-cancel-topic"; /** - * vip订单支付成功 + * 订单支付成功 */ - public static final String VIP_ORDER_PAY_NOTIFY_TOPIC = "vip-order-pay-notify-topic"; + public static final String ORDER_PAY_NOTIFY_TOPIC = "order-pay-notify-topic"; /** * vip记录创建 diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/MessageLog.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/MessageLog.java new file mode 100644 index 0000000..65b5ecf --- /dev/null +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/MessageLog.java @@ -0,0 +1,84 @@ +package com.bnyer.common.rocketmq.domain; + +import com.alibaba.fastjson.JSON; +import com.bnyer.common.rocketmq.enums.EnumMessageStatus; +import lombok.Builder; +import lombok.Getter; +import lombok.Setter; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; + +import java.time.LocalDateTime; +import java.util.Objects; + +/** + * @author :WXC + * @Date :2023/05/18 + * @description :消息日志 + */ +@Getter +@Setter +@Builder +public class MessageLog { + + /** + * 消息key,作为redis的key,消费的时候判断是否存在, + * 存在就判断状态是否是已消费,如果不是就进行消费,如果是就过滤该消息 + */ + private String messageKey; + + /** + * 消息主题 + */ + private String topic; + + /** + * 消息tag + */ + private String tag; + + /** + * 消费组名称 + */ + private String consumerGroupName; + + /** + * 消息状态 + */ + private EnumMessageStatus status; + + /** + * 错误信息 + */ + private String errMsg; + + /** + * 消费成功时间 + */ + private LocalDateTime consumerTime; + + /** + * 消息内容 + */ + private String message; + + /** + * 重试次数,用于判断重试次数,超过最大重试次数就人工补偿 + */ + protected Integer retryTimes = 0; + + public MessageLog(T message, SendResult sendResult) { + this.messageKey = message.getKey(); + this.message = JSON.toJSONString(message); + if (Objects.isNull(sendResult)){ + this.status = EnumMessageStatus.WALT_SEND; + }else { + if (SendStatus.SEND_OK == sendResult.getSendStatus()){ + this.status = EnumMessageStatus.SEND_OK; + }else { + this.status = EnumMessageStatus.SEND_FAILS; + } + } + } + +} diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/enums/EnumMessageStatus.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/enums/EnumMessageStatus.java new file mode 100644 index 0000000..acfe26f --- /dev/null +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/enums/EnumMessageStatus.java @@ -0,0 +1,28 @@ +package com.bnyer.common.rocketmq.enums; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +/** + * @author :WXC + * @Date :2023/05/18 + * @description :消息状态 + */ +@Getter +@AllArgsConstructor +public enum EnumMessageStatus { + //待发送 + WALT_SEND, + //发送成功 + SEND_OK, + //发送失败 + SEND_FAILS, + //待消费 + WALT_CONSUMER, + //消费中 + CONSUMER_PROCESS, + //消费成功 + CONSUMER_OK, + //消费失败 + CONSUMER_FAILS +} diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java index c51382e..65ade3e 100644 --- a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java @@ -1,15 +1,25 @@ package com.bnyer.common.rocketmq.handle; +import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; +import com.alibaba.fastjson.TypeReference; +import com.bnyer.common.redis.service.RedisService; +import com.bnyer.common.redis.service.RedissonService; import com.bnyer.common.rocketmq.constant.RocketMqConstant; import com.bnyer.common.rocketmq.domain.BaseMessage; +import com.bnyer.common.rocketmq.domain.MessageLog; +import com.bnyer.common.rocketmq.enums.EnumMessageStatus; import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.redisson.api.RLock; import javax.annotation.Resource; +import java.time.LocalDateTime; +import java.util.Objects; +import java.util.concurrent.TimeUnit; /** * @author :WXC @@ -31,6 +41,12 @@ public abstract class EnhanceMessageHandler { @Resource private RocketMQEnhanceTemplate rocketMQEnhanceTemplate; + @Resource + private RedisService redisService; + + @Resource + private RedissonService redissonService; + /** * 消息处理 * @@ -148,4 +164,82 @@ public abstract class EnhanceMessageHandler { } } + + /** + * 模板方法,过滤不符合条件的消息 + * @param message + * @return + */ + protected boolean handleFilter(T message){ + return false; +// RLock rLock = redissonService.getRLock(message.getKey()); +// try { +// //如果redis存在表示之前消费过,直接过滤 +// boolean tryLock = rLock.tryLock( 5,0, TimeUnit.MINUTES); +// if (!tryLock){ +// return true; +// } +// }catch (Exception e){ +// //需要重试消费 +// log.error("消息过滤执行异常,error:{}",e.getMessage()); +// throw new RuntimeException("消息过滤异常"); +// }finally { +// rLock.unlock(); +// } +// return true; + } + + /** + * 模板方法。保存消息日志 + * @param message + * @param status + */ + protected void handleSaveMessageLog(T message,EnumMessageStatus status){ + //超过最大重试次数以后,判定消费失败,记录日志,后续定时补偿 + MessageLog messageLog = getMessageLog(message); + if (Objects.isNull(messageLog)){ + messageLog = buildMessageLog(message); + } + if (EnumMessageStatus.CONSUMER_OK == status){ + messageLog.setConsumerTime(LocalDateTime.now()); + } + messageLog.setStatus(status); + saveMessageLog(messageLog); + } + + /** + * 获取消息日志 + * @param message + */ + protected MessageLog getMessageLog(T message){ + Object cacheObject = redisService.getCacheObject(message.getKey()); + if (Objects.nonNull(cacheObject)){ + MessageLog messageLog = JSON.parseObject(cacheObject.toString(),new TypeReference>(){}); + return messageLog; + } + return null; + } + + /** + * 构建消息日志 + * @param message + * @return + */ + protected MessageLog buildMessageLog(T message){ + RocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class); + return MessageLog.builder().messageKey(message.getKey()).message(JSON.toJSONString(message)) + .tag(annotation.selectorExpression()) + .topic(annotation.topic()) + .consumerGroupName(annotation.consumerGroup()).build(); + } + + /** + * 保存消息日志 + * @param messageLog + * @param + */ + protected void saveMessageLog(MessageLog messageLog){ + + } + } \ No newline at end of file diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/listenter/AbstractTransactionRocketMQListener.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/listenter/AbstractTransactionRocketMQListener.java new file mode 100644 index 0000000..d050abc --- /dev/null +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/listenter/AbstractTransactionRocketMQListener.java @@ -0,0 +1,91 @@ +package com.bnyer.common.rocketmq.listenter; + +import com.bnyer.common.rocketmq.domain.BaseMessage; +import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; +import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; +import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; +import org.springframework.messaging.Message; + +import javax.annotation.Resource; + +/** + * @author :WXC + * @Date :2023/05/19 + * @description :事务消息监听,支持多事务消息 + */ +@Slf4j +@RocketMQTransactionListener +public abstract class AbstractTransactionRocketMQListener implements RocketMQLocalTransactionListener { + + @Resource + private RocketMQEnhanceTemplate rocketMQEnhanceTemplate; + + /** + * 本地使用事务执行 + * @param message + * @return + */ + protected abstract boolean process(String message); + + /** + * 查询本地事务结果 + * @param message + * @return + */ + protected abstract boolean checkSuccess(String message); + + /** + * 执行本地事务,并获取执行结果 + * @param msg Half(prepare) message + * @param arg Custom business parameter + * @return + */ + @Override + public RocketMQLocalTransactionState executeLocalTransaction(final Message msg, final Object arg) { + String message = new String((byte[]) msg.getPayload()); + log.info("------------消息执行本地事务--------------"); + try { + if (process(message)){ + log.info("------------消息执行提交--------------"); + return RocketMQLocalTransactionState.COMMIT; + } + log.info("------------消息执行回滚--------------"); + return RocketMQLocalTransactionState.ROLLBACK; + } catch (Exception e) { + e.printStackTrace(); + } + log.info("------------消息执行异常--------------"); + return RocketMQLocalTransactionState.UNKNOWN; + } + + /** + * 检查本地事务执行结果 + * @param msg Check message + * @return + */ + @Override + public RocketMQLocalTransactionState checkLocalTransaction(final Message msg) { + log.info("------------消息查询结果--------------"); + if (checkSuccess(new String((byte[]) msg.getPayload()))) { + log.info("------------查询消息提交--------------"); + return RocketMQLocalTransactionState.COMMIT; + } + log.info("------------查询消息回滚--------------"); + return RocketMQLocalTransactionState.ROLLBACK; + } + + /** + * 发送事务消息 + * @param topic + * @param tag + * @param message + * @return + * @param + */ + protected boolean sendTransactionalMsg(String topic, String tag, String arg, T message) { + return rocketMQEnhanceTemplate.sendTransactionalMsg(topic,tag,arg,message); + } + +} diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/template/RocketMQEnhanceTemplate.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/template/RocketMQEnhanceTemplate.java index fdd4aa0..769549a 100644 --- a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/template/RocketMQEnhanceTemplate.java +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/template/RocketMQEnhanceTemplate.java @@ -2,14 +2,15 @@ package com.bnyer.common.rocketmq.template; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; +import com.bnyer.common.redis.service.RedisService; import com.bnyer.common.rocketmq.config.RocketEnhanceProperties; -import com.bnyer.common.rocketmq.constant.RocketMqConstant; import com.bnyer.common.rocketmq.domain.BaseMessage; +import com.bnyer.common.rocketmq.domain.MessageLog; +import com.bnyer.common.rocketmq.enums.EnumMessageStatus; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; -import org.apache.rocketmq.client.producer.SendCallback; -import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.*; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.beans.factory.annotation.Autowired; @@ -29,6 +30,8 @@ public class RocketMQEnhanceTemplate { private final RocketMQTemplate template; + private final RedisService redisService; + @Resource private RocketEnhanceProperties rocketEnhanceProperties; @@ -65,25 +68,20 @@ public class RocketMQEnhanceTemplate { * @param */ public SendResult send(String topic, String tag, T message) { - // 注意分隔符 - return send(buildDestination(topic,tag), message); - } - - - /** - * 发送同步消息 - * @param destination - * @param message - * @return - * @param - */ - public SendResult send(String destination, T message) { // 设置业务键,此处根据公共的参数进行处理 // 更多的其它基础业务处理... + MessageLog messageLog = saveMessageLog(message, null); Message sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build(); - SendResult sendResult = template.syncSend(destination, sendMessage); + SendResult sendResult; + try { + sendResult = template.syncSend(buildDestination(topic,tag), sendMessage); + delMessageLog(messageLog); + } catch (Exception e) { + editMessageLog(messageLog); + throw new RuntimeException(e); + } // 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如ELK采集 - log.info("[{}]同步消息[{}]发送结果[{}]", destination, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); + log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", topic,tag, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); return sendResult; } @@ -97,21 +95,17 @@ public class RocketMQEnhanceTemplate { * @param */ public SendResult send(String topic, String tag, T message, int delayLevel) { - return send(buildDestination(topic,tag), message, delayLevel); - } - - /** - * 发送同步延时消息 - * @param destination - * @param message - * @param delayLevel - * @return - * @param - */ - public SendResult send(String destination, T message, int delayLevel) { + MessageLog messageLog = saveMessageLog(message, null); Message sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build(); - SendResult sendResult = template.syncSend(destination, sendMessage, 3000, delayLevel); - log.info("[{}]延迟等级[{}]消息[{}]发送结果[{}]", destination, delayLevel, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); + SendResult sendResult; + try { + sendResult = template.syncSend(buildDestination(topic,tag), sendMessage, 10000, delayLevel); + delMessageLog(messageLog); + } catch (Exception e) { + editMessageLog(messageLog); + throw new RuntimeException(e); + } + log.info("topic:[{}]tag:[{}]延迟等级[{}]消息[{}]发送结果[{}]", topic,tag, delayLevel, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); return sendResult; } @@ -120,52 +114,89 @@ public class RocketMQEnhanceTemplate { * (适合对响应时间敏感的业务场景) */ public void sendAsyncMsg(String topic, String tag, T message) { - sendAsyncMsg(buildDestination(topic,tag),message); - } - - /** - * 发送异步消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑) - * (适合对响应时间敏感的业务场景) - */ - public void sendAsyncMsg(String destination, T message) { - template.asyncSend(destination, MessageBuilder.withPayload(message).build(), new SendCallback() { + MessageLog messageLog = saveMessageLog(message, null); + template.asyncSend(buildDestination(topic,tag), MessageBuilder.withPayload(message).build(), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { // 处理消息发送成功逻辑 - log.info("消息发送成功,destination:{},result:{}",destination, JSON.toJSONString(sendResult)); + log.info("消息发送成功,topic:{},tag:{},result:{}",topic,tag, JSON.toJSONString(sendResult)); + delMessageLog(messageLog); } @Override public void onException(Throwable throwable) { // 处理消息发送失败逻辑 - log.error("消息发送失败,destination:{},error:{}",destination,throwable.getMessage()); + log.error("消息发送失败,topic:{},tag:{},error:{}",topic,tag,throwable.getMessage()); + editMessageLog(messageLog); } }); } - /** - * 发送异步延时消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑) - * - */ - public void sendAsyncMsg(String topic, String tag, T message, int delayLevel) { - sendAsyncMsg(buildDestination(topic,tag),message,delayLevel); - } - /** * 发送异步延时消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑) * (适合对响应时间敏感的业务场景) */ - public void sendAsyncMsg(String destination, T message, int delayLevel) { - template.asyncSend(destination, MessageBuilder.withPayload(message).build(), new SendCallback() { + public void sendAsyncMsg(String topic, String tag, T message, int delayLevel) { + MessageLog messageLog = saveMessageLog(message, null); + template.asyncSend(buildDestination(topic,tag), MessageBuilder.withPayload(message).build(), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { // 处理消息发送成功逻辑 - log.info("消息发送成功,destination:{},result:{}",destination,JSON.toJSONString(sendResult)); + log.info("消息发送成功,topic:{},tag:{},result:{}",topic,tag,JSON.toJSONString(sendResult)); + delMessageLog(messageLog); } @Override public void onException(Throwable throwable) { - log.error("消息发送失败,destination:{},error:{}",destination,throwable.getMessage()); + log.error("消息发送失败,topic:{},tag:{},error:{}",topic,tag,throwable.getMessage()); + editMessageLog(messageLog); } },10000,delayLevel); } + /** + * 发送事务消息 + * @param topic 主题 + * @param tag 标签 + * @param message 消息 + * @return 发送结果 + * @param + */ + public boolean sendTransactionalMsg(String topic, String tag,String arg, T message) { + String destination = buildDestination(topic, tag); + TransactionSendResult sendResult = template.sendMessageInTransaction(destination, MessageBuilder.withPayload(message).build(), arg); + log.info("Send transaction msg result: " + sendResult); + return sendResult.getSendStatus() == SendStatus.SEND_OK; + } + + /** + * 添加消息日志 + * @param message + * @param sendResult + * @param + */ + private MessageLog saveMessageLog(T message,SendResult sendResult){ + log.info("消息发送前,入库消息日志表,如果发送失败,可进行消息补偿"); + MessageLog messageLog = new MessageLog<>(message, sendResult); + //保存 + return messageLog; + } + + /** + * 修改消息日志 + * @param messageLog + * @param + */ + private void editMessageLog(MessageLog messageLog){ + log.info("消息发送失败,修改消息日志状态,可进行消息补偿"); + //修改 + messageLog.setStatus(EnumMessageStatus.SEND_FAILS); + } + + /** + * 删除消息日志 + * @param messageLog + */ + private void delMessageLog(MessageLog messageLog){ + log.info("消息发送成功,删除消息日志"); + } + } diff --git a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/GoldRewardConsumer.java b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/GoldRewardConsumer.java index 4d96f1c..8705e01 100644 --- a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/GoldRewardConsumer.java +++ b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/GoldRewardConsumer.java @@ -137,12 +137,13 @@ public class GoldRewardConsumer extends EnhanceMessageHandler @Override protected boolean filter(GoldRewardMessage message) { - return false; + return super.handleFilter(message); } + @Override protected boolean isRetry() { - return false; + return true; } @Override diff --git a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/VipRecordCreateConsumer.java b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/VipRecordCreateConsumer.java index d1187d1..e7dd556 100644 --- a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/VipRecordCreateConsumer.java +++ b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/VipRecordCreateConsumer.java @@ -45,7 +45,7 @@ public class VipRecordCreateConsumer extends EnhanceMessageHandler implements RocketMQListener { @Autowired @@ -47,9 +48,10 @@ public class VipOrderCancelConsumer extends EnhanceMessageHandler implements RocketMQListener { @Resource @@ -38,9 +33,6 @@ public class VipOrderPayNotifyConsumer extends EnhanceMessageHandler { +public interface VipOrderService { /** * 生成会员订单,返回订单号 diff --git a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/VipOrderServiceImpl.java b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/VipOrderServiceImpl.java index 38799c5..c596abb 100644 --- a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/VipOrderServiceImpl.java +++ b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/VipOrderServiceImpl.java @@ -4,7 +4,6 @@ import cn.hutool.core.collection.CollUtil; import cn.hutool.core.date.DateUtil; import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.bnyer.common.core.constant.ServiceNameConstants; import com.bnyer.common.core.domain.R; import com.bnyer.common.core.domain.VipOrder; @@ -17,7 +16,10 @@ import com.bnyer.common.core.utils.bean.EntityConvertUtil; import com.bnyer.common.core.utils.uuid.IdUtils; import com.bnyer.common.core.vo.UserInfoVo; import com.bnyer.common.rocketmq.constant.RocketMqConstant; +import com.bnyer.common.rocketmq.constant.RocketMqTag; import com.bnyer.common.rocketmq.constant.RocketMqTopic; +import com.bnyer.common.rocketmq.domain.img.AddUserVipRecordMessage; +import com.bnyer.common.rocketmq.domain.img.GoldRewardMessage; import com.bnyer.common.rocketmq.domain.order.VipOrderCancelMessage; import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate; import com.bnyer.common.security.utils.SecurityUtils; @@ -25,17 +27,13 @@ import com.bnyer.img.api.remote.RemoteUserVipService; import com.bnyer.img.api.vo.UserVipVo; import com.bnyer.order.bean.dto.AddVipOrderDto; import com.bnyer.order.bean.dto.CancelVipOrderDto; -import com.bnyer.order.bean.query.VipOrderQuery; import com.bnyer.order.bean.query.VipOrderExtQuery; +import com.bnyer.order.bean.query.VipOrderQuery; import com.bnyer.order.bean.vo.VipOrderVo; -import com.bnyer.common.core.enums.EnumVipOrderStatus; import com.bnyer.order.mapper.VipOrderMapper; import com.bnyer.order.service.VipOrderService; import lombok.extern.slf4j.Slf4j; -import org.apache.rocketmq.client.producer.SendStatus; -import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.messaging.support.GenericMessage; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -51,7 +49,7 @@ import java.util.stream.Collectors; */ @Slf4j @Service -public class VipOrderServiceImpl extends ServiceImpl implements VipOrderService { +public class VipOrderServiceImpl implements VipOrderService { // @Autowired // private RocketMQTemplate orderCancelMqTemplate; @@ -118,7 +116,7 @@ public class VipOrderServiceImpl extends ServiceImpl i vipOrderCancelMessage.setKey(IdUtils.randomUUID()); vipOrderCancelMessage.setSource(ServiceNameConstants.ORDER_SERVICE); vipOrderCancelMessage.setOrderNo(orderNo); - rocketMQEnhanceTemplate.sendAsyncMsg(RocketMqTopic.VIP_ORDER_CANCEL_TOPIC,null,vipOrderCancelMessage,RocketMqConstant.THIRTY_MINUTES); + rocketMQEnhanceTemplate.send(RocketMqTopic.ORDER_CANCEL_TOPIC, RocketMqTag.ORDER_VIP_TAG,vipOrderCancelMessage,RocketMqConstant.THIRTY_MINUTES) // SendStatus sendStatus = orderCancelMqTemplate.syncSend(RocketMqConstant.VIP_ORDER_CANCEL_TOPIC, new GenericMessage<>(orderNo), RocketMqConstant.TIMEOUT, RocketMqConstant.CANCEL_ORDER_DELAY_LEVEL).getSendStatus(); // if (!Objects.equals(sendStatus,SendStatus.SEND_OK)) { // // 消息发不出去就抛异常,发的出去无所谓 @@ -223,6 +221,7 @@ public class VipOrderServiceImpl extends ServiceImpl i * 更新订单信息为已支付 * @param vipOrder */ + @Transactional @Override public void updateByToPaySuccess(VipOrder vipOrder) { vipOrder.setOrderStatus(EnumVipOrderStatus.SUCCESS.getStatus()); @@ -230,6 +229,58 @@ public class VipOrderServiceImpl extends ServiceImpl i vipOrder.setPayTime(new Date()); //更新订单 vipOrderMapper.updateById(vipOrder); + + //发消息,添加用户会员记录 + AddUserVipRecordMessage addUserVipRecordMessage = buildVipRecordMsg(vipOrder); + rocketMQEnhanceTemplate.sendAsyncMsg(RocketMqTopic.VIP_RECORD_CREATE_TOPIC,null, addUserVipRecordMessage); + + //发送开会员画意值奖励并写入记录消息 + GoldRewardMessage goldMsg = buildGoldRewardMsg(vipOrder.getUserId(),300, GoldEnum.BUY_VIP.getCode(),null,vipOrder.getUserClientType()); + rocketMQEnhanceTemplate.sendAsyncMsg(RocketMqTopic.GOLD_REWARD_TOPIC,null,goldMsg); + } + + + /** + * 构建vip记录消息体 + * @param vipOrder + * @return + */ + private AddUserVipRecordMessage buildVipRecordMsg(VipOrder vipOrder) { + AddUserVipRecordMessage message = new AddUserVipRecordMessage(); + message.setKey(IdUtils.randomUUID()); + message.setSource(ServiceNameConstants.ORDER_SERVICE); + message.setDays(vipOrder.getDays()); + message.setVipId(vipOrder.getVipId()); + message.setVipName(vipOrder.getVipName()); + message.setVipTypeName(vipOrder.getVipTypeName()); + message.setUserClientType(vipOrder.getUserClientType()); + message.setPhone(vipOrder.getPhone()); + message.setUserId(vipOrder.getUserId()); + return message; } + /** + * 构建画意值奖励消息体 + * @param userId 用户id + * @param goldNum 奖励/消耗画意值 + * @param platform 平台 + * @param userClientType 用户客户端类型 + * @param goldCode 画意值枚举类编码 + * @return - + */ + private GoldRewardMessage buildGoldRewardMsg(Long userId, int goldNum, String goldCode, String platform, Integer userClientType) { + GoldRewardMessage message = new GoldRewardMessage(); + message.setKey(IdUtils.randomUUID()); + message.setSource(ServiceNameConstants.ORDER_SERVICE); + message.setUserId(userId); + message.setGoldNum(goldNum); + message.setGoldCode(goldCode); + if(StringUtils.isNotNull(platform)){ + message.setPlatform(platform); + } + if(StringUtils.isNotNull(userClientType)){ + message.setUserClientType(userClientType); + } + return message; + } } diff --git a/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/PayInfoService.java b/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/PayInfoService.java index 1e0c074..482f927 100644 --- a/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/PayInfoService.java +++ b/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/PayInfoService.java @@ -1,7 +1,5 @@ package com.bnyer.pay.service; -import com.bnyer.common.core.domain.PayInfo; -import com.baomidou.mybatisplus.extension.service.IService; import com.bnyer.pay.bean.dto.AddPayInfoDto; import com.bnyer.pay.bean.dto.EditPayInfoNotifyDto; import com.bnyer.pay.bean.vo.PayInfoDetailsVo; @@ -10,7 +8,7 @@ import com.bnyer.pay.bean.vo.PayInfoDetailsVo; * @author :WXC * @description : */ -public interface PayInfoService extends IService{ +public interface PayInfoService { /** * 添加支付订单 * @param dto diff --git a/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/impl/PayInfoServiceImpl.java b/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/impl/PayInfoServiceImpl.java index 3e30806..57cf08f 100644 --- a/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/impl/PayInfoServiceImpl.java +++ b/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/impl/PayInfoServiceImpl.java @@ -2,7 +2,6 @@ package com.bnyer.pay.service.impl; import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.bnyer.common.core.constant.ServiceNameConstants; import com.bnyer.common.core.domain.PayInfo; import com.bnyer.common.core.enums.EnumSceneCode; @@ -10,6 +9,7 @@ import com.bnyer.common.core.enums.ResponseEnum; import com.bnyer.common.core.exception.ServiceException; import com.bnyer.common.core.utils.bean.EntityConvertUtil; import com.bnyer.common.core.utils.uuid.IdUtils; +import com.bnyer.common.rocketmq.constant.RocketMqTag; import com.bnyer.common.rocketmq.constant.RocketMqTopic; import com.bnyer.common.rocketmq.domain.order.VipOrderPayNotifyMessage; import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate; @@ -32,7 +32,7 @@ import java.util.Objects; */ @Slf4j @Service -public class PayInfoServiceImpl extends ServiceImpl implements PayInfoService { +public class PayInfoServiceImpl implements PayInfoService { @Autowired private PayInfoMapper payInfoMapper; @@ -86,7 +86,7 @@ public class PayInfoServiceImpl extends ServiceImpl impl vipOrderPayNotifyMessage.setKey(IdUtils.randomUUID()); vipOrderPayNotifyMessage.setSource(ServiceNameConstants.PAY_SERVICE); vipOrderPayNotifyMessage.setOrderNo(orderNo); - rocketMQEnhanceTemplate.sendAsyncMsg(RocketMqTopic.VIP_ORDER_PAY_NOTIFY_TOPIC,null, vipOrderPayNotifyMessage); + rocketMQEnhanceTemplate.sendAsyncMsg(RocketMqTopic.ORDER_PAY_NOTIFY_TOPIC, RocketMqTag.ORDER_VIP_TAG, vipOrderPayNotifyMessage); break; } }