从源码分析如何优雅的使用 Kafka 生产者

  • 时间:
  • 浏览:2

首先创建有有另一三个小  org.apache.kafka.clients.producer.Producer 的 bean。

接下来也不 路由分区,通常亲戚亲戚朋友使用的 Topic 为了实现扩展性以及高性能也有创建多个分区。

本文作者: crossoverJie 

但另有有另一三个小 就要能 获知发送结果。

同样的要能获取结果,同去发现回调的应用多多线程 并也有上文同步时的主应用多多线程 ,另有有另一三个小 要能证明是异步回调的。

发送流程讲完了再来看看 Producer 中比较重要的有几条参数。

该函数中会调用有有另一三个小  getOrCreateDeque() 写入到有有另一三个小 内部内部结构缓存中 batches。

也不 使用哪有有另一三个小 得视情況而定。

另有有另一三个小 的发送传输速率觉得是比较低下的,因为着每次都要能同步等候消息发送的结果。

在分析完后 先看有有另一三个小 简单的消息发送是怎么样的。

其中的 valueSerializer.serialize(record.topic(), record.value()); 是有有另一三个小 接口,亲戚亲戚朋友要能在初始化完后 指定序列化实现类。

执行完后 的结果:

从这里要能要能看出为哪些地方完后 说发送完成后元数据和异常信息只会出現有有另一三个小 。

调用该构造措施进行初始化时,不止是简单的将基本参数写入 KafkaProducer。比较麻烦的是初始化 Sender 应用多多线程 进行缓冲区消费。

retries 该参数主也不 来做重试使用,当趋于稳定也不 网络抖动也有造成重试。

当 acks = all/-1 时:

在最很久开始了了初始化的 IO 应用多多线程 觉得是有有另一三个小 守护应用多多线程 ,它会经常消费哪些地方地方数据。

至于为哪些地方会要能参数有有另一三个小 有值,在下文的源码分析中会一一解释。

在调用 send() 函数后觉得第一步也不 序列化,毕竟亲戚亲戚朋友的消息要能通过网络要能发送到 Kafka。

通过图中的有几条函数会获取到完后 写入的数据。这块内容能要能难能可贵深究,但其涵盖个 completeBatch 措施却非常关键。

也不 正确的写法应当是:

能要能看得人 Sender 应用多多线程 有要能成员变量,比如:

等,哪些地方地方参数会在后文分析。

但这仅仅也不 做到了消息发送,对消息算不算成功送达完整没管,等于是纯异步的措施。

简单的来说分为以下几步:

当 acks = 1 时:

通常有四种 生活措施。

Callback 是有有另一三个小 回调接口,在消息发送完成完后 能要能回调亲戚亲戚朋友自定义的实现。

这名 参数也也不 限制重试次数。

接下来详解每个步骤。

这名 参数看名称就知道是内部内部结构缓存区的大小限制,对他适当的调大能要能提高吞吐量。

调用该措施完后 肯定因为着是消息发送完毕了,也不 会调用 batch.done() 来完成完后 亲戚亲戚朋友在 send() 措施中定义的回调接口。

亲戚亲戚朋友要能要能自己实现序列化,只要能实现 org.apache.kafka.common.serialization.Serializer 接口即可。

现在只掌握了基本的消息发送,你还可不可以深刻的理解发送中的也不 参数配置还是得源码说了算。

一旦 Leader 挂掉消息就会丢失。但性能和消息安全性都得到了一定的保证。

而亲戚亲戚朋友也只要能自定义有有另一三个小 类实现 org.apache.kafka.clients.producer.Partitioner 接口,同去在创建 KafkaProducer 实例时配置 partitioner.class 参数。

以下代码基于 SpringBoot 构建。

另有有另一三个小 在少量、频繁的消息发送场景中能要能提高发送传输速率减轻单个 producer 的压力。

因为着是有有另一三个小 分区好说,所有消息都往中间写入即可。

但在过期完后 也有处里完剩余的任务。

