Kafka
2018-11-24 22:00:48 阿炯

Kafka是Apache下的一个子项目,是一个高性能跨语言分布式发布/订阅消息队列系统。具有以下特性:快速持久化,可以在O(1)的系统开销下进行消息持久化;高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现负载均衡;支持Hadoop数据并行加载,对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka通过Hadoop的并行加载机制统一了在线和离线的消息处理。采用Scala、Java语言开发并在Apache许可证2.0协议下授权。

Kafka is used for building real-time data pipelines and streaming apps. It is horizontally scalable, fault-tolerant, wicked fast, and runs in production in thousands of companies.


其最初是由领英开发,于2011年初开源,并于2012年10月23日由Apache Incubator孵化出站。Jay Kreps似乎已经将它以作家弗朗茨·卡夫卡命名。Kreps选择将该系统以一个作家命名是因为,它是“一个用于优化写作的系统”,而且他很喜欢卡夫卡的作品。作为时下最流行的开源消息系统,被广泛地应用在数据缓冲、异步通信、汇集日志、系统解耦等方面。相比较于RocketMQ等其他常见消息系统,Kafka在保障了大部分功能特性的同时,还提供了超一流的读写性能。

Kafka存储的消息来自任意多被称为“生产者”(Producer)的进程。数据从而可以被分配到不同的“分区”(Partition)、不同的“Topic”下。在一个分区内,这些消息被索引并连同时间戳存储在一起。其它被称为“消费者”(Consumer)的进程可以从分区查询消息。Kafka运行在一个由一台或多台服务器组成的集群上,并且分区可以跨集群结点分布。

Kafka高效地处理实时流式数据,可以实现与Storm、HBase和Spark的集成。作为群集部署到多台服务器上,Kafka处理它所有的发布和订阅消息系统使用了四个API,即生产者API、消费者API、Stream API和Connector API。它能够传递大规模流式消息,自带容错功能,已经取代了一些传统消息系统,如JMS、AMQP等。使用ZooKeeper来协调各个消费者。

Kafka架构的主要术语包括Topic、Record和Broker。Topic由Record组成,Record持有不同的信息,而Broker则负责复制消息。Kafka有四个主要API:
生产者API:支持应用程序发布Record流。
消费者API:支持应用程序订阅Topic和处理Record流。
Stream API:将输入流转换为输出流,并产生结果。
Connector API:执行可重用的生产者和消费者API,可将Topic链接到现有应用程序。


Kafka的架构图

相关术语

Topic 用来对消息进行分类,每个进入到Kafka的信息都会被放到一个Topic下
Broker 用来实现数据存储的主机服务器
Partition 每个Topic中的消息会被分为若干个Partition,以提高消息的处理效率
Producer 消息的生产者
Consumer 消息的消费者
Consumer Group 消息的消费群组

接上介绍一下Kafka的架构和涉及到的名词:
Topic:用于划分Message的逻辑概念,一个Topic可以分布在多个Broker上。

Partition:是Kafka中横向扩展和一切并行化的基础,每个Topic都至少被切分为1个Partition。

Offset:消息在Partition中的编号,编号顺序不跨Partition。

Consumer:用于从Broker中取出/消费Message。

Producer:用于往Broker中发送/生产Message。

Replication:Kafka支持以Partition为单位对Message进行冗余备份,每个Partition都可以配置至少1个Replication(当仅1个Replication时即仅该Partition本身)。

Leader:每个Replication集合中的Partition都会选出一个唯一的Leader,所有的读写请求都由Leader处理。其他Replicas从Leader处把数据更新同步到本地,过程类似大家熟悉的MySQL中的Binlog同步。

Broker:Kafka中使用Broker来接受Producer和Consumer的请求,并把Message持久化到本地磁盘。每个Cluster当中会选举出一个Broker来担任Controller,负责处理Partition的Leader选举,协调Partition迁移等工作。

ISR(In-Sync Replica):是Replicas的一个子集,表示目前Alive且与Leader能够“Catch-up”的Replicas集合。由于读写都是首先落到Leader上,所以一般来说通过同步机制从Leader上拉取数据的Replica都会和Leader有一些延迟(包括了延迟时间和延迟条数两个维度),任意一个超过阈值都会把该Replica踢出ISR。每个Partition都有它自己独立的ISR。


Broker

不同于Redis和MemcacheQ等内存消息队列,Kafka的设计是把所有的Message都要写入速度低容量大的硬盘,以此来换取更强的存储能力。实际上,Kafka使用硬盘并没有带来过多的性能损失,“规规矩矩”的抄了一条“近道”。

首先,说“规规矩矩”是因为Kafka在磁盘上只做Sequence I/O,由于消息系统读写的特殊性,这并不存在什么问题。关于磁盘I/O的性能,引用一组Kafka官方给出的测试数据(Raid-5,7200rpm):
Sequence I/O: 600MB/s
Random I/O: 100KB/s

所以通过只做Sequence I/O的限制,规避了磁盘访问速度低下对性能可能造成的影响。接下来我们再聊一聊Kafka是如何“抄近道的”。

首先,Kafka重度依赖底层操作系统提供的PageCache功能。当上层有写操作时,操作系统只是将数据写入PageCache,同时标记Page属性为Dirty。当读操作发生时,先从PageCache中查找,如果发生缺页才进行磁盘调度,最终返回需要的数据。实际上PageCache是把尽可能多的空闲内存都当做了磁盘缓存来使用。同时如果有其他进程申请内存,回收PageCache的代价又很小,所以现代的OS都支持PageCache。

使用PageCache功能同时可以避免在JVM内部缓存数据,JVM为我们提供了强大的GC能力,同时也引入了一些问题不适用与Kafka的设计。
• 如果在Heap内管理缓存,JVM的GC线程会频繁扫描Heap空间,带来不必要的开销。如果Heap过大,执行一次Full GC对系统的可用性来说将是极大的挑战。

• 所有在在JVM内的对象都不免带有一个Object Overhead(千万不可小视),内存的有效空间利用率会因此降低。

• 所有的In-Process Cache在OS中都有一份同样的PageCache。所以通过将缓存只放在PageCache,可以至少让可用缓存空间翻倍。

• 如果Kafka重启,所有的In-Process Cache都会失效,而OS管理的PageCache依然可以继续使用。

