@RocketMQMessageListener 标记消息消费者,可以简化代码。
consumerGroup:消费组,全局唯一
topic:主题
selectorExpression : 标签,默认" *" 全部
consumeMode :
messageModel : 消息模式: MessageModel.CLUSTERING 点对点(默认)MessageModel.BROADCASTING 广播
1、Producer 发送事务消息
rocketMQTemplate.sendMessageInTransaction
Producer (MQ发送方)发送事务消息至MQ Server,MQ Server将消息状态标记为Prepared(预备状态),注意此时这条消息消费者(MQ订阅方)是无法消费到的。
2、Producer 实现接口 RocketMQLocalTransactionListener
2.1、重写方法 RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg)
Producer 端执行业务代码逻辑,通过本地数据库事务控制。
若Producer 本地事务执行成功则自动向MQServer发送COMMIT消息,此时MQ订阅方才可以正常消费消息;
若Producer 本地事务执行失败则自动向MQServer发送ROLLBACK消息,MQ Server接收到ROLLBACK消息后 将删除此消息 。
2.2、重写方法 RocketMQLocalTransactionState checkLocalTransaction(Message message)
如果执行Producer端本地事务过程中,执行端挂掉,网络异常,或者超时,导致消息的状态一直是Prepared(预备状态),MQ Server有一定时器会不停的询问 Producer的checkLocalTransaction方法来获取事务执行状态,这个过程叫事务回查。MQ Server会根据事务回查结果来决定是否投递消息。
3,消费者消费
MQ订阅消费消息,消费成功则向MQ回应ack,否则将重复接收消息。这里ack默认自动回应,即程序执行正常则自动回应ack。
build.gradle文件添加jar包引用
compile group: 'org.apache.rocketmq', name: 'rocketmq-spring-boot-starter', version: '2.1.1'
application.properties 配置文件
###rocketmq### rocketmq.name-server=192.168.1.3:9876 rocketmq.producer.group=app-demp rocketmq.producer.timeout=4000
MsgSender 消息发送接口
public interface MsgSender { /** * 发送消息 * * @param data 消息信息 * @param topic 主题 */ void sendMessage(String topic, Object data); /** * 发送消息 * * @param data 消息信息 * @param topic 主题 * @param tags 主题的标签 */ void sendMessage(String topic, String tags, Object data); /** * 发送消息(支持分布式事务) * * @param data 消息信息 * @param topic 主题 */ void sendMessageInTransaction(String topic, Object data); /** * 发送消息(支持分布式事务) * * @param data 消息信息 * @param topic 主题 * @param tags 主题的标签 */ void sendMessageInTransaction(String topic, String tags, Object data); }
MsgSenderTemplateService 消息发送实现
import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; import javax.annotation.Resource; @Slf4j @Component public class MsgSenderTemplateService implements MsgSender { @Resource private RocketMQTemplate rocketMQTemplate; @Override public void sendMessage(String topic, Object data) { rocketMQTemplate.convertAndSend(topic, data); log.info("发送MQ成功:message={}", JSON.toJSONString(data)); } @Override public void sendMessage(String topic, String tags, Object data) { rocketMQTemplate.convertAndSend(String.format("%s:%s", topic, tags), data); log.info("发送MQ成功:message={}", JSON.toJSONString(data)); } @Override public void sendMessageInTransaction(String topic, Object data) { Message<?> message = MessageBuilder.withPayload(data).build(); rocketMQTemplate.sendMessageInTransaction(topic, message, null); log.info("发送MQ成功:message={}", JSON.toJSONString(data)); } @Override public void sendMessageInTransaction(String topic, String tags, Object data) { MessageBuilder<Object> messageBuilder = MessageBuilder.withPayload(data); messageBuilder.setHeader("msg", JSON.toJSONString(data)); rocketMQTemplate.sendMessageInTransaction(String.format("%s:%s", topic, tags), messageBuilder.build(), null); log.info("发送MQ成功:message={}", JSON.toJSONString(data)); } }
OrderProducer 订单发送普通消息
import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.HashMap; import java.util.Map; import java.util.UUID; /** * @author qizenan * @date 2020-9-9 **/ @Service public class OrderProducer { @Resource private MsgSender msgSender; public void createOrder() { Map<String, Object> orderInfo = new HashMap<>(); orderInfo.put("orderId", UUID.randomUUID().toString()); orderInfo.put("price", 10000); orderInfo.put("description", "我是注册订单,请尽快处理"); msgSender.sendMessage("TEMP", "order", orderInfo); } }
OrderConsumer 消费订单消息
import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; import java.util.Map; /** * @author qizenan * @date 2020-9-9 **/ @Slf4j @Component @RocketMQMessageListener(consumerGroup = "TEMP-GROUP", topic = "TEMP",selectorExpression ="order" ) public class OrderConsumer implements RocketMQListener<MessageExt> { @Override public void onMessage(MessageExt messageExt) { String message = new String(messageExt.getBody()); log.info("收到消息,topic:{}, tag:{}, msgId:{}, body:{}", messageExt.getTopic(), messageExt.getTags(), messageExt.getMsgId(), message); Map<Integer, Object> orderInfo = JSON.parseObject(messageExt.getBody(), Map.class); log.info("订单信息 orderInfo = {} ", orderInfo.toString()); } }
发生者运行结果
消费者运行结果
OrderProducer 订单发送事件消息
import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.HashMap; import java.util.Map; import java.util.UUID; /** * @author qizenan * @date 2020-9-9 **/ @Service public class OrderProducer { @Resource(name = "msgSenderTemplateService") private MsgSender msgSender; public void createOrder() { Map<String, Object> orderInfo = new HashMap<>(); orderInfo.put("orderId", UUID.randomUUID().toString()); orderInfo.put("price", 10000); orderInfo.put("description", "我是注册订单,请尽快处理"); msgSender.sendMessageInTransaction("TEMP", "order", orderInfo); } }
实现RocketMQLocalTransactionListener
import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.stereotype.Component; import java.util.Map; /** * @author qizenan * @date 2020-9-9 **/ @Slf4j @Component @RocketMQTransactionListener() public class ProducerListener implements RocketMQLocalTransactionListener { /** * RocketMQ的Producer本地事务:先执行本地的业务代码(使用Spring的事件管理),判断是否成功。 * 成功返回: RocketMQLocalTransactionState.COMMIT,失败返回:RocketMQLocalTransactionState.ROLLBACK */ @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) { Map orderInfo = checkDestination(message, "TEMP", "order", Map.class); if (orderInfo != null) { try { log.info("执行本地订单业务逻辑 orderInfo={} ", orderInfo.toString()); return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { return RocketMQLocalTransactionState.ROLLBACK; } } return RocketMQLocalTransactionState.COMMIT; } /** * 根据 topic,tag 获取发送者的信息 * * @param message RockeRtMQ的消息 * @param topic 主题 * @param tag 标签 * @param tClass 生产者发生的消息的class * @return 生产者发生的消息的 */ private <T> T checkDestination(Message message, String topic, String tag, Class<T> tClass) { String destination = topic; if (StringUtils.isNotBlank(tag)) { destination += ":" + tag; } MessageHeaders headers = message.getHeaders(); String msgDestination = headers.get("rocketmq_TOPIC", String.class); String msgTag = headers.get("rocketmq_TAGS", String.class); if (StringUtils.isNotBlank(msgTag)) { msgDestination += ":" + msgTag; } if (!destination.equals(msgDestination)) { return null; } Object msg = headers.get("msg"); if (msg == null) { return null; } try { return JSON.parseObject((String) msg, tClass); } catch (Exception e) { log.error("msgDestination = {} 转化 {} 异常", msgDestination, tClass.getName(), e); } return null; } /** * 因为网络异常或其他原因时,RocketMQ的消息状态处于UNKNOWN时,调用该方法检查Producer的本地事务是否已经执行成功, * 成功返回: RocketMQLocalTransactionState.COMMIT,失败返回:RocketMQLocalTransactionState.ROLLBACK */ @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { Map orderInfo = checkDestination(message, "TEMP", "order", Map.class); if (orderInfo != null) { boolean isSuccess = true; log.info("查询本地订单是否已经执行成功 orderInfo={},isSuccess={} ", orderInfo.toString(), isSuccess); if (isSuccess) { return RocketMQLocalTransactionState.COMMIT; } else { return RocketMQLocalTransactionState.ROLLBACK; } } return RocketMQLocalTransactionState.COMMIT; } }
生产者执行结果
消费者执行结果