Kafka生产者

Kafka Producer

生产者消息发送流程

发送原理

在消息发送的过程中, 涉及到了两个线程–main线程Sender线程. 在 main 线程中创建了一个双端队列RecordAccumulator. main 线程将消息发送个 RecordAccumulator, Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker.


发送流程

生产者重要参数

参数名称 描述
bootstrap.servers 生产者连接集群所需的 boker 地址清单. 可以设置一个或多个, 中间用,隔开. 注意: 并非需要所有的 broker 地址, 因为生产者从给定的 broker 里查找其他 broker 信息
key.serializervalue.serializer 指定发送消息的 key 和 value 的序列化类型. 一定要写全类名!
buffer.memory RecordAccumulator 缓冲区总大小, 默认32m.
batch.size 缓冲区一批数据最大值, 默认16k. (适当增加可以提高吞吐量; 但是如果过大的话, 会导致数据传输延迟增加).
linger.ms 如果数据一直未到达 batch.size, sender 等待 linger.ms 之后就会发送数据, 单位是ms. 默认0ms, 表示没有延迟. (生产环境一般 5-100ms 之间)
acks 0: 生产者发送过来的数据, 不需要等数据落盘应答; 1: 生产者发送过来的数据, Leader 收到数据后应答; -1(all): 生产者发送过来的数据, Leader+ 和 isr 队列里面的所有节点收齐数据后应答. 默认是-1, -1和all是等价的.
max.in.flight.request.per.connection 允许最多没有返回 ack 的次数, 默认为5, 开启幂等性要保证该值是 1-5 的数字.
retries 当消息发送出现错误的时候, 系统会重发消息. 该值表示重复次数. 默认为int最大值,2147483647. 如果设置了重试, 还想保证消息的有序性, 需要设置 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1 否则在重试此失败消息的时候, 其他的消息可能发送成功了.
retry.backoff.ms 两次重试之间的时间间隔, 默认为100ms.
enable.idempotence 是否开启幂等性, 默认为true, 开启幂等性.
compression.type 生产者发送的所有数据的压缩方式. 默认为none, 也就是不压缩. 支持压缩类型: none, gzip, snappy, lz4和zstd.

生产者分区

分区的好处

  1. 便于合理使用存储资源, 每个 Partition 在一个 Broker 上存储, 可以吧海量的数据按照分区切割成小块存储在多台 Broker 上. 合理控制分区的任务, 可以实现负载均衡的效果.
  2. 提高并行度, 生产者可以以分区为单位发送数据; 消费者可以以分区为单位消费数据.

数据传递语义

  • 至少一次(At Least Once) = ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2
  • 最多一次(At Most Once) = ACK级别设置为0.
  • 精确一次(Exactly Once): 对于一些非常重要的信息, 例如和钱相关的数据, 要求数据既不能重复也不能丢失.

也就是说, At Least Once 可以保证数据不丢失, 但是不能保证数据不重复; At Most Once 可以保证数据不重复, 但是不能保证数据不丢失.

幂等性

幂等性原理

幂等性就是指 Producer 不论向 Broker 发送多少次重复数据, Broker 端都只会持久化一条, 保证了不重复.

所以上面的 Exactly Once = 幂等性 + At Least Once(ack=-1 + partitions>=2 + ISR最小副本数>=2)

重复数据的判断标准: 具有<PID, Partition, SeqNumber>相同逐渐的消息提交时, Borker 只会持久化一条. 其中:

  • PID 是 Kafka 每次重启都会分配一个新的;
  • Partition 表示分区号
  • Sequence Number 单调自增

综上, 幂等性只能保证的是在单分区单会话内不重复.

如何使用幂等性

开启参数enable.idempotence.

生产者事务

开启事务, 必须开启幂等性!


Kafka事务原理
0%