PageCache还只是第一步,Kafka为了进一步的优化性能还采用了Sendfile技术。在解释Sendfile之前,首先介绍一下传统的网络I/O操作流程,大体上分为以下4步:
1)OS 从硬盘把数据读到内核区的PageCache。

2)用户进程把数据从内核区Copy到用户区。

3)然后用户进程再把数据写入到Socket,数据流入内核区的Socket Buffer上。

4)OS 再把数据从Buffer中Copy到网卡的Buffer上,这样完成一次发送。



整个过程共经历两次Context Switch,四次System Call。同一份数据在内核Buffer与用户Buffer之间重复拷贝,效率低下。其中2、3两步没有必要,完全可以直接在内核区完成数据拷贝。这也正是Sendfile所解决的问题,经过Sendfile优化后,整个I/O过程就变成了下面这个样子。



通过以上的介绍不难看出,Kafka的设计初衷是尽一切努力在内存中完成数据交换,无论是对外作为一整个消息系统,或是内部同底层操作系统的交互。如果Producer和Consumer之间生产和消费进度上配合得当,完全可以实现数据交换零I/O。这也就是我为什么说Kafka使用“硬盘”并没有带来过多性能损失的原因。

提示:

Kafka官方并不建议通过Broker端的log.flush.interval.messages和log.flush.interval.ms来强制写盘,认为数据的可靠性应该通过Replica来保证,而强制Flush数据到磁盘会对整体性能产生影响。

可以通过调整/proc/sys/vm/dirty_background_ratio和/proc/sys/vm/dirty_ratio来调优性能。

脏页率超过第一个指标会启动pdflush开始Flush Dirty PageCache。

脏页率超过第二个指标会阻塞所有的写操作来进行Flush。

根据不同的业务需求可以适当的降低dirty_background_ratio和提高dirty_ratio。


Partition

Partition是Kafka可以很好的横向扩展和提供高并发处理以及实现Replication的基础。

扩展性方面。首先,Kafka允许Partition在集群内的Broker之间任意移动,以此来均衡可能存在的数据倾斜问题。其次,Partition支持自定义的分区算法,例如可以将同一个Key的所有消息都路由到同一个Partition上去。同时Leader也可以在In-Sync的Replica中迁移。由于针对某一个Partition的所有读写请求都是只由Leader来处理,所以Kafka会尽量把Leader均匀的分散到集群的各个节点上,以免造成网络流量过于集中。

并发方面。任意Partition在某一个时刻只能被一个Consumer Group内的一个Consumer消费(反过来一个Consumer则可以同时消费多个Partition),Kafka非常简洁的Offset机制最小化了Broker和Consumer之间的交互,这使Kafka并不会像同类其他消息队列一样,随着下游Consumer数目的增加而成比例的降低性能。此外,如果多个Consumer恰巧都是消费时间序上很相近的数据,可以达到很高的PageCache命中率,因而Kafka可以非常高效的支持高并发读操作,实践中基本可以达到单机网卡上限。

不过Partition的数量并不是越多越好,Partition的数量越多,平均到每一个Broker上的数量也就越多。考虑到Broker宕机(Network Failure, Full GC)的情况下,需要由Controller来为所有宕机的Broker上的所有Partition重新选举Leader,假设每个Partition的选举消耗10ms,如果Broker上有500个Partition,那么在进行选举的5s的时间里,对上述Partition的读写操作都会触发LeaderNotAvailableException。

再进一步,如果挂掉的Broker是整个集群的Controller,那么首先要进行的是重新任命一个Broker作为Controller。新任命的Controller要从Zookeeper上获取所有Partition的Meta信息,获取每个信息大概3-5ms,那么如果有10000个Partition这个时间就会达到30s-50s。而且不要忘记这只是重新启动一个Controller花费的时间,在这基础上还要再加上前面说的选举Leader的时间!

此外,在Broker端,对Producer和Consumer都使用了Buffer机制。其中Buffer的大小是统一配置的,数量则与Partition个数相同。如果Partition个数过多,会导致Producer和Consumer的Buffer内存占用过大。

提示:

Partition的数量尽量提前预分配,虽然可以在后期动态增加Partition,但是会冒着可能破坏Message Key和Partition之间对应关系的风险。

Replica的数量不要过多,如果条件允许尽量把Replica集合内的Partition分别调整到不同的Rack。

尽一切努力保证每次停Broker时都可以Clean Shutdown,否则问题就不仅仅是恢复服务所需时间长,还可能出现数据损坏或其他很诡异的问题。


Producer

Kafka的研发团队表示在0.8版本里用Java重写了整个Producer,据说性能有了很大提升。我还没有亲自对比试用过,这里就不做数据对比了。本文结尾的扩展阅读里提到了一套我认为比较好的对照组,有兴趣的同学可以尝试一下。

其实在Producer端的优化大部分消息系统采取的方式都比较单一,无非也就化零为整、同步变异步这么几种。

Kafka系统默认支持MessageSet,把多条Message自动地打成一个Group后发送出去,均摊后拉低了每次通信的RTT。而且在组织MessageSet的同时,还可以把数据重新排序,从爆发流式的随机写入优化成较为平稳的线性写入。

此外,还要着重介绍的一点是,Producer支持End-to-End的压缩。数据在本地压缩后放到网络上传输,在Broker一般不解压(除非指定要Deep-Iteration),直至消息被Consume之后在客户端解压。

当然用户也可以选择自己在应用层上做压缩和解压的工作(毕竟Kafka目前支持的压缩算法有限,只有GZIP和Snappy),不过这样做反而会意外的降低效率!Kafka的End-to-End压缩与MessageSet配合在一起工作效果最佳,上面的做法直接割裂了两者间联系。至于道理其实很简单,压缩算法中一条基本的原理“重复的数据量越多,压缩比越高”。无关于消息体的内容,无关于消息体的数量,大多数情况下输入数据量大一些会取得更好的压缩比。

不过Kafka采用MessageSet也导致在可用性上一定程度的妥协。每次发送数据时,Producer都是send()之后就认为已经发送出去了,但其实大多数情况下消息还在内存的MessageSet当中,尚未发送到网络,这时候如果Producer挂掉,那就会出现丢数据的情况。

