SpringBoot和RocketMQ的实例RocketMQTemplate

@RocketMQMessageListener

@RocketMQMessageListener 标记消息消费者,可以简化代码。

consumerGroup:消费组,全局唯一

topic:主题

selectorExpression : 标签,默认" *" 全部

consumeMode :

  1. ConsumeMode.CONCURRENTLY 多线程并发竞争接受消费,不能保证消息的有效性,
  2. ConsumeMode.ORDERLY 一个队列一个线程,有序的接受消息

messageModel : 消息模式: MessageModel.CLUSTERING 点对点(默认)MessageModel.BROADCASTING 广播

RocketMQ事务

1、Producer 发送事务消息

rocketMQTemplate.sendMessageInTransaction

Producer (MQ发送方)发送事务消息至MQ Server,MQ Server将消息状态标记为Prepared(预备状态),注意此时这条消息消费者(MQ订阅方)是无法消费到的。

2、Producer 实现接口 RocketMQLocalTransactionListener

2.1、重写方法 RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg)

Producer 端执行业务代码逻辑,通过本地数据库事务控制。

若Producer 本地事务执行成功则自动向MQServer发送COMMIT消息,此时MQ订阅方才可以正常消费消息;

若Producer 本地事务执行失败则自动向MQServer发送ROLLBACK消息,MQ Server接收到ROLLBACK消息后 将删除此消息 。

2.2、重写方法 RocketMQLocalTransactionState checkLocalTransaction(Message message)

如果执行Producer端本地事务过程中,执行端挂掉,网络异常,或者超时,导致消息的状态一直是Prepared(预备状态),MQ Server有一定时器会不停的询问 Producer的checkLocalTransaction方法来获取事务执行状态,这个过程叫事务回查。MQ Server会根据事务回查结果来决定是否投递消息。

3,消费者消费

MQ订阅消费消息,消费成功则向MQ回应ack,否则将重复接收消息。这里ack默认自动回应,即程序执行正常则自动回应ack。

1,引用jar包

build.gradle文件添加jar包引用

compile group: 'org.apache.rocketmq', name: 'rocketmq-spring-boot-starter', version: '2.1.1' 

2,配置

application.properties 配置文件

###rocketmq### rocketmq.name-server=192.168.1.3:9876 rocketmq.producer.group=app-demp rocketmq.producer.timeout=4000 

3,生产者

MsgSender 消息发送接口

public interface MsgSender {     /**      * 发送消息      *      * @param data  消息信息      * @param topic 主题      */     void sendMessage(String topic, Object data);      /**      * 发送消息      *      * @param data  消息信息      * @param topic 主题      * @param tags  主题的标签      */     void sendMessage(String topic, String tags, Object data);          /**      * 发送消息(支持分布式事务)      *      * @param data  消息信息      * @param topic 主题      */     void sendMessageInTransaction(String topic, Object data);      /**      * 发送消息(支持分布式事务)      *      * @param data  消息信息      * @param topic 主题      * @param tags  主题的标签      */     void sendMessageInTransaction(String topic, String tags, Object data); } 

MsgSenderTemplateService 消息发送实现

 import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component;  import javax.annotation.Resource;  @Slf4j @Component public class MsgSenderTemplateService implements MsgSender {     @Resource     private RocketMQTemplate rocketMQTemplate;      @Override     public void sendMessage(String topic, Object data) {         rocketMQTemplate.convertAndSend(topic, data);         log.info("发送MQ成功:message={}", JSON.toJSONString(data));     }      @Override     public void sendMessage(String topic, String tags, Object data) {         rocketMQTemplate.convertAndSend(String.format("%s:%s", topic, tags), data);         log.info("发送MQ成功:message={}", JSON.toJSONString(data));     }      @Override     public void sendMessageInTransaction(String topic, Object data) {         Message<?> message = MessageBuilder.withPayload(data).build();         rocketMQTemplate.sendMessageInTransaction(topic, message, null);         log.info("发送MQ成功:message={}", JSON.toJSONString(data));     }      @Override     public void sendMessageInTransaction(String topic, String tags, Object data) {         MessageBuilder<Object> messageBuilder = MessageBuilder.withPayload(data);         messageBuilder.setHeader("msg", JSON.toJSONString(data));          rocketMQTemplate.sendMessageInTransaction(String.format("%s:%s", topic, tags), messageBuilder.build(), null);         log.info("发送MQ成功:message={}", JSON.toJSONString(data));     } }  

OrderProducer 订单发送普通消息

import org.springframework.stereotype.Service;  import javax.annotation.Resource; import java.util.HashMap; import java.util.Map; import java.util.UUID;   /**  * @author qizenan  * @date 2020-9-9  **/ @Service public class OrderProducer {     @Resource     private MsgSender msgSender;      public void createOrder() {         Map<String, Object> orderInfo = new HashMap<>();         orderInfo.put("orderId", UUID.randomUUID().toString());         orderInfo.put("price", 10000);         orderInfo.put("description", "我是注册订单,请尽快处理");          msgSender.sendMessage("TEMP", "order", orderInfo);     }  } 

