我的 Kafka 旅程 - 性能调优

Producer于 config/producer.properties 配置文件中的项
# 序列化数据压缩方式 [none/gzip/snappy/lz4/zstd]compression.type = snappy# default=none# 内存队列缓冲区总大小buffer.memory = 67108864# default=32M# 数据块/批次 单个大小batch.size = 32768# default=16K# 数据块/批次 过期毫秒linger.ms = 5# default=0# Broker 分区的应答机制acks = 1# default=all# 发送请求允许最大的积压数max.in.flight.requests.per.connection = 5 # default=5# 发送失败的重试次数retries = 2147483647# default=0# 发送失败重试间隔毫秒retry.backoff.ms = 100# default=100ms# 幂等性(生产者编号 + Broker分区编号 + 消息编号)enable.idempotence = true# default=trueBroker【我的 Kafka 旅程 - 性能调优】于 config/server.properties 配置文件中的项
# 数据写磁盘线程数(占总核心数60%)num.io.threads = 8# default=8# 副本主动拉取线程数(占总核心数10%)num.replica.fetchers = 1# default=1# 数据网络传输线程数(占总核心数30%)num.network.threads = 3# default=3# 不存在的Topic自动创建auto.create.topics.enable = true# default=true# 副本通信超时replica.lag.time.max.ms = 30000# default=30000# Broker leader partition 分区再平衡auto.leader.rebalance.enable = true# default=true# 再平衡警戒值(%)leader.imbalance.per.broker.percentage = 1 # default=10# 再平衡检测间隔秒数leader.imbalance.check.interval.seconds = 300 # default=300# 数据分片单文件大小log.segment.bytes = 1073741824# default=1GB# 数据每索引范围大小log.index.interval.bytes = 4096# default=4KB# 数据保留时长log.retention.hours = 168# default=168 (7天)# 数据保留分钟log.retention.minutes# default=null# 数据保留毫秒log.retention.ms# default=null# 数据保留检测间隔log.retention.check.interval.ms = 300000 # default=300000# 数据保留总大小log.retention.bytes = -1# default=-1 (无穷大)# 数据删除策略 [compact,delete]log.cleanup.policy = delete# default=deleteConsumer于 config/consumer.properties 配置文件中的项
# 自动提交消费偏移量enable.auto.commit = true# default=true# 提交消费偏移量频率间隔auto.commit.interval.ms = 5000# default=5000# 缺少偏移量的处理 [latest,earliest,none]auto.offset.reset = latest# default=latest# 分区数offsets.topic.num.partitions = 50 # default=50# 与Broker间的心跳间隔heartbeat.interval.ms = 5000# default=3000# 与Broker间的超时session.timeout.ms = 45000# default=45000# 消息处理最大时长max.poll.interval.ms = 300000# default=300000# 单次拉取数据大小fetch.max.bytes = 57671680# default=50M# 单次拉取数据最大条数max.poll.records = 500# default=500# 再平衡策略# default= Range + CooperativeStickypartition.assignment.strategy = class...RangeAssignor,class...CooperativeStickyAssignor整体吞吐量生产者

  • buffer.memory:增加内存缓冲区
  • batch.size:增加单数据块/批次容量
  • linger.ms:消息发送延迟5毫秒
  • compression.type:开启压缩
Broker
  • 增加分区数(按分类分区)并行处理
消费者
  • fetch.max.bytes:每次消费数据最大容量
  • max.poll.recodes:每次消费数据最大条数
数据精确一次生产者:acks = all,幂等性 + 事务Broker:分区副本至少大于2,防丢失消费者:手动提交offset + 事务

    推荐阅读