为了解决这个问题,Kafka在0.8版本的设计借鉴了网络当中的ack机制。如果对性能要求较高,又能在一定程度上允许Message的丢失,那就可以设置request.required.acks=0 来关闭ack,以全速发送。如果需要对发送的消息进行确认,就需要设置request.required.acks为1或-1,那么1和-1又有什么区别呢?这里又要提到前面聊的有关Replica数量问题。如果配置为1,表示消息只需要被Leader接收并确认即可,其他的Replica可以进行异步拉取无需立即进行确认,在保证可靠性的同时又不会把效率拉得很低。如果设置为-1,表示消息要Commit到该Partition的ISR集合中的所有Replica后,才可以返回ack,消息的发送会更安全,而整个过程的延迟会随着Replica的数量正比增长,这里就需要根据不同的需求做相应的优化。

提示:Producer的线程不要配置过多,尤其是在Mirror或者Migration中使用的时候,会加剧目标集群Partition消息乱序的情况(如果你的应用场景对消息顺序很敏感的话)。0.8版本的request.required.acks默认是0(同0.7)。


Consumer

Consumer端的设计大体上还算是比较常规的。

• 通过Consumer Group,可以支持生产者消费者和队列访问两种模式。
• Consumer API分为High level和Low level两种。前一种重度依赖Zookeeper,所以性能差一些且不自由,但是超省心。第二种不依赖Zookeeper服务,无论从自由度和性能上都有更好的表现,但是所有的异常(Leader迁移、Offset越界、Broker宕机等)和Offset的维护都需要自行处理。
• 发布的0.9 Release。开发人员又用Java重写了一套Consumer。把两套API合并在一起,同时去掉了对Zookeeper的依赖。据说性能有大幅度提升。

提示:强烈推荐使用Low level API,虽然繁琐一些,但是目前只有这个API可以对Error数据进行自定义处理,尤其是处理Broker异常或由于Unclean Shutdown导致的Corrupted Data时,否则无法Skip只能等着“坏消息”在Broker上被Rotate掉,在此期间该Replica将会一直处于不可用状态。

对kafka的快速理解

它是一个流式数据处理平台,具有消息系统的能力,也有实时流式数据处理分析能力,只是更多的偏向于把他当做消息队列系统来使用。如果说按照容易理解来分层的话,大致可以分为3层:

第一层是Zookeeper,相当于注册中心,他负责kafka集群元数据的管理,以及集群的协调工作,在每个kafka服务器启动的时候去连接到Zookeeper,把自己注册到Zookeeper当中

第二层里是kafka的核心层,这里就会包含很多kafka的基本概念在内:
record:代表消息
topic:主题,消息都会由一个主题方式来组织,可以理解为对于消息的一个分类
producer:生产者,负责发送消息
consumer:消费者,负责消费消息
broker:kafka服务器
partition:分区,主题会由多个分区组成,通常每个分区的消息都是按照顺序读取的,不同的分区无法保证顺序性,分区也就是我们常说的数据分片sharding机制,主要目的就是为了提高系统的伸缩能力,通过分区,消息的读写可以负载均衡到多个不同的节点上
Leader/Follower:分区的副本。为了保证高可用,分区都会有一些副本,每个分区都会有一个Leader主副本负责读写数据,Follower从副本只负责和Leader副本保持数据同步,不对外提供任何服务
offset:偏移量,分区中的每一条消息都会根据时间先后顺序有一个递增的序号,这个序号就是offset偏移量
Consumer group:消费者组,由多个消费者组成,一个组内只会由一个消费者去消费一个分区的消息
Coordinator:协调者,主要是为消费者组分配分区以及重平衡Rebalance操作
Controller:控制器,其实就是一个broker而已,用于协调和管理整个Kafka集群,他会负责分区Leader选举、主题管理等工作,在Zookeeper第一个创建临时节点/controller的就会成为控制器。

第三层则是存储层,用来保存kafka的核心数据,它们都会以日志的形式最终写入磁盘中。



服务器:Kafka 作为一个或多个服务器集群运行,可以跨越多个数据中心或云区域。其中一些服务器形成存储层,称为代理。其他服务器运行 Kafka Connect 以事件流的形式持续导入和导出数据,以将 Kafka 与您现有的系统(例如关系数据库以及其他 Kafka 集群)集成。为了让您实现关键任务用例,Kafka 集群具有高度可扩展性和容错性:如果其中任何一台服务器发生故障,其他服务器将接管它们的工作以确保连续运行而不会丢失任何数据。

客户端:它们允许您编写分布式应用程序和微服务,即使在出现网络问题或机器故障的情况下,也能以容错的方式并行、大规模地读取、写入和处理事件流。Kafka 附带了一些这样的客户端,这些客户端由 Kafka 社区提供的 数十个客户端进行了扩充:客户端可用于 Java 和 Scala,包括更高级别的 Kafka Streams 库,用于许多其他编程语言以及 REST API。

架构


与前面两个 MQ 类似有生产阶段、存储阶段、消费阶段,相比 RocketMQ 这里的注册中心是用的 Zookeeper,Kafka 的诸多事件都依赖于 ZK,元数据管理、各个角色的注册、心跳、选举、状态维护,这里的角色包括 Boker、Topic、Partition、消费者组等。

所以这里也会带来 ZK Watch 事件压力过大的问题,大量的 ZK 节点事件阻塞在队列中,导致自旋锁,导致 CPU 上升,由于大量数量事件对象导致占用了大量的内存。

图中的 Controller 是 Kakfa 服务端 Broker 的概念,Broker 集群有多台,但只有一台 Broker 可以扮演控制器的角色;某台 Broker 一旦成为 Controller,它用于以下权力:完成对集群成员管理、主题维护和分区的管理,如集群 Broker 信息、Topic 维护、Partition 维护、分区选举 ISR、同步元信息给其他 Broker 等。

存储


Topic 是逻辑上的概念,而 Partition 是物理上的概念,即一个 Topic 划分为多个 Partition,每个 Partition 对应一个 Log 文件。


.log 文件:存储消息数据的文件。
.index 文件:索引文件,记录一条消息在 log 文件中的位置。
.snapshot 文件:记载着生产者最新的 offset。
.timeindex 时间索引文件:当前日志分段文件中建立索引的消息的时间戳,是在 0.10.0 版本后增加的,用于根据时间戳快速查找特定消息的位移值,优化 Kafka 读取历史消息缓慢的问题。为了保证时间戳的单调递增,可以将 log.message.timestamp.type 设置成 logApendTime,而 CreateTime 不能保证是消息写入时间。


