Browse Source

优化

feature-1.1
wuxicheng 3 years ago
parent
commit
2e1eeb2852
  1. 2
      bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/constant/RocketMqConstant.java
  2. 5
      bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java
  3. 15
      bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/persist/IPersist.java
  4. 11
      bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/persist/RedisPersist.java
  5. 25
      bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/strategy/RedisRepeatStrategy.java
  6. 5
      bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/template/RocketMQEnhanceTemplate.java

2
bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/constant/RocketMqConstant.java

@ -15,7 +15,7 @@ public class RocketMqConstant {
/**
* 数据来源前缀
*/
public static final String RETRY_PREFIX = "bnyer-";
public static final String MESSAGE_SOURCE_PREFIX = "bnyer";
/**
* 按顺序匹配1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h (1-18)

5
bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java

@ -1,6 +1,5 @@
package com.bnyer.common.rocketmq.handle;
import cn.hutool.core.util.IdUtil;
import com.alibaba.fastjson.JSONObject;
import com.bnyer.common.core.enums.EnumMessageStatus;
import com.bnyer.common.rocketmq.config.RepeatConsumerConfig;
@ -170,8 +169,8 @@ public abstract class EnhanceMessageHandler<T extends BaseMessage> {
}
//重新构建消息体
String messageSource = message.getSource();
if(!messageSource.startsWith(RocketMqConstant.RETRY_PREFIX)){
message.setSource(RocketMqConstant.RETRY_PREFIX + messageSource);
if(!messageSource.startsWith(RocketMqConstant.MESSAGE_SOURCE_PREFIX)){
message.setSource(RocketMqConstant.MESSAGE_SOURCE_PREFIX + "-" + messageSource);
}
message.setRetryTimes(message.getRetryTimes() + 1);

15
bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/persist/IPersist.java

@ -1,7 +1,6 @@
package com.bnyer.common.rocketmq.persist;
import com.bnyer.common.rocketmq.domain.BaseMessage;
import com.bnyer.common.rocketmq.domain.RepeatElement;
/**
@ -16,34 +15,30 @@ public interface IPersist {
* @param repeatElement
* @param processingExpireMilliSeconds
* @return
* @param <T>
*/
<T extends BaseMessage> boolean setConsumingIfNX(RepeatElement repeatElement, long processingExpireMilliSeconds);
boolean setConsumingIfNX(RepeatElement repeatElement, long processingExpireMilliSeconds);
/**
* 删除存储中的消息
* @param repeatElement
* @param <T>
*/
<T extends BaseMessage> void delete(RepeatElement repeatElement);
void delete(RepeatElement repeatElement);
/**
* 修改存储中的消息
* @param repeatElement
* @param recordReserveMinutes
* @param <T>
*/
<T extends BaseMessage> void markConsumed(RepeatElement repeatElement, long recordReserveMinutes);
void markConsumed(RepeatElement repeatElement, long recordReserveMinutes);
/**
* 获取存储中的消息
* @param repeatElement
* @return
* @param <T>
*/
<T extends BaseMessage> String get(RepeatElement repeatElement);
String get(RepeatElement repeatElement);
default <T extends BaseMessage> String toPrintInfo(RepeatElement repeatElement) {
default String toPrintInfo(RepeatElement repeatElement) {
return repeatElement.toString();
}
}

11
bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/persist/RedisPersist.java

@ -2,7 +2,6 @@ package com.bnyer.common.rocketmq.persist;
import com.bnyer.common.rocketmq.constant.RocketMqRepeatConstant;
import com.bnyer.common.rocketmq.domain.BaseMessage;
import com.bnyer.common.rocketmq.domain.RepeatElement;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.connection.RedisStringCommands;
@ -29,7 +28,7 @@ public class RedisPersist implements IPersist {
}
@Override
public <T extends BaseMessage> boolean setConsumingIfNX(RepeatElement repeatElement, long processingExpireMilliSeconds) {
public boolean setConsumingIfNX(RepeatElement repeatElement, long processingExpireMilliSeconds) {
String repeatKey = buildRepeatMessageRedisKey(repeatElement.getApplicationName(), repeatElement.getTopic(), repeatElement.getTag(), repeatElement.getMessageKey());
//setnx, 成功就可以消费
Boolean execute = redisTemplate.execute((RedisCallback<Boolean>) redisConnection -> redisConnection.set(repeatKey.getBytes(),
@ -41,25 +40,25 @@ public class RedisPersist implements IPersist {
}
@Override
public <T extends BaseMessage> void delete(RepeatElement repeatElement) {
public void delete(RepeatElement repeatElement) {
String repeatKey = buildRepeatMessageRedisKey(repeatElement.getApplicationName(), repeatElement.getTopic(), repeatElement.getTag(), repeatElement.getMessageKey());
redisTemplate.delete(repeatKey);
}
@Override
public <T extends BaseMessage> void markConsumed(RepeatElement repeatElement, long recordReserveMinutes) {
public void markConsumed(RepeatElement repeatElement, long recordReserveMinutes) {
String repeatKey = buildRepeatMessageRedisKey(repeatElement.getApplicationName(), repeatElement.getTopic(), repeatElement.getTag(), repeatElement.getMessageKey());
redisTemplate.opsForValue().set(repeatKey, RocketMqRepeatConstant.CONSUME_STATUS_SUCCESS, recordReserveMinutes, TimeUnit.MINUTES);
}
@Override
public <T extends BaseMessage> String get(RepeatElement repeatElement) {
public String get(RepeatElement repeatElement) {
String repeatKey = buildRepeatMessageRedisKey(repeatElement.getApplicationName(), repeatElement.getTopic(), repeatElement.getTag(), repeatElement.getMessageKey());
return redisTemplate.opsForValue().get(repeatKey);
}
@Override
public <T extends BaseMessage> String toPrintInfo(RepeatElement repeatElement) {
public String toPrintInfo(RepeatElement repeatElement) {
return buildRepeatMessageRedisKey(repeatElement.getApplicationName(), repeatElement.getTopic(), repeatElement.getTag(), repeatElement.getMessageKey());
}

25
bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/strategy/RedisRepeatStrategy.java

@ -14,21 +14,26 @@ import java.util.function.Function;
/**
* @author :WXC
* @Date :2023/05/20
* @description :如果已经消费过则直接消费幂等掉
* @description :如果已经消费过则直接幂等掉
*/
@Slf4j
public class RedisRepeatStrategy implements RepeatConsumerStrategy {
private final RepeatConsumerConfig filterConfig;
private final RepeatConsumerConfig repeatConfig;
/**
* 获取去重键的函数
*/
private final Function<BaseMessage, String> repeatMessageKeyFunction;
public RedisRepeatStrategy(RepeatConsumerConfig dedupConfig, Function<BaseMessage, String> dedupMessageKeyFunction) {
this.filterConfig = dedupConfig;
this.repeatMessageKeyFunction = dedupMessageKeyFunction;
/**
* 重复消息
*/
private RepeatElement repeatElement;
public RedisRepeatStrategy(RepeatConsumerConfig repeatConfig, Function<BaseMessage, String> repeatMessageKeyFunction) {
this.repeatConfig = repeatConfig;
this.repeatMessageKeyFunction = repeatMessageKeyFunction;
}
@ -38,13 +43,13 @@ public class RedisRepeatStrategy implements RepeatConsumerStrategy {
}
private <T extends BaseMessage> boolean doInvoke(T message) {
IPersist persist = filterConfig.getPersist();
RepeatElement repeatElement = new RepeatElement(filterConfig.getApplicationName(), message.getConsumerGroupName(),message.getTopic()
, message.getTag()==null ? "" : message.getTag()
, repeatMessageKeyFunction.apply(message));
IPersist persist = repeatConfig.getPersist();
//RepeatElement repeatElement = new RepeatElement(repeatConfig.getApplicationName(), message.getConsumerGroupName(),message.getTopic()
// , message.getTag()==null ? "" : message.getTag()
// , repeatMessageKeyFunction.apply(message));
boolean shouldConsume = true;
if (StringUtils.isNotBlank(repeatElement.getMessageKey())) {
shouldConsume = persist.setConsumingIfNX(repeatElement, filterConfig.getProcessingExpireMilliSeconds());
shouldConsume = persist.setConsumingIfNX(repeatElement, repeatConfig.getProcessingExpireMilliSeconds());
}
//设置成功,证明没有消费过
if (shouldConsume) {

5
bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/template/RocketMQEnhanceTemplate.java

@ -4,6 +4,7 @@ import cn.hutool.core.util.IdUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.bnyer.common.rocketmq.config.RocketEnhanceProperties;
import com.bnyer.common.rocketmq.constant.RocketMqConstant;
import com.bnyer.common.rocketmq.domain.BaseMessage;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -107,7 +108,7 @@ public class RocketMQEnhanceTemplate {
Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getMessageKey()).build();
SendResult sendResult;
try {
sendResult = template.syncSend(buildDestination(topic,tag), sendMessage, 10000, delayLevel);
sendResult = template.syncSend(buildDestination(topic,tag), sendMessage, RocketMqConstant.TIME_OUT, delayLevel);
} catch (Exception e) {
throw new RuntimeException(e);
}
@ -151,7 +152,7 @@ public class RocketMQEnhanceTemplate {
public void onException(Throwable throwable) {
log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",topic,tag,JSON.toJSONString(message),throwable.getMessage());
}
},10000,delayLevel);
}, RocketMqConstant.TIME_OUT,delayLevel);
}
/**

Loading…
Cancel
Save