消息队列(英语:Message queue)是一种进程间通信或同一进程的不同线程间的通信方式,软件的贮列用来处理一系列的输入,通常是来自使用者。消息队列提供了异步的通信协议,每一个贮列中的纪录包含详细说明的资料,包含发生的时间,输入装置的种类,以及特定的输入参数,也就是说:消息的发送者和接收者不需要同时与消息队列互交。消息会保存在队列中,直到接收者取回它。实际上,消息队列常常保存在链表结构中,拥有权限的进程可以向消息队列中写入或读取消息。目前有很多消息队列有很多开源的实现,包括Apache ActiveMQ、RabbitMQ和ZeroMQ。进程通信
消息队列(也叫做报文队列)能够克服早期unix通信机制的一些缺点。作为早期unix通信机制之一的信号能够传送的信息量有限,后来虽然POSIX 1003.1b在信号的实时性方面作了拓广,使得信号在传递信息量方面有了相当程度的改进,但是信号这种通信方式更像"即时"的通信方式,它要求接受信号的进程在某个时间范围内对信号做出反应,因此该信号最多在接受信号进程的生命周期内才有意义,信号所传递的信息是接近于随进程持续的概念(process-persistent);管道及有名管道则是典型的随进程持续IPC,并且,只能传送无格式的字节流无疑会给应用程序开发带来不便,另外,它的缓冲区大小也受到限制。
消息队列就是一个消息的链表。可以把消息看作一个记录,具有特定的格式以及特定的优先级。对消息队列有写权限的进程可以向消息队列中按照一定的规则添加新消息;对消息队列有读权限的进程则可以从消息队列中读走消息,消息队列是随内核持续的。目前主要有两种类型的消息队列:POSIX消息队列以及系统V消息队列,系统V消息队列目前被大量使用。考虑到程序的可移植性,新开发的应用程序应尽量使用POSIX消息队列。
系统V消息队列是随内核持续的,只有在内核重起或者显式删除一个消息队列时,该消息队列才会真正被删除。因此系统中记录消息队列的数据结构(struct ipc_ids msg_ids)位于内核中,系统中的所有消息队列都可以在结构msg_ids中找到访问入口。 消息队列就是一个消息的链表。每个消息队列都有一个队列头,用结构struct msg_queue来描述。队列头中包含了该消息队列的大量信息,包括消息队列键值、用户ID、组ID、消息队列中消息数目等等,甚至记录了最近对消息队列读写进程的ID。读者可以访问这些信息,也可以设置其中的某些信息。
使用消息队列的理由
1. 解耦
在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息队列在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
2. 冗余
有时在处理数据的时候处理过程会失败。除非数据被持久化,否则将永远丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。在被许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理过程明确的指出该消息已经被处理完毕,确保你的数据被安全的保存直到你使用完毕。
3. 扩展性
因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的;只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单。
4. 灵活性 & 峰值处理能力
当你的应用上了Hacker News的首页,你将发现访问流量攀升到一个不同寻常的水平。在访问量剧增的情况下,你的应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住增长的访问压力,而不是因为超出负荷的请求而完全崩溃。请查看我们关于峰值处理能力的博客文章了解更多此方面的信息。
5. 可恢复性
当体系的一部分组件失效,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。而这种允许重试或者延后处理请求的能力通常是造就一个略感不便的用户和一个沮丧透顶的用户之间的区别。
6. 送达保证
消息队列提供的冗余机制保证了消息能被实际的处理,只要一个进程读取了该队列即可。在此基础上,IronMQ提供了一个"只送达一次"保证。无论有多少进程在从队列中领取数据,每一个消息只能被处理一次。这之所以成为可能,是因为获取一个消息只是"预定"了这个消息,暂时把它移出了队列。除非客户端明确的表示已经处理完了这个消息,否则这个消息会被放回队列中去,在一段可配置的时间之后可再次被处理。
7.排序保证
在许多情况下,数据处理的顺序都很重要。消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。IronMO保证消息浆糊通过FIFO(先进先出)的顺序来处理,因此消息在队列中的位置就是从队列中检索他们的位置。
8.缓冲
在任何重要的系统中,都会有需要不同的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列通过一个缓冲层来帮助任务最高效率的执行--写入队列的处理会尽可能的快速,而不受从队列读的预备处理的约束。该缓冲有助于控制和优化数据流经过系统的速度。
9. 理解数据流
在一个分布式系统里,要得到一个关于用户操作会用多长时间及其原因的总体印象,是个巨大的挑战。消息系列通过消息被处理的频率,来方便的辅助确定那些表现不佳的处理过程或领域,这些地方的数据流都不够优化。
10. 异步通信
很多时候,你不想也不需要立即处理消息。消息队列提供了异步处理机制,允许你把一个消息放入队列,但并不立即处理它。你想向队列中放入多少消息就放多少,然后在你乐意的时候再去处理它们。
消息队列本身是异步的,它允许接收者在消息发送很长时间后再取回消息,这和大多数通信协议是不同的。例如WWW中使用的HTTP协议是同步的,因为客户端在发出请求后必须等待服务器回应。然而,很多情况下我们需要异步的通信协议。比如一个进程通知另一个进程发生了一个事件,但不需要等待回应。但消息队列的异步特点,也造成了一个缺点,就是接收者必须轮询消息队列,才能收到最近的消息。
和信号相比,消息队列能够传递更多的信息。与管道相比,消息队列提供了有格式的数据,这可以减少开发人员的工作量。但消息队列仍然有大小限制。
相关应用产品
ActiveMQ
Java世界的中坚力量。它有很长的历史,而且被广泛的使用。它还是跨平台的,给那些非微软平台的产品提供了一个天然的集成接入点。
RabbitMQ
这个用Erlang写成的消息中间件有不少优秀的特性。它支持开放的高级消息队列协议 (AMQP,Advanced Message Queuing Protocol),从根本上避免了生产厂商的封闭,使用任何语言的各种客户都可以从中受益。这种协议提供了相当复杂的消息传输模式,所以基本上不需要 MassTransit 或 NServiceBus 的配合。它还具有“企业级”的适应性和稳定性。这些东西对我的客户来说十分的有吸引力。
ZeroMQ
ZeroMQ具有一个独特的非中间件的模式,也就是说,跟其它几个接受测试的产品不同,不需要安装和运行一个消息服务器,或中间件。只需要简单的引用ZeroMQ程序库,可以使用NuGet安装,然后就可以的在应用程序之间发送消息了。非常有趣的是,也可同样使用这 方式在任何利用ZeroMQ进行强大的进程内通信的语言里创建Erlang风格的这种执行角色。
ZeroMQ没有中间件架构,不需要任何服务进程和运行时。事实上,应用程序端点扮演了这个服务角色。这让部署起来非常简单,但没有地方可以观察它是否有问题出现。ZeroMQ仅提供非持久性的队列,你可以在需要的地方实现自己的审计和数据恢复功能,它的运行原理和其它几种差别太大了。
各类消息队列简述
ActiveMQ 是 Apache 出品的、采用 Java 语言编写的完全基于 JMS1.1 规范的面向消息的中间件,为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通信。不过由于历史原因包袱太重,目前市场份额没有后面三种消息中间件多,其最新架构被命名为 Apollo,号称下一代 ActiveMQ,有兴趣的同学可行了解。
RabbitMQ 是采用 Erlang 语言实现的 AMQP 协议的消息中间件,最初起源于金融系统,用于在分布式系统中存储转发消息。RabbitMQ 发展到今天,被越来越多的人认可,这和它在可靠性、可用性、扩展性、功能丰富等方面的卓越表现是分不开的。
Kafka 起初是由 LinkedIn 公司采用 Scala 语言开发的一个分布式、多分区、多副本且基于 zookeeper 协调的分布式消息系统,现已捐献给 Apache 基金会。它是一种高吞吐量的分布式发布订阅消息系统,以可水平扩展和高吞吐率而被广泛使用。目前越来越多的开源分布式处理系统如 Cloudera、Apache Storm、Spark、Flink 等都支持与 Kafka 集成。
RocketMQ 是阿里开源的消息中间件,目前已经捐献个 Apache 基金会,它是由 Java 语言开发的,具备高吞吐量、高可用性、适合大规模分布式系统应用等特点,经历过双 11 的洗礼,实力不容小觑。
ZeroMQ 号称史上最快的消息队列,基于 C 语言开发。ZeroMQ 是一个消息处理队列库,可在多线程、多内核和主机之间弹性伸缩,虽然大多数时候我们习惯将其归入消息队列家族之中,但是其和前面的几款有着本质的区别,ZeroMQ 本身就不是一个消息队列服务器,更像是一组底层网络通讯库,对原有的 Socket API 上加上一层封装而已。
目前市面上的消息中间件还有很多,比如腾讯系的 PhxQueue、CMQ、CKafka,又比如基于 Go 语言的 NSQ,有时人们也把类似 Redis 的产品也看做消息中间件的一种,当然它们都很优秀,但是本文篇幅限制无法穷极所有,下面会针对性的挑选 RabbitMQ 和 Kafka 两款典型的消息中间件来做分析,力求站在一个公平公正的立场来阐述消息中间件选型中的各个要点。
官网地址
Kafka:http://kafka.apache.org/
RabbitMQ:https://www.rabbitmq.com/
ZeroMQ:http://zeromq.org/
RocketMQ:http://rocketmq.apache.org/
ActiveMQ:http://activemq.apache.org/
下面大概围绕如下几点进行阐述:
为什么使用消息队列?
使用消息队列有什么缺点?
消息队列如何选型?
如何保证消息队列是高可用的?
如何保证消息不被重复消费?
如何保证消费的可靠性传输?
如何保证消息的顺序性?
我们围绕以上七点进行阐述。需要说明一下,本文不是《消息队列从入门到精通》此类教程,因此只是提供一个复习思路,而不是去教你们怎么调用消息队列的 API。建议对消息队列不了解的人,去找点消息队列的博客看看,再看本文,收获更大。消息队列中间件(简称消息中间件)是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下提供应用解耦、弹性伸缩、冗余存储、流量削峰、异步通信、数据同步等等功能,其作为分布式系统架构中的一个重要组件,有着举足轻重的地位。在整体架构中引入消息中间件,势必要考虑很多因素,比如成本及收益问题,怎么样才能达到最优的性价比?虽然消息中间件种类繁多,但是各自都有各自的侧重点,选择合适自己、扬长避短无疑是最好的方式。如果对此感到无所适从,本文或许可以参考一二。
衡量一款消息中间件是否符合需求需要从多个维度进行考察,首要的就是功能维度,这个直接决定了你能否最大程度上的实现开箱即用,进而缩短项目周期、降低成本等。如果一款消息中间件的功能达不到想要的功能,那么就需要进行二次开发,这样会增加项目的技术难度、复杂度以及增大项目周期等。

