使用原生 MQTT 为数百万客户端提供服务
RabbitMQ 的核心协议一直是 AMQP 0.9.1。为了支持 MQTT、STOMP 和 AMQP 1.0,代理通过其核心协议进行透明代理。虽然这是扩展 RabbitMQ 以支持更多消息协议的简单方法,但它会降低可伸缩性和性能。
在过去 9 个月里,我们重写了 MQTT 插件,使其不再通过 AMQP 0.9.1 进行代理。取而代之的是,MQTT 插件会解析 MQTT 消息并直接将其发送到队列。这就是我们称之为原生 MQTT 的方式。
结果令人瞩目
- 内存使用量下降高达 95%,在数百万连接的情况下节省数百 GB 内存。
- RabbitMQ 首次能够处理数百万个连接。
- 端到端延迟下降 50% - 70%。
- 吞吐量提高 30% - 40%。
原生 MQTT 将 RabbitMQ 转变为一个 MQTT 代理,为更广泛的物联网用例打开了大门。
原生 MQTT 将在 RabbitMQ 3.12 中发布。
概述
如图 1 所示,在 RabbitMQ 3.11 及更早版本中,MQTT 插件通过解析 MQTT 消息并经由 AMQP 0.9.1 协议将其转发到 通道,然后通道将消息路由到 队列。图 1 中的每个蓝色圆点代表一个 Erlang 进程。每个传入的 MQTT 连接会创建总共 22 个 Erlang 进程。
在 MQTT 插件(即 Erlang 应用程序 rabbitmq_mqtt)中,每个 MQTT 连接会创建 16 个 Erlang 进程,包括一个负责 MQTT 心跳 的进程,以及一群充当 AMQP 0.9.1 客户端的进程。
在 Erlang 应用程序 rabbit 中,每个 MQTT 连接会创建 6 个 Erlang 进程。它们实现了核心 AMQP 0.9.1 服务器的每个客户端部分。
图 2 显示,原生 MQTT 每个传入的 MQTT 连接只需要一个 Erlang 进程。
这一个 Erlang 进程负责解析 MQTT 消息、遵守 MQTT 心跳、执行 身份验证和授权,并将消息路由到队列。
典型的 MQTT 工作负载包括许多物联网 (IoT) 设备定期向 MQTT 代理发送数据。例如,可能有数十万甚至数百万个设备每隔几秒钟或几分钟发送一次状态更新。
我们在去年的 RabbitMQ Summit 演讲 《RabbitMQ 性能改进》 中了解到,创建 Erlang 进程是廉价的。(Erlang 进程比 Java 线程轻量得多。Erlang 进程可以与 Golang 中的 Goroutine 相提并论。)然而,对于 100 万个传入的 MQTT 客户端连接,RabbitMQ 是创建 2200 万个 Erlang 进程(RabbitMQ 3.11 及以前)还是仅创建 100 万个 Erlang 进程(RabbitMQ 3.12),在可伸缩性方面会产生巨大差异。这种差异在“内存使用量”和“延迟和吞吐量”部分进行了分析。
不仅重写了 rabbitmq_mqtt 插件,还重写了 rabbitmq_web_mqtt 插件。这意味着,从 3.12 开始,每个传入的 MQTT over WebSocket 连接也将只占用一个 Erlang 进程。因此,本文博客中概述的所有性能改进同样适用于 MQTT over WebSockets。
“原生 MQTT”一词不是 MQTT 规范的官方术语。“原生 MQTT”指的是 RabbitMQ 3.12 中的新实现,其中 RabbitMQ “原生”支持 MQTT,即 MQTT 流量不再通过 AMQP 0.9.1 进行代理。
新的 MQTT QoS 0 队列类型
原生 MQTT 附带了一个名为 rabbit_mqtt_qos0_queue 的新 RabbitMQ 队列类型。要让 MQTT 插件创建该新队列类型的队列,必须启用同名的 3.12 功能标志 rabbit_mqtt_qos0_queue。(请记住,功能标志并非用于集群配置。成功进行滚动升级后,应启用所有功能标志。每个功能标志将在未来的 RabbitMQ 版本中变为强制性。)
在解释新队列类型之前,我们应该先理解队列与 MQTT 订阅者之间的关系。
在所有 RabbitMQ 版本中,MQTT 插件都会为每个 MQTT 订阅者创建一个专用队列。更确切地说,每个 MQTT 连接可能有 0、1 或 2 个队列。
- 如果 MQTT 客户端从未发送 SUBSCRIBE 数据包,则该 MQTT 连接没有队列。MQTT 客户端只负责发布消息。
- 如果 MQTT 客户端使用相同的 服务质量 (QoS) 级别 创建一个或多个订阅,则该 MQTT 连接有一个队列。
- 如果 MQTT 客户端使用 QoS 0(最多一次)和 QoS 1(至少一次)创建一个或多个订阅,则该 MQTT 连接有两个队列。
在列出队列时,您会看到队列命名模式 mqtt-subscription-<MQTT 客户端 ID>qos[0|1],其中 <MQTT 客户端 ID> 是 MQTT 客户端标识符,而 [0|1] 要么是 0(用于 QoS 0 订阅),要么是 1(用于 QoS 1 订阅)。为每个 MQTT 订阅者设置单独的队列是有意义的,因为每个 MQTT 订阅者都会接收应用程序消息的独立副本。
默认情况下,MQTT 插件会创建经典队列。
MQTT 插件会为 MQTT 订阅客户端透明地创建队列。MQTT 规范没有定义队列的概念,MQTT 客户端也不知道这些队列的存在。队列是 RabbitMQ 实现 MQTT 协议的实现细节。
图 3 显示了一个 MQTT 订阅者,它以 CleanSession=1 连接并以 QoS 0 进行订阅。干净会话意味着 MQTT 会话仅在客户端和服务器之间的网络连接持续期间有效。当会话结束时,服务器中的所有会话状态都会被删除,这意味着队列会被自动删除。
如图 3 所示,每个经典队列会产生两个额外的 Erlang 进程:一个 监督者进程和一个工作进程。
新队列类型优化工作如下:如果
- 功能标志
rabbit_mqtt_qos0_queue已启用,并且 - MQTT 客户端以
CleanSession=1连接,并且 - MQTT 客户端以 QoS 0 进行订阅
那么,MQTT 插件将创建 rabbit_mqtt_qos0_queue 类型的队列,而不是经典队列。
图 4 显示,新队列类型是一个“伪”队列或“虚拟”队列:它与您熟悉的队列类型(经典队列、法定数量队列和流)非常不同,因为这种新队列类型既不是独立的 Erlang 进程,也不在磁盘上存储消息。相反,这种队列类型是 Erlang 进程邮箱的一部分。MQTT 消息直接发送到订阅客户端的 MQTT 连接进程。换句话说,MQTT 消息被发送到任何“在线”的 MQTT 订阅者。
更准确地说,可以认为队列被跳过了(正如 RabbitMQ Summit 2022 演讲 这个幻灯片 中最后一行 no queue at all? 🤔 所指出的)。将消息直接发送到 MQTT 连接进程是通过队列类型实现的,目的是简化消息路由和协议互操作性,这样消息不仅可以从 MQTT 发布连接进程发送,也可以从通道进程发送。后者使得消息可以从 AMQP 0.9.1、AMQP 1.0 或 STOMP 客户端直接发送到 MQTT 订阅者连接进程,跳过专用队列进程。
现在我们明白了这种新队列类型会跳过队列进程。然而,这对 MQTT 有什么好处呢?MQTT 工作负载的特点是许多设备与代理进行数据通信。以下是新队列类型提供优化的四个原因(按重要性递减排序):
优势 1:大规模扇出
实现大规模扇出,将消息从“云”(MQTT 代理)发送到所有设备。
对于经典队列和法定数量队列,每个队列客户端(即通道进程或 MQTT 连接进程)会保留所有目标队列的状态,以实现流量控制。例如,通道进程在其 进程字典 中保存来自每个目标队列的信用额度。 (阅读我们的博客文章,了解基于信用的流量控制,以获取更多信息。)
- 如果有一个通道进程发送给 300 万个 MQTT 设备,则通道进程字典中会保留 300 万条条目(数百 MB 内存)。
- 如果有 100 个通道进程,每个进程发送给 300 万个设备,则进程字典中总共有 3 亿条条目(最好不要尝试)。
- 如果存在数千个通道或 MQTT 连接进程,每个进程向 300 万个设备发送消息,RabbitMQ 将耗尽内存并崩溃。
即使这些大规模扇出事件发生得极其罕见,例如每天一次,发送 Erlang 进程到所有目标队列的状态始终保存在内存中(直到目标队列被删除)。
新队列类型最重要的特点是其队列类型客户端是无状态的。这意味着 MQTT 连接或通道进程可以向 300 万个 MQTT 连接进程发送消息(“即时发送并忘记”)(这仍然需要临时大量的内存),而无需保留到目标队列的任何状态。一旦垃圾回收启动,队列客户端进程的内存使用量将降至 0 MB。
优势 2:更低的内存使用量
不仅对于大规模扇出,而且在 1:1 拓扑(其中每个发布者仅发送给一个订阅者)中,新队列类型 rabbit_mqtt_qos0_queue 通过跳过队列进程节省了大量的内存。
即使在 3.12 版本中使用原生 MQTT,并且 rabbit_mqtt_qos0_queue 功能标志已禁用,300 万个订阅 QoS 0 的 MQTT 设备仍会导致创建 900 万个 Erlang 进程。如果启用 rabbit_mqtt_qos0_queue 功能标志,相同的负载仅会导致创建 300 万个 Erlang 进程,因为每个 MQTT 订阅者额外的队列监督者和队列工作进程被跳过,从而节省了几 GB 的进程内存。
优势 3:更低的发布者确认延迟
尽管 MQTT 客户端订阅了 QoS 0,但另一个 MQTT 客户端仍然可以发送 QoS 1 消息(或者等效地,来自 AMQP 0.9.1 客户端的通道可以置于确认模式)。在这种情况下,发布客户端需要代理的发布者确认。
新队列类型的客户端(MQTT 发布者连接进程或通道进程的一部分)会直接代表“队列服务器进程”自动确认,因为最多一次的 QoS 0 消息在从代理到 MQTT 订阅者的过程中可能会丢失。这导致发布者确认延迟降低。
在 RabbitMQ 中,只有当 **所有** 目标队列都确认收到消息后,才会将消息确认回发布客户端。因此,更重要的是,发布进程只等待可能至少有一个消费者的队列的确认。使用新队列类型,在一条消息路由到一个重要的法定数量队列以及一个百万个 MQTT QoS 0 订阅者,同时其中一个 MQTT 连接进程过载(因此响应非常缓慢)的情况下,发布进程不会被阻塞而无法将确认发送回发布客户端。
优势 4:更低的端到端延迟
由于跳过了队列进程,减少了一个消息跳数,从而降低了端到端延迟。
过载保护
由于新队列类型没有流量控制,MQTT 消息可能会比 MQTT 连接进程发送到 MQTT 订阅客户端的速度更快地到达 MQTT 连接进程的邮箱。当 MQTT 订阅客户端与 RabbitMQ 之间的网络连接较差,或在大型扇入场景(许多发布者使单个 MQTT 订阅客户端过载)中,这种情况可能会发生。
为了防止由于 MQTT QoS 0 消息在 MQTT 连接进程邮箱中堆积而导致的内存使用量过高,如果同时满足以下两个条件,RabbitMQ 会故意丢弃 rabbit_mqtt_qos0_queue 中的 QoS 0 消息:
- MQTT 连接进程邮箱中的消息数量超过设置
mqtt.mailbox_soft_limit(默认为 200),并且 - 发送到 MQTT 客户端的套接字繁忙(发送速度不够快)。
请注意,进程邮箱中可能还有其他消息(例如,从 MQTT 订阅客户端发送到 RabbitMQ 的应用程序消息,或来自其他队列类型到 MQTT 连接进程的确认),这些消息显然不会被丢弃。然而,这些其他消息也计入 mqtt.mailbox_soft_limit。
将 mqtt.mailbox_soft_limit 设置为 0 会禁用过载保护机制,这意味着 QoS 0 消息永远不会被 RabbitMQ 故意丢弃。将 mqtt.mailbox_soft_limit 设置为非常高的值会降低有意丢弃 QoS 0 消息的可能性,同时增加了导致集群范围 内存警报 的风险(尤其是在消息负载很大或有许多类型为 rabbit_mqtt_qos0_queue 的过载队列时)。
mqtt.mailbox_soft_limit 可以被视为一个 队列长度限制,尽管不完全精确,因为如前所述,Erlang 进程邮箱可能包含比 MQTT 应用消息更多的其他消息。这就是为什么配置键 mqtt.mailbox_soft_limit 包含“soft”一词。描述的过载保护机制大致对应于您已知的经典队列和法定数量队列的溢出行为 drop-head。
队列类型命名
不要依赖这种新队列类型的名称。功能标志 rabbit_mqtt_qos0_queue 的名称不会改变。然而,如果我们将来决定将此设计的部分内容重用于其他 RabbitMQ 用例(例如 直接回复),我们可能会更改队列类型 rabbit_mqtt_qos0_queue 的名称。
事实上,MQTT 客户端应用程序和 RabbitMQ 核心(Erlang 应用程序 rabbit)都不知道这种新队列类型。终端用户也不会意识到这种新队列类型,除非在管理界面或通过 rabbitmqctl 列出队列时。
鉴于我们现在已经了解了原生 MQTT 的架构以及 MQTT QoS 0 队列类型的意图,我们可以继续进行性能基准测试。
内存使用量
本节比较了 RabbitMQ 3.11 中的 MQTT 和 RabbitMQ 3.12 中的原生 MQTT 的内存使用量。完整的测试设置可以在 ansd/rabbitmq-mqtt 中找到。
本节中的两个测试是针对一个 3 节点集群进行的,具有以下 RabbitMQ 配置子集:
mqtt.tcp_listen_options.sndbuf = 1024
mqtt.tcp_listen_options.recbuf = 1024
mqtt.tcp_listen_options.buffer = 1024
management_agent.disable_metrics_collector = true
TCP 缓冲区大小配置得很小,不会导致高二进制内存使用量。
在 rabbitmq_management 插件中禁用了指标收集。对于生产用例,应使用 Prometheus。rabbitmq_management 插件并非为处理大量统计数据发出对象(如队列和连接)而设计。
100 万个 MQTT 连接
第一个测试使用 1,000,000 个 MQTT 连接,这些连接在连接建立后仅发送 MQTT 心跳。不发送或接收 MQTT 应用消息。
如图 5 所示,在 3.11 中,3 节点集群需要 108.0 + 100.7 + 92.4 = 301.1 GiB 内存,而在 3.12 中仅需 6.1 + 6.3 + 6.3 = 18.7 GiB 内存。因此,3.11 比 3.12 需要多 16 倍(或 282 GiB)的内存。原生 MQTT 将内存使用量减少了 94%。

