Skip to content

消息样例

顺序消息

kafka的顺序消息可以指定paritationKey实现,相同paritationKey的消息会被发给同一个paritation。

RocketMQ可以通过实现 MessageQueueSelectorselect 方法自定义实现消息所发给 MessageQueue的逻辑。

java
    @SneakyThrows
    @Test
    public void orderSend() {
        try {
            DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
            producer.setNamesrvAddr(nameServer);
            producer.start();

            for (int i = 0; i < 10; i++) {
                int orderId = i;

                for (int j = 0; j <= 5; j++) {
                    Message msg =
                            //相同tag
                            new Message("OrderTopicTest", "order_" + orderId, "KEY" + orderId,
                                    ("order_" + orderId + " step " + j).getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                        //重写select
                        //根据orderid指定messageQueue
                        @Override
                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                            Integer id = (Integer) arg;
                            int index = id % mqs.size();
                            return mqs.get(index);
                        }
                    }, orderId);

                    System.out.printf("%s%n", sendResult);
                }
            }
            producer.shutdown();
        } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
            e.printStackTrace();
        }
    }

广播消息

广播消息能把消息发给订阅了topic的每一个消费者。

即使消费者在同一个消费者组里面。

默认的同一消费者组,同一条消息只有一个消费者会消费。

延时消息

RocketMQ的延时消息是按照等级划分的。

1到18分别对应

messageDelayLevel= 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m

9m 10m 20m 30m 1h 2h
java
    @SneakyThrows
    @Test
    public void orderSend() {
        try {
            DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
            producer.setNamesrvAddr(nameServer);
            producer.start();

            for (int i = 0; i < 10; i++) {
                int orderId = i;

                for (int j = 0; j <= 5; j++) {
                    Message msg =
                            //相同tag
                            new Message("OrderTopicTest", "order_" + orderId, "KEY" + orderId,
                                    ("order_" + orderId + " step " + j).getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                        //重写select
                        //根据orderid指定messageQueue
                        @Override
                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                            Integer id = (Integer) arg;
                            int index = id % mqs.size();
                            return mqs.get(index);
                        }
                    }, orderId);

                    System.out.printf("%s%n", sendResult);
                }
            }
            producer.shutdown();
        } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
            e.printStackTrace();
        }
    }

事务消息

RocketMQ的事务消息,保证的是本地事务和写MQ消息的原子性。

支持的是最终一致性。

image.png

事务消息流程

在 RocketMQ 中,事务消息的流程如下:

  1. 半消息(Half Message)的发送: 当你调用事务消息的发送方法时,RocketMQ 会首先将消息作为半消息发送到 Broker。此时,Broker 并不知道这个消息最终应该是提交还是回滚。
  2. 本地事务的执行: 发送半消息后,你可以执行任何必要的本地事务操作,如更新数据库、修改文件等。这一步完全由你的应用程序控制。
  3. 确认事务状态: 本地事务完成后,你需要向 RocketMQ 提交一个确认,指示事务的最终状态(提交或回滚)。这通常通过回调函数或异步确认机制完成。RocketMQ 的事务监听器(TransactionListener)提供了这种机制,允许你在本地事务完成后确认消息的状态。
  4. 消息状态的确认: 根据本地事务的结果,你通过事务监听器回调函数返回 CHECKSTATUS_COMMITCHECKSTATUS_ROLLBACK 来指示消息应该被提交还是回滚。
  5. Broker 的处理: 根据你的确认,Broker 会将半消息转换为提交状态(可见消息)或直接丢弃(回滚状态)。如果消息被提交,它将被发送给相应的消费者。
java
/**
 * 事务监听器,用来处理本地事务
 * @author yangjunwei
 * @date 2024/7/4
 */
public class TransactionListenerImpl implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    //在提交完事务消息执行本地事务
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
//        int value = transactionIndex.getAndIncrement();
//        int status = value % 3;
//        localTrans.put(msg.getTransactionId(), status);
//        return LocalTransactionState.UNKNOW;
        String tags = msg.getTags();
        if(StringUtils.contains(tags,"TagA")){
            return LocalTransactionState.COMMIT_MESSAGE;
        }else if(StringUtils.contains(tags,"TagB")){
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }else{
            return LocalTransactionState.UNKNOW;
        }
    }

    /**
     * 检查本地事务
     * @param msg Check message
     * @return
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
//        Integer status = localTrans.get(msg.getTransactionId());
//        if (null != status) {
//            switch (status) {
//                case 0:
//                    return LocalTransactionState.UNKNOW;
//                case 1:
//                    return LocalTransactionState.COMMIT_MESSAGE;
//                case 2:
//                    return LocalTransactionState.ROLLBACK_MESSAGE;
//                default:
//                    return LocalTransactionState.COMMIT_MESSAGE;
//            }
//        }
//        return LocalTransactionState.COMMIT_MESSAGE;
        String tags = msg.getTags();
        if(StringUtils.contains(tags,"TagC")){
            return LocalTransactionState.COMMIT_MESSAGE;
        }else if(StringUtils.contains(tags,"TagD")){
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }else{
            return LocalTransactionState.UNKNOW;
        }
    }
}