看完这篇文章,我还是不了解Kafka,给榴莲跪下。
Kafka整体数据流程如下:
一般的用法是Producers将消息写入Brokers中的指定Topic,Consumers从Brokers中拉取指定Topic的消息,然后进行业务处理。图中有两个Topic,Topic0有两个Partition,Topic1有1个Partition,备份了三份。
可以看到Consumer Gourp1中的Consumer2并没有分配给Partition处理。这是可能的,这将在下面讨论。 Brokers、Topics、Partitions的一些元信息都存储在ZK中,ZK也用于监控和路由。
———————————— 生产————————————
基本流程是这样的:
创建记录。每条记录必须指定相应的主题和值。键和分区是可选的。
先序列化,然后根据Topic和Partition放入对应的发送队列。 Kafka Produce是批量请求,会累积一批,然后一起发送。网络数据包将立即发送,而不是调用send()。
如果不填写Partition,情况会是这样:
填写Key,根据Key进行Hash,同一个Key到一个Partition。 (如果Partition数量扩大,则无法保证。)
键未填写。循环选择分区。
这些要发送到同一个Partition的请求根据配置保存在一个wave中,然后由一个单独的线程一次性发送给它们。
应用程序编程接口
有一个高级API 可以为我们做很多事情,包括偏移和路由。使用起来非常简单。还有Simple API、Offset等,需要我们自己记录。 (注:消费消息时,首先要知道消费到哪里,即路由,消费完后,必须记录消费顺序的位置,即Offset)
分割
当存在多个副本时,我们会尽力将多个副本分配给不同的Broker。 Kafka会为该Partition选出一个Leader,之后对该Partition的所有请求实际上都会由该Leader进行操作,然后同步到其他Followers。
那么就涉及到两个细节:
如何分配分区
如何选择领导者
关于Partition的分配和Leader的。。,都必须有一个执行者。在Kafka中,这个执行者被称为Controller。 Kafka使用ZK在Broker中。。一个Controller来进行Partition分配和Leader。。。
分区分配:
对所有Broker(假设总共有n 个Broker)和要分配的Partition 进行排序。
将第i 个分区分配给第(i mod n) 个Broker(这是Leader)。
将第i 个分区的第j 个副本分配给第((i + j) 模式n) 个Broker。
领导者灾难恢复
Controller会在ZK的/brokers/ids节点上注册Watch,这样一旦有Broker宕机了它就会知道。当Broker 宕机时,Controller 将为受影响的Partition 。。一个新的Leader。
Controller从ZK的/brokers/topics/[topic]/partitions/[partition]/state中读取该Partition对应的ISR(in-syncreplica同步副本)列表,并选出一个作为leader。选择Leader后,更新ZK,然后向受影响的Brokers发送LeaderAndISRRequest,让他们知道更改。
为什么不使用ZK通知而是直接向Broker发送RPC请求?我的理解可能是ZK存在性能问题。如果ISR列表为空,那么会根据配置随机选择一个Replica作为Leader,或者Partition只是一个休息的机器;如果ISR列表中有一台机器,但它也被关闭,您仍然可以等待ISR机器恢复生机。
多副本同步
这里的策略以及服务器端的处理是,Follower从Leader批量拉取数据进行同步。但具体的可靠性是由制造商决定的。生产者生产消息时,通过request.required.acks 参数设置数据的可靠性。
当Acks=-1时,如果ISR小于min.insync.replicas指定的数量,将返回不可用。
这里ISR列表中的机器将会改变。根据配置replica.lag.time.max.ms,如果长时间不同步,就会从ISR列表中删除。过去,ISR 会根据落后的消息数量而被踢出。 1.0版本后去掉了这个,因为这个值很难获取,高峰期节点很容易不断进入和退出ISR列表。当Leader从ISA中选出后,Follower会删除其日志中之前高水位线之后的记录,然后去Leader那里获取新的数据。
因为新的Leader选出后,Follower上的数据可能会比新的Leader多,所以需要进行拦截。这里高水位的含义,对于Partition和Leader来说,是所有ISR中的最新记录。消费者能读到的最好的结果就是高水位线。
从Leader的角度来看,高水位线更新将推迟一轮。例如,如果写入了一条新消息,ISR中的所有Brokers都已获取它,但ISR中的Brokers只能在下一轮Fetch中告诉Leader。正是因为这个高水位线延迟了一轮,在某些情况下,Kafka会丢失数据,导致主备数据不一致。从0.11开始,将使用Leader Epoch代替高水位线。
思考:当Acks=-1时
Follwers 应该来Fetch 并返回成功,还是应该等待Follwers 的第二轮Fetch?
Leader已经在本地写好了,但是ISR中的一些机器出现了故障。我应该怎么办?
———————————— 消耗————————————
订阅Topic是基于消费者组的。一个消费者组中可以有多个消费者。同一个消费组中的两个消费者不会同时消费一个Partition。换句话说,它是一个Partition,只能被消费者组中的一个消费者消费,但可以同时被多个消费者组消费。
因此,如果消费者组中的消费者数量多于Partition,那么就会有部分消费者一直处于空闲状态。
应用程序编程接口
订阅主题时,可以使用正则表达式。如果有新的Topic 匹配,则会自动订阅。
保存偏移量
当一个消费组消费Partition时,需要保存Offset来记录消费位置。之前是存储在ZK中的。由于ZK的写入性能较差,之前的解决方案是Consumer每分钟上报一次。这里ZK的性能严重影响了消费的速度,很容易出现重复消费的情况。 0.10版本之后,Kafka将offset存储从ZK中分离出来,存储在一个名为consumeroffsets topic的Topic中。
消息中写入的Key由Groupid、Topic、Partition组成,Value为偏移量Offset。 Topic配置的清理策略是Compact。始终保留最新的Key 并删除其余的。正常情况下,每个Key的偏移量都会缓存在内存中。查询时不需要遍历Partition。如果没有缓存,则第一次遍历Partition建立缓存,然后返回查询。
确定Consumer Group位移信息写入consumers_offsets的哪个Partition。具体计算公式为:
__consumers_offsets partition=Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount) //groupMetadataTopicPartitionCount 由offsets.topic.num.partitions 指定。默认为50 个分区。
思考:如果运行的服务修改了offsets.topic.num.partitions,Offset的保存会不会乱了?
分配分区—重新平衡
在生产过程中,Broker必须分配Partition,在消费过程中,也必须将Partition分配给消费者。与从Broker中选择Controller类似,消费者也需要从Broker中选择Coordinator来分配Partition。
让我们从上到下逐一解释:
如何选择协调员
交互过程
再平衡过程
Select Coordinator:查看Offset保存在哪个Partition; Partition Leader 所在的Broker 就是选定的Coordinator。
这里我们可以看到Consumer Group的Coordinator和保存Consumer Group Offset的Partition Leader是同一台机器。
交互过程:选择协调员后,就到了分配的时间。整个过程是这样的:
当Consumer启动或者Coordinator宕机时,Consumer会随意请求一个Broker,发送ConsumerMetadataRequest请求。
Broker会按照上面提到的方法选择这个Consumer对应的Coordinator的地址。
消费者向协调者发送心跳请求。如果返回IllegalGeneration,则说明该Consumer的信息是旧的,需要重新加入进行Reblance。
如果返回成功,Consumer将从最后分配的Partition继续执行。
再平衡过程:
消费者向协调者发送JoinGroupRequest 请求。
这时,当其他消费者发送Heartbeat请求时,Coordinator就会告诉他们需要Reblance。
其他消费者发送JoinGroupRequest 请求。
当所有注册的Consumer都发送了JoinGroupRequest请求后,Coordinator会在这里的Consumer中随机选择一个Leader。
然后返回JoinGroupRespone,它会告诉Consumer你是Follower还是Leader。对于Leader来说,Follower的信息也会被带到它那里,以便它可以根据这些信息来分配Partition。
Consumer向Coordinator发送SyncGroupRequest,Leader的SyncGroupRequest将包含分配信息。
协调者返回数据包并告诉消费者(包括领导者)有关分配的信息。
当分区或消费者数量发生变化时,必须进行Reblance。
列出Reblance发生的情况:
添加分区
增加消费者
消费者主动关闭
消费者下降
协调器本身已关闭
———————————— 消息传递语义————————————
Kafka支持3种消息传递语义:
最多一次:最多一次,消息可能会丢失,但不会重复。
至少一次:至少一次,消息不会丢失,并且可以重复。
Exactly Once:仅一次,消息不丢失、不重复,仅消费一次(0.11中实现,仅限下游和Kafka)
在商业中,经常使用至少一次模型。如果需要重入,业务通常会自行实现。
1. 至少一次
先获取数据,再进行业务处理。业务处理成功后,Commit Offset:
生产者异常生产消息。不确定消息是否写入成功。如果重做,可能会写入重复的消息。
消费者处理消息。业务处理成功后,offset更新失败。如果消费者重新启动,则会重复消费。
2. 最多一次
首先获取数据,然后Commit Offset,最后进行业务处理:
如果生产者异常产生消息,它并不关心。如果它产生下一条消息,则该消息将丢失。
消费者处理消息时,先更新Offset,然后再进行业务处理。如果业务处理失败,消费者重启,消息丢失。
3. 恰好一次
思路是这样的:首先保证消息不丢失,然后保证不重复。因此,重点关注“至少一次”的原因。
我首先想到的是:
生产者重做会导致写入重复消息:生产保证幂等性。
消费者重复消费:杜绝重复消费没问题,或者业务接口保证幂等重复消费。
由于Kafka无法保证业务接口是否幂等,因此Kafka提供的Exactly once是有限的,消费者的下游也必须是Kafka。因此,在下面的讨论中,如果没有特别说明,消费者的下游系统都是Kafka(注:使用的是Kafka Conector,适配了部分系统,实现了Exactly Once)。生产者幂等性很容易实现,没有任何问题。
解决重复消费有两种方法:
下游系统保证幂等性,重复消费不会产生多条记录。
将Commit Offset 和业务处理绑定到一个事务中。
本来,第1 点只执行一次就可以了。但在某些使用场景下,我们的数据源可能是多个Topic,经过处理后会输出到多个Topic。在这种情况下,我们希望所有输出都成功,或者全部失败。这需要事务性的实现。既然要做交易,那么不妨从源头上解决重复消费的问题,将Commit Offset 和输出到其他Topic 绑定到一笔交易中。
4.产生幂等性
其思想是为每个Producer分配一个Pid作为Producer的唯一标识符。生产者将为每个维护一个单调递增的Seq。同样,Broker也会记录每条记录的最新Seq。
Broker只会在req_seq==broker_seq+1时接受消息,因为:
当消息的Seq大于Broker的Seq时,说明中间还有数据没有写入,即乱序。
如果消息的Seq不小于Broker的Seq,则说明消息已经被保存。
5. 事务/原子广播
场景是这样的:
首先从多个源主题获取数据。
做业务处理,写入多个下游主题。
更新多个源主题的偏移量。
其中第2点和第3点,作为一笔交易,要么全部成功,要么全部失败。这是因为Offset 实际上是使用特殊的Topic 来保存的。这两点结合起来就是编写多个Topic的事务处理。
基本思想是这样的:
引入Tid(交易ID)。与Pid不同的是,这个ID是由应用程序提供的,用于标识交易。制作人是谁并不重要。
即任何Producer都可以使用这个Tid来做交易,这样中途死亡的交易就可以被另一个Producer恢复。
同时,为了记录事务的状态,与Offset的处理类似,引入了Transaction Coordinator来记录Transaction Log。
集群中会存在多个Transaction Coordinator,每个Tid对应一个唯一的Transaction Coordinator。
注意:事务日志删除策略是紧凑的。已完成的交易将被标记为Null,并且在Compact 后不会保留。
做交易时,首先标记交易并写入数据。如果全部成功,准备提交状态将记录在事务日志中。否则,将写入“准备中止”状态。然后向每个相关的Partition写入Marker(Commit或Abort)消息,将事务的Message标记为可读或已放弃。成功后,Commit/Abort 状态会记录在事务日志中,事务结束。
数据流:
首先使用Tid请求任意Broker(代码中写了负载最小的Broker),找到对应的Transaction Coordinator。
请求Transaction Coordinator获取对应的Pid以及该Pid对应的Epoch。这个Epoch是用来防止僵尸进程复活造成的消息混乱。
当消息的Epoch小于当前维护的Epoch时,将会被拒绝。 Tid和Pid之间是一一对应的,所以对于同一个Tid,会返回相同的Pid。
客户端首先请求事务协调器记录的事务状态。初始状态为开始。如果是最先到达的交易,则同时进行交易计时。
Client向相关Partition输出数据; Client然后请求交易协调器记录Offset的交易状态; Client向相应的Offset Partition发送Offset Commit。
Client发送Commit请求,Transaction Coordinator记录Prepare Commit/Abort,然后将Marker发送到相关Partition。
全部成功后,记录Commit/Abort的状态。最后一条记录不需要等待其他Replica的ACK,因为在不丢失Prepare的情况下可以保证最终的正确性。
这里的Prepare状态主要用于事务恢复。例如,向相关Partition发送控制消息,在发送之前就会崩溃。备用启动后,Producer在发送获取Pid的请求时就会完成未完成的事务。
当Commit Marker被写入Partition时,就可以读取到相关的消息。因此,Kafka事务消息是从Prepare Commit到Commit这段时间逐渐可见的,而不是同时可见的。
6. 消费者事务
我们都从生产的角度来看待事物。还有一些问题需要从消费角度考虑。消费时,Partition中会有一些消息处于未提交状态,即业务方不应该看到的消息。这些消息需要被过滤以防止企业看到它们。 Kafka选择在消费者进程中处理它们,而不是在Broker中过滤它们。主要考虑的是性能。
Kafka高性能的一个关键点就是零拷贝。如果Broker中需要进行过滤,那么消息内容就必须读入内存,零拷贝特性就会丢失。
———————————— 文件组织————————————
Kafka数据实际上以文件的形式存储在文件系统中。 Topic下有Partitions,Partition下有Segments。 Segment是实际的文件,Topic和Partition都是抽象概念。
/partitionid}/目录下,存放的是实际的Log文件(即Segment),以及对应的索引文件。每个Segment文件大小相等。文件名以该Segment中最小的Offset命名。文件扩展名是.log。 Segment对应的索引的文件名是一样的,扩展名为.index。
有两个索引文件:
一种是Offset Index,用于通过Offset来检查Message。
一种是时间索引,用于根据时间来查。事实上,这些都是可以优化和组合的。下面,我们只讲Offset Index。
总体组织如下:
为了减小索引文件的大小,减少空间占用,并且方便直接加载到内存中,这里的索引使用了稀疏矩阵。没有记录每条消息的具体位置。相反,每隔一定数量的字节创建一个索引。
该索引包含两部分:
BaseOffset:表示该索引对应Segment文件中的哪条Message。这有利于使用数值压缩算法来节省空间。例如,Kafka 使用Varint。
位置:Segment 中的绝对位置。
在查找Offset对应的记录时,我们会先用二分法找出对应的Offset在哪个Segment,然后利用索引定位该Offset在Segment中的大致位置,然后遍历查找信息。
———————————— 常用配置项————————————
1. 经纪商配置
2.主题配置
用户评论
哈哈哈哈,说的太对了!我也是看了很多Kafka资料之后才终于明白这东西到底是怎么用的。之前就一脸懵逼,感觉像是在看天书一样...
有16位网友表示赞同!
其实我觉得这个标题写的有点夸张啊,看完这篇博客,我至少对Kafka有一个基本的理解了。当然啦,想要深入了解还得多加练习才行。
有12位网友表示赞同!
我刚开始学编程的时候就遇到过Kafka,当时一头雾水,后来查资料才知道原来是用来异步处理信息的,现在用起来还挺方便的呢!
有12位网友表示赞同!
我也觉得这篇博文讲得还不错,把Kafka的原理解释得很清楚,特别是那部分关于topic和partition的内容,我之前一直搞不明白。
有14位网友表示赞同!
看得我一头雾水啊!我不是很理解博主说的那些技术细节,感觉跟我的认知还是有点距离呢...
有18位网友表示赞同!
这篇博文虽然能科普Kafka的基本概念,但对于一些更深入的应用场景和配置策略就不太详细了。我觉得可以再多补充一些实战经验就更好了。
有16位网友表示赞同!
我已经开始练习用Kafka了,感觉这东西确实很有潜力!尤其是处理海量的实时数据方面的应用,真是强大 banget!
有14位网友表示赞同!
(这个博客) 真的救了我!本来以为 Kafka 是完全无解的,看了这篇文档之后终于有点眉目了...
有6位网友表示赞同!
说实话,我感觉 Kafka 要学的也太多了吧!从消息订阅、 生产者消费者到集群管理,每一部分都是知识量相当庞大
有15位网友表示赞同!
我觉得 Kafka 用来处理一些特定场景下的数据流动还是挺实用的,但是如果只是初学者想快速入门,我建议先从其他的轻量的消息队列工具开始了解一下,等对架构有了基本的理解再学习 Kafka 可能会更容易些。
有6位网友表示赞同!
博主说得对啊,看完这篇博客我就仿佛明白了 Kafka 的精髓! 强烈推荐给大家!
有8位网友表示赞同!
Kafka 也太复杂了,我现在还是一头雾水... 感觉自己距离成为 Kafka 之父还有很长的路要走....
有19位网友表示赞同!
我学习 Kafka 主要是因为工作需求,感觉这个东西确实能提高数据传输和处理效率。这篇博文给我了一些启发,但我还是需要继续深入学习才能更熟练运用它。
有17位网友表示赞同!