|
|
|
@ -91,7 +91,7 @@ public class ImgMqMessageRecordServiceImpl implements ImgMqMessageRecordService |
|
|
|
//保存消息记录
|
|
|
|
log.info("消息发送中,开始入库本地消息记录表"); |
|
|
|
MqRecordMessage mqRecordMessage = getMqRecordMessage(topic, tag, message); |
|
|
|
ImgMqMessageRecord imgMqMessageRecord = saveMessageRecord(topic,tag, mqRecordMessage); |
|
|
|
ImgMqMessageRecord imgMqMessageRecord = saveMessageRecord(mqRecordMessage); |
|
|
|
mqRecordMessage.setId(imgMqMessageRecord.getId()); |
|
|
|
//发消息
|
|
|
|
Message<MqRecordMessage> sendMessage = MessageBuilder.withPayload(mqRecordMessage).setHeader(RocketMQHeaders.KEYS, mqRecordMessage.getMessageKey()).build(); |
|
|
|
@ -118,7 +118,7 @@ public class ImgMqMessageRecordServiceImpl implements ImgMqMessageRecordService |
|
|
|
//保存消息记录
|
|
|
|
log.info("消息发送中,开始入库本地消息记录表"); |
|
|
|
MqRecordMessage mqRecordMessage = getMqRecordMessage(topic, tag, message); |
|
|
|
ImgMqMessageRecord imgMqMessageRecord = saveMessageRecord(topic,tag, mqRecordMessage); |
|
|
|
ImgMqMessageRecord imgMqMessageRecord = saveMessageRecord(mqRecordMessage); |
|
|
|
mqRecordMessage.setId(imgMqMessageRecord.getId()); |
|
|
|
//发消息
|
|
|
|
rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqRecordMessage).build(), new SendCallback() { |
|
|
|
@ -146,7 +146,7 @@ public class ImgMqMessageRecordServiceImpl implements ImgMqMessageRecordService |
|
|
|
//保存消息记录
|
|
|
|
log.info("消息发送中,开始入库本地消息记录表"); |
|
|
|
MqRecordMessage mqRecordMessage = getMqRecordMessage(topic, tag, message); |
|
|
|
ImgMqMessageRecord imgMqMessageRecord = saveMessageRecord(topic,tag, mqRecordMessage); |
|
|
|
ImgMqMessageRecord imgMqMessageRecord = saveMessageRecord(mqRecordMessage); |
|
|
|
mqRecordMessage.setId(imgMqMessageRecord.getId()); |
|
|
|
//发消息
|
|
|
|
rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqRecordMessage).build(), new SendCallback() { |
|
|
|
@ -168,13 +168,12 @@ public class ImgMqMessageRecordServiceImpl implements ImgMqMessageRecordService |
|
|
|
* @param message |
|
|
|
*/ |
|
|
|
@Transactional |
|
|
|
public ImgMqMessageRecord saveMessageRecord(String topic, String tag, MqRecordMessage message){ |
|
|
|
topic = rocketMQEnhanceTemplate.reBuildTopic(topic); |
|
|
|
public ImgMqMessageRecord saveMessageRecord(MqRecordMessage message){ |
|
|
|
ImgMqMessageRecord imgMqMessageRecord = new ImgMqMessageRecord(); |
|
|
|
imgMqMessageRecord.setStatus(EnumMessageStatus.PROCESS); |
|
|
|
imgMqMessageRecord.setConsumerGroupName(topic); |
|
|
|
imgMqMessageRecord.setTopic(topic); |
|
|
|
imgMqMessageRecord.setTag(tag); |
|
|
|
imgMqMessageRecord.setConsumerGroupName(message.getConsumerGroupName()); |
|
|
|
imgMqMessageRecord.setTopic(message.getTopic()); |
|
|
|
imgMqMessageRecord.setTag(message.getTag()); |
|
|
|
imgMqMessageRecord.setCreateTime(new Date()); |
|
|
|
imgMqMessageRecord.setMessageKey(IdUtils.randomUUID()); |
|
|
|
imgMqMessageRecord.setContent(JSON.toJSONString(message)); |
|
|
|
|