线上kafka消息堆积,consumer掉线,怎么办?( 二 )


文章插图
对消费者来说 , 主要采用一个线程池来处理每个kafkaListener,一个listener就是一个独立线程 。
这个线程会同步处理 poll消息,然后动态代理回调用户自定义的消息消费逻辑,也就是我们在@KafkaListener中写的业务 。

线上kafka消息堆积,consumer掉线,怎么办?

文章插图
所以,从这里可以知道两件事情 。
第一点 , 如果业务消费逻辑很慢或者卡住了,会影响poll 。
第二点,这里没有看到直接设置消费超时的参数,其实也不太好做 。
因为这里做了超时中断 , 那么poll也会被中断,是在同一个线程中 。所以要么poll和消费逻辑在两个工作线程 , 要么中断掉当前线程后 , 重新起一个线程poll 。
所以从业务使用角度来说,可能的实现,还是自己设置业务超时 。比较通用的实现,可以是在消费逻辑中,用线程池处理消费逻辑,同时用Future get阻塞超时中断 。
google了一下,发现kafka 0.8 曾经有consumer.timeout.ms这个参数 , 但是现在的版本没有这个参数了,不知道是不是类似的作用 。
4.1.2 RocketMQ有点相关机制然后去看了下RocketMQ是否有相关实现,果然有发现 。
在RocketMQ中,可以对consumer设置consumeTimeout,这个超时就跟我们的设想有一点像了 。
【线上kafka消息堆积,consumer掉线,怎么办?】consumer会启动一个异步线程池对正在消费的消息做定时做 cleanExpiredMsg() 处理 。
线上kafka消息堆积,consumer掉线,怎么办?

文章插图
注意,如果消息类型是顺序消费(orderly),这个机制就不生效 。
如果是并发消费,那么就会进行超时判断,如果超时了,就会将这条消息的信息通过sendMessageBack() 方法发回给broker进行重试 。
线上kafka消息堆积,consumer掉线,怎么办?

文章插图
如果消息重试超过一定次数,就会进入RocketMQ的死信队列 。
spring-kafka其实也有做类似的封装,可以自定义一个死信topic,做异常处理
4.2 有没有办法通过什么手段快速发现死循环?一般来说,死循环的线程会导致CPU飙高、OOM等现象,在本次故障中,并没有相关异常表现 , 所以并没有联系到死循环的问题 。
那通过这次故障后,对kafka相关机制有了更深刻了解,poll间隔超时很有可能就是消费阻塞甚至死循环导致 。
所以,如果下次出现类似问题,消费者停止消费,但是kafkaListener线程还在 , 可以直接通过arthas的 thread id 命令查看对应线程的调用栈,看看是否有异常方法死循环调用 。
5、最佳实践通过此次故障,我们也可以总结几点kafka使用的最佳实践:
  • 使用消息队列进行消费时,一定需要多考虑异常情况,包括幂等、耗时处理(甚至死循环)的情况 。
  • 尽量提高客户端的消费速度,消费逻辑另起线程进行处理,并最好做超时控制 。
  • 减少Group订阅Topic的数量,一个Group订阅的Topic最好不要超过5个,建议一个Group只订阅一个Topic 。
  • 参考以下说明调整参数值:max.poll.records:降低该参数值 , 建议远远小于<单个线程每秒消费的条数> * <消费线程的个数> * <max.poll.interval.ms>的积 。max.poll.interval.ms: 该值要大于<max.poll.records> / (<单个线程每秒消费的条数> * <消费线程的个数>)的值 。
希望能够抛砖引玉,提供一些启发和思考 。如果你有其他补充和建议,欢迎留言讨论 。
都看到最后了,原创不易 , 点个关注,点个赞吧~
文章持续更新,可以微信搜索「阿丸笔记 」第一时间阅读,回复【笔记】获取Canal、MySQL、HBase、JAVA实战笔记,回复【资料】获取一线大厂面试资料 。

推荐阅读