消息队列二览
2023-05-07 15:21:37 阿炯

在计算机科学中,消息队列(英语:Message queue,也称消息中间件)是一种进程间通信或同一进程的不同线程间的通信方式,软件的贮列用来处理一系列的输入,通常是来自用户。消息队列提供了异步的通信协议,每一个贮列中的纪录包含详细说明的资料,包含发生的时间,输入设备的种类,以及特定的输入参数,通信地讲:消息的发送者和接收者不需要同时与消息队列交互。消息会保存在队列中,直到接收者取回它。

消息队列是大型分布式系统不可缺少的中间件,也是高并发系统的基础组件,所以掌握好消息队列MQ就变得极其重要。本文简述了消息队列及其应用场景与选型。实际上,消息队列常常保存在链表结构中,拥有权限的进程可以向消息队列中写入或读取消息。目前有很多开源的实现的消息队列,包括RabbitMQ、Qpid、RocketMQ、Kafka。

消息队列本身是异步的,它允许接收者在消息发送很长时间后再取回消息,这和大多数通信协议是不同的。例如WWW中使用的HTTP协议(HTTP/2之前)是同步的,因为客户端在发出请求后必须等待服务器回应。然而很多情况下需要异步的通信协议。比如一个进程通知另一个进程发生了一个事件,但不需要等待回应。但消息队列的异步特点,也造成了一个缺点,就是接收者必须轮询消息队列,才能收到最近的消息。和信号相比,消息队列能够传递更多的信息。与管道相比,消息队列提供了有格式的数据,这可以减少开发人员的工作量。但消息队列仍然有大小限制,消息队列除了可以当不同线程或进程间的缓冲外,更可以透过消息队列当前消息数量来侦测接收线程或进程性能是否有问题。

消息中间件的发展已经有近 40 年历史,早在上个世纪 80 年代就诞生了第一款消息队列 The Information Bus。90 年代 IBM、Oracle、Microsoft 纷纷推出自家的 MQ,但都是收费且闭源的产品,主要面向高端的企业用户,这些 MQ 一般都采用高端硬件,软硬件一体机交付,需要采购专门的维护服务,MQ 本身的架构是单机的架构,用户的自主性较差。进入新世纪后,随着技术成熟,开始讨论 MQ 的协议,诞生了 JMS、AMPQ 两大协议标准,随之分别有 ActiveMQ、RabbitMQ 的具体实现,并且是开源共建的,这使得这两款 MQ 在当时迅速流行开来,其使用门槛也随之降低,越来越多系统融入了 MQ 作为基础能力。

再后来互联网的爆发式发展,由于传统的消息队列无法承受亿级用户的访问流量和海量数据传输,诞生了互联网消息中间件,核心能力是全面采用分布式架构、具备很强的横向扩展能力,开源典型代表有 Kafka、RocketMQ、Pulsar。Kafka 的诞生还将消息中间件从 Messaging 领域延伸到了 Streaming 领域,从分布式应用的异步解耦场景延伸到大数据领域的流存储和流计算场景。Pulsar 更是在 Kafka 之后集大家之成,在企业级应用上做得更好,存储和计算分离的设计使得拓展更加轻松。

如今,IoT、云计算、云原生引领了新的技术趋势。面向 IoT 的场景,消息队列开始从云内服务端应用通信,延伸到边缘机房和物联网终端设备,支持 MQTT 等物联网标准协议也成了各大消息队列的标配,看到 Pulsar、Kafka、RocketMQ 都在努力跟随时代步伐,拓展自己在各种使用场景下的能力。

什么是消息队列

可以把消息队列看作是一个存放消息的容器,当需要使用消息的时候,直接从容器中取出消息供自己使用即可。由于队列 Queue 是一种先进先出的数据结构,所以消费消息时也是按照顺序来消费的。


参与消息传递的双方称为生产者和消费者 ,生产者负责发送消息,消费者负责处理消息。


发布与订阅(Pub/Sub)模型

操作系统中的进程通信的一种很重要的方式就是消息队列,这里提到的消息队列稍微有点区别,更多指的是各个服务以及系统内部各个组件/模块之前的通信,属于一种中间件。系统产生的行为不需要通过接口等方式来通知到相关服务,只需要发布一次消息,订阅者都能消费到消息,执行服务自身的本职工作。

