事务消息
事务消息的流程
先写half消息到RocketMQ
再执行本地事务
本地事务有两个方法,一个是回调执行本地事务,另一个是检查本地事务。
java/** * 事务监听器,用来处理本地事务 * @author yangjunwei * @date 2024/7/4 */ public class TransactionListenerImpl implements TransactionListener { private AtomicInteger transactionIndex = new AtomicInteger(0); private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>(); //在提交完事务消息执行本地事务 @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { String tags = msg.getTags(); if(StringUtils.contains(tags,"TagA")){ return LocalTransactionState.COMMIT_MESSAGE; }else if(StringUtils.contains(tags,"TagB")){ return LocalTransactionState.ROLLBACK_MESSAGE; }else{ return LocalTransactionState.UNKNOW; } } /** * 检查本地事务 * @param msg Check message * @return */ @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { String tags = msg.getTags(); if(StringUtils.contains(tags,"TagC")){ return LocalTransactionState.COMMIT_MESSAGE; }else if(StringUtils.contains(tags,"TagD")){ return LocalTransactionState.ROLLBACK_MESSAGE; }else{ return LocalTransactionState.UNKNOW; } } }
事务消息的问题
half消息的作用?
half消息不是完整的数据,RocketMQ把会half消息存起来,等待本地事务提交commit状态之后,才会把half消息转为完整的消息供消费者消费。
half消息写入失败怎么办?
如果half消息写入失败,认为MQ服务不可用。本地可以直接给当前记录(订单)做一个状态标记,可以后续做补偿。
写DB失败了怎么办?
如果本地事务执行失败,可以返回给RocketMQ一个UNKNOWN状态,然后把数据缓存起来(redis),等待RocketMQ进行事务回查。
事务回查的事务再尝试执行写DB,如果成功返回COMMIT给RocketMQ。
half消息写入成功MQ挂了怎么办?
half消息会做持久化,MQ挂掉之后,不会回调本地事务方法检查本地事务的状态。
比如订单,还是新建状态在数据库里。MQ不会去操作它。
只需要等待MQ恢复之后,检查到half消息然后回调本地事务方法,检查订单即可。
下单后如何利用事务消息等待支付成功(延时关单)?
除了RocketMQ本身的延时消息可以实现,也可以利用事务消息来实现。
比如订单需要15分钟过期,可以设置回查次数15次,每次间隔1分钟。然后在第一次执行本地事务的时候返回UNKNOWN状态即可。