From 8c23ebeae7894b6f9c1171e4cc7465c0bb7258ae Mon Sep 17 00:00:00 2001 From: wuxicheng <1441859745@qq.com> Date: Thu, 25 May 2023 15:15:31 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rocketmq/domain/MqRecordMessage.java | 10 ++++ .../domain/img/AddUserVipRecordMessage.java | 29 ++++++--- .../domain/order/VipOrderCancelMessage.java | 3 +- .../strategy/RedisRepeatStrategy.java | 4 -- .../template/RocketMQEnhanceTemplate.java | 60 ++++++++++--------- .../img/listener/GoldRewardConsumer.java | 6 +- .../img/listener/VipRecordCreateConsumer.java | 11 +--- .../listener/vip/VipOrderCancelConsumer.java | 14 +++-- .../vip/VipOrderPayNotifyConsumer.java | 3 +- 9 files changed, 76 insertions(+), 64 deletions(-) diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/MqRecordMessage.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/MqRecordMessage.java index e76dddb..44821ab 100644 --- a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/MqRecordMessage.java +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/MqRecordMessage.java @@ -1,5 +1,6 @@ package com.bnyer.common.rocketmq.domain; +import com.alibaba.fastjson.JSON; import com.bnyer.common.core.enums.EnumMessageStatus; import lombok.Getter; import lombok.NoArgsConstructor; @@ -29,4 +30,13 @@ public class MqRecordMessage extends BaseMessage { */ private String content; + + public T getObject(Class clazz) { + if (this.content == null) { + return null; + } else { + return JSON.parseObject(content, clazz); + } + } + } diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/img/AddUserVipRecordMessage.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/img/AddUserVipRecordMessage.java index 7c1de89..23c2fdd 100644 --- a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/img/AddUserVipRecordMessage.java +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/img/AddUserVipRecordMessage.java @@ -1,6 +1,5 @@ package com.bnyer.common.rocketmq.domain.img; -import io.swagger.annotations.ApiModelProperty; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; @@ -15,24 +14,38 @@ import lombok.Setter; @NoArgsConstructor public class AddUserVipRecordMessage { - @ApiModelProperty(value="用户id") + /** + * 用户id + */ private Long userId; - @ApiModelProperty(value="用户手机号") + /** + * 用户手机号 + */ private String phone; - @ApiModelProperty(value="vip表id") + /** + * vip表id + */ private Long vipId; - @ApiModelProperty(value="本次开通计算后的天数") + /** + * 本次开通计算后的天数 + */ private Integer days; - @ApiModelProperty(value="vip名称") + /** + * vip名称 + */ private String vipName; - @ApiModelProperty(value = "vip类型名称") + /** + * vip类型名称 + */ private String vipTypeName; - @ApiModelProperty(value = "用户客户端类型:10用户-抖音 20用户-快手 30用户-微信 40艺术家-微信") + /** + * 用户客户端类型:10用户-抖音 20用户-快手 30用户-微信 40艺术家-微信 + */ private Integer userClientType; } diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/order/VipOrderCancelMessage.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/order/VipOrderCancelMessage.java index 98e55ab..db969ae 100644 --- a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/order/VipOrderCancelMessage.java +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/order/VipOrderCancelMessage.java @@ -1,6 +1,5 @@ package com.bnyer.common.rocketmq.domain.order; -import com.bnyer.common.rocketmq.domain.BaseMessage; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; @@ -13,7 +12,7 @@ import lombok.Setter; @Getter @Setter @NoArgsConstructor -public class VipOrderCancelMessage extends BaseMessage { +public class VipOrderCancelMessage { /** * 订单号 */ diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/strategy/RedisRepeatStrategy.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/strategy/RedisRepeatStrategy.java index 35b2b23..4cdd394 100644 --- a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/strategy/RedisRepeatStrategy.java +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/strategy/RedisRepeatStrategy.java @@ -26,10 +26,6 @@ public class RedisRepeatStrategy implements RepeatConsumerStrategy { */ private final Function repeatMessageKeyFunction; - /** - * 重复消息 - */ - private RepeatElement repeatElement; public RedisRepeatStrategy(RepeatConsumerConfig repeatConfig, Function repeatMessageKeyFunction) { this.repeatConfig = repeatConfig; diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/template/RocketMQEnhanceTemplate.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/template/RocketMQEnhanceTemplate.java index 8d088de..4c07283 100644 --- a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/template/RocketMQEnhanceTemplate.java +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/template/RocketMQEnhanceTemplate.java @@ -5,11 +5,14 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.bnyer.common.rocketmq.config.RocketEnhanceProperties; import com.bnyer.common.rocketmq.constant.RocketMqConstant; -import com.bnyer.common.rocketmq.domain.BaseMessage; +import com.bnyer.common.rocketmq.domain.MqRecordMessage; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; -import org.apache.rocketmq.client.producer.*; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.client.producer.TransactionSendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.beans.factory.annotation.Autowired; @@ -78,11 +81,11 @@ public class RocketMQEnhanceTemplate { * @return * @param */ - public SendResult send(String topic, String tag, T message) { + public SendResult send(String topic, String tag, T message) { // 设置业务键,此处根据公共的参数进行处理 // 更多的其它基础业务处理... - buildBaseMessage(topic,tag,message); - Message sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getMessageKey()).build(); + MqRecordMessage mqRecordMessage = buildBaseMessage(topic, tag, message); + Message sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, mqRecordMessage.getMessageKey()).build(); SendResult sendResult; try { sendResult = template.syncSend(buildDestination(topic,tag), sendMessage); @@ -90,7 +93,7 @@ public class RocketMQEnhanceTemplate { throw new RuntimeException(e); } // 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如ELK采集 - log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", topic,tag, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); + log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", topic,tag, JSONObject.toJSON(mqRecordMessage), JSONObject.toJSON(sendResult)); return sendResult; } @@ -103,16 +106,16 @@ public class RocketMQEnhanceTemplate { * @return * @param */ - public SendResult send(String topic, String tag, T message, int delayLevel) { - buildBaseMessage(topic,tag,message); - Message sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getMessageKey()).build(); + public SendResult send(String topic, String tag, T message, int delayLevel) { + MqRecordMessage mqRecordMessage = buildBaseMessage(topic, tag, message); + Message sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, mqRecordMessage.getMessageKey()).build(); SendResult sendResult; try { sendResult = template.syncSend(buildDestination(topic,tag), sendMessage, RocketMqConstant.TIME_OUT, delayLevel); } catch (Exception e) { throw new RuntimeException(e); } - log.info("topic:[{}]tag:[{}]延迟等级[{}]消息[{}]发送结果[{}]", topic,tag, delayLevel, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); + log.info("topic:[{}]tag:[{}]延迟等级[{}]消息[{}]发送结果[{}]", topic,tag, delayLevel, JSONObject.toJSON(mqRecordMessage), JSONObject.toJSON(sendResult)); return sendResult; } @@ -120,18 +123,18 @@ public class RocketMQEnhanceTemplate { * 发送异步消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑) * (适合对响应时间敏感的业务场景) */ - public void sendAsyncMsg(String topic, String tag, T message) { - buildBaseMessage(topic,tag,message); + public void sendAsyncMsg(String topic, String tag, T message) { + MqRecordMessage mqRecordMessage = buildBaseMessage(topic, tag, message); template.asyncSend(buildDestination(topic,tag), MessageBuilder.withPayload(message).build(), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { // 处理消息发送成功逻辑 - log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",topic,tag,JSON.toJSONString(message), JSON.toJSONString(sendResult)); + log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",topic,tag,JSON.toJSONString(mqRecordMessage), JSON.toJSONString(sendResult)); } @Override public void onException(Throwable throwable) { // 处理消息发送失败逻辑 - log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",topic,tag,JSON.toJSONString(message),throwable.getMessage()); + log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",topic,tag,JSON.toJSONString(mqRecordMessage),throwable.getMessage()); } }); } @@ -140,17 +143,17 @@ public class RocketMQEnhanceTemplate { * 发送异步延时消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑) * (适合对响应时间敏感的业务场景) */ - public void sendAsyncMsg(String topic, String tag, T message, int delayLevel) { - buildBaseMessage(topic,tag,message); + public void sendAsyncMsg(String topic, String tag, T message, int delayLevel) { + MqRecordMessage mqRecordMessage = buildBaseMessage(topic, tag, message); template.asyncSend(buildDestination(topic,tag), MessageBuilder.withPayload(message).build(), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { // 处理消息发送成功逻辑 - log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",topic,tag,JSON.toJSONString(message),JSON.toJSONString(sendResult)); + log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",topic,tag,JSON.toJSONString(mqRecordMessage),JSON.toJSONString(sendResult)); } @Override public void onException(Throwable throwable) { - log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",topic,tag,JSON.toJSONString(message),throwable.getMessage()); + log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",topic,tag,JSON.toJSONString(mqRecordMessage),throwable.getMessage()); } }, RocketMqConstant.TIME_OUT,delayLevel); } @@ -163,10 +166,10 @@ public class RocketMQEnhanceTemplate { * @return 发送结果 * @param */ - public boolean sendTransactionalMsg(String topic, String tag,String arg, T message) { - buildBaseMessage(topic,tag,message); + public boolean sendTransactionalMsg(String topic, String tag,String arg, T message) { + MqRecordMessage mqRecordMessage = buildBaseMessage(topic, tag, message); String destination = buildDestination(topic, tag); - TransactionSendResult sendResult = template.sendMessageInTransaction(destination, MessageBuilder.withPayload(message).build(), arg); + TransactionSendResult sendResult = template.sendMessageInTransaction(destination, MessageBuilder.withPayload(mqRecordMessage).build(), arg); log.info("Send transaction msg result: " + sendResult); return sendResult.getSendStatus() == SendStatus.SEND_OK; } @@ -179,13 +182,16 @@ public class RocketMQEnhanceTemplate { * @param message * @param */ - private void buildBaseMessage(String topic,String tag,T message){ + private MqRecordMessage buildBaseMessage(String topic,String tag,T message){ + MqRecordMessage mqRecordMessage = new MqRecordMessage(); String buildTopic = reBuildTopic(topic); - message.setSource(applicationName); - message.setMessageKey(applicationName + "-" + IdUtil.getSnowflakeNextIdStr()); - message.setTopic(buildTopic); - message.setTag(tag); - message.setConsumerGroupName(buildTopic); + mqRecordMessage.setSource(applicationName); + mqRecordMessage.setMessageKey(applicationName + "-" + IdUtil.getSnowflakeNextIdStr()); + mqRecordMessage.setTopic(buildTopic); + mqRecordMessage.setTag(tag); + mqRecordMessage.setConsumerGroupName(buildTopic); + mqRecordMessage.setContent(JSON.toJSONString(message)); + return mqRecordMessage; } } diff --git a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/GoldRewardConsumer.java b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/GoldRewardConsumer.java index 15833ca..497fabd 100644 --- a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/GoldRewardConsumer.java +++ b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/GoldRewardConsumer.java @@ -47,14 +47,10 @@ public class GoldRewardConsumer extends EnhanceMessageHandler i @Autowired private GoldLogService goldLogService; - @Resource - private RocketMQEnhanceTemplate rocketMQEnhanceTemplate; - @Override public void onMessage(MqRecordMessage message) { super.dispatchMessage(message); - String content = message.getContent(); - GoldRewardMessage goldReward = JSON.parseObject(content, GoldRewardMessage.class); + GoldRewardMessage goldReward = message.getObject(GoldRewardMessage.class); if(StringUtils.isNotNull(goldReward.getPlatform())){ switch (goldReward.getPlatform()){ case "0": diff --git a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/VipRecordCreateConsumer.java b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/VipRecordCreateConsumer.java index 86e304b..9673f03 100644 --- a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/VipRecordCreateConsumer.java +++ b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/VipRecordCreateConsumer.java @@ -1,14 +1,11 @@ package com.bnyer.img.listener; -import com.alibaba.fastjson.JSON; import com.bnyer.common.core.dto.AddUserVipRecordDto; -import com.bnyer.common.core.enums.EnumMessageStatus; import com.bnyer.common.core.utils.bean.EntityConvertUtil; import com.bnyer.common.rocketmq.constant.RocketMqTopic; import com.bnyer.common.rocketmq.domain.MqRecordMessage; import com.bnyer.common.rocketmq.domain.img.AddUserVipRecordMessage; import com.bnyer.common.rocketmq.handle.EnhanceMessageHandler; -import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate; import com.bnyer.img.service.UserVipRecordService; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; @@ -16,8 +13,6 @@ import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import javax.annotation.Resource; - /** * @author :WXC * @Date :2023/03/24 @@ -28,17 +23,13 @@ import javax.annotation.Resource; @RocketMQMessageListener(topic = RocketMqTopic.VIP_RECORD_CREATE_TOPIC,consumerGroup = RocketMqTopic.VIP_RECORD_CREATE_TOPIC) public class VipRecordCreateConsumer extends EnhanceMessageHandler implements RocketMQListener { - @Resource - private RocketMQEnhanceTemplate rocketMQEnhanceTemplate; - @Autowired private UserVipRecordService userVipRecordService; @Override public void onMessage(MqRecordMessage message) { super.dispatchMessage(message); - String content = message.getContent(); - AddUserVipRecordMessage addUserVipRecordMessage = JSON.parseObject(content, AddUserVipRecordMessage.class); + AddUserVipRecordMessage addUserVipRecordMessage = message.getObject(AddUserVipRecordMessage.class); AddUserVipRecordDto addUserVipRecordDto = EntityConvertUtil.copy(addUserVipRecordMessage, AddUserVipRecordDto.class); //保存用户会员记录 userVipRecordService.saveUserVipRecord(addUserVipRecordDto); diff --git a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderCancelConsumer.java b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderCancelConsumer.java index e9e3ea0..9c708c2 100644 --- a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderCancelConsumer.java +++ b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderCancelConsumer.java @@ -2,6 +2,7 @@ package com.bnyer.order.listener.vip; import com.bnyer.common.rocketmq.constant.RocketMqTag; import com.bnyer.common.rocketmq.constant.RocketMqTopic; +import com.bnyer.common.rocketmq.domain.MqRecordMessage; import com.bnyer.common.rocketmq.domain.order.VipOrderCancelMessage; import com.bnyer.common.rocketmq.handle.EnhanceMessageHandler; import com.bnyer.order.bean.dto.CancelVipOrderDto; @@ -21,33 +22,34 @@ import org.springframework.stereotype.Component; @Slf4j @Component @RocketMQMessageListener(topic = RocketMqTopic.ORDER_CANCEL_TOPIC,selectorExpression = RocketMqTag.ORDER_VIP_TAG,consumerGroup = RocketMqTopic.ORDER_CANCEL_TOPIC) -public class VipOrderCancelConsumer extends EnhanceMessageHandler implements RocketMQListener { +public class VipOrderCancelConsumer extends EnhanceMessageHandler implements RocketMQListener { @Autowired private VipOrderService vipOrderService; @Override - public void onMessage(VipOrderCancelMessage message) { + public void onMessage(MqRecordMessage message) { super.dispatchMessage(message); + VipOrderCancelMessage orderCancelMessage = message.getObject(VipOrderCancelMessage.class); // 如果订单未支付的话,将订单设为取消状态 CancelVipOrderDto cancelVipOrderDto = new CancelVipOrderDto(); cancelVipOrderDto.setOrderStatus(3); - cancelVipOrderDto.setOrderNos(Lists.newArrayList(message.getOrderNo())); + cancelVipOrderDto.setOrderNos(Lists.newArrayList(orderCancelMessage.getOrderNo())); vipOrderService.cancelVipOrder(cancelVipOrderDto); } @Override - protected void handleMessage(VipOrderCancelMessage message) throws Exception { + protected void handleMessage(MqRecordMessage message) throws Exception { } @Override - protected void handleMaxRetriesExceeded(VipOrderCancelMessage message) { + protected void handleMaxRetriesExceeded(MqRecordMessage message) { log.error("消息消费失败,可扩展执行后续处理"); } @Override - protected boolean filter(VipOrderCancelMessage message) { + protected boolean filter(MqRecordMessage message) { return super.handleMsgRepeat(message); } diff --git a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderPayNotifyConsumer.java b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderPayNotifyConsumer.java index 40aa36d..1e24ef2 100644 --- a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderPayNotifyConsumer.java +++ b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderPayNotifyConsumer.java @@ -44,8 +44,7 @@ public class VipOrderPayNotifyConsumer extends EnhanceMessageHandler().eq(VipOrder::getOrderNo, orderNo)); if (Objects.isNull(vipOrder)){