维基百科是这样介绍中间件的:中间件(英语:Middleware),又译中间件、中介层,是一类提供系统软件和应用软件之间连接、便于软件各部件之间的沟通的软件,应用软件可以借助中间件在不同的技术架构之间共享信息与资源。中间件位于客户机服务器的操作系统之上,管理着计算资源和网络通信。简单来说:中间件就是一类为应用软件服务的软件,应用软件是为用户服务的,用户不会接触或者使用到中间件。除了消息队列之外,常见的中间件还有 RPC 框架、分布式组件、HTTP 服务器、任务调度框架、配置中心、数据库层的分库分表工具和数据迁移工具等等。

随着分布式和微服务系统的发展,消息队列在系统设计中有了更大的发挥空间,使用消息队列可以降低系统耦合性、实现任务异步、有效地进行流量削峰,是分布式和微服务系统中重要的组件之一。早年间 MQ 一直被叫做消息队列,就可以定义为传递消息的容器,随着时代的发展,MQ 都在努力拓展出来越来越多的功能,越来越多需求加在其上,消息中间件的能力越来越强,应用的场景也越来越多,如果非要用一个定义来概括只能是抽象出来一些概念,概括为跨服务之间传递信息的软件。下文还会对模型进行较深的解析。

当然,一切收益都是有代价的,对于系统架构本身来说,会引入新组件,带来系统复杂度的提升,整体系统的可靠性也会是挑战,增加消息中间件的运维成本,还会带来整体系统一致性的问题。所以需要权衡自身系统是否有必要引入 MQ,能解决什么痛点,投入产出能否让组织满意,对于本身流量不大的系统来说,保持简单架构是皆大欢喜的事情,毕竟越简单越稳定,越耐用。

消息队列的作用

通常来说,使用消息队列能为我们的系统带来下面三点好处:
1).通过异步处理提高系统性能
2).削峰/限流
3).降低系统耦合性
4).实现分布式事务


通过异步处理提高系统性能(减少响应所需时间)

将用户的请求数据存储到消息队列之后就立即返回结果。随后,系统再对消息进行消费。可以把接口请求根据业务的时效性程度,将不紧急的处理逻辑生成消息、事件放到 MQ 当中,再由专门的系统处理该消息、事件;如日志上报、归档事件、数据推送、数据分析、触发策略、变更推荐、添加积分、发送通知消息等。

因为用户请求数据写入消息队列之后就立即返回给用户了,但是请求数据在后续的业务校验、写数据库等操作中可能失败。因此,使用消息队列进行异步处理之后,需要适当修改业务流程进行配合,比如用户在提交订单之后,订单数据写入消息队列,不能立即返回用户订单提交成功,需要在消息队列的订单消费者进程真正处理完该订单之后,甚至出库后,再通过电子邮件或短信通知用户订单成功,以免交易纠纷。这就类似我们平时手机订火车票和电影票。

削峰/限流
作为系统内部的一个消息池,抵抗洪峰,对后端服务起到保护作用。流量洪峰进来的时候,会转换为消息落到 MQ 当中,后端服务可以根据自己的处理能力来,流量不会直接冲击到后端服务,特别是落库、IO 等操作。即先将短时间高并发产生的事务消息存储在消息队列中,然后后端服务再慢慢根据自己的能力去消费这些消息,这样就避免直接把后端服务打垮掉。举例:在电子商务一些秒杀、促销活动中,合理使用消息队列可以有效抵御促销活动刚开始大量订单涌入对系统的冲击。如下图所示:


降低系统耦合性(解耦)

使用消息队列还可以降低系统耦合性。我们知道如果模块之间不存在直接调用,那么新增模块或者修改模块就对其他模块影响较小,这样系统的可扩展性无疑更好一些。生产者(客户端)发送消息到消息队列中去,接受者(服务端)处理消息,需要消费的系统直接去消息队列取消息进行消费即可而不需要和其他系统有耦合,这显然也提高了系统的扩展性。减少系统、模块之间直接对接带来的耦合,交互统一按 MQ 中消息的协议,按需生产和消费,耦合程度大大降低。

消息队列使用发布-订阅模式工作,消息发送者(生产者)发布消息,一个或多个消息接受者(消费者)订阅消息。消息发送者(生产者)和消息接受者(消费者)之间没有直接耦合,消息发送者将消息发送至分布式消息队列即结束对消息的处理,消息接受者从分布式消息队列获取该消息后进行后续处理,并不需要知道该消息从何而来。对新增业务,只要对该类消息感兴趣,即可订阅该消息,对原有系统和业务没有任何影响,从而实现网站业务的可扩展性设计。