上图是三个 Broker、两个 Topic、两个 Partition 的 Broker  的存储情况,可以延伸想象一下百万级 Topic 的存储情况会很复杂。

Rebalnce 问题

为了解决强依赖 Zookeeper 进行 Rebalance 带来的问题,Kafka 引入了 Coordinator 机制。首先触发 Rebalance(再均衡)操作的场景目前分为以下几种:消费者组内消费者数量发生变化,包括:
有新消费者加入与节点扩容;
有消费者宕机下线,包括真正宕机,或者长时间 GC、网络延迟导致消费者未在超时时间内向 GroupCoordinator 发送心跳,也会被认为下线;
有消费者主动退出消费者组(发送 LeaveGroupRequest 请求) 比如客户端调用了 unsubscrible() 方法取消对某些主题的订阅;
消费者组对应的 GroupCoordinator 节点发生了变化;
消费者组订阅的主题发生变化(增减)或者主题分区数量发生了变化。

Kafka使用入门

Kafka中级学习笔记

1、Kafka基本概念

Broker:Kafka集群包含一个或多个服务器,这种服务器被称为broker。

Topic:每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。

Message
    消息是Kafka通讯的基本单位,有一个固定长度的消息头和一个可变长度的消息体(payload)构成。在Java客户端中又称之为记录(Record)。
    消息结构各部分说明如下:
            CRC32: CRC32校验和,4个字节。
            magic: Kafka服务程序协议版本号,用于做兼容。1个字节。
            attributes: 该字段占1字节,其中低两位用来表示压缩方式,第三位表示时间戳类型(0表示LogCreateTime,1表示LogAppendTime),高四位为预留位置,暂无实际意义。
            timestamp: 消息时间戳,当 magic > 0 时消息头必须包含该字段。8个字节。
            key-length: 消息key长度,4个字节。
            key: 消息key实际数据。
            payload-length: 消息实际数据长度,4个字节。
            payload: 消息实际数据
    在实际存储一条消息还包括12字节的额外开销(LogOverhead):
            消息的偏移量: 8字节,类似于消息的Id。
            消息的总长度: 4字节

Partition:
    Partition(分区)是物理上的概念,每个Topic包含一个或多个Partition。
    每个分区由一系列有序的不可变的消息组成,是一个有序队列。
    每个分区在物理上对应为一个文件夹,分区的命名规则为${topicName}-{partitionId},如__consumer_offsets-0。
    分区目录下存储的是该分区的日志段,包括日志数据文件和两个索引文件。
    每条消息被追加到相应的分区中,是顺序写磁盘,因此效率非常高,这也是Kafka高吞吐率的一个重要保证。
    kafka只能保证一个分区内的消息的有序性,并不能保证跨分区消息的有序性。

LogSegment:
日志文件按照大小或者时间滚动切分成一个或者多个日志段(LogSegment),其中日志段大小由配置项log.segment.bytes指定,默认是1GB。时间长度则是根据log.roll.ms或者log.roll.hours配置项设置;当前活跃的日志段称之为活跃段(activeSegment)。
    不同于普通的日志文件,Kafka的日志段除了有一个具体的日志文件之外,还有两个辅助的索引文件:
    数据文件
        数据文件是以 .log 为文件后缀名的消息集文件(FileMessageSet),用于保存消息实际数据
        命名规则为:由数据文件的第一条消息偏移量,也称之为基准偏移量(BaseOffset),左补0构成20位数字字符组成
        每个数据文件的基准偏移量就是上一个数据文件的LEO+1(第一个数据文件为0)
    偏移量索引文件
        文件名与数据文件相同,但是以.index为后缀名。它的目的是为了快速根据偏移量定位到消息所在的位置。
        首先Kafka将每个日志段以BaseOffset为key保存到一个ConcurrentSkipListMap跳跃表中,这样在查找指定偏移量的消息时,用二分查找法就能快速定位到消息所在的数据文件和索引文件
        然后在索引文件中通过二分查找,查找值小于等于指定偏移量的最大偏移量,最后从查找出的最大偏移量处开始顺序扫描数据文件,直到在数据文件中查询到偏移量与指定偏移量相等的消息
        需要注意的是并不是每条消息都对应有索引,而是采用了稀疏存储的方式,每隔一定字节的数据建立一条索引,我们可以通过index.interval.bytes设置索引跨度。
    时间戳索引文件
        Kafka从0.10.1.1版本开始引入了一个基于时间戳的索引文件,文件名与数据文件相同,但是以.timeindex作为后缀。它的作用则是为了解决根据时间戳快速定位消息所在位置。
        Kafka API提供了一个 offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)方法,该方法会返回时间戳大于等于待查询时间的第一条消息对应的偏移量和时间戳。这个功能其实挺好用的,假设我们希望从某个时间段开始消费,就可以用offsetsForTimes()方法定位到离这个时间最近的第一条消息的偏移量,然后调用seek(TopicPartition, long offset)方法将消费者偏移量移动过去,然后调用poll()方法长轮询拉取消息。

Producer:
        负责发布消息到Kafka broker。
        生产者的一些重要的配置项:
            request.required.acks: Kafka为生产者提供了三种消息确认机制(ACK),用于配置broker接到消息后向生产者发送确认信息,以便生产者根据ACK进行相应的处理,该机制通过属性request.required.acks设置,取值可以为0, -1, 1,默认是1。
                acks=0: 生产者不需要等待broker返回确认消息,而连续发送消息。
                acks=1: 生产者需要等待Leader副本已经成功将消息写入日志文件中。这种方式在一定程度上降低了数据丢失的可能性,但仍无法保证数据一定不会丢失。因为没有等待follower副本同步完成。
                acks=-1: Leader副本和所有的ISR列表中的副本都完成数据存储时才会向生产者发送确认消息。为了保证数据不丢失,需要保证同步的副本至少大于1,通过参数min.insync.replicas设置,当同步副本数不足次配置项时,生产者会抛出异常。但是这种方式同时也影响了生产者发送消息的速度以及吞吐率。
            message.send.max.retries: 生产者在放弃该消息前进行重试的次数,默认是3次。
            retry.backoff.ms: 每次重试之前等待的时间,单位是ms,默认是100。
            queue.buffering.max.ms: 在异步模式下,消息被缓存的最长时间,当到达该时间后消息被开始批量发送;若在异步模式下同时配置了缓存数据的最大值batch.num.messages,则达到这两个阈值的任何一个就会触发消息批量发送。默认是1000ms。
            queue.buffering.max.messages: 在异步模式下,可以被缓存到队列中的未发送的最大消息条数。默认是10000。
            queue.enqueue.timeout.ms:
                =0: 表示当队列没满时直接入队,满了则立即丢弃
                <0: 表示无条件阻塞且不丢弃
                >0: 表示阻塞达到该值时长抛出QueueFullException异常
            batch.num.messages: Kafka支持批量消息(Batch)向broker的特定分区发送消息,批量大小由属性batch.num.messages设置,表示每次批量发送消息的最大消息数,当生产者采用同步模式发送时改配置项将失效。默认是200。
            request.timeout.ms: 在需要acks时,生产者等待broker应答的超时时间。默认是1500ms。
            send.buffer.bytes: Socket发送缓冲区大小。默认是100kb。
            topic.metadata.refresh.interval.ms: 生产者定时请求更新主题元数据的时间间隔。若设置为0,则在每个消息发送后都会去请求更新数据。默认是5min。
            client.id: 生产者id,主要方便业务用来追踪调用定位问题。默认是console-producer。

