前言
消息队列的精髓与灵魂从理论、抽象层面泛泛的谈了下关于消息队列的一些思想和理念。
消费模型
- 队列模型,如果有多个生产者往同一个队列里面发送消息,这个队列中可以消费到的消息,就是这些生产者生产的所有消息的合集。消息的顺序就是这些生产者发送消息的自然顺序。如果有多个消费者接收同一个队列的消息,这些消费者之间实际上是竞争的关系,每个消费者只能收到队列中的一部分消息,也就是说任何一条消息只能被其中的一个消费者收到。
- 在发布 - 订阅模型中,消息的发送方称为发布者(Publisher),消息的接收方称为订阅者(Subscriber),服务端存放消息的容器称为主题(Topic)。发布者将消息发送到主题中,订阅者在接收消息之前需要先“订阅主题”。每份订阅中,订阅者都可以接收到主题的所有消息。
最大的区别其实就是,一份消息数据能不能被消费多次的问题。在发布 - 订阅模型中,如果只有一个订阅者,那它和队列模型就基本是一样的了。也就是说,发布 - 订阅模型在功能层面上是可以兼容队列模型的。RabbitMQ 采用的是队列模型,但是它一样可以实现发布 - 订阅的功能。RocketMQ 和 Kafka 采用的是发布 - 订阅模型,并且二者的消息模型是基本一致的,都有topic/consumer group等概念,kafka 的partition=rocketmq的queue。
但业务模型不等于就是实现层面的模型。比如说 MySQL 和 Hbase 同样是支持 SQL 的数据库,它们的业务模型中,存放数据的单元都是“表”,但是在实现层面,没有哪个数据库是以二维表的方式去存储数据的,MySQL 使用 B+ 树来存储数据,而 HBase 使用的是 KV 的结构来存储。同样,像 Kafka 和 RocketMQ 的业务模型基本是一样的,并不是说他们的实现就是一样的,实际上这两个消息队列的实现是完全不同的。
可靠性
几乎所有的消息队列产品都使用一种非常朴素的“请求 - 确认”机制,确保消息不会在传递过程中由于网络或服务器故障丢失。
- 在生产端,生产者先将消息发送给服务端,也就是 Broker,服务端在收到消息并将消息写入主题或者队列中后,会给生产者发送确认的响应。如果生产者没有收到服务端的确认或者收到失败的响应,则会重新发送消息;
- 在消费端,消费者在收到消息并完成自己的消费业务逻辑(比如,将数据保存到数据库中)后,也会给服务端发送消费成功的确认,服务端只有收到消费确认后,才认为一条消息被成功消费,否则它会给消费者重新发送这条消息,直到收到对应的消费成功确认。在编写消费代码时需要注意的是,不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。
- 存储阶段:如果 Broker 是由多个节点组成的集群,需要将 Broker 集群配置成:至少将消息发送到 2 个以上的节点,再给客户端回复发送确认响应。
- 在 RocketMQ 中,复制的基本单位是 Broker,也就是服务端的进程。
- Kafka 中,复制的基本单位是分区。每个分区的几个副本之间,构成一个小的复制集群,Broker 只是这些分区副本的容器,所以 Kafka 的 Broker 是不分主从的。
引入这个机制在消费端带来了一个不小的问题。为了确保消息的有序性,在某一条消息被成功消费之前,下一条消息是不能被消费的。因此在扩容 Consumer 的实例数量的同时,必须同步扩容主题中的分区(也叫队列)数量,确保 Consumer 的实例数和分区数量是相等的。如果 Consumer 的实例数量超过分区数量,这样的扩容实际上是没有效果的。因为对于消费者来说,在每个分区上实际上只能支持单线程消费。kafka在消费时,ConsumerGroup 中的每个 Consumer 独占一个或多个 Partition(分区);
RocketMQ在搜狐的创新实践由于某次机器故障,导致 Kafka 集群发生故障转移,而不幸的是,这个集群的分区数过多,导致转移耗时几分钟才完成。进而导致业务线程阻塞,服务进入无响应状态。而之后了解到 RocketMQ 即使某个 broker 宕机,消息会发送到其他 broker,不会产生整个集群阻塞情况
事务消息
订单系统创建订单后,发消息给购物车系统,将已下单的商品从购物车中删除。因为从购物车删除已下单商品这个步骤,并不是用户下单支付这个主要流程中必需的步骤,使用消息队列来异步清理购物车是更加合理的设计。如何用消息队列来实现分布式事务?RocketMQ 中的事务确保执行本地事务和发消息这两个操作,要么都成功,要么都失败。
- 订单系统在消息队列上开启一个事务。然后订单系统给消息服务器发送一个“半消息”,这个半消息不是说消息内容不完整,它包含的内容就是完整的消息内容,半消息和普通消息的唯一区别是,在事务提交之前,对于消费者来说,这个消息是不可见的。
- 半消息发送成功后,订单系统就可以执行本地事务了,在订单库中创建一条订单记录,并提交订单库的数据库事务。然后根据本地事务的执行结果决定提交或者回滚事务消息。如果订单创建成功,那就提交事务消息,购物车系统就可以消费到这条消息继续后续的流程。如果订单创建失败,那就回滚事务消息,购物车系统就不会收到这条消息。
如果在第四步提交事务消息时失败了怎么办?对于这个问题,Kafka 和 RocketMQ 给出了 2 种不同的解决方案。
- Kafka 的解决方案比较简单粗暴,直接抛出异常,让用户自行处理。我们可以在业务代码中反复重试提交,直到提交成功,或者删除之前创建的订单进行补偿。
- RocketMQ增加了事务反查的机制来解决事务消息提交失败的问题。在提交或者回滚事务消息时发生网络异常,RocketMQ 的 Broker 没有收到提交或者回滚的请求,Broker 会定期去 Producer 上反查这个事务对应的本地事务的状态(http或rpc回调?),然后根据反查结果决定提交或者回滚这个事务。
为了解决分布式事务问题,Kafka 引入了事务协调者这个角色,负责在服务端协调整个事务。这个协调者并不是一个独立的进程,而是 Broker 进程的一部分,协调者和分区一样通过选举来保证自身的可用性。和 RocketMQ 类似,Kafka 集群中也有一个特殊的用于记录事务日志的主题,这个事务日志主题的实现和普通的主题是一样的,里面记录的数据就是类似于“开启事务”“提交事务”这样的事务日志。
当我们开启事务的时候,生产者会给协调者发一个请求来开启事务,协调者在事务日志中记录下事务 ID。然后,生产者在发送消息之前,还要给协调者发送请求,告知发送的消息属于哪个主题和分区,这个信息也会被协调者记录在事务日志中。接下来,生产者就可以像发送普通消息一样来发送事务消息,这里和 RocketMQ 不同的是,RocketMQ 选择把未提交的事务消息保存在特殊的队列中,而 Kafka 在处理未提交的事务消息时,和普通消息是一样的,直接发给 Broker,保存在这些消息对应的分区中,Kafka 会在客户端的消费者中,暂时过滤未提交的事务消息。消息发送完成后,生产者给协调者发送提交或回滚事务的请求,由协调者来开始两阶段提交,完成事务。第一阶段,协调者把事务的状态设置为“预提交”,并写入事务日志。到这里,实际上事务已经成功了,无论接下来发生什么情况,事务最终都会被提交。之后便开始第二阶段,协调者在事务相关的所有分区中,都会写一条“事务结束”的特殊消息,当 Kafka 的消费者,也就是客户端,读到这个事务结束的特殊消息之后,它就可以把之前暂时过滤的那些未提交的事务消息,放行给业务代码进行消费了。最后,协调者记录最后一条事务日志,标识这个事务已经结束了。
pulsar
- 早期的消息队列,主要被用来在系统之间异步交换数据,大部分消息队列的存储能力都比较弱,不支持消息持久化,不提倡在消息队列中堆积大量的消息,这个时期的消息队列,本质上是一个数据的管道。
- 现代的消息队列,功能上看似没有太多变化,依然是收发消息,但是用途更加广泛,数据被持久化到磁盘中,大多数消息队列具备了强大的消息堆积能力,本质上已经演变成为分布式的存储系统。
- 现有的流计算平台,包括 Storm、Flink 和 Spark,它们的节点都是无状态的纯计算节点,是没有数据存储能力的。而消息队列正好相反,它很好地保证了数据的可靠性、一致性,但是 Broker 只具备存储能力,没有计算的功能,数据流进去什么样,流出来还是什么样。,一个只能计算不能存储,一个只能存储不能计算,那未来如果出现一个新的系统,既能计算也能存储,如果还能有不错的性能,是不是就会把现在的消息队列和流计算平台都给替代了?这是很有可能的。对于一个“带计算功能的消息队列”来说,采用存储计算分离的设计,计算节点负责流计算,存储节点负责存储消息,这个设计就非常和谐了。
Pulsar 和其他消息队列最大的区别是,它采用了存储计算分离的设计。存储消息的职责从 Broker 中分离出来,交给专门的 BookKeeper 存储集群。这样 Broker 就变成了无状态的节点,在集群调度和故障恢复方面更加简单灵活。Pulsar 的客户端要读写某个主题分区上的数据之前,依然要在元数据中找到分区当前所在的那个 Broker,不一样的地方是,其他的消息队列,分区与 Broker 的对应关系是相对稳定的,只要不发生故障,这个关系是不会变的。而在 Pulsar 中,这个对应关系是动态的,它可以根据 Broker 的负载情况进行动态调整,而且由于 Broker 是无状态的,分区可以调整到集群中任意一个 Broker 上,这个负载均衡策略就可以做得非常简单并且灵活。如果某一个 Broker 发生故障,可以立即用任何一个 Broker 来替代它。
存储
Kafka 的存储以 Partition 为单位,每个 Partition 包含一组消息文件(Segment file)和一组索引文件(Index),并且消息文件和索引文件一一对应,具有相同的文件名(但文件扩展名不一样),文件名就是这个文件中第一条消息的索引序号。每个索引中保存索引序号(也就是这条消息是这个分区中的第几条消息)和对应的消息在消息文件中的绝对位置。在索引的设计上,Kafka 采用的是稀疏索引:为了节省存储空间,它不会为每一条消息都创建索引,而是每隔几条消息创建一条索引。
- 写入消息的时候非常简单,就是在消息文件尾部连续追加写入,一个文件写满了再写下一个文件。
- 查找消息时,首先根据文件名找到所在的索引文件,然后用二分法遍历索引文件内的索引,在里面找到离目标消息最近的索引,再去消息文件中,找到这条最近的索引指向的消息位置,从这个位置开始顺序遍历消息文件,找到目标消息。可以看到,寻址过程还是需要一定时间的。一旦找到消息位置后,就可以批量顺序读取,不必每条消息都要进行一次寻址。
RocketMQ 的存储以 Broker 为单位。它的存储也是分为消息文件和索引文件,但是在 RocketMQ 中,每个 Broker 只有一组消息文件,它把在这个 Broker 上的所有主题的消息都存在这一组消息文件中。索引文件和 Kafka 一样,是按照主题和队列分别建立的,每个队列对应一组索引文件,这组索引文件在 RocketMQ 中称为 ConsumerQueue。RocketMQ 中的索引是定长稠密索引:它为每一条消息都建立索引,每个索引的长度(注意不是消息长度)是固定的 20 个字节。
- 写入消息的时候,Broker 上所有主题、所有队列的消息按照自然顺序追加写入到同一个消息文件中,一个文件写满了再写下一个文件。
- 查找消息的时候,可以直接根据队列的消息序号,计算出索引的全局位置(索引序号 x 索引固定长度 20),然后直接读取这条索引,再根据索引中记录的消息的全局位置,找到消息。可以看到,这里两次寻址都是绝对位置寻址,比 Kafka 的查找是要快的。
异同
- 共通的地方,都是采用消息文件 + 索引文件的存储方式,索引文件的名字都是第一条消息的索引序号,索引中记录了消息的位置等等。
- 在消息文件的存储粒度上,Kafka 以分区为单位,粒度更细,优点是更加灵活,很容易进行数据迁移和扩容。RocketMQ 以 Broker 为单位,较粗的粒度牺牲了灵活性,带来的好处是,在写入的时候,同时写入的文件更少,有更好的批量(不同主题和分区的数据可以组成一批一起写入),更多的顺序写入,尤其是在 Broker 上有很多主题和分区的情况下,有更好的写入性能。
- 索引设计上,RocketMQ 和 Kafka 分别采用了稠密和稀疏索引,稠密索引需要更多的存储空间,但查找性能更好,稀疏索引能节省一些存储空间,代价是牺牲了查找性能。
「分」有两层解释,首先代表了模块和职责的分明,属于计算的逻辑应该封闭在计算模块,属于存储的逻辑应该下成到存储模块;第二层是计算和存储要支持分开部署,计算完全采用无状态的部署方式,存储是有状态的放式,来很好地解决在云上多租户场景面临的种种问题。「合」的前提是从代码设计上要先分开,至于是分开部署还是合并部署完全是业务的选择,新的架构必须要支持合并的部署形态,满足吞吐型的业务场景。 存储计算分离与就近计算和就近存储的理念是冲突的,但对多产品的业务形态(消息队列 RocketMQ、消息队列 Kafka、微消息队列 MQTT、消息队列 AMQP、消息服务 MNS、事件总线EventBridge)有诸多好处。PS:估计小厂用不着
- 比如计算节点共建,通过模型抽象支持多业务模型,多通信协议,释放重复建设的人力。通过存储节点并池,各产品打通内部存储节点,形成资源池合并,统一运维和管控,有助于降低成本、提高效率,加速存储创新,孵化消息中台。
- 存储集群足够抽象,满足通用的消息存取需求。计算集群多合一,足够的模块化,可插拔,满足多产品部署带来不同权限体系、不同协议、不同抽象模型等的需求。