消息接受者对消息进行过滤、处理、包装后,构造成一个新的消息类型,将消息继续发送出去,等待其他消息接受者订阅该消息。因此基于事件(消息对象)驱动的业务架构可以是一系列流程。为了避免消息队列服务器宕机造成消息丢失,会将成功发送到消息队列的消息存储在消息生产者服务器上,等消息真正被消费者服务器处理后才删除消息。在消息队列服务器宕机后,生产者服务器会选择分布式消息队列服务器集群中的其他服务器发布消息。

备注:不要认为消息队列只能利用发布-订阅模式工作,只不过在解耦这个特定业务环境下是使用发布-订阅模式的。除了发布-订阅模式,还有点对点订阅模式(一个消息只有一个消费者),比较常用的是发布-订阅模式,这两种消息模型是 JMS 提供的,AMQP 协议还提供了另外 5 种消息模型。

实现分布式事务

分布式事务的解决方案之一就是 MQ 事务。RocketMQ、Kafka、Pulsar、QMQ 都提供了事务相关的功能。事务允许事件流应用将消费,处理,生产消息整个过程定义为一个原子操作。

使用消息队列会带来哪些问题

系统可用性降低:系统可用性在某种程度上降低,为什么这样说呢?在加入 MQ 之前,你不用考虑消息丢失或者说 MQ 挂掉等等的情况,但引入 MQ 之后你就需要去考虑了!
系统复杂性提高:加入 MQ 之后,你需要保证消息没有被重复消费、处理消息丢失的情况、保证消息传递的顺序性等等问题!
一致性问题:上面讲了消息队列可以实现异步,消息队列带来的异步确实可以提高系统响应速度。但是,万一消息的真正消费者并没有正确消费消息怎么办?这样就会导致数据不一致的情况了!


消息模型

队列
一种是消息队列,生产者往队列写消息,消费者从这个队列消费消息,当然生产者可以是多个,消费者也可以是多个,但是一条消息只能被消费一次,具体怎么做的,这就涉及到具体的使用需求和每一款消息中间件的实现了,后面第二部分的时候会涉及到。这是最早的消息模型,这也是为什么消息队列 MQ 这个名字也一直有人在用吧。


订阅
后来上个世纪 80 年代有人提出发布订阅模式,就是 Topic 模式,生产者发布的消息,消息中间件会把消息投递给每一个订阅者,这个投递的过程有可能是推也可能是拉,支持哪一种也要看每一款的具体实现。


消息协议


常见的消息协议:


接下来举例 AMPQ 协议的生产、消费过程标准。

AMQP 协议

高级消息队列协议(Advanced Message Queuing Protocol),一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端 / 中间件不同产品,不同的开发语言等条件的限制。

生产消息


消费消息


MQTT 协议
MQTT (消息队列遥测传输) 是 ISO 标准 (ISO/IEC PRF 20922) 下基于发布 / 订阅范式的消息协议。它工作在  TCP/IP 协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计所的协议。MQTT 协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。  

其他协议
另外还有 STOMP、OpenMessaging 等,这里不做展开。当前市面上主流的消息中间件多是有自定义的协议发展起来的,如 Kafka 在最开始并不算是一个消息中间件,而是用于日志记录系统的一部分,所以并不是基于某种中间件消息协议来做的,而是基于 TCP/IP,根据自定义的消息格式,来传递日志消息,为满足对于消息丢失是有一定容忍度的;在后来逐步发展到可以支持正好一次(Exactly Once)语义,实际上是通过 At Least Once + 幂等性 = Exactly Once 。

将服务器的 ACK 设置为 -1,可以保证 Procedure 到 Broker 不会丢失数据即 At Least Once;相对的,服务器级别设置为 0,可以保证生产者发送消息只会发一次,即 At Most Once 语义但是,一些非常重要的消息,如交易数据,下游消费者要求消息不重不漏,即 Exactly Once,精准一次,在 0.11 版本之前,Kafka 是无能为力的,只能通过设置 ACK=-1,然后业务消费者自己去重。

0.11 版本之后,Kafka 引入了幂等性概念,Procedure 无论向 Broker 发送多少次消息,Broker 只会持久化一条:At Least Once + 幂等性 = Exactly Once。要启用幂等性,只需要将 Procedure 参数中的 enable.idempotence 设置为 True 即可,Kafka 的幂等性实现其实就是将原来在下游做的去重放在了数据上游。开启幂等性的 Procedure 在初始化的时候会分配一个 PID,发往同一个 Partition 的消息会带一个 Sequence Number,而 Broker 端会对做缓存,当相同主键消息提交时,Broker 只会持久化一条。  

