kafka采用Consumer消费者Pull主动拉取数据的方式,当Broker无数据时,消费者空转 。Kafka并不删除已消费的消息,各自独立的消费者可消费同一个Broker分区数据 。
消费流程1、消费者发起网络消费请求
- # 每批次最小抓取设置(推荐1字节)
- fetch.min.bytes
- # 每批次最大抓取大小设置(推荐500ms)
- fetch.max.bytes
- # 未达到大小的超时设置(推荐50M)
- fetch.max.wait.ms
- # 单次拉取最大消息条数设置(推荐500条)
- max.poll.records
2.2、拦截器处理(如:汇总统计记录)
3、数据的后续处理保存等的消费端动作 。
offset当一个消费者挂掉或重启后,是否还记得消费到的位置了?offset解决了此问题 。
对于每一个topic,都会维持一个分区日志 , 分区中的每一个记录都会分配一个Id来表示顺序,称之为offset,offset用来唯一的标识分区中每条记录 , 并将每次的消费位置提交到topic中 。消费者恢复启动后接着按序消费数据 。
自动提交
- # 开启自动提交
- enable.auto.commit = true
- # 每次提交间隔(推荐5秒)
- auto.commit.interval.ms = 5000
- # 同步提交,等提交完成才可下一次再消费
- .CommitSync
- # 异步提交,可直接进行下一个消费,也有可能提交失败
- .CommitAync
- # 按指定时间得出offset值
- .offsetsForTimes
- # 按指定offset值继续消费
- .seek
- # earliest: 最早消费;无offset时,从头开始消费 。
- # latest: 最新消费;无offset时,从最新的数据开始消费 。
- # none: 无offset时 , 引发异常 。
- auto.offset.reset = earliest | latest | none
漏消费:offset提交成功,消费者端后续的数据处理未完成(建议下游步骤事务处理) 。
消费者组为了实现横向扩展 , 应用程序需要创建一个消费者群组,然后往群组里添加消费者来提高处理效率,群组里的每个消费者只处理一部分消息 。
推荐阅读
- 我的父亲作文600字初中作文 关于我的父亲写人作文600字
- 我的老父亲优秀学生作文5篇
- 我的父亲作文600字初中优秀作文 我的父亲初二优秀作文600字范文
- 我的父亲600字优秀作文初中 描写我的父亲初中600字作文
- 我的世界手机版紫珀台阶怎么获得
- 关于朋友圈的句子短句子 发朋友圈的句子短句子
- 主播感谢守护的句子 感谢守护我的句子
- qq查看特别关心我的人数 qq查看特别关心我的人
- 我的心情为什么总是很忧郁 为什么我的心情总是忧郁?
- 抖音流行的短句子关于友情 抖音流行的短句子