Consumer & Consumer Group & Group Coordinator:
    Consumer: 消息消费者,向Kafka broker读取消息的客户端。Kafka0.9版本发布了基于Java重新写的新的消费者,它不再依赖scala运行时环境和zookeeper。
    Consumer Group: 每个消费者都属于一个特定的Consumer Group,可通过group.id配置项指定,若不指定group name则默认为test-consumer-group。
    Group Coordinator: 对于每个Consumer group,会选择一个brokers作为消费组的协调者。
    每个消费者也有一个全局唯一的id,可通过配置项client.id指定,如果不指定,Kafka会自动为该消费者生成一个格式为${groupId}-${hostName}-${timestamp}-${UUID前8个字符}的全局唯一id。
    Kafka提供了两种提交consumer_offset的方式:Kafka自动提交 或者 客户端调用KafkaConsumer相应API手动提交。
    自动提交: 并不是定时周期性提交,而是在一些特定事件发生时才检测与上一次提交的时间间隔是否超过auto.commit.interval.ms。
        enable.auto.commit=true
        auto.commit.interval.ms
    手动提交
        enable.auto.commit=false
        commitSync(): 同步提交
        commitAsync(): 异步提交
    消费者的一些重要的配置项:
        group.id: A unique string that identifies the consumer group this consumer belongs to.
        client.id: The client id is a user-specified string sent in each request to help trace calls. It should logically identify the application making the request.
        bootstrap.servers: A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
        key.deserializer: Deserializer class for key that implements the org.apache.kafka.common.serialization.Deserializer interface.
        value.deserializer: Deserializer class for value that implements the org.apache.kafka.common.serialization.Deserializer interface.
        fetch.min.bytes: The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request.
        fetch.max.bytes: The maximum amount of data the server should return for a fetch request.
        max.partition.fetch.bytes: The maximum amount of data per-partition the server will return.
        max.poll.records: The maximum number of records returned in a single call to poll().
        heartbeat.interval.ms: The expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities.
        session.timeout.ms: The timeout used to detect consumer failures when using Kafka’s group management facility.
        enable.auto.commit: If true the consumer’s offset will be periodically committed in the background.

ISR: Kafka在ZK中动态维护了一个ISR(In-Sync Replica),即保持同步的副本列表,该列表中保存的是与leader副本保持消息同步的所有副本对应的brokerId。如果一个副本宕机或者落后太多,则该follower副本将从ISR列表中移除。

Zookeeper:
Kafka利用ZK保存相应的元数据信息,包括:broker信息,Kafka集群信息,旧版消费者信息以及消费偏移量信息,主题信息,分区状态信息,分区副本分片方案信息,动态配置信息,等等。Kafka在zk中注册节点说明:
    /consumers: 旧版消费者启动后会在ZK的该节点下创建一个消费者的节点
    /brokers/seqid: 辅助生成的brokerId,当用户没有配置broker.id时,ZK会自动生成一个全局唯一的id。
    /brokers/topics: 每创建一个主题就会在该目录下创建一个与该主题同名的节点。
    /borkers/ids: 当Kafka每启动一个KafkaServer时就会在该目录下创建一个名为{broker.id}的子节点
    /config/topics: 存储动态修改主题级别的配置信息
    /config/clients: 存储动态修改客户端级别的配置信息
    /config/changes: 动态修改配置时存储相应的信息
    /admin/delete_topics: 在对主题进行删除操作时保存待删除主题的信息
    /cluster/id: 保存集群id信息
    /controller: 保存控制器对应的brokerId信息等
    /isr_change_notification: 保存Kafka副本ISR列表发生变化时通知的相应路径
Kafka在启动或者运行过程中会在ZK上创建相应的节点来保存元数据信息,通过监听机制在这些节点注册相应的监听器来监听节点元数据的变化。

TIPS:如果跟ES对应,Broker相当于Node,Topic相当于Index,Message相对于Document,而Partition相当于shard。LogSegment相对于ES的Segment。

1.1、如何查看消息内容(Dump Log Segments)

我们在使用kafka的过程中有时候可以需要查看我们生产的消息的各种信息,这些消息是存储在kafka的日志文件中的。由于日志文件的特殊格式,我们是无法直接查看日志文件中的信息内容。Kafka提供了一个命令,可以将二进制分段日志文件转储为字符类型的文件:
$ bin/kafka-run-class.sh kafka.tools.DumpLogSegments
Parse a log file and dump its contents to the console,
useful for debugging a seemingly corrupt log segment.
Option                                  Description                           
------                                  -----------                           
--deep-iteration    使用深迭代而不是浅迭代                          
--files <file1, file2, ...>    必填。输入的日志段文件,逗号分隔
--key-decoder-class    自定义key值反序列化器。必须实现`kafka.serializer.Decoder` trait。所在jar包需要放在`kafka/libs`目录下。(默认是`kafka.serializer.StringDecoder`)。
--max-message-size <Integer: size>    消息最大的字节数(默认为5242880)                           
--print-data-log    同时打印出日志消息             
--value-decoder-class    自定义value值反序列化器。必须实现`kafka.serializer.Decoder` trait。所在jar包需要放在`kafka/libs`目录下。(默认是`kafka.serializer.StringDecoder`)。
--verify-index-only    只是验证索引不打印索引内容