基于这个理解看下 Kafka 的消息报文格式定义,

协议概要:


再展开看 Message 的定义:


基于 TCP/IP 协议,通过定义消息格式,在请求和响应中做可靠性保证。且随着发展在修改协议,比如 Timestamp 是为了增加时间索引,在 0.10.0 版本后增加的,用于根据时间戳快速查找特定消息的位移值,优化 Kafka 读取历史消息缓慢的问题。

Streaming、Eventing 场景下目前还没有看到有公认消息协议的出现。


消息队列技术选型
对比方向概要
吞吐量万级的 ActiveMQ 和 RabbitMQ 的吞吐量(ActiveMQ 的性能最差)要比十万级甚至是百万级的 RocketMQ 和 Kafka 低一个数量级。
可用性都可以实现高可用。ActiveMQ 和 RabbitMQ 都是基于主从架构实现高可用性。RocketMQ 基于分布式架构。 Kafka 也是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
时效性RabbitMQ 基于 Erlang 开发,所以并发能力很强,性能极其好,延时很低,达到微秒级,其他几个都是 ms 级。
功能支持Pulsar 的功能更全面,支持多租户、多种消费模式和持久性模式等功能,是下一代云原生分布式消息流平台。
消息丢失ActiveMQ 和 RabbitMQ 丢失的可能性非常低, Kafka、RocketMQ 和 Pulsar 理论上可以做到 0 丢失。


常见的消息队列实现

Kafka

Kafka 是 LinkedIn 开源的一个分布式流式处理平台,已经成为 Apache 顶级项目,早期被用来用于处理海量的日志,后面才慢慢发展成了一款功能全面的高性能消息队列。流式处理平台具有三个关键功能:
消息队列:发布和订阅消息流,这个功能类似于消息队列,这也是 Kafka 也被归类为消息队列的原因。
容错的持久方式存储记录消息流: Kafka 会把消息持久化到磁盘,有效避免了消息丢失的风险。
流式处理平台: 在消息发布的时候进行处理,Kafka 提供了一个完整的流式处理类库。

Kafka 是一个分布式系统,由通过高性能 TCP 网络协议进行通信的服务器和客户端组成,可以部署在在本地和云环境中的裸机硬件、虚拟机和容器上。

在 2.8 之前,Kafka 最被大家诟病的就是其重度依赖于 Zookeeper 做元数据管理和集群的高可用。之后引入了基于 Raft 协议的 KRaft 模式,不再依赖 Zookeeper,大大简化了 Kafka 的架构,可以以一种轻量级的方式来使用 Kafka。不过要提示一下:如果要使用 KRaft 模式的话,建议选择较高版本的 Kafka,因为这个功能还在持续完善优化中。Kafka 3.3.1 版本是第一个将 KRaft(Kafka Raft)共识协议标记为生产就绪的版本。

RocketMQ

RocketMQ 是阿里开源的一款云原生“消息、事件、流”实时数据处理平台,借鉴了 Kafka,已经成为 Apache 顶级项目。核心特性:
云原生:生与云,长与云,无限弹性扩缩,K8s 友好
高吞吐:万亿级吞吐保证,同时满足微服务与大数据场景。
流处理:提供轻量、高扩展、高性能和丰富功能的流计算引擎。
金融级:金融级的稳定性,广泛用于交易核心链路。
架构极简:零外部依赖,Shared-nothing 架构。
生态友好:无缝对接微服务、实时计算、数据湖等周边生态。

根据官网介绍:Apache RocketMQ 自诞生以来,因其架构简单、业务功能丰富、具备极强可扩展性等特点被众多企业开发者以及云厂商广泛采用。历经十余年的大规模场景打磨,RocketMQ 已经成为业内共识的金融级可靠业务消息首选方案,被广泛应用于互联网、大数据、移动互联网、物联网等领域的业务场景。

RabbitMQ