这里做了一个简单的业务场景:应用系统A与系统B、C、D要进行相关的数据交互。
1、为什么要使用消息队列
分析:一个用消息队列的人,不知道为啥用,这就有点尴尬。
回答:这个问题,咱只答三个最主要的应用场景 (不可否认还有其他的,但是只答三个主要的), 即以下六个字:解耦、异步、削峰。
(1) 解耦
传统模式的缺点:系统间耦合性太强,系统 A 在代码中直接调用系统 B 和系统 C 的代码,如果将来 D 系统接入,系统 A 还需要修改代码,过于麻烦!
中间件模式的的优点:将消息写入消息队列,需要消息的系统自己从消息队列中订阅,从而系统 A 不需要做任何修改。
(2) 异步
传统模式的缺点:一些非必要的业务逻辑以同步的方式运行,太耗费时间。
中间件模式的的优点:将消息写入消息队列,非必要的业务逻辑以异步的方式运行,加快响应速度。
(3) 削峰
传统模式的缺点:并发量大的时候,所有的请求直接请求到数据库,造成数据库连接异常。
中间件模式的的优点:系统 A 慢慢的按照数据库能处理的并发量,从消息队列中慢慢拉取消息。在生产中,这个短暂的高峰期积压是允许的。
消息队列协议对比
为了找到合适的消息队列系统,选定几个消息队列协议进行对比来找到合适方式:
MQTT(消息队列遥测传输),在物联网、小型设备、移动应用等方面有较广泛的应用
AMQP(级消息队列协议),是应用层协议的一个开放标准,为面向消息的中间件设计
Kafka(不是协议,是消息系统),解决有吞吐量要求而的日志和数据聚合作用
MQTT优势是实时性,Kafka优势是数据聚合,AMQP优势是应层解耦。
EMQ是百万级分布式开源物联网MQTT消息服务器。同与RabbitMQ基于高并发的Erlang/OTP语言平台设计,支持百万级连接和分布式集群,发布订阅模式的开源MQTT消息服务器。完整支持MQTT V3.1/V3.1.1协议规范,扩展支持WebSocket、Stomp、CoAP、MQTT-SN或私有TCP协议。MQTT协议应用场景大多是在物联网设备的接入,对短数据有好的适应实时性特点,Kafka和AMQP的实时性都不及MQTT。它专门为物联网设计,可以直接接入支持MQTT协议的设备,软件,系统或移动端。目前大多的物联网平台的做法是使用MQTT做设备上传协议。
2、使用了消息队列会有什么缺点
分析:一个使用了 MQ 的项目,如果连这个问题都没有考虑过,就把 MQ 引进去了,那就给自己的项目带来了风险。我们引入一个技术,要对这个技术的弊端有充分的认识,才能做好预防。要记住,不要给公司挖坑!
回答:回答也很容易,从以下两个个角度来答
系统可用性降低:你想啊,本来其他系统只要运行好好的,那你的系统就是正常的。现在你非要加个消息队列进去,那消息队列挂了,你的系统不是呵呵了。因此,系统可用性降低。
系统复杂性增加:要多考虑很多方面的问题,比如一致性问题、如何保证消息不被重复消费,如何保证保证消息可靠传输。因此需要考虑的东西更多,系统复杂性增大。
但是,该用还是要用的。
3、消息队列如何选型?
笔者建议从以下5个技术维度进行选型参考。
技术选型指标1:功能维度
功能维度又可以划分个多个子维度,大致可以分为以下这些。
优先级队列:
优先级队列不同于先进先出队列,优先级高的消息具备优先被消费的特权,这样可以为下游提供不同消息级别的保证。不过这个优先级也是需要有一个前提的:如果消费者的消费速度大于生产者的速度,并且消息中间件服务器(一般简单的称之为 Broker)中没有消息堆积,那么对于发送的消息设置优先级也就没有什么实质性的意义了,因为生产者刚发送完一条消息就被消费者消费了,那么就相当于 Broker 中至多只有一条消息,对于单条消息来说优先级是没有什么意义的。
延迟队列:
当你在网上购物的时候是否会遇到这样的提示:“三十分钟之内未付款,订单自动取消”?这个是延迟队列的一种典型应用场景。延迟队列存储的是对应的延迟消息,所谓“延迟消息”是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。延迟队列一般分为两种:基于消息的延迟和基于队列的延迟。基于消息的延迟是指为每条消息设置不同的延迟时间,那么每当队列中有新消息进入的时候就会重新根据延迟时间排序,当然这也会对性能造成极大的影响。实际应用中大多采用基于队列的延迟,设置不同延迟级别的队列,比如 5s、10s、30s、1min、5mins、10mins 等,每个队列中消息的延迟时间都是相同的,这样免去了延迟排序所要承受的性能之苦,通过一定的扫描策略(比如定时)即可投递超时的消息。
死信队列:
由于某些原因消息无法被正确的投递,为了确保消息不会被无故的丢弃,一般将其置于一个特殊角色的队列,这个队列一般称之为死信队列。与此对应的还有一个“回退队列”的概念,试想如果消费者在消费时发生了异常,那么就不会对这一次消费进行确认(Ack),进而发生回滚消息的操作之后消息始终会放在队列的顶部,然后不断被处理和回滚,导致队列陷入死循环。为了解决这个问题,可以为每个队列设置一个回退队列,它和死信队列都是为异常的处理提供的一种机制保障。实际情况下,回退队列的角色可以由死信队列和重试队列来扮演。
重试队列:
重试队列其实可以看成是一种回退队列,具体指消费端消费消息失败时,为防止消息无故丢失而重新将消息回滚到 Broker 中。与回退队列不同的是重试队列一般分成多个重试等级,每个重试等级一般也会设置重新投递延时,重试次数越多投递延时就越大。举个例子:消息第一次消费失败入重试队列 Q1,Q1 的重新投递延迟为 5s,在 5s 过后重新投递该消息;如果消息再次消费失败则入重试队列 Q2,Q2 的重新投递延迟为 10s,在 10s 过后再次投递该消息。以此类推,重试越多次重新投递的时间就越久,为此需要设置一个上限,超过投递次数就入死信队列。重试队列与延迟队列有相同的地方,都是需要设置延迟级别,它们彼此的区别是:延迟队列动作由内部触发,重试队列动作由外部消费端触发;延迟队列作用一次,而重试队列的作用范围会向后传递。
消费模式:
消费模式分为推(push)模式和拉(pull)模式。推模式是指由 Broker 主动推送消息至消费端,实时性较好,不过需要一定的流制机制来确保服务端推送过来的消息不会压垮消费端。而拉模式是指消费端主动向 Broker 端请求拉取(一般是定时或者定量)消息,实时性较推模式差,但是可以根据自身的处理能力而控制拉取的消息量。
广播消费:
消息一般有两种传递模式:点对点(P2P,Point-to-Point)模式和发布/订阅(Pub/Sub)模式。对于点对点的模式而言,消息被消费以后,队列中不会再存储,所以消息消费者不可能消费到已经被消费的消息。虽然队列可以支持多个消费者,但是一条消息只会被一个消费者消费。发布订阅模式定义了如何向一个内容节点发布和订阅消息,这个内容节点称为主题(topic),主题可以认为是消息传递的中介,消息发布者将消息发布到某个主题,而消息订阅者则从主题中订阅消息。主题使得消息的订阅者与消息的发布者互相保持独立,不需要进行接触即可保证消息的传递,发布/订阅模式在消息的一对多广播时采用。RabbitMQ 是一种典型的点对点模式,而 Kafka 是一种典型的发布订阅模式。但是 RabbitMQ 中可以通过设置交换器类型来实现发布订阅模式而达到广播消费的效果,Kafka 中也能以点对点的形式消费,你完全可以把其消费组(consumer group)的概念看成是队列的概念。不过对比来说,Kafka 中因为有了消息回溯功能的存在,对于广播消费的力度支持比 RabbitMQ 的要强。
消息回溯:
一般消息在消费完成之后就被处理了,之后再也不能消费到该条消息。消息回溯正好相反,是指消息在消费完成之后,还能消费到之前被消费掉的消息。对于消息而言,经常面临的问题是“消息丢失”,至于是真正由于消息中间件的缺陷丢失还是由于使用方的误用而丢失一般很难追查,如果消息中间件本身具备消息回溯功能的话,可以通过回溯消费复现“丢失的”消息进而查出问题的源头之所在。消息回溯的作用远不止与此,比如还有索引恢复、本地缓存重建,有些业务补偿方案也可以采用回溯的方式来实现。
消息堆积 + 持久化:
流量削峰是消息中间件的一个非常重要的功能,而这个功能其实得益于其消息堆积能力。从某种意义上来讲,如果一个消息中间件不具备消息堆积的能力,那么就不能把它看做是一个合格的消息中间件。消息堆积分内存式堆积和磁盘式堆积。RabbitMQ 是典型的内存式堆积,但这并非绝对,在某些条件触发后会有换页动作来将内存中的消息换页到磁盘(换页动作会影响吞吐),或者直接使用惰性队列来将消息直接持久化至磁盘中。Kafka 是一种典型的磁盘式堆积,所有的消息都存储在磁盘中。一般来说,磁盘的容量会比内存的容量要大得多,对于磁盘式的堆积其堆积能力就是整个磁盘的大小。从另外一个角度讲,消息堆积也为消息中间件提供了冗余存储的功能。援引 纽约时报的案例,其直接将 Kafka 用作存储系统。
消息追踪:
对于分布式架构系统中的链路追踪(trace)而言,大家一定不会陌生。对于消息中间件而言,消息的链路追踪(以下简称消息追踪)同样重要。对于消息追踪最通俗的理解就是要知道消息从哪来,存在哪里以及发往哪里去。基于此功能下,我们可以对发送或者消费完的消息进行链路追踪服务,进而可以进行问题的快速定位与排查。
消息过滤:
消息过滤是指按照既定的过滤规则为下游用户提供指定类别的消息。就以 kafka 而言,完全可以将不同类别的消息发送至不同的 topic 中,由此可以实现某种意义的消息过滤,或者 Kafka 还可以根据分区对同一个 topic 中的消息进行分类。不过更加严格意义上的消息过滤应该是对既定的消息采取一定的方式按照一定的过滤规则进行过滤。同样以 Kafka 为例,可以通过客户端提供的 ConsumerInterceptor 接口或者 Kafka Stream 的 filter 功能进行消息过滤。
多租户:
也可以称为多重租赁技术,是一种软件架构技术,主要用来实现多用户的环境下公用相同的系统或程序组件,并且仍可以确保各用户间数据的隔离性。RabbitMQ 就能够支持多租户技术,每一个租户表示为一个 vhost,其本质上是一个独立的小型 RabbitMQ 服务器,又有自己独立的队列、交换器及绑定关系等,并且它拥有自己独立的权限。vhost 就像是物理机中的虚拟机一样,它们在各个实例间提供逻辑上的分离,为不同程序安全保密地允许数据,它既能将同一个 RabbitMQ 中的众多客户区分开,又可以避免队列和交换器等命名冲突。
多协议支持:
消息是信息的载体,为了让生产者和消费者都能理解所承载的信息(生产者需要知道如何构造消息,消费者需要知道如何解析消息),它们就需要按照一种统一的格式描述消息,这种统一的格式称之为消息协议。有效的消息一定具有某种格式,而没有格式的消息是没有意义的。一般消息层面的协议有 AMQP、MQTT、STOMP、XMPP 等(消息领域中的 JMS 更多的是一个规范而不是一个协议),支持的协议越多其应用范围就会越广,通用性越强,比如 RabbitMQ 能够支持 MQTT 协议就让其在物联网应用中获得一席之地。还有的消息中间件是基于其本身的私有协议运转的,典型的如 Kafka。
跨语言支持:
对很多公司而言,其技术栈体系中会有多种编程语言,如 C/C++、JAVA、Go、PHP 等,消息中间件本身具备应用解耦的特性,如果能够进一步的支持多客户端语言,那么就可以将此特性的效能扩大。跨语言的支持力度也可以从侧面反映出一个消息中间件的流行程度。
流量控制:
流量控制(flow control)针对的是发送方和接收方速度不匹配的问题,提供一种速度匹配服务抑制发送速率使接收方应用程序的读取速率与之相适应。通常的流控方法有 Stop-and-wait、滑动窗口以及令牌桶等。
消息顺序性:
顾名思义,消息顺序性是指保证消息有序。这个功能有个很常见的应用场景就是 CDC(Change Data Chapture),以 MySQL 为例,如果其传输的 binlog 的顺序出错,比如原本是先对一条数据加 1,然后再乘以 2,发送错序之后就变成了先乘以 2 后加 1 了,造成了数据不一致。
安全机制:
在 Kafka 0.9 版本之后就开始增加了身份认证和权限控制两种安全机制。身份认证是指客户端与服务端连接进行身份认证,包括客户端与 Broker 之间、Broker 与 Broker 之间、Broker 与 ZooKeeper 之间的连接认证,目前支持 SSL、SASL 等认证机制。权限控制是指对客户端的读写操作进行权限控制,包括对消息或 Kafka 集群操作权限控制。权限控制是可插拔的,并支持与外部的授权服务进行集成。对于 RabbitMQ 而言,其同样提供身份认证(TLS/SSL、SASL)和权限控制(读写操作)的安全机制。
消息幂等性:
对于确保消息在生产者和消费者之间进行传输而言一般有三种传输保障(delivery guarantee):At most once,至多一次,消息可能丢失,但绝不会重复传输;At least once,至少一次,消息绝不会丢,但是可能会重复;Exactly once,精确一次,每条消息肯定会被传输一次且仅一次。对于大多数消息中间件而言,一般只提供 At most once 和 At least once 两种传输保障,对于第三种一般很难做到,由此消息幂等性也很难保证。
Kafka 自 0.11 版本开始引入了幂等性和事务,Kafka 的幂等性是指单个生产者对于单分区单会话的幂等,而事务可以保证原子性地写入到多个分区,即写入到多个分区的消息要么全部成功,要么全部回滚,这两个功能加起来可以让 Kafka 具备 EOS(Exactly Once Semantic)的能力。
不过如果要考虑全局的幂等,还需要与从上下游方面综合考虑,即关联业务层面,幂等处理本身也是业务层面所需要考虑的重要议题。以下游消费者层面为例,有可能消费者消费完一条消息之后没有来得及确认消息就发生异常,等到恢复之后又得重新消费原来消费过的那条消息,那么这种类型的消息幂等是无法有消息中间件层面来保证的。如果要保证全局的幂等,需要引入更多的外部资源来保证,比如以订单号作为唯一性标识,并且在下游设置一个去重表。
事务性消息:
事务本身是一个并不陌生的词汇,事务是由事务开始(Begin Transaction)和事务结束(End Transaction)之间执行的全体操作组成。支持事务的消息中间件并不在少数,Kafka 和 RabbitMQ 都支持,不过此两者的事务是指生产者发生消息的事务,要么发送成功,要么发送失败。消息中间件可以作为用来实现分布式事务的一种手段,但其本身并不提供全局分布式事务的功能。
下表是对 Kafka 与 RabbitMQ 功能的总结性对比及补充说明:



技术选型指标2:性能
功能维度是消息中间件选型中的一个重要的参考维度,但这并不是唯一的维度。有时候性能比功能还要重要,况且性能和功能很多时候是相悖的,鱼和熊掌不可兼得,Kafka 在开启幂等、事务功能的时候会使其性能降低,RabbitMQ 在开启 rabbitmq_tracing 插件的时候也会极大的影响其性能。消息中间件的性能一般是指其吞吐量,虽然从功能维度上来说,RabbitMQ 的优势要大于 Kafka,但是 Kafka 的吞吐量要比 RabbitMQ 高出 1 至 2 个数量级,一般 RabbitMQ 的单机 QPS 在万级别之内,而 Kafka 的单机 QPS 可以维持在十万级别,甚至可以达到百万级。
消息中间件的吞吐量始终会受到硬件层面的限制。就以网卡带宽为例,如果单机单网卡的带宽为 1Gbps,如果要达到百万级的吞吐,那么消息体大小不得超过 (1Gb/8)/100W,即约等于 134B,换句话说如果消息体大小超过 134B,那么就不可能达到百万级别的吞吐。这种计算方式同样可以适用于内存和磁盘。
时延作为性能维度的一个重要指标,却往往在消息中间件领域所被忽视,因为一般使用消息中间件的场景对时效性的要求并不是很高,如果要求时效性完全可以采用 RPC 的方式实现。消息中间件具备消息堆积的能力,消息堆积越大也就意味着端到端的时延也就越长,与此同时延时队列也是某些消息中间件的一大特色。那么为什么还要关注消息中间件的时延问题呢?消息中间件能够解耦系统,对于一个时延较低的消息中间件而言,它可以让上游生产者发送消息之后可以迅速的返回,也可以让消费者更加快速的获取到消息,在没有堆积的情况下可以让整体上下游的应用之间的级联动作更加高效,虽然不建议在时效性很高的场景下使用消息中间件,但是如果所使用的消息中间件的时延方面比较优秀,那么对于整体系统的性能将会是一个不小的提升。
技术选型指标3:可靠性 + 可用性
消息丢失是使用消息中间件时所不得不面对的一个同点,其背后消息可靠性也是衡量消息中间件好坏的一个关键因素。尤其是在金融支付领域,消息可靠性尤为重要。然而说到可靠性必然要说到可用性,注意这两者之间的区别,消息中间件的可靠性是指对消息不丢失的保障程度;而消息中间件的可用性是指无故障运行的时间百分比,通常用几个 9 来衡量。
从狭义的角度来说,分布式系统架构是一致性协议理论的应用实现,对于消息可靠性和可用性而言也可以追溯到消息中间件背后的一致性协议。对于 Kafka 而言,其采用的是类似 PacificA 的一致性协议,通过 ISR(In-Sync-Replica)来保证多副本之间的同步,并且支持强一致性语义(通过 acks 实现)。对应的 RabbitMQ 是通过镜像环形队列实现多副本及强一致性语义的。多副本可以保证在 master 节点宕机异常之后可以提升 slave 作为新的 master 而继续提供服务来保障可用性。Kafka 设计之初是为日志处理而生,给人们留下了数据可靠性要求不要的不良印象,但是随着版本的升级优化,其可靠性得到极大的增强,详细可以参考 KIP101。就目前而言,在金融支付领域使用 RabbitMQ 居多,而在日志处理、大数据等方面 Kafka 使用居多,随着 RabbitMQ 性能的不断提升和 Kafka 可靠性的进一步增强,相信彼此都能在以前不擅长的领域分得一杯羹。
同步刷盘是增强一个组件可靠性的有效方式,消息中间件也不例外,Kafka 和 RabbitMQ 都可以支持同步刷盘,但是笔者对同步刷盘有一定的疑问:绝大多数情景下,一个组件的可靠性不应该由同步刷盘这种极其损耗性能的操作来保障,而是采用多副本的机制来保证。
这里还要提及的一个方面是扩展能力,这里我狭隘地将此归纳到可用性这一维度,消息中间件的扩展能力能够增强其用可用能力及范围,比如前面提到的 RabbitMQ 支持多种消息协议,这个就是基于其插件化的扩展实现。还有从集群部署上来讲,归功于 Kafka 的水平扩展能力,其基本上可以达到线性容量提升的水平,在 LinkedIn 实践介绍中就提及了有部署超过千台设备的 Kafka 集群。
技术选型指标4:运维管理
在消息中间件的使用过程中难免会出现各式各样的异常情况,有客户端的,也有服务端的,那么怎样及时有效的进行监测及修复。业务线流量有峰值又低谷,尤其是电商领域,那么怎样前进行有效的容量评估,尤其是大促期间?脚踢电源、网线被挖等事件层出不穷,如何有效的做好异地多活?这些都离不开消息中间件的衍生产品——运维管理。运维管理也可以进行进一步的细分,比如:申请、审核、监控、告警、管理、容灾、部署等。
申请、审核很好理解,在源头对资源进行管控,既可以进行有效校正应用方的使用规范,配和监控也可以做好流量统计与流量评估工作,一般申请、审核与公司内部系统交融性较大,不适合使用开源类的产品。
监控、告警也比较好理解,对消息中间件的使用进行全方位的监控,即可以为系统提供基准数据,也可以在检测到异常的情况配合告警,以便运维、开发人员的迅速介入。除了一般的监控项(比如硬件、GC 等)之外,对于消息中间件还需要关注端到端时延、消息审计、消息堆积等方面。对于 RabbitMQ 而言,最正统的监控管理工具莫过于 rabbitmq_management 插件了,但是社区内还有 AppDynamics, Collectd, DataDog, Ganglia, Munin, Nagios, New Relic, Prometheus, Zenoss 等多种优秀的产品。Kafka 在此方面也毫不逊色,比如:Kafka Manager, Kafka Monitor, Kafka Offset Monitor, Burrow, Chaperone, Confluent Control Center 等产品,尤其是 Cruise 还可以提供自动化运维的功能。
不管是扩容、降级、版本升级、集群节点部署、还是故障处理都离不开管理工具的应用,一个配套完备的管理工具集可以在遇到变更时做到事半功倍。故障可大可小,一般是一些应用异常,也可以是机器掉电、网络异常、磁盘损坏等单机故障,这些故障单机房内的多副本足以应付。如果是机房故障就要涉及异地容灾了,关键点在于如何有效的进行数据复制,对于 Kafka 而言,可以参考 MirrorMarker、uReplicator 等产品,而 RabbitMQ 可以参考 Federation 和 Shovel。
技术选型指标5:社区力度及生态发展
对于目前流行的编程语言而言,如Java、Python,如果你在使用过程中遇到了一些异常,基本上可以通过搜索引擎的帮助来得到解决,因为一个产品用的人越多,踩过的坑也就越多,对应的解决方案也就越多。对于消息中间件也同样适用,如果你选择了一种“生僻”的消息中间件,可能在某些方面运用的得心应手,但是版本更新缓慢、遇到棘手问题也难以得到社区的支持而越陷越深;相反如果你选择了一种“流行”的消息中间件,其更新力度大,不仅可以迅速的弥补之前的不足,而且也能顺应技术的快速发展来变更一些新的功能,这样可以让你以“站在巨人的肩膀上”。在运维管理维度我们提及了 Kafka 和 RabbitMQ 都有一系列开源的监控管理产品,这些正是得益于其社区及生态的迅猛发展。
消息中间件选型误区总结
在进行消息中间件选型之前可以先问自己一个问题:是否真的需要一个消息中间件?在搞清楚这个问题之后,还可以继续问自己一个问题:是否需要自己维护一套消息中间件?很多初创型公司为了节省成本会选择直接购买消息中间件有关的云服务,自己只需要关注收发消息即可,其余的都可以外包出去。
很多人面对消息中间件时会有一种自研的冲动,你完全可以对 Java 中的 ArrayBlockingQueue 做一个简单的封装,你也可以基于文件、数据库、Redis等底层存储封装而形成一个消息中间件。消息中间件做为一个基础组件并没有想象中的那么简单,其背后还需要配套的管理运维整个生态的产品集。自研还有会交接问题,如果文档不齐全、运作不规范将会带给新人噩梦般的体验。是否真的有自研的必要?如果不是 KPI 的压迫可以先考虑下这 2 个问题:
1). 目前市面上的消息中间件是否都真的无法满足目前业务需求?
2). 团队是否有足够的能力、人力、财力、精力来支持自研?
很多人在做消息中间件选型时会参考网络上的很多对比类的文章,但是其专业性、严谨性、以及其政治立场问题都有待考证,需要带着怀疑的态度去审视这些文章。比如有些文章会在没有任何限定条件及场景的情况下直接定义某款消息中间件最好,还有些文章没有指明消息中间件版本及测试环境就来做功能和性能对比分析,诸如此类的文章都可以唾弃之。
消息中间件犹如小马过河,选择合适的才最重要,这需要贴合自身的业务需求,技术服务于业务,大体上可以根据上一节所提及的功能、性能等 6 个维度来一一进行筛选。更深层次的抉择在于你能否掌握其魂,笔者鄙见:RabbitMQ 在于 routing,而 Kafka 在于 streaming,了解其根本对于自己能够对症下药选择到合适的消息中间件尤为重要。
消息中间件选型切忌一味的追求性能或者功能,性能可以优化,功能可以二次开发。如果要在功能和性能方面做一个抉择的话,那么首选性能,因为总体上来说性能优化的空间没有功能扩展的空间大。然而对于长期发展而言,生态又比性能以及功能都要重要。
很多时候,对于可靠性方面也容易存在一个误区:想要找到一个产品来保证消息的绝对可靠,很不幸的是这世界上没有绝对的东西,只能说尽量趋于完美。想要尽可能的保障消息的可靠性也并非单单只靠消息中间件本身,还要依赖于上下游,需要从生产端、服务端和消费端这 3 个维度去努力保证,《RabbitMQ 消息可靠性分析》这篇文章就从这 3 个维度去分析了 RabbitMQ 的可靠性。消息中间件选型还有一个考量标准就是尽量贴合团队自身的技术栈体系,虽然说没有蹩脚的消息中间件只有蹩脚的程序员,但是让一个 C 栈的团队去深挖 PhxQueue 总比去深挖 Scala 编写的 Kafka 要容易的多。
消息中间件大道至简:一发一存一消费,没有最好的消息中间件,只有最合适的消息中间件。
既然在项目中用了 MQ,肯定事先要对业界流行的 MQ 进行调研,如果连每种 MQ 的优缺点都没了解清楚,就拍脑袋依据喜好,用了某种 MQ,就会给项目制造困难。
来一个性能对比表
| 特性 | ActiveMQ | RabbitMQ | RocketMQ | kafka |
|---|---|---|---|---|
| 开发语言 | java | erlang | java | scala |
| 单机吞吐量 | 万级 | 万级 | 10万级 | 10万级 |
| 时效性 | ms级 | us级 | ms级 | ms级以内 |
| 可用性 | 高(主从架构) | 高(主从架构) | 非常高(分布式架构) | 非常高(分布式架构) |
| 功能特性 | 成熟的产品,在很多公司得到应用;有较多的文档;各种协议支持较好 | 基于erlang开发,所以并发能力很强,性能极其好,延时很低;管理界面较丰富 | MQ功能比较完备,扩展性佳 | 只支持主要的MQ功能,像一些消息查询,消息回溯等功能没有提供,毕竟是为大数据准备的,在大数据领域应用广。 |
综合上面的材料得出以下两点:
(1) 中小型软件公司,建议选 RabbitMQ。一方面,erlang 语言天生具备高并发的特性,而且他的管理界面用起来十分方便。正所谓,成也萧何,败也萧何!他的弊端也在这里,虽然 RabbitMQ 是开源的,然而国内有几个能定制化开发 erlang 的程序员呢?所幸,RabbitMQ 的社区十分活跃,可以解决开发过程中遇到的 bug,这点对于中小型公司来说十分重要。不考虑 rocketmq 和 kafka 的原因是,一方面中小型软件公司不如互联网公司,数据量没那么大,选消息中间件,应首选功能比较完备的,所以 kafka 排除。不考虑 rocketmq 的原因是,rocketmq 是阿里出品,如果阿里放弃维护 rocketmq,中小型公司一般抽不出人来进行 rocketmq 的定制化开发,因此不推荐。
(2) 大型软件公司,根据具体使用在 rocketMq 和 kafka 之间二选一。一方面,大型软件公司,具备足够的资金搭建分布式环境,也具备足够大的数据量。针对 rocketMQ, 大型软件公司也可以抽出人手对 rocketMQ 进行定制化开发,毕竟国内有能力改 JAVA 源码的人,还是相当多的。至于 kafka,根据业务场景选择,如果有日志采集功能,肯定是首选 kafka 了。具体该选哪个,看使用场景。
4、如何保证消息队列是高可用的
分析:在第二点说过了,引入消息队列后,系统的可用性下降。在生产中,没人使用单机模式的消息队列。因此,作为一个合格的程序员,应该对消息队列的高可用有很深刻的了解。如果面试的时候,面试官问,你们的消息中间件如何保证高可用的?你的回答只是表明自己只会订阅和发布消息,面试官就会怀疑你是不是只是自己搭着玩,压根没在生产用过。请做一个爱思考,会思考,懂思考的程序员。
回答:这问题,其实要对消息队列的集群模式要有深刻了解,才好回答。以 rcoketMQ 为例,他的集群就有多 master 模式、多 master 多 slave 异步复制模式、多 master 多 slave 同步双写模式。多 master 多 slave 模式部署架构图

