消息样例
顺序消息
kafka的顺序消息可以指定paritationKey实现,相同paritationKey的消息会被发给同一个paritation。
RocketMQ可以通过实现 MessageQueueSelector
的 select
方法自定义实现消息所发给 MessageQueue的逻辑。
java
@SneakyThrows
@Test
public void orderSend() {
try {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr(nameServer);
producer.start();
for (int i = 0; i < 10; i++) {
int orderId = i;
for (int j = 0; j <= 5; j++) {
Message msg =
//相同tag
new Message("OrderTopicTest", "order_" + orderId, "KEY" + orderId,
("order_" + orderId + " step " + j).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
//重写select
//根据orderid指定messageQueue
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
}
}
producer.shutdown();
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
e.printStackTrace();
}
}
广播消息
广播消息能把消息发给订阅了topic的每一个消费者。
即使消费者在同一个消费者组里面。
默认的同一消费者组,同一条消息只有一个消费者会消费。
延时消息
RocketMQ的延时消息是按照等级划分的。
1到18分别对应
messageDelayLevel= 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m
9m 10m 20m 30m 1h 2h
java
@SneakyThrows
@Test
public void orderSend() {
try {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr(nameServer);
producer.start();
for (int i = 0; i < 10; i++) {
int orderId = i;
for (int j = 0; j <= 5; j++) {
Message msg =
//相同tag
new Message("OrderTopicTest", "order_" + orderId, "KEY" + orderId,
("order_" + orderId + " step " + j).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
//重写select
//根据orderid指定messageQueue
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
}
}
producer.shutdown();
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
e.printStackTrace();
}
}
事务消息
RocketMQ的事务消息,保证的是本地事务和写MQ消息的原子性。
支持的是最终一致性。
事务消息流程
在 RocketMQ 中,事务消息的流程如下:
- 半消息(Half Message)的发送: 当你调用事务消息的发送方法时,RocketMQ 会首先将消息作为半消息发送到 Broker。此时,Broker 并不知道这个消息最终应该是提交还是回滚。
- 本地事务的执行: 发送半消息后,你可以执行任何必要的本地事务操作,如更新数据库、修改文件等。这一步完全由你的应用程序控制。
- 确认事务状态: 本地事务完成后,你需要向 RocketMQ 提交一个确认,指示事务的最终状态(提交或回滚)。这通常通过回调函数或异步确认机制完成。RocketMQ 的事务监听器(
TransactionListener
)提供了这种机制,允许你在本地事务完成后确认消息的状态。 - 消息状态的确认: 根据本地事务的结果,你通过事务监听器回调函数返回
CHECKSTATUS_COMMIT
或CHECKSTATUS_ROLLBACK
来指示消息应该被提交还是回滚。 - Broker 的处理: 根据你的确认,Broker 会将半消息转换为提交状态(可见消息)或直接丢弃(回滚状态)。如果消息被提交,它将被发送给相应的消费者。
java
/**
* 事务监听器,用来处理本地事务
* @author yangjunwei
* @date 2024/7/4
*/
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
//在提交完事务消息执行本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// int value = transactionIndex.getAndIncrement();
// int status = value % 3;
// localTrans.put(msg.getTransactionId(), status);
// return LocalTransactionState.UNKNOW;
String tags = msg.getTags();
if(StringUtils.contains(tags,"TagA")){
return LocalTransactionState.COMMIT_MESSAGE;
}else if(StringUtils.contains(tags,"TagB")){
return LocalTransactionState.ROLLBACK_MESSAGE;
}else{
return LocalTransactionState.UNKNOW;
}
}
/**
* 检查本地事务
* @param msg Check message
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// Integer status = localTrans.get(msg.getTransactionId());
// if (null != status) {
// switch (status) {
// case 0:
// return LocalTransactionState.UNKNOW;
// case 1:
// return LocalTransactionState.COMMIT_MESSAGE;
// case 2:
// return LocalTransactionState.ROLLBACK_MESSAGE;
// default:
// return LocalTransactionState.COMMIT_MESSAGE;
// }
// }
// return LocalTransactionState.COMMIT_MESSAGE;
String tags = msg.getTags();
if(StringUtils.contains(tags,"TagC")){
return LocalTransactionState.COMMIT_MESSAGE;
}else if(StringUtils.contains(tags,"TagD")){
return LocalTransactionState.ROLLBACK_MESSAGE;
}else{
return LocalTransactionState.UNKNOW;
}
}
}