4,消费者

OrderConsumer 消费订单消息

import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component;  import java.util.Map;   /**  * @author qizenan  * @date 2020-9-9  **/ @Slf4j @Component @RocketMQMessageListener(consumerGroup = "TEMP-GROUP", topic = "TEMP",selectorExpression ="order" ) public class OrderConsumer implements RocketMQListener<MessageExt> {      @Override     public void onMessage(MessageExt messageExt) {         String message = new String(messageExt.getBody());         log.info("收到消息,topic:{}, tag:{}, msgId:{}, body:{}", messageExt.getTopic(), messageExt.getTags(),                 messageExt.getMsgId(), message);          Map<Integer, Object> orderInfo = JSON.parseObject(messageExt.getBody(), Map.class);         log.info("订单信息 orderInfo = {} ", orderInfo.toString());     }  } 

发生者运行结果
SpringBoot和RocketMQ的实例RocketMQTemplate

消费者运行结果
SpringBoot和RocketMQ的实例RocketMQTemplate

5,RocketMQ的分布式事务

OrderProducer 订单发送事件消息

import org.springframework.stereotype.Service;  import javax.annotation.Resource; import java.util.HashMap; import java.util.Map; import java.util.UUID;   /**  * @author qizenan  * @date 2020-9-9  **/ @Service public class OrderProducer {     @Resource(name = "msgSenderTemplateService")     private MsgSender msgSender;      public void createOrder() {         Map<String, Object> orderInfo = new HashMap<>();         orderInfo.put("orderId", UUID.randomUUID().toString());         orderInfo.put("price", 10000);         orderInfo.put("description", "我是注册订单,请尽快处理");          msgSender.sendMessageInTransaction("TEMP", "order", orderInfo);     }  } 

实现RocketMQLocalTransactionListener

 import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.stereotype.Component;  import java.util.Map;   /**  * @author qizenan  * @date 2020-9-9  **/ @Slf4j @Component @RocketMQTransactionListener() public class ProducerListener implements RocketMQLocalTransactionListener {     /**      * RocketMQ的Producer本地事务:先执行本地的业务代码(使用Spring的事件管理),判断是否成功。      * 成功返回: RocketMQLocalTransactionState.COMMIT,失败返回:RocketMQLocalTransactionState.ROLLBACK      */     @Override     public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {         Map orderInfo = checkDestination(message, "TEMP", "order", Map.class);         if (orderInfo != null) {             try {                 log.info("执行本地订单业务逻辑 orderInfo={} ", orderInfo.toString());                 return RocketMQLocalTransactionState.COMMIT;             } catch (Exception e) {                 return RocketMQLocalTransactionState.ROLLBACK;             }         }         return RocketMQLocalTransactionState.COMMIT;     }      /**      * 根据 topic,tag 获取发送者的信息      *      * @param message RockeRtMQ的消息      * @param topic   主题      * @param tag     标签      * @param tClass  生产者发生的消息的class      * @return 生产者发生的消息的      */     private <T> T checkDestination(Message message, String topic, String tag, Class<T> tClass) {         String destination = topic;         if (StringUtils.isNotBlank(tag)) {             destination += ":" + tag;         }         MessageHeaders headers = message.getHeaders();         String msgDestination = headers.get("rocketmq_TOPIC", String.class);         String msgTag = headers.get("rocketmq_TAGS", String.class);         if (StringUtils.isNotBlank(msgTag)) {             msgDestination += ":" + msgTag;         }         if (!destination.equals(msgDestination)) {             return null;         }         Object msg = headers.get("msg");         if (msg == null) {             return null;         }         try {             return JSON.parseObject((String) msg, tClass);         } catch (Exception e) {             log.error("msgDestination = {} 转化 {} 异常", msgDestination, tClass.getName(), e);         }         return null;     }      /**      * 因为网络异常或其他原因时,RocketMQ的消息状态处于UNKNOWN时,调用该方法检查Producer的本地事务是否已经执行成功,      * 成功返回: RocketMQLocalTransactionState.COMMIT,失败返回:RocketMQLocalTransactionState.ROLLBACK      */     @Override     public RocketMQLocalTransactionState checkLocalTransaction(Message message) {         Map orderInfo = checkDestination(message, "TEMP", "order", Map.class);         if (orderInfo != null) {             boolean isSuccess = true;             log.info("查询本地订单是否已经执行成功 orderInfo={},isSuccess={} ", orderInfo.toString(), isSuccess);             if (isSuccess) {                 return RocketMQLocalTransactionState.COMMIT;             } else {                 return RocketMQLocalTransactionState.ROLLBACK;             }         }         return RocketMQLocalTransactionState.COMMIT;     } }  

生产者执行结果
SpringBoot和RocketMQ的实例RocketMQTemplate

消费者执行结果
SpringBoot和RocketMQ的实例RocketMQTemplate