$ bin/kafka-run-class.sh kafka.tools.DumpLogSegments
--files /tmp/kafka-logs/test-0/00000000000000000000.log
--print-data-log
Dumping /tmp/kafka-logs/test-0/00000000000000000000.log
Starting offset: 0
offset: 0 position: 0 CreateTime: 1498104812192 isvalid: true
payloadsize: 11 magic: 1 compresscodec: NONE crc: 3271928089
payload: hello world
offset: 1 position: 45 CreateTime: 1498104813269 isvalid: true
payloadsize: 14 magic: 1 compresscodec: NONE crc: 242183772
payload: hello everyone

注意:这里 --print-data-log  是表示查看消息内容的,不加此项只能看到Header,看不到payload。

也可以用来查看index文件:
$ bin/kafka-run-class.sh kafka.tools.DumpLogSegments
--files /tmp/kafka-logs/test-0/00000000000000000000.index  
--print-data-log
Dumping /tmp/kafka-logs/test-0/00000000000000000000.index
offset: 0 position: 0

timeindex文件也是OK的:
$ bin/kafka-run-class.sh kafka.tools.DumpLogSegments
--files /tmp/kafka-logs/test-0/00000000000000000000.timeindex  
--print-data-log
Dumping /tmp/kafka-logs/test-0/00000000000000000000.timeindex
timestamp: 1498104813269 offset: 1
Found timestamp mismatch in :
/tmp/kafka-logs/test-0/00000000000000000000.timeindex
  Index timestamp: 0, log timestamp: 1498104812192
Found out of order timestamp in :
/tmp/kafka-logs/test-0/00000000000000000000.timeindex
  Index timestamp: 0, Previously indexed timestamp: 1498104813269

1.2、消费者平衡过程

消费者平衡(Consumer Rebalance)是指的是消费者重新加入消费组,并重新分配分区给消费者的过程。在以下情况下会引起消费者平衡操作:
    新的消费者加入消费组
    当前消费者从消费组退出(不管是异常退出还是正常关闭)
    消费者取消对某个主题的订阅
    订阅主题的分区增加(Kafka的分区数可以动态增加但是不能减少)
    broker宕机新的协调器当选
    当消费者在${session.timeout.ms}时间内还没有发送心跳请求,组协调器认为消费者已退出。

消费者自动平衡操作提供了消费者的高可用和高可扩展性,这样当我们增加或者减少消费者或者分区数的时候,不需要关心底层消费者和分区的分配关系。但是需要注意的是,在rebalancing过程中,由于需要给消费者重新分配分区,所以会出现在一个短暂时间内消费者不能拉取消息的状况。

注意:
这里要特别注意最后一种情况,就是所谓的慢消费者(Slow Consumers)。如果没有在session.timeout.ms时间内收到心跳请求,协调者可以将慢消费者从组中移除。通常,如果消息处理比session.timeout.ms慢,就会成为慢消费者。导致两次poll()方法的调用间隔比session.timeout.ms时间长。由于心跳只在 poll()调用时才会发送(在0.10.1.0版本中, 客户端心跳在后台异步发送了),这就会导致协调者标记慢消费者死亡。

如果没有在session.timeout.ms时间内收到心跳请求,协调者标记消费者死亡并且断开和它的连接。同时通过向组内其他消费者的HeartbeatResponse中发送IllegalGeneration错误代码 触发rebalance操作。

在手动commit offset的模式下,要特别注意这个问题,否则会出现commit不上的情况。导致一直在重复消费。

2、Kafka的特点

2.1、消息顺序:保证每个partition内部的顺序,但是不保证跨partition的全局顺序。如果需要全局消息有序,topic只能有一个partition。

2.2、consumer group:consumer group中的consumer并发获取消息,但是为了保证partition消息的顺序性,每个partition只会由一个consumer消费。因此consumer group中的consumer数量需要小于等于topic的partition个数。(如需全局消息有序,只能有一个partition,一个consumer)

2.3、同一Topic的一条消息只能被同一个Consumer Group内的一个Consumer消费,但多个Consumer Group可同时消费这一消息。这是Kafka用来实现一个Topic消息的广播(发给所有的Consumer)和单播(发给某一个Consumer)的手段。一个Topic可以对应多个Consumer Group。如果需要实现广播,只要每个Consumer有一个独立的Group就可以了。要实现单播只要所有的Consumer在同一个Group里。

2.4、Producer Push消息,Client Pull消息模式:一些logging-centric system,比如Facebook的Scribe和Cloudera的Flume,采用push模式。事实上,push模式和pull模式各有优劣。push模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。push模式的目标是尽可能以最快速度传递消息,但是这样很容易造成Consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据Consumer的消费能力以适当的速率消费消息。pull模式可简化broker的设计,Consumer可自主控制消费消息的速率,同时Consumer可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。

实际上,Kafka的设计理念之一就是同时提供离线处理和实时处理。根据这一特性,可以使用Storm或Spark Streaming这种实时流处理系统对消息进行实时在线处理,同时使用Hadoop这种批处理系统进行离线处理,还可以同时将数据实时备份到另一个数据中心,只需要保证这三个操作所使用的Consumer属于不同的Consumer Group即可。

3、kafka的HA


Kafka在0.8以前的版本中,并不提供High Availablity机制,一旦一个或多个Broker宕机,则宕机期间其上所有Partition都无法继续提供服务。若该Broker永远不能再恢复,亦或磁盘故障,则其上数据将丢失。而Kafka的设计目标之一即是提供数据持久化,同时对于分布式系统来说,尤其当集群规模上升到一定程度后,一台或者多台机器宕机的可能性大大提高,对Failover要求非常高。因此,Kafka从0.8开始提供High Availability机制。主要表现在Data Replication和Leader Election两方面。

3.1、Data Replication

Kafka从0.8开始提供partition级别的replication,replication的数量可在$KAFKA_HOME/config/server.properties中配置:
default.replication.factor = 1

该 Replication与leader election配合提供了自动的failover机制。replication对Kafka的吞吐率是有一定影响的,但极大的增强了可用性。默认情况下,Kafka的replication数量为1。每个partition都有一个唯一的leader,所有的读写操作都在leader上完成,follower批量从leader上pull数据。一般情况下partition的数量大于等于broker的数量,并且所有partition的leader均匀分布在broker上。follower上的日志和其leader上的完全一样。

