Skip to content

消费者组

Consumer Group 是 Kafka 提供的可扩展且具有容错性的消费者机制。

  • 组内的所有消费者协调在一起来消费订阅主题(Subscribed Topics)的所有分区(Partition)。
  • 每个分区只能由同一个消费者组内的一个 Consumer 实例来消费。

比如 topic 有 6 个分区,消费者组里面的消费者数量最理想状态是 6 个,每个消费者消费一个分区。也可以是 3 个或者两个,这样分区能够平均分配。

但是最好不要超过 6 个消费者,这样的话会有消费者分不到分区。

而 topic 的分区设计时,最好和 broker 的数量成比例。比如 3 个 borker,可以设计为 6 个分区。假如设置为 5 个分区,会有 1 个 broker 分了 1 个分区,导致分区分布不均匀。

位移管理-offset

针对 Consumer Group,Kafka 是怎么管理位移的呢?

  • 老版本

    老版本的 Consumer Group,kafka 将位移记录存到了 zk 里面。这样减少了 broker 的开销,而且方便节点伸缩扩容。

    但是 zk 并不适合频繁的写更新,而 offset 需要 频繁的更新进度。

  • 新版本

    新版本的 Consumer Group 将位移保存在 Broker 端的内部主题 - __consumer_offsets中。

消费者组的-Rebalance

Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅 Topic 的每个分区。

Rebalance的触发条件

  • 组成员数发生变化。

    新的消费者加入消费者组,后者离开组。

  • 订阅的 topic发生变化。

    当 topic 发生变化后,消费者组会进行 Rebalance 来为消费者分配 topic 的分区。

  • topic 的分区数发生变化。

    当分区数增加时,消费者组会进行 Rebalance 来重新为消费者分配分区。

image.png

Rebalance 的目的就是为了将 topic 的分区平均的分给各个消费者。

Rebalance的缺陷

  1. Rebalance 会影响消费者的 TPS。Rebalance 最致命的缺陷就是在 Rebalance 过程中,所有的消费者会停止消费,等待 Rebalance 完成
  2. Rebalance 很慢。 如果 Gonsumer Group下的 Consumer 很多,那样 Rebalance 时间会很长。
  3. Rebalance 效率不高。目前Rebalance 的设计是所有Consumer 实例共同参与,重新分配所有的分区。不会考虑局部分配。

在日常使用过程中,最好是避免 Rebalance 的产生。

如何避免Reblance

在 Rebalance 过程中,所有 Consumer 实例共同参与,在协调者组件的帮助下,完成订阅主题分区的分配。

但是整个分配过程,所有消费者都不能消费消息,因此对 Consumer 的 TPS影响非常大。

协调者-Coordinator

专门为 Consumer Group 服务,负责为 Group 执行 Rebalance 以及提供位移管理和组成员管理等。

首先所有 broker 都有自己的 Coordinator,在 broker 启动 的时候会创建和开启对应的 Coordinator。

消费者组对应的Coordinator 就位于某个 broker 上面,需要根据策略找到对应的 broker。

  1. 确定由位移主题(__consumer_offsets)的哪个分区来保存该 Group 数据:

    partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)。

    这样能找到 GroupId 在 __consumer_offsets 对应的分区(比如分区为 10)。

  2. 找到 __consumer_offsets 对应分区的 Leader。

  3. Leader 副本所在的 Broker,该 Broker 即为对应的 Coordinator

消费者会按照策略找到对应的 Coordinator,用户无需关注。

Rebalance的过程

  1. 所有成员都向 Coordinator 发送请求,请求加入 Consumer Group。
  2. 当所有 Consumer 都发送请求之后,Coordinator 会挑选一个 Consumer 作为 Leader,并将组成员和 topic 信息等都发给 Leader。
  3. Leader 开始分配消费方案,指定哪个 Conusmer 消费哪个Topic 的分区。Leader 消费方案确认之后,会将方案发给 Coordinator(协调者)。
  4. Coordinator 会按照消费方案通知对应的 Consumer 开始消费。

避免方案

Rebalance 影响 Consumer 的 TPS和 Rebalance 很慢这两个缺点目前是无法避免的。

所以只能尽量避免引起 Rebalance 的发生,从引起 Rebalance 的时机入手:

  1. 消费者数量发生变化。
  2. 订阅的 topic 发生变化。
  3. topic 的分区发生变化。

在这三个之中,topic 和 topic 的分区发生变化是无法避免的,通常是业务所需。

能尽量避免的只有消费者的数量。


但是计划内的消费者数量变化是无需关注的,比如根据业务TPS需要,扩展或者删除消费者。

我们要关注的是 Coordinator 管理 Consumer 时,错误的将 Consumer 踢出 Consumer Group 导致的 Rebalance。

  • 心跳机制

    Conusmer 会定期的向 Coordinator 发送心跳。如果超过配置时间未收到心跳,则认为 Consumer 挂掉了。

    • session.timeout.ms

      心跳间隔时间,默认为 10s。如果超过 10s 未收到某个 Consumer 的心跳,则会将该 Consumer 踢出 Group,从而触发 Rebalance。

    • heartbeat.interval.ms

      这个值设置得越小,Consumer 实例发送心跳请求的频率就越高。频繁地发送心跳请求会额外消耗带宽资源,但好处是能够更加快速地知晓当前是否开启 Rebalance。

      目前 Coordinator 通知各个 Consumer 实例开启 Rebalance 的方法,就是将 REBALANCE_NEEDED 标志封装进心跳请求的响应体中

  • max.poll.interval.ms

    限定了Consumer 拉取数据两次 poll 的最大时间间隔。默认是 5 分钟。

    该参数也会影响到 Consumer 离开 Group,假如 5 分钟内poll 的数据还未被消费完,当前Consumer 就会离开 Group。

    要确认消费的逻辑,如果消费逻辑过长,超过了 poll 的时间间隔,就要按需调整。

    如果 Consumer 出现长时间的 FullGC,也会导致该配置生效。

    当前版本(0.10.2.0)默认300s,0.10.2.1官方修改为MAX_VALUE

推荐数值,你可以“无脑”地应用在你的生产环境中。

  • 设置 session.timeout.ms = 6s。
  • 设置 heartbeat.interval.ms = 2s。
  • 要保证 Consumer 实例在被判定为“dead”之前,能够发送至少 3 轮的心跳请求,即 session.timeout.ms >= 3 * heartbeat.interval.ms

总结

一定要避免因为各种参数或逻辑不合理而导致的组成员意外离组或退出的情形,与之相关的主要参数有:

  • session.timeout.ms
  • heartbeat.interval.ms
  • max.poll.interval.ms
  • GC 参数