顺序消息
生产者
生产者发送消息到MQ的过程,如果要保证顺序消费。
只能采用单线程去生产消息,因为多线程无法控制消息生产顺序。
还需要保证 sharding key 相同,保证同一类消息发到同一个 ConsumerQueue。
比如同一笔订单可以按照
OrderId % queues.size()
得到 sharding key。
- 单线程生产消息
- 发送到同一个ConsumerQueue
存储
RocketMQ的存储是按照时间顺序 append write 到 commitlog 中的,同时它会被分发到 ConsumeQueue中。
所以只需要生产时候保证消息采用单线程发送到同一个ConsumerQueue,存储时候就能够顺序存储。
消费者
RocketMQ的消费者在消费单个ConsumerQueue时,采用的是顺序消费。
而且在同一个ConsumerGroup 中只有一个 Consumer 能同时消费一个 ConsumerQueue。(第一把分布式锁)
消费者并发消费数据情况,为了保证消息有序性。保证一个线程去消费数据。(第二把本地锁)
线程池消费数据,增加消费能力。
顺序消费的三把锁
第一把锁:分布式锁
RocketMQ 提供了一个 ConsumeMessageOrderlyService 类来保证顺序消费。
这个 service 启动的时候会向 Broker 申请当前消费者负责的队列锁。
将 消费组+消费者+负责的队列 发往 Broker。Broker存储分布式锁,保证同一个消费者组只有一个消费者能消费某个队列。
- 锁续期
- 消费者组纬度保存锁
第二把锁:本地锁Synchronized
在消费者消费的时候,并发消费的情况下。
消费者获取消息后,提交到线程池的情况。可能存在多个线程分别处理消费到的消息,可能会破坏顺序性。
所以需要一把本地锁,保证同时只有一个线程处理消息。
第三把锁:ReentrantLock
消费者的某个线程获取到 Synchronized 锁之后,还需要到 ProcessQueue 获取到 ConsumerLock锁。
获取到该锁之后,标识消费者开始消费数据了。
在重平衡阶段,该锁能锁住某个队列不被重平衡,避免重复消息。
当消费者尝试获取某个ConsumerQueue的锁时。
- 如果获取成功,表示当前队列没有被消费者消费,可以去 Broker 解除分布式锁,让新的消费者接管。
- 如果获取失败,表示当前队列正在被消费者消费消息,当前队列重平衡失败,等待下次重平衡。
RocketMQ的重平衡问题
在消费者挂掉之后,会触发重平衡机制。让其他消费者顶替挂掉的消费者。
如果挂掉的消费者消息没有消费完,还未提交 offset,就会导致新来的消费者重复消费。
顺序消费的问题
三把锁并不能完全保证顺序性和不重复。
比如 Broker 挂了,那么 Broker 对应的队列就全部不可用了。此时其它 Broker 会顶替挂掉的Broker,队列中相关的消息只能被发送到别的队列里,就会被别的消费者消费。这个时候就破坏了单个消费者消费的有序性。
如果想要保证严格的顺序性,可以在创建Topic时候配置 -order = true
。这样消息就不会发给别的队列。
牺牲可用性保证顺序性。