需要注意的是,replication factor并不会影响consumer的吞吐率测试,因为consumer只会从每个partition的leader读数据,而与replicaiton factor无关。同样,consumer吞吐率也与同步复制还是异步复制无关。

3.2、Leader Election

引入Replication之后,同一个Partition可能会有多个副本(Replica),而这时需要在这些副本之间选出一个Leader,Producer和Consumer只与这个Leader副本交互,其它Replica作为Follower从Leader中复制数据。注意,只有Leader负责数据读写,Follower只向Leader顺序Fetch数据(N条通路),并不提供任何读写服务,系统更加简单且高效。

思考为什么follower副本不提供读写,只做冷备?

follwer副本不提供写服务这个比较好理解,因为如果follower也提供写服务的话,那么就需要在所有的副本之间相互同步。n个副本就需要 nxn 条通路来同步数据,如果采用异步同步的话,数据的一致性和有序性是很难保证的;而采用同步方式进行数据同步的话,那么写入延迟其实是放大n倍的,反而适得其反。

那么为什么不让follower副本提供读服务,减少leader副本的读压力呢?这个除了因为同步延迟带来的数据不一致之外,不同于其他的存储服务(如ES,MySQL),Kafka的读取本质上是一个有序的消息消费,消费进度是依赖于一个叫做offset的偏移量,这个偏移量是要保存起来的。如果多个副本进行读负载均衡,那么这个偏移量就不好确定了。

提示:Kafka的leader副本类似于ES的primary shard,follower副本相对于ES的replica。ES也是一个index有多个shard(相对于Kafka一个topic有多个partition),shard又分为primary shard和replicition shard,其中primary shard用于提供读写服务(sharding方式跟MySQL非常类似:shard = hash(routing) % number_of_primary_shards。但是ES引入了协调节点(coordinating node) 的角色,实现对客户端透明),而replication shard只提供读服务(这里跟Kafka一样,ES会等待relication shard返回成功才最终返回给client)。

有传统MySQL分库分表经验的同学一定会觉得这个过程是非常相似的,就是一个sharding + replication的数据架构,只是通过client(SDK)或者coordinator对你透明了而已。

3.3、Propagate消息

Producer在发布消息到某个Partition时,先通过ZooKeeper找到该Partition的Leader,然后无论该Topic的Replication Factor为多少(也即该Partition有多少个Replica),Producer只将该消息发送到该Partition的Leader。Leader会将该消息写入其本地Log。每个Follower都从Leader pull数据。这种方式上,Follower存储的数据顺序与Leader保持一致。Follower在收到该消息并写入其Log后,向Leader发送ACK。一旦Leader收到了 ISR (in-sync replicas) 中的所有Replica的ACK,该消息就被认为已经commit了,Leader将增加 HW( High-Watermark) 并且向Producer发送ACK。

为了提高性能,每个Follower在接收到数据后就立马向Leader发送ACK,而非等到数据写入Log中。因此,对于已经commit的消息,Kafka只能保证它被存于多个Replica的内存中,而不能保证它们被持久化到磁盘中,也就不能完全保证异常发生后该条消息一定能被Consumer消费。但考虑到这种场景非常少见,可以认为这种方式在性能和数据持久化上做了一个比较好的平衡。在将来的版本中,Kafka会考虑提供更高的持久性。

Consumer读消息也是从Leader读取,只有被commit过的消息(offset低于HW的消息)才会暴露给Consumer。

Kafka Replication的数据流如下图所示:


关于这方面的内容比较多而且复杂,这里就不展开了。


3.4、Kafka的几个游标(偏移量/offset)

下面这张图非常简单明了的显示kafka的所有游标:

下面简单的说明一下:

3.4.0、ISR
In-Sync Replicas list,顾名思义,就是跟leader “保存同步” 的Replicas。“保持同步”的含义有些复杂,在0.9版本,broker的参数replica.lag.time.max.ms用来指定ISR的定义,如果leader在这么长时间没收到follower的拉取请求,或者在这么长时间内,follower没有fetch到leader的log end offset,就会被leader从ISR中移除。ISR是个很重要的指标,controller选取partition的leader replica时会使用它,leader需要维护ISR列表,因此leader选取ISR后会把结果记到Zookeeper上。

在需要选举leader的场景下,leader和ISR是由controller决定的。在选出leader以后,ISR是leader决定。如果谁是leader和ISR只存在于ZK上,那么每个broker都需要在Zookeeper上监听它host的每个partition的leader和ISR的变化,这样效率比较低。如果不放在Zookeeper上,那么当controller fail以后,需要从所有broker上重新获得这些信息,考虑到这个过程中可能出现的问题,也不靠谱。所以leader和ISR的信息存在于Zookeeper上,但是在变更leader时,controller会先在Zookeeper上做出变更,然后再发送LeaderAndIsrRequest给相关的broker。这样可以在一个LeaderAndIsrRequest里包括这个broker上有变动的所有partition,即batch一批变更新信息给broker,更有效率。另外,在leader变更ISR时,会先在Zookeeper上做出变更,然后再修改本地内存中的ISR。

3.4.1、Last Commited Offset
Consumer最后提交的位置,这个位置会保存在一个特殊的topic:_consumer_offsets 中。

3.4.2、Current Position
Consumer当前读取的位置,但是还没有提交给broker。提交之后就变成Last Commit Offset。

3.4.3、High Watermark(HW)
这个offset是所有ISR的LEO的最小位置(minimum LEO across all the ISR of this partition),consumer不能读取超过HW的消息,因为这意味着读取到未完全同步(因此没有完全备份)的消息。换句话说就是:HW是所有ISR中的节点都已经复制完的消息.也是消费者所能获取到的消息的最大offset(注意,并不是所有replica都一定有这些消息,而只是ISR里的那些才肯定会有)。

随着follower的拉取进度的即时变化,HW是随时在变化的。follower总是向leader请求自己已有messages的下一个offset开始的数据,因此当follower发出了一个fetch request,要求offset为A以上的数据,leader就知道了这个follower的log end offset至少为A。此时就可以统计下ISR里的所有replica的LEO是否已经大于了HW,如果是的话,就提高HW。同时,leader在fetch本地消息给follower时,也会在返回给follower的reponse里附带自己的HW。这样follower也就知道了leader处的HW(但是在实现中,follower获取的只是读leader本地log时的HW,并不能保证是最新的HW)。但是leader和follower的HW是不同步的,follower处记的HW可能会落后于leader。