3.11 版本中导致内存使用量最大的部分是**进程**内存。如“概述”部分所述,3.12 版本中的原生 MQTT 每个 MQTT 连接使用 1 个 Erlang 进程,而 3.11 版本每个 MQTT 连接使用 22 个 Erlang 进程。其中一些 3.11 进程会保留大量状态,导致内存使用量过高。
原生 MQTT 的低内存使用量不仅通过使用单个 Erlang 进程实现,还通过减少该单个 Erlang 进程的状态实现。PR #5895 中实现了许多内存优化,例如从进程状态中删除长列表和不必要的函数引用。
在开发环境中的进一步测试表明,对于 3 节点集群中的 900 万个 MQTT 连接,原生 MQTT 每个节点大约需要 56 GB 内存。
10 万发布者,10 万订阅者
第二个测试使用 100,000 个发布者和 100,000 个订阅者。它们以 1:1 拓扑进行发送和接收,这意味着每个发布者每 2 分钟向一个 QoS 0 订阅者发送一个 QoS 0 应用消息。
如图 6 所示,在 3.11 中,3 节点集群需要 21.6 + 21.5 + 21.7 = 64.8 GiB 内存,而在 3.12 中仅需 2.6 + 2.6 + 2.6 = 7.8 GiB 内存。

在开发环境中的进一步测试表明,对于以下场景,3 节点集群中的原生 MQTT 每个节点大约需要 47 GB 内存:
- 总共 300 万个 MQTT 连接
- 1:1 拓扑
- 150 万个发布者每 3 分钟发送一次 64 字节有效负载的 QoS 1 消息
- 150 万个 QoS 1 订阅者(即 150 万个经典队列 版本 2)
在 RabbitMQ 3.11.6 中发布的 PR #6684 大幅降低了许多经典队列的内存使用量,这有利于许多 MQTT QoS 1 或 CleanSession=0 订阅者用例。
延迟和吞吐量
为了比较 3.11 中的 MQTT 和 3.12 中的原生 MQTT 之间的延迟,我们使用了 mqtt-bm-latency。
以下延迟基准测试是在一台物理 Ubuntu 22.04 机器上执行的,该机器具有 8 个 CPU 和 32 GB RAM。客户端和单节点 RabbitMQ 服务器运行在同一台机器上。
在 rabbitmq-server 的根目录下,使用 4 个调度器线程启动服务器。
make run-broker PLUGINS="rabbitmq_mqtt" RABBITMQ_CONFIG_FILE="rabbitmq.conf" RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 4"
服务器使用以下 rabbitmq.conf 运行:
mqtt.mailbox_soft_limit = 0
mqtt.tcp_listen_options.nodelay = true
mqtt.tcp_listen_options.backlog = 128
mqtt.tcp_listen_options.sndbuf = 87380
mqtt.tcp_listen_options.recbuf = 87380
mqtt.tcp_listen_options.buffer = 87380
classic_queue.default_version = 2
第一行仅在 RabbitMQ 3.12 中使用,并禁用过载保护,以确保所有 QoS 0 消息都传递给订阅者。
3.11 基准测试使用 Git 标签 v3.11.10。3.12 基准测试使用 Git 标签 v3.12.0-beta.2。
延迟和吞吐量 QoS 1
第一个基准测试比较了发送到 QoS 1 订阅者的 QoS 1 消息的延迟和吞吐量。
./mqtt-bm-latency -clients 100 -count 10000 -pubqos 1 -subqos 1 -size 100 -keepalive 120 -topic t
此基准测试使用 100 个 MQTT 客户端“对”以 1:1 拓扑进行发送。换句话说,总共有 200 个 MQTT 客户端:100 个发布者和 100 个订阅者。每个发布者向单个订阅者发送 10,000 条消息。
所有 MQTT 客户端并发运行。然而,应用程序消息是从发布客户端同步发送到 RabbitMQ 的:每个发布者发送一个 QoS 1 消息,并在收到服务器的 PUBACK 之前,不会发送下一条消息。
订阅客户端还会为收到的每个 PUBLISH 数据包回复 PUBACK 数据包。
上面命令中的主题 t 只是一个主题前缀。每个发布客户端通过在其后附加其索引来发布到不同的主题(例如,第一个发布者发布到主题 t-0,第二个发布者发布到 t-1,依此类推)。
3.11 的结果
================= TOTAL PUBLISHER (100) =================
Total Publish Success Ratio: 100.000% (1000000/1000000)
Total Runtime (sec): 75.835
Average Runtime (sec): 75.488
Pub time min (ms): 0.167
Pub time max (ms): 101.331
Pub time mean mean (ms): 7.532
Pub time mean std (ms): 0.037
Average Bandwidth (msg/sec): 132.475
Total Bandwidth (msg/sec): 13247.470
================= TOTAL SUBSCRIBER (100) =================
Total Forward Success Ratio: 100.000% (1000000/1000000)
Forward latency min (ms): 0.177
Forward latency max (ms): 94.045
Forward latency mean std (ms): 0.032
Total Mean forward latency (ms): 6.737
3.12 的结果
================= TOTAL PUBLISHER (100) =================
Total Publish Success Ratio: 100.000% (1000000/1000000)
Total Runtime (sec): 55.955
Average Runtime (sec): 55.570
Pub time min (ms): 0.175
Pub time max (ms): 96.725
Pub time mean mean (ms): 5.550
Pub time mean std (ms): 0.031
Average Bandwidth (msg/sec): 179.959
Total Bandwidth (msg/sec): 17995.888
================= TOTAL SUBSCRIBER (100) =================
Total Forward Success Ratio: 100.000% (1000000/1000000)
Forward latency min (ms): 0.108
Forward latency max (ms): 51.421
Forward latency mean std (ms): 0.028
Total Mean forward latency (ms): 2.880
Total Publish Success Ratio 和 Total Forward Success Ratio 验证了总共 100 个发布者 * 10,000 条消息 = 1,000,000 条消息 被 RabbitMQ 处理。
Total Runtime 和 Average Runtime 显示了第一个巨大的性能差异:使用原生 MQTT,每个发布者在 55 秒内收到服务器发送的所有 10,000 个发布者确认(PUBACK)。3.11 中的 MQTT 需要 75 秒。
Pub time mean mean 表明,平均发布者确认延迟从 3.11 的 7.532 毫秒下降了 1.982 毫秒(26%),在 3.12 中为 5.550 毫秒。
如图 7 所示,100 个并发客户端(每个客户端同步向 RabbitMQ 发布 MQTT QoS 1 消息)的吞吐量从 3.11 的每秒 13,247 条消息增加了 4,748 条消息(36%),达到 3.12 的每秒 17,995 条消息。
forward latency 统计数据表示端到端延迟,即从发布消息到在客户端接收消息所需的时间。它通过让发布者在每条消息的有效负载中包含时间戳来测量。
图 8 说明,Total mean forward latency 从 3.11 的 6.737 毫秒下降了 3.857 毫秒(57%),在 3.12 中为 2.880 毫秒。
延迟 QoS 0
第二个延迟基准测试与第一个非常相似,但使用了 QoS 0 进行发布消息和 QoS 0 进行订阅。
./mqtt-bm-latency -clients 100 -count 10000 -pubqos 0 -subqos 0 -size 100 -keepalive 120 -topic t
由于客户端以 CleanSession=1 连接,RabbitMQ 为每个订阅者创建了一个 rabbit_mqtt_qos0_queue。因此,队列进程被跳过。
此处省略了 3.11 和 3.12 的发布者结果,因为它们非常低(Pub time mean mean 为 5 微秒)。发布客户端一次性将所有 100 万条消息发送(“即时发送并忘记”)到服务器,而不等待 PUBACK 响应。
3.11 的结果
================= TOTAL SUBSCRIBER (100) =================
Total Forward Success Ratio: 100.000% (1000000/1000000)
Forward latency min (ms): 3.907
Forward latency max (ms): 12855.600
Forward latency mean std (ms): 883.343
Total Mean forward latency (ms): 7260.071
3.12 的结果
================= TOTAL SUBSCRIBER (100) =================
Total Forward Success Ratio: 100.000% (1000000/1000000)
Forward latency min (ms): 0.461
Forward latency max (ms): 5003.936
Forward latency mean std (ms): 596.133
Total Mean forward latency (ms): 2426.867
Total mean forward latency 从 3.11 的 7,260.071 毫秒下降了 4,833.204 毫秒(66%),在 3.12 中为 2,426.867 毫秒。
与 QoS 1 基准测试相比,7.2 秒(3.11)和 2.4 秒(3.12)的端到端延迟非常高,因为代理暂时被所有 100 万条 MQTT 消息淹没。
禁用信用流(通过在 advanced.config 中设置 {credit_flow_default_credit, {0, 0}})并不能提高 3.11 的延迟。
留给感兴趣的读者一项练习是运行相同的基准测试,但针对禁用了 rabbit_mqtt_qos0_queue 功能标志的 3.12 版本,以查看跳过队列进程所带来的延迟差异。
原生 MQTT 在 3.12 中还有哪些改进?
以下列表包含 3.12 版本原生 MQTT 的一些杂项更改:
- 实现了 MQTT 3.1.1 的一个功能,允许 SUBACK 数据包包含一个失败返回码 (0x80)。
- 移除了 AMQP 0.9.1 头部
x-mqtt-dup,因为其在 MQTT 插件中的用法截至 3.11 都是错误的。如果您的 AMQP 0.9.1 客户端依赖于此头部,这将是一个破坏性更改。 - MQTT 插件创建的所有队列都是持久化的。这样做是为了便于将来迁移到 Khepri。
- MQTT 插件为干净会话创建独占队列(而不是自动删除队列)。
- MQTT 客户端 ID 跟踪(如果新客户端使用相同的客户端 ID 连接,则终止现有的 MQTT 连接)已在 pg 中实现。以前用于跟踪 MQTT 客户端 ID 的 Ra 集群在启用功能标志
delete_ra_cluster_mqtt_node时将被删除。旧的 Ra 集群需要大量内存,并在一次性断开过多客户端时成为瓶颈。 - Prometheus 指标使用 全局计数器 实现。协议标签的值为
mqtt310或mqtt311。此类指标的一个示例是:rabbitmq_global_messages_routed_total{protocol="mqtt311"} 10。 - MQTT 解析器得到了优化。
- 从未发布过应用消息的 MQTT 客户端在 内存和磁盘警报期间不会被阻塞,以便订阅者可以继续清空队列。
我应该将 RabbitMQ 用作 MQTT 代理吗?
原生 MQTT 实现了许多新用例,因为它允许更多的物联网设备连接到 RabbitMQ。用例范围非常广泛。
RabbitMQ 只是市场上的一个 MQTT 代理。市面上还有其他不错的 MQTT 代理,有些甚至能处理比 RabbitMQ 更多的 MQTT 客户端连接,因为其他代理专门针对 MQTT。
RabbitMQ 的优势在于它不仅仅是一个 MQTT 代理。它是一个通用的多协议代理。RabbitMQ 的真正强大之处在于协议互操作性,以及灵活的路由和队列类型的选择。
举个例子:作为一家支付处理公司,您可以将分布在全球的数十万甚至数百万台现金终端连接到一个中央 RabbitMQ 集群,每个现金终端每隔几分钟向 RabbitMQ 发送 MQTT QoS 1 消息(每当客户进行支付时)。
这些包含支付数据的 MQTT 消息对您的业务至关重要。在任何情况下(节点故障、磁盘故障、网络分区),都不能丢失这些消息。它们还需要被一个或多个微服务处理。
一种解决方案是将几个法定数量队列绑定到主题交换机,其中每个法定数量队列使用不同的路由键绑定,从而将负载分散到各个队列。每个法定数量队列在 3 个或 5 个 RabbitMQ 节点之间复制,通过使用底层的 Raft 共识算法来提供高数据安全性。每个法定数量队列中的消息可以被不同的微服务使用比 MQTT 更专业的报文协议(如 AMQP 0.9.1 或 AMQP 1.0)进行处理。
或者,MQTT 插件可以将 MQTT 支付消息转发到一个在多个 RabbitMQ 节点之间复制的超级流。其他客户端可以使用 RabbitMQ Streams 协议RabbitMQ Streams 协议,通过多次消费相同的消息来进行不同的数据分析。
RabbitMQ 提供的灵活性几乎是无限的,市场上没有其他消息代理能提供如此广泛的协议互操作性、数据安全性、容错能力、可伸缩性和性能。
未来工作
原生 MQTT 在 3.12 中的发布是 RabbitMQ 作为 MQTT 代理使用的一个重要步骤。然而,MQTT 的旅程并未在此结束。未来,我们计划(不作承诺!)添加以下 MQTT 改进:
- 添加对 MQTT 5.0 (v5) 的支持 - 这是社区高度要求的特性。您可以跟踪 PR #7263 中的当前进展。截至 3.12,RabbitMQ 仅支持 MQTT 3.1.1 (v4) 和 MQTT 3.1.0 (v3)。原生 MQTT 是开发 MQTT 5.0 支持的先决条件。
- 提高升级处理数百万 MQTT 连接的 RabbitMQ 节点的弹性,并提高大规模客户端断开连接(例如,由于负载均衡器崩溃)的弹性。
- 支持 QoS 1 的大规模扇出,至少一次地向所有设备发送消息。新的队列类型
rabbit_mqtt_qos0_queue通过不保留发送和接收 Erlang 进程中的任何状态来改进大规模扇出。然而,截至目前,向数百万个队列发送需要队列确认的消息(MQTT 中的 QoS 1 或 AMQP 0.9.1 中的发布者确认)应格外谨慎:始终使用相同的发布者,并非常罕见地执行(每隔几分钟)。请注意,1:1 拓扑和大型扇入对 RabbitMQ 并非问题。
总结
尽管 RabbitMQ 在 3.11 中已支持 MQTT,但就客户端连接数量而言,其可伸缩性一直很差,内存使用量也很高。
原生 MQTT 在 3.12 中的发布将 RabbitMQ 转变为一个 MQTT 代理。它允许数百万个客户端连接到 RabbitMQ。即使您不打算连接如此多的客户端,通过将您的 MQTT 工作负载升级到 3.12,您也将大幅节省基础设施成本,因为内存使用量最多可降低 95%。
下一步,我们计划添加对 MQTT 5.0 的支持。