RabbitMQ 是采用 Erlang 语言实现 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)的消息中间件,它最初起源于金融系统,用于在分布式系统中存储转发消息。其被越来越多的人认可,这和它在易用性、扩展性、可靠性和高可用性等方面的卓著表现是分不开的。具体特点可以概括为以下几点:
可靠性:使用一些机制来保证消息的可靠性,如持久化、传输确认及发布确认等。
灵活的路由:在消息进入队列之前,通过交换器来路由消息。对于典型的路由功能,RabbitMQ 己经提供了一些内置的交换器来实现。针对更复杂的路由功能,可以将多个交换器绑定在一起,也可以通过插件机制来实现自己的交换器。
扩展性:多个 RabbitMQ 节点可以组成一个集群,也可以根据实际业务情况动态地扩展集群中节点。
高可用性:队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队列仍然可用。
支持多种协议:除了原生支持 AMQP 协议,还支持 STOMP、MQTT 等多种消息中间件协议。
多语言客户端:几乎支持所有常用语言,比如 Java、Python、Ruby、PHP、C#、JavaScript 等。
易用的管理界面:提供了一个易用的用户界面,使得用户可以监控和管理消息、集群中的节点等。在安装的时候就会安装好就自带管理界面。
插件机制:提供了许多插件,以实现从多方面进行扩展,当然也可以编写自己的插件。有点类似 Dubbo 的 SPI 机制。

Pulsar

Pulsar 是下一代云原生分布式消息流平台,最初由 Yahoo 开发 ,已经成为 Apache 顶级项目。集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性,被看作是云原生时代实时消息流传输、存储和计算最佳解决方案。关键特性如下:
是下一代云原生分布式消息流平台。
Pulsar 的单个实例原生支持多个集群,可跨机房在集群间无缝地完成消息复制。
极低的发布延迟和端到端延迟。
可无缝扩展到超过一百万个 topic。
简单的客户端 API,支持 Java、Go、Python 和 C++。
主题的多种订阅模式(独占、共享和故障转移)。
通过 Apache BookKeeper 提供的持久化消息存储机制保证消息传递 。
由轻量级的 serverless 计算框架 Pulsar Functions 实现流原生的数据处理。
基于 Pulsar Functions 的 serverless connector 框架 Pulsar IO 使得数据更易移入、移出 Apache Pulsar。
分层式存储可在数据陈旧时,将数据从热存储卸载到冷/长期存储(如 S3、GCS)中。


小结:
ActiveMQ 的社区算是比较成熟,但是较目前来说,其性能比较差,而且版本迭代很慢,不推荐使用,已经被淘汰了。
RabbitMQ 在吞吐量方面虽然稍逊于 Kafka 、RocketMQ 和 Pulsar,但它基于 Erlang 开发,所以并发能力很强,性能极其好,延时很低,达到微秒级。但是也因其基于 Erlang 开发,所以国内很少有公司有实力做 Erlang 源码级别的研究和定制。如果业务场景对并发量要求不是太高(十万级、百万级),那这几种消息队列中,RabbitMQ 或许是首选。
RocketMQ 和 Pulsar 支持强一致性,对消息一致性要求比较高的场景可以使用。
RocketMQ 阿里出品,Java 系开源项目,源代码我们可以直接阅读,然后可以定制自己公司的 MQ,并且 RocketMQ 有阿里巴巴的实际业务场景的实战考验。
Kafka 的特点其实很明显,就是仅仅提供较少的核心功能,但是提供超高的吞吐量,毫秒级的延迟,极高的可用性以及可靠性,而且分布式可以任意扩展。同时 Kafka 最好是支撑较少的 topic 数量即可,保证其超高吞吐量。Kafka 唯一的一点劣势是有可能消息重复消费,那么对数据准确性会造成极其轻微的影响,在大数据领域中以及日志采集中,这点轻微影响可以忽略这个特性天然适合大数据实时计算以及日志收集。如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。


此图摘抄自《面渣逆袭:RocketMQ 二十三问》

该图没有 Pulsar 的信息,从网上看到的压测报告来看,Pulsar 吞吐量大概是 Kafka 的两倍左右,延迟表现比 Kafka 低不少,Pulsar 的 I/O 隔离显著优于 Kafka。比较详实的 Pulsar 和 Kafka 的比对可以查阅 StreamNative 的文章《Pulsar 和 Kafka 基准测试:Pulsar 性能精准解析(完整版)》,StreamNative 作为 Apache Pulsar 的商业化公司,数据和结果还是比较可靠的。

最好的学习方法是带着问题去寻找答案,在路上捡拾更多果实,增加经验值,快速升级。推荐费曼学习法,以教代学,按可以教别人的标准来学习,最终产出教学内容为目的来学习一个知识,能让自己高效学习。所以在这里给出几个问题,读者可以根据自己的兴趣爱好带着问题去寻找答案:
如何保证消息的可用性/可靠性/不丢失呢?
如何处理消息重复的问题呢?
顺序消息如何实现?
怎么处理消息积压?
怎么实现分布式消息事务的?半消息?
如何实现消息过滤?