Hight Watermark Checkpoint
由于HW是随时变化的,如果即时更新到Zookeeper,会带来效率的问题。而HW是如此重要,因此需要持久化,ReplicaManager就启动了单独的线程定期把所有的partition的HW的值记到文件中,即做highwatermark-checkpoint。

3.4.4、Log End Offset(LEO)
这个很好理解,就是当前的最新日志写入(或者同步)位置。

4、Kafka客户端

Kafka支持JVM语言(java、scala),同是也提供了高性能的C/C++客户端,和基于librdkafka封装的各种语言客户端。

5、Kafka的offset管理

kafka读取消息其实是基于offset来进行的,如果offset出错,就可能出现重复读取消息或者跳过未读消息。在0.8.2之前,kafka是将offset保存在ZooKeeper中,但是我们知道zk的写操作是很昂贵的,而且不能线性拓展,频繁的写入zk会导致性能瓶颈。所以在0.8.2引入了Offset Management,将这个offset保存在一个 compacted kafka topic(_consumer_offsets),Consumer通过发送OffsetCommitRequest请求到指定broker(偏移量管理者)提交偏移量。这个请求中包含一系列分区以及在这些分区中的消费位置(偏移量)。偏移量管理者会追加键值(key-value)形式的消息到一个指定的topic(__consumer_offsets)。key是由consumerGroup-topic-partition组成的,而value是偏移量。同时为了提供性能,内存中也会维护一份最近的记录,这样在指定key的情况下能快速的给出OffsetFetchRequests而不用扫描全部偏移量topic日志。如果偏移量管理者因某种原因失败,新的broker将会成为偏移量管理者并且通过扫描偏移量topic来重新生成偏移量缓存。

5.1、如何查看消费偏移量

0.9版本之前的Kafka提供了kafka-consumer-offset-checker.sh脚本,可以用来查看某个消费组对一个或者多个topic的消费者消费偏移量情况,该脚本调用的是kafka.tools.Consumer.OffsetChecker。0.9版本之后已不再建议使用该脚本了,而是建议使用kafka-consumer-groups.sh脚本,该脚本调用的是kafka.admin.ConsumerGroupCommand。这个脚本其实是对消费组进行管理,不只是查看消费组的偏移量。这里只介绍最新的kafka-consumer-groups.sh脚本使用。

用ConsumerGroupCommand工具,可以使用list,describe,或delete消费者组。

例如要列出所有主题中的所有消费组信息,使用list参数:
$ bin/kafka-consumer-groups.sh --bootstrap-server broker1:9092 --list
test-consumer-group

要查看某个消费组当前的消费偏移量则使用describe参数:
$ bin/kafka-consumer-groups.sh --bootstrap-server broker1:9092 --describe --group test-consumer-group
GROUP                          TOPIC                          
test-consumer-group            test-foo    

PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             OWNER                  
0          1               3               2               consumer-1_/127.0.0.1

提示:该脚本只支持删除不包括任何消费组的消费组,而且只能删除消费组为老版本消费者对应的消费组(即分组元数据存储在zookeeper的才有效),因为这个脚本删除操作的本质就是删除ZK中对应消费组的节点及其子节点而已。

5.2、如何管理消费偏移量

上面介绍了通过脚本工具方式查询Kafka消费偏移量。事实上也可以通过API的方式查询消费偏移量。

Kafka消费者API提供了两个方法用于查询消费者消费偏移量的操作:
1)committed(TopicPartition partition):该方法返回一个OffsetAndMetadata对象,通过它可以获取指定分区已提交的偏移量。

2)position(TopicPartition partition):该方法返回下一次拉取位置的position。

除了查看消费偏移量,有些时候我们需要人为的指定offset,比如跳过某些消息,或者redo某些消息。在0.8.2之前,offset是存放在ZK中,只要用ZKCli操作ZK就可以了。但是在0.8.2之后,offset默认是存放在kafka的__consumer_offsets队列中,只能通过API修改了。

Kafka消费者API提供了重置消费偏移量的方法:
1)seek(TopicPartition partition, long offset):该方法用于将消费起始位置重置到指定的偏移量位置。

2)seekToBeginning():从消息起始位置开始消费,对应偏移量重置策略:auto.offset.reset=earliest。

3)seekToEnd():从最新消息对应的位置开始消费,也就是说等待新的消息写入后才开始拉取,对应偏移量重置策略是:auto.offset.reset=latest。

当然前提得知道要重置的offset的位置。一种方式就是根据时间戳获取对应的offset再seek过去。


最新版本:2.8
Apache Kafka 2.8.0 已于2021年4月中旬正式发布,本次更新的显著变化如下:
增加描述群组 API;
支持 SASL_SSL 监听器上的相互 TLS 认证;
JSON 请求/响应调试日志;
限制 broker 连接创建率;
主题标识符;
在 Connect REST API 中公开任务配置;
早期访问用一个自我管理的 Quorum 取代 ZooKeeper;
更新 Streams FSM,以表明错误状态的含义;
扩展 StreamJoined 以允许更多的 store 配置;
更加方便的 TopologyTestDriver construtors;
引入Kafka-Stream特有的未捕获异常 handler;
启动和关闭 Streams 线程的 API;
改进 TimeWindowedDeserializer 和 TimeWindowedSerde,以处理窗口尺寸;
改进 Kafka stream 中的超时和重试;
更多详情可查看此处

最新版本:3.0
Apache Kafka 3.0.0 正式于2021年9月下旬发布,这是一个重要的版本更新,其中包括许多新的功能:
对 Java 8 和 Scala 2.12 的支持已被废弃
Kafka Raft 支持元数据主题的快照,以及 self-managed quorum 方面的其他改进
废弃了消息格式 v0 和 v1
默认情况下为 Kafka Producer 启用更强的交付保证
优化了 OffsetFetch 和 FindCoordinator 请求
更灵活的 MirrorMaker 2 配置和 MirrorMaker 1 的废弃。
能够在 Kafka Connect 的一次调用中重新启动连接器的任务
连接器日志上下文和连接器客户端覆盖现在是默认启用的
增强了 Kafka Streams 中时间戳同步的语义
修改了 Stream 的 TaskId 的公共 API
在 Kafka Streams 中,默认的 serde 变成了 null,还有一些其他的配置变化


项目主页:https://kafka.apache.org/