也不 查看 send() 的 API 能要能发现还有有有另一三个小 参数。

在 send() 措施拿到分区也有调用有有另一三个小  append() 函数:

当 acks = 0 时:

主要有 [all、-1, 0, 1] 这有几条选项,默认为 1。

因为着要能 指定分区,则会调用 partitioner.partition 接口执行自定义分区策略。

因为着会确保所有的 follower 副本都完成数据的写入才会返回。

因为着消息量真的非常大,同去又要能尽快的将消息发送到 Kafka。有有另一三个小  producer 始终会收到缓存大小等影响。

为了直观的了解发送的流程,简单的画了有几条在发送过程中关键的步骤。

因为着是写入也不 特有的分区,由怪怪的的消费者来进行处里等。

前提是 Topic 配置副本数量 replica > 1。

最后则是 Producer 的关闭,Producer 在使用过程中消耗了不少资源(应用多多线程 、内存、网络等)也不 要能显式的关闭从而回收哪些地方地方资源。

这名 一般在特殊场景下会使用。

也不 这有有另一三个小 参数难能可贵会同去也有数据,要能发送失败才会有异常信息,同去发送元数据为空。

通常要能自定义分区一般是在想尽量的保证消息的顺序性。

本文内容较多,从实例和源码的深度1分析了 Kafka 生产者。

主要关注 bootstrap.servers,它是必填参数。指的是 Kafka 集群中的 broker 地址,相似 127.0.0.1:9094。

但多个分区就不可处里要能知道写入哪个分区。

其余有几条参数暂时不做讨论,后文会有完整介绍。

同去回调的之也有传递有有另一三个小 参数:

最后四种 生活则是默认的路由策略,因为着亲戚亲戚朋友啥都没做就会执行该策略。

另有有另一三个小 在路由也有判断与算不算指定,有就直接使用该分区。

该策略也会使得消息分配的比较均匀。

但也不 能极端,调越多会浪费内存。小了也发挥不了作用,也是有有另一三个小 典型的时间和空间的权衡。

上图是有几条使用的体现。

为此亲戚亲戚朋友应当采取异步的措施发送,觉得 send() 措施默认则是异步的,也不 不手动调用 get() 措施。

那算不算能要能创建多个 producer 来进行发送呢?

要能 他不知道消息到底发送成功要能 该怎么办呢?

默认的 close() 措施和涵盖超时时间的措施也有在一定的时间后强制关闭。

本文来自云栖社区合作措施协议伙伴“开源中国”

acks 是有有另一三个小 影响消息吞吐量的有有另一三个小 关键参数。

producer 不让等候副本的任何响应,另有有另一三个小 最容易丢失消息但同去性能却是最好的!

希望看得人的亲戚亲戚朋友能有收获,同去也欢迎留言讨论。

觉得 Producer 的 API 因为着帮亲戚亲戚朋友考虑到了,发送完后 只要能调用它的 get() 措施即可同步获取发送结果。

另有有另一三个小 能要能保证消息不让丢失!

但同去性能和吞吐量却是最低的。

发送结果:

觉得这也不 很典型的轮询算法,也不 也不 分区数不频繁变动这名 措施也会比较均匀。

来看看它的实现:

首先还是来谈谈消息发送时的整个流程是怎么样的,Kafka 并也有简单的把消息通过网络发送到了 broker中,在 Java 内部内部结构还是经过了也不 优化和设计。

能要能在构建 ProducerRecord 为每条消息指定分区。

但也有也不 也不 问题图片。

接着注入这名 bean 即可调用它的发送函数发送消息。

初始化 IO 应用多多线程 处:

这是四种 生活折中的方案,它会等候副本 Leader 响应,但不让等到 follower 的响应。

从上至下依次是:

原文链接

这里我给某有有另一三个小 Topic 发送了 10W 条数据,运行应用多多线程 消息正常发送。

因为着 Kafka 也有采取的主备模式,也不 采用相似于 Zookeeper 的主备模式。