其实第一眼看到这个图,就觉得和 kafka 很像,只是 NameServer 集群,在 kafka 中是用 zookeeper 代替,都是用来保存和发现 master 和 slave 用的。通信过程如下:
Producer 与 NameServer 集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Broker Master 建立长连接,且定时向 Broker 发送心跳。Producer 只能将消息发送到 Broker master,但是 Consumer 则不一样,它同时和提供 Topic 服务的 Master 和 Slave 建立长连接,既可以从 Broker Master 订阅消息,也可以从 Broker Slave 订阅消息。
那么 kafka 呢,为了对比说明直接上 kafka 的拓朴架构图

如上图所示,一个典型的 Kafka 集群中包含若干 Producer(可以是 web 前端产生的 Page View,或者是服务器日志,系统 CPU、Memory 等),若干 broker(Kafka 支持水平扩展,一般 broker 数量越多,集群吞吐率越高),若干 Consumer Group,以及一个 Zookeeper 集群。Kafka 通过 Zookeeper 管理集群配置,选举 leader,以及在 Consumer Group 发生变化时进行 rebalance。Producer 使用 push 模式将消息发布到 broker,Consumer 使用 pull 模式从 broker 订阅并消费消息。
至于 rabbitMQ,也有普通集群和镜像集群模式,可自行去了解,比较简单,两小时即懂。要求在回答高可用的问题时,应该能逻辑清晰的画出自己的 MQ 集群架构或清晰的叙述出来。
5、如何保证消息不被重复消费
分析:这个问题其实换一种问法就是,如何保证消息队列的幂等性?这个问题可以认为是消息队列领域的基本问题。换句话来说,是在考察你的设计能力,这个问题的回答可以根据具体的业务场景来答,没有固定的答案。
回答:先来说一下为什么会造成重复消费
其实无论是那种消息队列,造成重复消费原因其实都是类似的。正常情况下,消费者在消费消息时候,消费完毕后,会发送一个确认信息给消息队列,消息队列就知道该消息被消费了,就会将该消息从消息队列中删除。只是不同的消息队列发送的确认信息形式不同,例如 RabbitMQ 是发送一个 ACK 确认消息,RocketMQ 是返回一个 CONSUME_SUCCESS 成功标志,kafka 实际上有个 offset 的概念,简单说一下(如果还不懂,出门找一个 kafka 入门到精通教程),就是每一个消息都有一个 offset,kafka 消费过消息后,需要提交 offset,让消息队列知道自己已经消费过了。那造成重复消费的原因?,就是因为网络传输等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将该消息分发给其他的消费者。
如何解决?这个问题针对业务场景来答分以下几点:
(1) 比如,你拿到这个消息做数据库的 insert 操作。那就容易了,给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。
(2) 再比如,你拿到这个消息做 redis 的 set 的操作,那就容易了,不用解决,因为你无论 set 几次结果都是一样的,set 操作本来就算幂等操作。
(3) 如果上面两种情况还不行,上大招。准备一个第三方介质,来做消费记录。以 redis 为例,给消息分配一个全局 id,只要消费过该消息,将 < id,message > 以 K-V 形式写入 redis。那消费者开始消费前,先去 redis 中查询有没消费记录即可。
6、如何保证消费的可靠性传输
分析:我们在使用消息队列的过程中,应该做到消息不能多消费,也不能少消费。如果无法做到可靠性传输,可能给公司带来财产损失。同样的,如果可靠性传输在使用过程中,没有考虑到,也会为公司带来损失。
回答:其实这个可靠性传输,每种 MQ 都要从三个角度来分析:生产者弄丢数据、消息队列弄丢数据、消费者弄丢数据
RabbitMQ
(1) 生产者丢数据
从生产者弄丢数据这个角度来看,RabbitMQ 提供 transaction 和 confirm 模式来确保生产者不丢消息。
transaction 机制就是说,发送消息前,开启事物 (channel.txSelect ()),然后发送消息,如果发送过程中出现什么异常,事物就会回滚 (channel.txRollback ()),如果发送成功则提交事物 (channel.txCommit ())。
然而缺点就是吞吐量下降了。因此,按照博主的经验,生产上用 confirm 模式的居多。一旦 channel 进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID (从 1 开始),一旦消息被投递到所有匹配的队列之后,rabbitMQ 就会发送一个 Ack 给生产者 (包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了。如果 rabiitMQ 没能处理该消息,则会发送一个 Nack 消息给你,你可以进行重试操作。
(2) 消息队列丢数据
处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和 confirm 机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个 Ack 信号。这样,如果消息持久化磁盘之前,rabbitMQ 阵亡了,那么生产者收不到 Ack 信号,生产者会自动重发。那么如何持久化呢,这里顺便说一下吧,其实也很容易,就下面两步:
1、将 queue 的持久化标识 durable 设置为 true, 则代表是一个持久的队列
2、发送消息的时候将 deliveryMode=2
这样设置以后,rabbitMQ 就算挂了,重启后也能恢复数据。
(3) 消费者丢数据
消费者丢数据一般是因为采用了自动确认消息模式。这种模式下,消费者会自动确认收到信息。这时 rahbitMQ 会立即将消息删除,这种情况下如果消费者出现异常而没能处理该消息,就会丢失该消息。至于解决方案,采用手动确认消息即可。
Kafka
这里先引一张 kafka Replication 的数据流向图

Producer 在发布消息到某个 Partition 时,先通过 ZooKeeper 找到该 Partition 的 Leader,然后无论该 Topic 的 Replication Factor 为多少(也即该 Partition 有多少个 Replica),Producer 只将该消息发送到该 Partition 的 Leader,Leader 会将该消息写入其本地 Log。每个 Follower 都从 Leader 中 pull 数据。
针对上述情况,得出如下分析:
(1) 生产者丢数据
在 kafka 生产中,基本都有一个 leader 和多个 follwer。follwer 会去同步 leader 的信息。因此,为了避免生产者丢数据,做如下两点配置
第一个配置要在 producer 端设置 acks=all。这个配置保证了,follwer 同步完成后,才认为消息发送成功。
在 producer 端设置 retries=MAX,一旦写入失败,这无限重试
(2) 消息队列丢数据
针对消息队列丢数据的情况,无外乎就是,数据还没同步,leader 就挂了,这时 zookpeer 会将其他的 follwer 切换为 leader, 那数据就丢失了。针对这种情况,应该做两个配置。
replication.factor 参数,这个值必须大于 1,即要求每个 partition 必须有至少 2 个副本
min.insync.replicas 参数,这个值必须大于 1,这个是要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系
这两个配置加上上面生产者的配置联合起来用,基本可确保 kafka 不丢数据
(3) 消费者丢数据
这种情况一般是自动提交了 offset,然后你处理程序过程中挂了。kafka 以为你处理好了。再强调一次 offset 的用途:
offset:指的是 kafka 的 topic 中的每个消费组消费的下标。简单的来说就是一条消息对应一个 offset 下标,每次消费数据的时候如果提交 offset,那么下次消费就会从提交的 offset 加一那里开始消费。比如一个 topic 中有 100 条数据,我消费了 50 条并且提交了,那么此时的 kafka 服务端记录提交的 offset 就是 49 (offset 从 0 开始),那么下次消费的时候 offset 就从 50 开始消费。
解决方案也很简单,改成手动提交即可。
ActiveMQ 和 RocketMQ
自行查阅吧。
7、如何保证消息的顺序性
分析:其实并非所有的公司都有这种业务需求,但是还是对这个问题要有所复习。
回答:针对这个问题,通过某种算法,将需要保持先后顺序的消息放到同一个消息队列中 (kafka 中就是 partition,rabbitMq 中就是 queue)。然后只用一个消费者去消费该队列。
有的人会问:那如果为了吞吐量,有多个消费者去消费怎么办?这个问题,没有固定回答的套路。比如我们有一个微博的操作,发微博、写评论、删除微博,这三个异步操作。如果是这样一个业务场景,那只要重试就行。比如你一个消费者先执行了写评论的操作,但是这时候,微博都还没发,写评论一定是失败的,等一段时间。等另一个消费者,先执行写评论的操作后,再执行,就可以成功。总之,针对这个问题,个人的观点是保证入队有序就行,出队以后的顺序交给消费者自己去保证,没有固定用法。
看完上文的对比,如果还存在疑问的话,可以参考下文,JackJiang从17个维度进行了深入的对比,相信对选择哪个产品组件会有很大的帮助。
维度1:资料文档
1)Kafka:资料数量中等。有Kafka作者自己写的书,网上资料也有一些。
2)RabbitMQ:资料数量多。有一些不错的书,网上资料多。
3)ZeroMQ:资料数量少。专门写ZeroMQ的书较少,网上的资料多是一些代码的实现和简单介绍。
4)RocketMQ:资料数量少。专门写RocketMQ的书目前有了两本;网上的资料良莠不齐,官方文档很简洁,但是对技术细节没有过多的描述。
5)ActiveMQ:资料数量多。没有专门写ActiveMQ的书,网上资料多。
维度2:开发语言
1)Kafka:Scala
2)RabbitMQ:Erlang
3)ZeroMQ:C语言
4)RocketMQ:Java
5)ActiveMQ:Java
维度3:支持的协议
1)Kafka:自己定义的一套…(基于TCP)
2)RabbitMQ:AMQP
3)ZeroMQ:TCP、UDP
4)RocketMQ:自己定义的一套…
5)ActiveMQ:OpenWire、STOMP、REST、XMPP、AMQP
维度4:消息存储
1)Kafka:
内存、磁盘、数据库。支持大量堆积。Kafka的最小存储单元是分区,一个topic包含多个分区,Kafka创建主题时,这些分区会被分配在多个服务器上,通常一个broker一台服务器。分区首领会均匀地分布在不同的服务器上,分区副本也会均匀的分布在不同的服务器上,确保负载均衡和高可用性,当新的broker加入集群的时候,部分副本会被移动到新的broker上。根据配置文件中的目录清单,Kafka会把新的分区分配给目录清单里分区数最少的目录。默认情况下,分区器使用轮询算法把消息均衡地分布在同一个主题的不同分区中,对于发送时指定了key的情况,会根据key的hashcode取模后的值存到对应的分区中。
2)RabbitMQ:
内存、磁盘。支持少量堆积。RabbitMQ的消息分为持久化的消息和非持久化消息,不管是持久化的消息还是非持久化的消息都可以写入到磁盘。持久化的消息在到达队列时就写入到磁盘,并且如果可以,持久化的消息也会在内存中保存一份备份,这样可以提高一定的性能,当内存吃紧的时候会从内存中清除。非持久化的消息一般只存在于内存中,在内存吃紧的时候会被换入到磁盘中,以节省内存。
引入镜像队列机制,可将重要队列“复制”到集群中的其他broker上,保证这些队列的消息不会丢失。配置镜像的队列,都包含一个主节点master和多个从节点slave,如果master失效,加入时间最长的slave会被提升为新的master,除发送消息外的所有动作都向master发送,然后由master将命令执行结果广播给各个slave,RabbitMQ会让master均匀地分布在不同的服务器上,而同一个队列的slave也会均匀地分布在不同的服务器上,保证负载均衡和高可用性。
3)ZeroMQ:
消息发送端的内存或者磁盘中。不支持持久化。
4)RocketMQ:
磁盘。支持大量堆积。commitLog文件存放实际的消息数据,每个commitLog上限是1G,满了之后会自动新建一个commitLog文件保存数据。
ConsumeQueue队列只存放offset、size、tagcode,非常小,分布在多个broker上。
ConsumeQueue相当于CommitLog的索引文件,消费者消费时会从consumeQueue中查找消息在commitLog中的offset,再去commitLog中查找元数据。
ConsumeQueue存储格式的特性,保证了写过程的顺序写盘(写CommitLog文件),大量数据IO都在顺序写同一个commitLog,满1G了再写新的。加上RocketMQ是累计4K才强制从PageCache中刷到磁盘(缓存),所以高并发写性能突出。
5)ActiveMQ:
内存、磁盘、数据库。支持少量堆积。
维度5:消息事务
1)Kafka:支持
2)RabbitMQ:支持。客户端将信道设置为事务模式,只有当消息被RabbitMQ接收,事务才能提交成功,否则在捕获异常后进行回滚。使用事务会使得性能有所下降
3)ZeroMQ:不支持
4)RocketMQ:支持
5)ActiveMQ:支持
维度6:负载均衡
Kafka
支持负载均衡。
1)一个broker通常就是一台服务器节点。对于同一个Topic的不同分区,Kafka会尽力将这些分区分布到不同的Broker服务器上,zookeeper保存了broker、主题和分区的元数据信息。分区首领会处理来自客户端的生产请求,Kafka分区首领会被分配到不同的broker服务器上,让不同的broker服务器共同分担任务。每一个broker都缓存了元数据信息,客户端可以从任意一个broker获取元数据信息并缓存起来,根据元数据信息知道要往哪里发送请求。
2)Kafka的消费者组订阅同一个topic,会尽可能地使得每一个消费者分配到相同数量的分区,分摊负载。
3)当消费者加入或者退出消费者组的时候,还会触发再均衡,为每一个消费者重新分配分区,分摊负载。Kafka的负载均衡大部分是自动完成的,分区的创建也是Kafka完成的,隐藏了很多细节,避免了繁琐的配置和人为疏忽造成的负载问题。
4)发送端由topic和key来决定消息发往哪个分区,如果key为null,那么会使用轮询算法将消息均衡地发送到同一个topic的不同分区中。如果key不为null,那么会根据key的hashcode取模计算出要发往的分区。
RabbitMQ
对负载均衡的支持不好。
1)消息被投递到哪个队列是由交换器和key决定的,交换器、路由键、队列都需要手动创建。
RabbitMQ客户端发送消息要和broker建立连接,需要事先知道broker上有哪些交换器,有哪些队列。通常要声明要发送的目标队列,如果没有目标队列,会在broker上创建一个队列,如果有,就什么都不处理,接着往这个队列发送消息。假设大部分繁重任务的队列都创建在同一个broker上,那么这个broker的负载就会过大。可以在上线前预先创建队列,无需声明要发送的队列,但是发送时不会尝试创建队列,可能出现找不到队列的问题,RabbitMQ的备份交换器会把找不到队列的消息保存到一个专门的队列中,以便以后查询使用。
使用镜像队列机制建立RabbitMQ集群可以解决这个问题,形成master-slave的架构,master节点会均匀分布在不同的服务器上,让每一台服务器分摊负载。slave节点只是负责转发,在master失效时会选择加入时间最长的slave成为master。当新节点加入镜像队列的时候,队列中的消息不会同步到新的slave中,除非调用同步命令,但是调用命令后,队列会阻塞,不能在生产环境中调用同步命令。
2)当RabbitMQ队列拥有多个消费者的时候,队列收到的消息将以轮询的分发方式发送给消费者。每条消息只会发送给订阅列表里的一个消费者,不会重复。这种方式非常适合扩展,而且是专门为并发程序设计的。如果某些消费者的任务比较繁重,那么可以设置basicQos限制信道上消费者能保持的最大未确认消息的数量,在达到上限时,RabbitMQ不再向这个消费者发送任何消息。
3)对于RabbitMQ而言,客户端与集群建立的TCP连接不是与集群中所有的节点建立连接,而是挑选其中一个节点建立连接。但是RabbitMQ集群可以借助HAProxy、LVS技术,或者在客户端使用算法实现负载均衡,引入负载均衡之后,各个客户端的连接可以分摊到集群的各个节点之中。
客户端均衡算法:
a. 轮询法:按顺序返回下一个服务器的连接地址。
b. 加权轮询法:给配置高、负载低的机器配置更高的权重,让其处理更多的请求;而配置低、负载高的机器,给其分配较低的权重,降低其系统负载。
c. 随机法:随机选取一个服务器的连接地址。
d. 加权随机法:按照概率随机选取连接地址。
e. 源地址哈希法:通过哈希函数计算得到的一个数值,用该数值对服务器列表的大小进行取模运算。
f. 最小连接数法:动态选择当前连接数最少的一台服务器的连接地址。
ZeroMQ
去中心化,不支持负载均衡。本身只是一个多线程网络库。
RocketMQ
支持负载均衡。
一个broker通常是一个服务器节点,broker分为master和slave,master和slave存储的数据一样,slave从master同步数据。
1)nameserver与每个集群成员保持心跳,保存着Topic-Broker路由信息,同一个topic的队列会分布在不同的服务器上。
2)发送消息通过轮询队列的方式发送,每个队列接收平均的消息量。发送消息指定topic、tags、keys,无法指定投递到哪个队列(没有意义,集群消费和广播消费跟消息存放在哪个队列没有关系)。
tags选填,类似于 Gmail 为每封邮件设置的标签,方便服务器过滤使用。目前只支 持每个消息设置一个 tag,所以也可以类比为 Notify 的 MessageType 概念。
keys选填,代表这条消息的业务关键词,服务器会根据 keys 创建哈希索引,设置后, 可以在 Console 系统根据 Topic、Keys 来查询消息,由于是哈希索引,请尽可能 保证 key 唯一,例如订单号,商品 Id 等。
3)RocketMQ的负载均衡策略规定:
Consumer数量应该小于等于Queue数量,如果Consumer超过Queue数量,那么多余的Consumer 将不能消费消息。
这一点和Kafka是一致的,RocketMQ会尽可能地为每一个Consumer分配相同数量的队列,分摊负载。
ActiveMQ
支持负载均衡。可以基于zookeeper实现负载均衡。
维度7:集群方式
1)Kafka:
天然的'Leader-Slave'无状态集群,每台服务器既是Master也是Slave。
分区首领均匀地分布在不同的Kafka服务器上,分区副本也均匀地分布在不同的Kafka服务器上,所以每一台Kafka服务器既含有分区首领,同时又含有分区副本。每一台Kafka服务器是某一台Kafka服务器的Slave,同时也是某一台Kafka服务器的leader。
Kafka的集群依赖于zookeeper,zookeeper支持热扩展,所有的broker、消费者、分区都可以动态加入移除,而无需关闭服务,与不依靠zookeeper集群的mq相比,这是最大的优势。
2)RabbitMQ:
支持简单集群,'复制'模式,对高级集群模式支持不好。
RabbitMQ的每一个节点,不管是单一节点系统或者是集群中的一部分,要么是内存节点,要么是磁盘节点,集群中至少要有一个是磁盘节点。
在RabbitMQ集群中创建队列,集群只会在单个节点创建队列进程和完整的队列信息(元数据、状态、内容),而不是在所有节点上创建。引入镜像队列,可以避免单点故障,确保服务的可用性,但是需要人为地为某些重要的队列配置镜像。
3)ZeroMQ:
去中心化,不支持集群。
4)RocketMQ:
常用 多对'Master-Slave' 模式,开源版本需手动切换Slave变成Master。
Name Server是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master。
Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。
Master也可以部署多个。每个Broker与Name Server集群中的所有节点建立长连接,定时注册Topic信息到所有Name Server。
Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。
Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。
客户端先找到NameServer,然后通过NameServer再找到 Broker。一个topic有多个队列,这些队列会均匀地分布在不同的broker服务器上。RocketMQ队列的概念和Kafka的分区概念是基本一致的,Kafka同一个topic的分区尽可能地分布在不同的broker上,分区副本也会分布在不同的broker上。RocketMQ集群的slave会从master拉取数据备份,master分布在不同的broker上。
5)ActiveMQ:
支持简单集群模式,比如'主-备',对高级集群模式支持不好。
维度8:管理界面
1)Kafka:一般
2)RabbitMQ:好
3)ZeroMQ:无
4)RocketMQ:有管理后台, 但不是项目里自带, 需要自己启动一个单独的管理后台实例
5)ActiveMQ:一般
维度9:可用性
1)Kafka:非常高(分布式)
2)RabbitMQ:高(主从)
3)ZeroMQ:高
4)RocketMQ:非常高(分布式)
5)ActiveMQ:高(主从)
维度10:消息重复
1)Kafka:支持at least once、at most once
2)RabbitMQ:支持at least once、at most once
3)ZeroMQ:只有重传机制,但是没有持久化,消息丢了重传也没有用。既不是at least once、也不是at most once、更不是exactly only once
4)RocketMQ:支持at least once
5)ActiveMQ:支持at least once
维度11:吞吐量TPS
1)Kafka:极大。Kafka按批次发送消息和消费消息。发送端将多个小消息合并,批量发向Broker,消费端每次取出一个批次的消息批量处理。
2)RabbitMQ:比较大
3)ZeroMQ:极大
4)RocketMQ:大。RocketMQ接收端可以批量消费消息,可以配置每次消费的消息数,但是发送端不是批量发送。
5)ActiveMQ:比较大
维度12:订阅形式和消息分发
Kafka
基于topic以及按照topic进行正则匹配的发布订阅模式。
1)发送:
发送端由topic和key来决定消息发往哪个分区,如果key为null,那么会使用轮询算法将消息均衡地发送到同一个topic的不同分区中。如果key不为null,那么会根据key的hashcode取模计算出要发往的分区。
2)接收:
consumer向群组协调器broker发送心跳来维持他们和群组的从属关系以及他们对分区的所有权关系,所有权关系一旦被分配就不会改变除非发生再均衡(比如有一个consumer加入或者离开consumer group),consumer只会从对应的分区读取消息。
Kafka限制consumer个数要少于分区个数,每个消息只会被同一个 Consumer Group的一个consumer消费(非广播)。
Kafka的 Consumer Group订阅同一个topic,会尽可能地使得每一个consumer分配到相同数量的分区,不同 Consumer Group订阅同一个主题相互独立,同一个消息会被不同的 Consumer Group处理。
RabbitMQ
提供了4种:direct、topic、Headers和fanout。
1)发送:
先要声明一个队列,这个队列会被创建或者已经被创建,队列是基本存储单元。
由exchange和key决定消息存储在哪个队列。
direct>发送到和bindingKey完全匹配的队列。
topic>路由key是含有"."的字符串,会发送到含有“*”、“#”进行模糊匹配的bingKey对应的队列。
fanout>与key无关,会发送到所有和exchange绑定的队列
headers>与key无关,消息内容的headers属性(一个键值对)和绑定键值对完全匹配时,会发送到此队列。此方式性能低一般不用
2)接收:
RabbitMQ的队列是基本存储单元,不再被分区或者分片,对于我们已经创建了的队列,消费端要指定从哪一个队列接收消息。当RabbitMQ队列拥有多个消费者的时候,队列收到的消息将以轮询的分发方式发送给消费者。每条消息只会发送给订阅列表里的一个消费者,不会重复。这种方式非常适合扩展,而且是专门为并发程序设计的。如果某些消费者的任务比较繁重,那么可以设置basicQos限制信道上消费者能保持的最大未确认消息的数量,在达到上限时,RabbitMQ不再向这个消费者发送任何消息。
ZeroMQ
点对点(p2p)。
RocketMQ
基于topic/messageTag以及按照消息类型、属性进行正则匹配的发布订阅模式。
1)发送:
发送消息通过轮询队列的方式发送,每个队列接收平均的消息量。发送消息指定topic、tags、keys,无法指定投递到哪个队列(没有意义,集群消费和广播消费跟消息存放在哪个队列没有关系)。
tags选填,类似于 Gmail 为每封邮件设置的标签,方便服务器过滤使用。目前只支 持每个消息设置一个 tag,所以也可以类比为 Notify 的 MessageType 概念。
keys选填,代表这条消息的业务关键词,服务器会根据 keys 创建哈希索引,设置后, 可以在 Console 系统根据 Topic、Keys 来查询消息,由于是哈希索引,请尽可能 保证 key 唯一,例如订单号,商品 Id 等。
2)接收:
广播消费。一条消息被多个Consumer消费,即使Consumer属于同一个ConsumerGroup,消息也会被ConsumerGroup中的每个Consumer都消费一次。
集群消费。一个 Consumer Group中的Consumer实例平均分摊消费消息。例如某个Topic有 9 条消息,其中一个Consumer Group有3个实例,那么每个实例只消费其中的 3 条消息。即每一个队列都把消息轮流分发给每个consumer。
ActiveMQ
点对点(p2p)、广播(发布-订阅)。
点对点模式,每个消息只有1个消费者;发布/订阅模式,每个消息可以有多个消费者。
1)发送:
点对点模式:先要指定一个队列,这个队列会被创建或者已经被创建。
发布/订阅模式:先要指定一个topic,这个topic会被创建或者已经被创建。
2)接收:
点对点模式:对于已经创建了的队列,消费端要指定从哪一个队列接收消息。
发布/订阅模式:对于已经创建了的topic,消费端要指定订阅哪一个topic的消息。
维度13:顺序消息
Kafka:支持。设置生产者的max.in.flight.requests.per.connection为1,可以保证消息是按照发送顺序写入服务器的,即使发生了重试。Kafka保证同一个分区里的消息是有序的,但是这种有序分两种情况:
①key为null,消息逐个被写入不同主机的分区中,但是对于每个分区依然是有序的
②key不为null , 消息被写入到同一个分区,这个分区的消息都是有序。
RabbitMQ:不支持
ZeroMQ:不支持
RocketMQ:支持
ActiveMQ:不支持
维度14:消息确认
Kafka
支持。
1)发送方确认机制:
ack=0,不管消息是否成功写入分区
ack=1,消息成功写入首领分区后,返回成功
ack=all,消息成功写入所有分区后,返回成功。
2)接收方确认机制:
自动或者手动提交分区偏移量,早期版本的Kafka偏移量是提交给Zookeeper的,这样使得zookeeper的压力比较大,更新版本的Kafka的偏移量是提交给Kafka服务器的,不再依赖于zookeeper群组,集群的性能更加稳定。
RabbitMQ
支持。
1)发送方确认机制,消息被投递到所有匹配的队列后,返回成功。如果消息和队列是可持久化的,那么在写入磁盘后,返回成功。支持批量确认和异步确认。
2)接收方确认机制,设置autoAck为false,需要显式确认,设置autoAck为true,自动确认。
当autoAck为false的时候,RabbitMQ队列会分成两部分,一部分是等待投递给consumer的消息,一部分是已经投递但是没收到确认的消息。
如果一直没有收到确认信号,并且consumer已经断开连接,RabbitMQ会安排这个消息重新进入队列,投递给原来的消费者或者下一个消费者。
未确认的消息不会有过期时间,如果一直没有确认,并且没有断开连接,RabbitMQ会一直等待,RabbitMQ允许一条消息处理的时间可以很久很久。
ZeroMQ
支持。
RocketMQ
支持。
ActiveMQ
支持。
维度15:消息回溯
1)Kafka:支持指定分区offset位置的回溯
2)RabbitMQ:不支持
3)ZeroMQ:不支持
4)RocketMQ:支持指定时间点的回溯
5)ActiveMQ:不支持
维度16:消息重试
Kafka
不支持,但是可以实现。Kafka支持指定分区offset位置的回溯,可以实现消息重试。
RabbitMQ
不支持,但是可以利用消息确认机制实现。
RabbitMQ接收方确认机制,设置autoAck为false。
当autoAck为false的时候,RabbitMQ队列会分成两部分,一部分是等待投递给consumer的消息,一部分是已经投递但是没收到确认的消息。
如果一直没有收到确认信号,并且consumer已经断开连接,RabbitMQ会安排这个消息重新进入队列,投递给原来的消费者或者下一个消费者。
ZeroMQ
不支持。
RocketMQ
支持。
消息消费失败的大部分场景下,立即重试99%都会失败,所以RocketMQ的策略是在消费失败时定时重试,每次时间间隔相同。
1)发送端的 send 方法本身支持内部重试,重试逻辑如下:
至多重试3次;如果发送失败,则轮转到下一个broker;这个方法的总耗时不超过sendMsgTimeout 设置的值,默认 10s,超过时间不在重试。
2)接收端:
Consumer 消费消息失败后,要提供一种重试机制,令消息再消费一次。Consumer 消费消息失败通常可以分为以下两种情况:
由于消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。定时重试机制,比如过 10s 秒后再重试。
由于依赖的下游应用服务不可用,例如 db 连接不可用,外系统网络不可达等。即使跳过当前失败的消息,消费其他消息同样也会报错。这种情况可以 sleep 30s,再消费下一条消息,减轻 Broker 重试消息的压力。
ActiveMQ
不支持。
维度17:并发度
Kafka
并发度高。一个线程一个消费者,Kafka限制消费者的个数要小于等于分区数,如果要提高并行度,可以在消费者中再开启多线程,或者增加consumer实例数量。
RabbitMQ
并发度极高。本身是用Erlang语言写的,并发性能高。可在消费者中开启多线程,最常用的做法是一个channel对应一个消费者,每一个线程把持一个channel,多个线程复用connection的tcp连接,减少性能开销。
当RabbitMQ队列拥有多个消费者的时候,队列收到的消息将以轮询的分发方式发送给消费者。每条消息只会发送给订阅列表里的一个消费者,不会重复。这种方式非常适合扩展,而且是专门为并发程序设计的。如果某些消费者的任务比较繁重,那么可以设置basicQos限制信道上消费者能保持的最大未确认消息的数量,在达到上限时,RabbitMQ不再向这个消费者发送任何消息。
ZeroMQ
并发度高。
RocketMQ
并发度高。
1)RocketMQ限制消费者的个数少于等于队列数,但是可以在消费者中再开启多线程,这一点和Kafka是一致的,提高并行度的方法相同。修改消费并行度方法:
同一个 ConsumerGroup 下,通过增加 Consumer 实例数量来提高并行度,超过订阅队列数的 Consumer实例无效。提高单个 Consumer 的消费并行线程,通过修改参数consumeThreadMin、consumeThreadMax
2)同一个网络连接connection,客户端多个线程可以同时发送请求,连接会被复用,减少性能开销。
ActiveMQ
并发度高。
单个ActiveMQ的接收和消费消息的速度在1万笔/秒(持久化 一般为1-2万, 非持久化 2 万以上),在生产环境中部署10个ActiveMQ就能达到10万笔/秒以上的性能,部署越多的ActiveMQ broker 在MQ上latency也就越低,系统吞吐量也就越高。