Skip to content

事务消息

image.png

事务消息的流程

  • 先写half消息到RocketMQ

  • 再执行本地事务

    本地事务有两个方法,一个是回调执行本地事务,另一个是检查本地事务。

    java
    /**
     * 事务监听器,用来处理本地事务
     * @author yangjunwei
     * @date 2024/7/4
     */
    public class TransactionListenerImpl implements TransactionListener {
        private AtomicInteger transactionIndex = new AtomicInteger(0);
    
        private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
    
        //在提交完事务消息执行本地事务
        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            String tags = msg.getTags();
            if(StringUtils.contains(tags,"TagA")){
                return LocalTransactionState.COMMIT_MESSAGE;
            }else if(StringUtils.contains(tags,"TagB")){
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }else{
                return LocalTransactionState.UNKNOW;
            }
        }
    
        /**
         * 检查本地事务
         * @param msg Check message
         * @return
         */
        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            String tags = msg.getTags();
            if(StringUtils.contains(tags,"TagC")){
                return LocalTransactionState.COMMIT_MESSAGE;
            }else if(StringUtils.contains(tags,"TagD")){
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }else{
                return LocalTransactionState.UNKNOW;
            }
        }
    }

事务消息的问题

  1. half消息的作用?

    half消息不是完整的数据,RocketMQ把会half消息存起来,等待本地事务提交commit状态之后,才会把half消息转为完整的消息供消费者消费。

  2. half消息写入失败怎么办?

    如果half消息写入失败,认为MQ服务不可用。本地可以直接给当前记录(订单)做一个状态标记,可以后续做补偿。

  3. 写DB失败了怎么办?

    如果本地事务执行失败,可以返回给RocketMQ一个UNKNOWN状态,然后把数据缓存起来(redis),等待RocketMQ进行事务回查。

    事务回查的事务再尝试执行写DB,如果成功返回COMMIT给RocketMQ。

  4. half消息写入成功MQ挂了怎么办?

    half消息会做持久化,MQ挂掉之后,不会回调本地事务方法检查本地事务的状态。

    比如订单,还是新建状态在数据库里。MQ不会去操作它。

    只需要等待MQ恢复之后,检查到half消息然后回调本地事务方法,检查订单即可。

  5. 下单后如何利用事务消息等待支付成功(延时关单)?

    除了RocketMQ本身的延时消息可以实现,也可以利用事务消息来实现。

    比如订单需要15分钟过期,可以设置回查次数15次,每次间隔1分钟。然后在第一次执行本地事务的时候返回UNKNOWN状态即可。