MQTT 插件
概述
RabbitMQ 通过随核心发行版提供的插件支持 MQTT 版本 3.1、3.1.1 和 5.0。
本指南涵盖以下主题
- 如何启用插件
- 支持的 MQTT 功能 和 限制
- MQTT 插件实现概述
- 何时(不)使用 仲裁队列
- MQTT QoS 0 队列类型
- 用户和身份验证 以及 远程连接
- 插件的 关键可配置设置
- TLS 支持
- 虚拟主机
- 指标
- 性能和可伸缩性检查列表
- 代理协议
- Sparkplug 支持
启用插件
MQTT 插件包含在 RabbitMQ 发行版中。在客户端成功连接之前,必须使用 rabbitmq-plugins 在所有集群节点上启用它。
rabbitmq-plugins enable rabbitmq_mqtt
支持的 MQTT 功能
RabbitMQ 支持大多数 MQTT 5.0 功能,包括以下内容
- QoS 0 (最多一次) 和 QoS 1 (至少一次) 发布和订阅
- TLS、OAuth 2.0
- Clean 和非 Clean 会话
- 消息过期间隔
- 订阅标识符 和 订阅选项
- 遗嘱消息,包括 遗嘱延迟间隔
- 请求/响应,包括与其他协议(如 AMQP 0.9.1 和 AMQP 1.0)的互操作性
- 主题别名
- 保留消息,存在 保留消息和存储 部分描述的限制
- 通过 WebSocket 的 MQTT,通过 RabbitMQ Web MQTT 插件
MQTT 5.0 博客文章 提供了支持的 MQTT 5.0 功能的完整列表,包括其用法和实现细节。
MQTT 客户端可以与其他协议互操作。例如,MQTT 发布者可以向 AMQP 0.9.1 或 AMQP 1.0 消费者发送消息,前提是这些消费者从绑定到 MQTT 主题交换(通过 mqtt.exchange 配置,默认为 amq.topic)的队列中消费。同样,AMQP 0.9.1、AMQP 1.0 或 STOMP 发布者也可以向 MQTT 订阅者发送消息,前提是发布者发布到 MQTT 主题交换。
MQTT 插件如何工作
RabbitMQ MQTT 插件支持 MQTT 3.1、3.1.1 和 5.0,支持广泛的 MQTT 客户端。它还使得 MQTT 客户端能够与 AMQP 0-9-1、AMQP 1.0 和 STOMP 客户端互操作。还支持多租户。
将 MQTT 映射到 AMQP 0.9.1 模型
RabbitMQ 核心实现了 AMQP 0.9.1 协议。该插件建立在 AMQP 0.9.1 实体之上:交换、队列和绑定。发布到 MQTT 主题的消息由 AMQP 0.9.1 主题交换路由。MQTT 订阅者从绑定到主题交换的队列中消费。
MQTT 插件为每个 MQTT 订阅者创建一个专用队列。更精确地说,每个 MQTT 会话可能有 0、1 或 2 个队列
- 如果 MQTT 客户端从未发送 SUBSCRIBE 数据包,则 MQTT 会话将有 0 个队列。MQTT 客户端仅发布消息。
- 如果 MQTT 客户端创建了一个或多个具有相同 QoS 级别的订阅,则 MQTT 会话将有 1 个队列。
- 如果 MQTT 客户端创建了 QoS 0 和 QoS 1 两种级别的订阅,则 MQTT 会话将有 2 个队列。
在列出队列时,您会看到队列命名模式 mqtt-subscription-<MQTT client ID>qos[0|1],其中 <MQTT client ID> 是 MQTT 客户端标识符,[0|1] 是 QoS 0 订阅的 0 或 QoS 1 订阅的 1。每个 MQTT 订阅者拥有一个单独的队列,允许每个 MQTT 订阅者接收其自己的应用程序消息副本。
该插件为 MQTT 订阅客户端透明地创建队列。MQTT 规范没有定义队列的概念,MQTT 客户端也不知道这些队列的存在。队列是 RabbitMQ 实现 MQTT 协议的一种内部实现细节。
队列类型
MQTT 客户端可以发布消息到任何队列类型。要使此工作正常进行,必须将 经典队列、仲裁队列 或 MQTT QoS 0 队列 绑定到主题交换,并使用与 PUBLISH 数据包主题匹配的绑定键。
MQTT 插件为每个 MQTT 订阅者创建一个经典队列、仲裁队列或 MQTT QoS 0 队列。默认情况下,该插件创建一个经典队列。
该插件可以配置为为主会话持续时间长于 MQTT 网络连接的订阅者创建仲裁队列(而不是经典队列)。这将在 仲裁队列 部分进行解释。
该插件为 MQTT 会话持续时间与其 MQTT 网络连接时间相同的 QoS 0 订阅者创建一个 MQTT QoS 0 队列。这将在 MQTT QoS 0 队列类型 部分进行解释。
队列属性 和 参数
自 RabbitMQ 3.12 起,MQTT 插件创建的所有队列
- 都是 持久化的,即队列元数据存储在磁盘上。
- 如果 MQTT 会话持续时间与其 MQTT 网络连接相同,则这些队列是 独占的。在这种情况下,RabbitMQ 将在网络连接(和会话)结束时删除 MQTT 客户端的所有状态,包括其队列。只有订阅的 MQTT 客户端可以从其队列中消费。
- 队列不是
auto-delete的。例如,如果 MQTT 客户端订阅了一个主题,然后取消订阅,队列不会被删除。但是,当 MQTT 会话结束时,队列将被删除。 - 当 MQTT 会话最终过期时(即,RabbitMQ 操作员未禁用会话过期,见下文),并且会话持续时间超过 MQTT 网络连接时,队列将设置 队列 TTL(队列参数
x-expires)。队列 TTL(以毫秒为单位)由 MQTT 客户端在 CONNECT 数据包中请求的 会话过期间隔(以秒为单位)和服务器端配置的mqtt.max_session_expiry_interval_seconds中的最小值决定。
mqtt.max_session_expiry_interval_seconds 的默认值为 86400(1 天)。RabbitMQ 操作员可以通过将此参数设置为 0 来强制所有 MQTT 会话在网络连接结束后立即结束。
RabbitMQ 操作员可以通过将此参数设置为 infinity 来允许 MQTT 会话永久存在。这会带来风险:生命周期短且不使用 Clean Session 的客户端可能会留下队列和消息,从而消耗资源并需要手动清理。
RabbitMQ 在 MQTT 会话结束时删除 MQTT 客户端的任何状态。这些状态包括客户端的队列(包括 QoS 0 和 QoS 1 消息)以及队列绑定(即客户端的订阅)。
主题级别分隔符和通配符
MQTT 协议规范定义斜杠 ("/") 为 主题级别分隔符,而 AMQP 0.9.1 定义点 (".") 为主题级别分隔符。此插件会在底层进行翻译以连接两者。
例如,MQTT 主题 cities/london 变为 AMQP 0.9.1 主题 cities.london,反之亦然。因此,当 MQTT 客户端发布主题为 cities/london 的消息时,如果 AMQP 0.9.1 客户端要接收该消息,它应该从其队列创建一个绑定到主题交换,绑定键为 cities.london。
反之,当 AMQP 0.9.1 客户端发布主题为 cities.london 的消息时,如果 MQTT 客户端要接收该消息,它应该创建一个主题过滤器为 cities/london 的 MQTT 订阅。
这有一个重要的限制:包含点的 MQTT 主题将无法按预期工作,应避免使用;同样,包含斜杠的 AMQP 0.9.1 路由和绑定键也应避免。
此外,MQTT 将加号 ("+") 定义为 单级通配符,而 AMQP 0.9.1 将星号 ("*") 定义为匹配单个单词
| MQTT | AMQP 0.9.1 | 描述 |
|---|---|---|
| / (斜杠) | . (点) | 主题级别分隔符 |
| + (加号) | * (星号) | 单级通配符(匹配单个单词) |
| # (井号) | # (井号) | 多级通配符(匹配零个或多个单词) |
使用仲裁队列
通过使用 mqtt.durable_queue_type 选项,可以为 MQTT 会话持续时间超过其 MQTT 网络连接的订阅者选择使用 仲裁队列。
此值仅应在新集群上启用,在任何客户端声明持久订阅之前。由于队列类型在声明后无法更改,如果此设置的值针对现有集群进行了更改,具有现有持久状态的客户端将遇到队列类型不匹配错误,并且订阅将失败。
下面是一个选择使用仲裁队列的 rabbitmq.conf 示例
# must ONLY be enabled for new clusters before any clients declare durable subscriptions
mqtt.durable_queue_type = quorum
目前,此设置适用于所有 MQTT 客户端,这些客户端
- 使用 QoS 1 订阅,并且
- 使用大于 0 的 会话过期间隔(MQTT 5.0)连接,或将 CleanSession 设置为 0(MQTT 3.1.1)
第二个条件意味着 MQTT 会话会持续比 MQTT 网络连接更长的时间。
虽然仲裁队列旨在确保数据安全并能从副本故障中进行可预测的高效恢复,但它们也有缺点。仲裁队列定义上要求集群中至少有三个副本。因此,仲裁队列声明和删除需要更长的时间,不适合 客户端连接频繁 或拥有大量(数十万)订阅者的环境。
仲裁队列非常适合少量(数百个)更长生命周期的客户端,这些客户端非常关心数据安全。
MQTT QoS 0 队列类型
如果满足以下两个条件,MQTT 插件将创建一个 MQTT QoS 0 队列
- MQTT 客户端订阅的 QoS 为 0。
- MQTT 5.0 客户端连接的 会话过期间隔 为 0,或者 MQTT 3.1.1 客户端连接的 CleanSession 设置为 1。
第二个条件意味着 MQTT 会话仅持续到网络连接结束。
MQTT QoS 0 队列类型可以被视为“伪”或“虚拟”队列:它与其他队列类型(经典队列、仲裁队列和流)有很大不同,因为它不是一个单独的 Erlang 进程,也不在磁盘上存储消息。相反,这种队列类型是 Erlang 进程邮箱的一部分。MQTT 消息直接发送到订阅客户端的 MQTT 连接进程。换句话说,MQTT 消息被发送到任何“在线”的 MQTT 订阅者。
更准确地说,可以认为队列被“跳过”了。将消息直接发送到 MQTT 连接进程作为队列类型实现,是为了简化消息路由和协议互操作性,使得消息不仅可以从 MQTT 发布连接进程发送,还可以从 AMQP 0.9.1 通道进程发送。后者使得 AMQP 0.9.1、AMQP 1.0 或 STOMP 客户端可以直接将消息发送到 MQTT 订阅者连接进程,跳过专用队列进程。
使用 MQTT QoS 0 队列类型的好处是
- 支持更大的扇出,例如将消息从“云”(RabbitMQ)发送到所有设备(MQTT 客户端)。
- 更低的 内存使用
- 更低的发布者确认延迟
- 更低的端到端延迟
由于 MQTT QoS 0 队列类型没有流控制,MQTT 消息可能会比从 MQTT 连接进程传递到 MQTT 订阅客户端的速度更快地到达 MQTT 连接进程邮箱。当 MQTT 订阅客户端与 RabbitMQ 之间的网络连接较差,或者在大量发布者使单个 MQTT 订阅客户端过载的大扇入场景下,这种情况可能会发生。
过载保护
为防止 MQTT QoS 0 消息在 MQTT 连接进程邮箱中堆积导致 内存使用 过高,RabbitMQ 会故意丢弃 MQTT QoS 0 队列中的 QoS 0 消息,如果同时满足以下两个条件
- MQTT 连接进程邮箱中的消息数量超过配置的
mqtt.mailbox_soft_limit(默认为 200),并且 - 发送到 MQTT 客户端的套接字很忙(由于 TCP 背压而无法足够快地发送)。
请注意,进程邮箱中可能还有其他消息(例如,从 MQTT 订阅客户端发送到 RabbitMQ 的应用程序消息,或来自其他队列类型的确认消息到 MQTT 连接进程),这些消息显然不会被丢弃。但是,这些其他消息也计入 mqtt.mailbox_soft_limit。
将 mqtt.mailbox_soft_limit 设置为 0 会禁用过载保护机制,这意味着 QoS 0 消息永远不会被 RabbitMQ 故意丢弃。将 mqtt.mailbox_soft_limit 设置为非常高的值会降低故意丢弃 QoS 0 消息的可能性,同时会增加导致集群范围内存警报的风险(特别是当消息负载很大或有许多类型为 rabbit_mqtt_qos0_queue 的过载队列时)。
mqtt.mailbox_soft_limit 可以被视为一个 队列长度限制(尽管不完全准确,因为如前所述,Erlang 进程邮箱可以包含 MQTT 应用程序消息以外的其他消息)。这就是为什么配置键 mqtt.mailbox_soft_limit 包含“soft”一词的原因。描述的过载保护机制大致对应于经典队列和仲裁队列中存在的 溢出行为 drop-head。
以下由给定 RabbitMQ 节点报告的 Prometheus 指标显示了在该节点生命周期内,在所有类型为 rabbit_mqtt_qos0_queue 的队列中总共丢弃了多少 QoS 0 消息
rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_mqtt_qos0_queue",dead_letter_strategy="disabled"} 0
Native MQTT 博客文章更详细地描述了 MQTT QoS 0 队列类型。
用户和身份验证
前提是 MQTT 客户端拥有具有适当权限的现有用户的凭据集,它们才能连接。
要使 MQTT 连接成功,它必须成功通过身份验证,并且用户必须对插件使用的虚拟主机具有 适当的权限(见下文)。
MQTT 客户端可以在连接时(通常是这样)指定一组凭据。凭据可以是用户名和密码对,也可以是 x.509 证书(见下文)。
该插件支持匿名身份验证,但强烈不推荐使用,并且默认情况下会强制执行某些限制(如下所述),以确保合理的安全性。
可以使用 rabbitmqctl、管理 UI 或 HTTP API 来管理用户及其权限。
例如,以下命令创建一个用于 MQTT 连接的新用户,并授予其对该插件默认使用的 虚拟主机 的完全访问权限
# username and password are both "mqtt-test"
rabbitmqctl add_user mqtt-test mqtt-test
rabbitmqctl set_permissions -p "/" mqtt-test ".*" ".*" ".*"
rabbitmqctl set_user_tags mqtt-test management
请注意,用户名中不能包含冒号。
本地与远程客户端连接
当 MQTT 客户端不提供登录凭据时,插件默认使用 guest 账户,该账户不允许非 localhost 连接。从远程主机连接时,以下选项可确保远程客户端成功连接
- 创建一到多个新用户,授予它们对 MQTT 插件使用的虚拟主机的完全权限,并使从远程主机连接的客户端使用这些凭据。这是推荐的选项。
- 将
anonymous_login_user和anonymous_login_pass设置为具有 适当权限 的非guest用户。
匿名连接
MQTT 支持可选身份验证(客户端可以不提供凭据)。因此,匿名连接会使用一组默认凭据。
anonymous_login_user 和 anonymous_login_pass 配置键用于指定凭据。
anonymous_login_user = some-user
anonymous_login_pass = s3kRe7
可以禁用匿名连接
mqtt.allow_anonymous = false
anonymous_login_user = none
如果 mqtt.allow_anonymous 键设置为 false,则客户端必须提供凭据。
强烈不推荐使用匿名连接,并且默认情况下会强制执行某些限制(见上文),以确保合理的安全性。
插件配置
以下是一个示例 配置,演示了许多 MQTT 插件设置
mqtt.listeners.tcp.default = 1883
## Default MQTT with TLS port is 8883
# mqtt.listeners.ssl.default = 8883
# anonymous connections, if allowed, will use the default
# credentials specified here
mqtt.allow_anonymous = true
mqtt.vhost = /
mqtt.exchange = amq.topic
mqtt.prefetch = 10
# 24 hours by default
mqtt.max_session_expiry_interval_seconds = 86400
TCP 监听器
如果未指定配置,MQTT 插件将监听所有接口上的 1883 端口,并具有默认用户登录/密码 guest/guest。
要更改监听端口,请编辑您的 配置文件,使其包含 rabbitmq_mqtt 应用程序的 tcp_listeners 变量。
例如,一个最小化的配置文件,将监听端口更改为 12345,将如下所示
mqtt.listeners.tcp.1 = 12345
而一个仅监听 localhost(IPv4 和 IPv6)的配置将如下所示
mqtt.listeners.tcp.1 = 127.0.0.1:1883
mqtt.listeners.tcp.2 = ::1:1883
TCP 监听器选项
该插件支持 TCP 监听器选项配置。
这些设置使用一个共同的前缀 mqtt.tcp_listen_options,并控制 TCP 缓冲区大小、入站 TCP 连接队列长度、是否启用 TCP keepalive 等。有关详细信息,请参阅 网络指南。
mqtt.listeners.tcp.1 = 127.0.0.1:1883
mqtt.listeners.tcp.2 = ::1:1883
mqtt.tcp_listen_options.backlog = 4096
mqtt.tcp_listen_options.buffer = 131072
mqtt.tcp_listen_options.recbuf = 131072
mqtt.tcp_listen_options.sndbuf = 131072
mqtt.tcp_listen_options.keepalive = true
mqtt.tcp_listen_options.nodelay = true
mqtt.tcp_listen_options.exit_on_close = true
mqtt.tcp_listen_options.send_timeout = 120
TLS 支持
要为 MQTT 连接使用 TLS,必须在代理中 配置 TLS。要启用支持 TLS 的 MQTT 连接,请使用 mqtt.listeners.ssl.* 配置键为 MQTT 添加一个 TLS 监听器。
该插件将使用核心 RabbitMQ 服务器证书和密钥(就像 AMQP 0.9.1 和 AMQP 1.0 监听器一样)
ssl_options.cacertfile = /path/to/ca_certificate.pem
ssl_options.certfile = /path/to/server_certificate.pem
ssl_options.keyfile = /path/to/server_key.pem
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = true
# default TLS-enabled port for MQTT connections
mqtt.listeners.ssl.default = 8883
mqtt.listeners.tcp.default = 1883
请注意,RabbitMQ 默认拒绝 SSLv3 连接,因为该协议已知存在安全漏洞。
有关详细信息,请参阅 TLS 配置指南。
虚拟主机
RabbitMQ 是一个核心的多租户系统,每个连接都属于一个虚拟主机。某些消息传递协议具有 vhost 的概念,而其他协议则没有。MQTT 属于后者。因此,MQTT 插件需要提供一种将连接映射到 vhost 的方法。
vhost 选项控制适配器默认连接到哪个 RabbitMQ vhost。只有在建立连接时未提供 vhost 时,才会咨询 vhost 配置。有几种(可选)方法可以指定客户端将连接到的 vhost。
端口到虚拟主机映射
第一种方法是将 MQTT 插件(TCP 或 TLS)监听器端口映射到 vhost。映射通过 mqtt_port_to_vhost_mapping 全局运行时参数 指定。让我们看下面的插件配置
mqtt.listeners.tcp.1 = 1883
mqtt.listeners.tcp.2 = 1884
mqtt.listeners.ssl.1 = 8883
mqtt.listeners.ssl.2 = 8884
# (other TLS settings are omitted for brevity)
mqtt.vhost = /
请注意,插件监听端口 1883、1884、8883 和 8884。假设您希望连接到端口 1883 和 8883 的客户端连接到 vhost1 虚拟主机,而连接到端口 1884 和 8884 的客户端连接到 vhost2 虚拟主机。可以通过使用 rabbitmqctl 设置 mqtt_port_to_vhost_mapping 全局参数来创建端口到 vhost 的映射
- bash
- PowerShell
- HTTP API
rabbitmqctl set_global_parameter mqtt_port_to_vhost_mapping \
'{"1883":"vhost1", "8883":"vhost1", "1884":"vhost2", "8884":"vhost2"}'
rabbitmqctl.bat set_global_parameter mqtt_port_to_vhost_mapping ^
"{""1883"":""vhost1"", ""8883"":""vhost1"", ""1884"":""vhost2"", ""8884"":""vhost2""}"
PUT /api/global-parameters/mqtt_port_to_vhost_mapping
# => {"value": {"1883":"vhost1", "8883":"vhost1", "1884":"vhost2", "8884":"vhost2"}}
如果对于给定端口没有映射(因为该端口在 mqtt_port_to_vhost_mapping 全局参数的 JSON 文档中找不到,或者全局参数根本未设置),插件将尝试从用户名中提取虚拟主机(见下文),并最终使用 vhost 插件配置选项。
代理在连接时查询 mqtt_port_to_vhost_mapping 全局参数的值。如果值发生更改,已连接的客户端不会收到通知或断开连接。它们需要重新连接才能切换到新的虚拟主机。
虚拟主机作为用户名的一部分
另一种更具体地指定 vhost 的方法是,在用户名中预先加上 vhost,并用冒号分隔。
例如,连接使用
/:guest
相当于默认 vhost 和用户名,而
mqtt-vhost:mqtt-username
表示连接到 vhost mqtt-host,用户名为 mqtt-username。
在用户名中指定虚拟主机优先于使用 mqtt_port_to_vhost_mapping 全局参数指定的端口到 vhost 映射。
使用 TLS/x509 客户端证书进行身份验证
该插件可以通过从客户端的 TLS (x509) 证书中提取名称来验证启用了 TLS 的连接,而无需使用密码。
为了安全起见,服务器必须使用 TLS 选项 fail_if_no_peer_cert 设置为 true,并且 verify 设置为 verify_peer,以强制所有 TLS 客户端都拥有一个可验证的客户端证书。
要启用此功能,请为 rabbitmq_mqtt 应用程序设置 ssl_cert_login 为 true。例如
mqtt.ssl_cert_login = true
默认情况下,这将用户名设置为证书主体专有名称的 RFC4514 风格字符串格式,类似于 OpenSSL 的 "-nameopt RFC2253" 选项生成的格式。
要改用通用名称 (Common Name),请添加
ssl_cert_login_from = common_name
到您的配置中。
请注意
- 已通过身份验证的用户必须存在于配置的身份验证/授权后端中。
- 客户端不得提供用户名和密码。
您还可以通过使用 mqtt_default_vhosts 全局运行时参数为客户端证书指定一个可选的虚拟主机。此全局参数的值必须包含一个 JSON 文档,该文档将证书的主体专有名称映射到其目标虚拟主机。我们来看如何将两个证书 O=client,CN=guest 和 O=client,CN=rabbit 分别映射到 vhost1 和 vhost2 虚拟主机。
可以使用以下方法设置全局参数
- bash
- PowerShell
- HTTP API
rabbitmqctl set_global_parameter mqtt_default_vhosts \
'{"O=client,CN=guest": "vhost1", "O=client,CN=rabbit": "vhost2"}'
rabbitmqctl set_global_parameter mqtt_default_vhosts ^
"{""O=client,CN=guest"": ""vhost1"", ""O=client,CN=rabbit"": ""vhost2""}'
PUT /api/global-parameters/mqtt_default_vhosts
# => {"value": {"O=client,CN=guest": "vhost1", "O=client,CN=rabbit": "vhost2"}}
请注意
- 如果找不到证书的虚拟主机(因为在
mqtt_default_vhosts全局参数 JSON 文档中找不到证书主体 DN,或者全局参数根本未设置),则将使用vhost插件配置选项指定的虚拟主机。 - 代理在连接时查询
mqtt_default_vhosts全局参数的值。如果值发生更改,已连接的客户端不会收到通知或断开连接。它们需要重新连接才能切换到新的虚拟主机。 - 使用
mqtt_default_vhosts全局参数的证书到 vhost 映射被认为比使用mqtt_port_to_vhost_mapping全局参数的端口到 vhost 映射更具体,因此具有更高的优先级。
使用客户端证书中的 client_id 进行身份验证
可以配置节点以验证 MQTT 连接上设置的 client_id 是否与客户端证书中找到的 client_id 匹配。
如果匹配,RabbitMQ 将通过将用户身份连同 client_id 一起传递给已配置的身份验证后端来继续用户的身份验证。某些身份验证后端(如 rabbitmq_auth_backend_http)可能会使用 client_id 凭据以及 username 来做出身份验证和/或授权决策。如果 client_id 不匹配,RabbitMQ 将以原因码 2(“服务器不允许客户端标识符”)关闭连接。
为此,必须先指示 RabbitMQ 如何从证书中提取 client_id。这是通过 mqtt.ssl_cert_client_id_from 配置键完成的。可接受的值为
distinguished_name:这是证书的 DN,或 专有名称subject_alternative_name:主题备用名称或 SAN,它来自证书的扩展部分。由于存在不同类型的主题备用名称,可能需要指定类型
如果 mqtt.ssl_cert_client_id_from 设置为 subject_alternative_name,则可以使用 mqtt.ssl_cert_login_san_type 配置备用名称的类型。如果省略该设置,则使用默认类型 dns。可接受的值为
dnsipemailuriother_name
一个证书可以包含多个相同类型的 SAN 字段,例如,两个备用 DNS 名称。如果发生这种情况,请使用 mqtt.ssl_cert_login_san_index 配置键指定要使用的索引。默认情况下,RabbitMQ 将选择第一个值,即 mqtt.ssl_cert_login_san_index 的默认值为 0。
以下是一个示例,其中用户名从证书的专有名称中提取,而 client_id 从类型为 uri 的第一个 SAN(主题备用名称)中提取
ssl_cert_login_from = distinguished_name
mqtt.ssl_cert_client_id_from = subject_alternative_name
mqtt.ssl_cert_login_san_type = uri
流控制
prefetch 选项控制将传递的未确认的 QoS=1 的 PUBLISH 数据包的最大数量。此选项的解释方式与 消费者预取 相同。
MQTT 5.0 客户端可以通过在 CONNECT 数据包中设置 接收最大值 来定义一个较低的数字。
自定义交换
exchange 选项决定 MQTT 客户端发布的消息发送到哪个交换。在客户端发布任何消息之前,必须先创建该交换。该交换预计是一个 主题交换。
默认的主题交换 amq.topic 是预先声明的:因此,在 RabbitMQ 启动时就存在。
保留消息和存储
该插件支持保留消息,但存在本节所述的限制。消息存储实现是可插拔的,插件开箱即用地提供了两种实现
- 基于 ETS(内存中),在模块
rabbit_mqtt_retained_msg_store_ets中实现 - 基于 DETS(磁盘上),在模块
rabbit_mqtt_retained_msg_store_dets中实现
这两种实现都有局限性和权衡。对于前者,可以保留的最大消息数量受 RAM 限制。对于后者,每个 vhost 的限制为 2 GB。两者都是节点本地的:保留的消息既不复制到远程集群节点,也不从远程集群节点查询。
一个有效的例子是:MQTT 客户端将一条保留消息发布到节点 A,主题为 topic/1。之后,另一个客户端在节点 A 上订阅主题过滤器 topic/1。新订阅者将收到保留的消息。
但是,如果一个客户端在节点 A 上发布保留消息,而另一个客户端随后在节点 B 上订阅,则该订阅客户端将不会收到存储在节点 A 上的任何保留消息。
此外,如果主题过滤器包含通配符(多级通配符字符“#”或 单级通配符字符“+”),则不会发送保留消息。
要配置存储,请使用 mqtt.retained_message_store 配置键
## use DETS (disk-based) store for retained messages
mqtt.retained_message_store = rabbit_mqtt_retained_msg_store_dets
## only used by DETS store (in milliseconds)
mqtt.retained_message_store_dets_sync_interval = 2000
值必须是实现该存储的模块
rabbit_mqtt_retained_msg_store_ets用于基于 RAM 的存储rabbit_mqtt_retained_msg_store_dets用于基于磁盘的存储(这是默认值)。
这些实现适用于开发,但有时不适用于生产环境。MQTT 规范没有定义保留消息存储的一致性或复制要求,因此 RabbitMQ 允许自定义存储以满足特定环境的一致性和可用性需求。例如,基于 Riak 和 Cassandra 的存储对于大多数生产环境来说都是合适的,因为这些数据存储提供了 可调一致性。
消息存储必须实现 rabbit_mqtt_retained_msg_store 行为。
指标
Prometheus
此插件会发出 全局计数器 中列出的 Prometheus 指标。
Prometheus 标签 protocol 的值分别是 mqtt310、mqtt311 和 mqtt50,取决于 MQTT 客户端使用的是 MQTT 3.1、MQTT 3.1.1 还是 MQTT 5.0。
Prometheus 标签 queue_type 的值分别是 rabbit_classic_queue、rabbit_quorum_queue 和 rabbit_mqtt_qos0_queue,取决于 MQTT 客户端从何种队列类型消费。(请注意,MQTT 客户端永远不会直接从 流 中消费,尽管它们可以向流发布消息。)
RabbitMQ 管理 API
管理 API 提供 MQTT 连接(例如,客户端的网络流量)以及经典队列和仲裁队列(例如,它们包含多少消息)的指标。但是,自 3.12 版起,与 AMQP 0.9.1 通道相关的管理 API 指标(如消息速率)已不可用。
性能和可伸缩性检查列表
MQTT 是物联网 (IoT) 的标准协议。常见的 IoT 工作负载是许多 MQTT 设备定期向 MQTT 代理发布传感器数据。可能有数十万,有时甚至数百万的 IoT 设备连接到 MQTT 代理。博客文章 Native MQTT 展示了此类工作负载的性能基准。
本节旨在提供一个非详尽的清单,其中包含将 RabbitMQ 配置为高效 MQTT 代理的技巧和窍门,该代理支持大量客户端连接
- 将
management_agent.disable_metrics_collector = true设置为在 管理插件 中禁用指标收集。RabbitMQ 管理插件并非为过度指标收集而设计。事实上,通过管理 API 进行指标传递已被 弃用。相反,请使用一个专门用于收集和查询大量指标的工具:Prometheus。 - QoS 0 的 MQTT 数据包和订阅比 QoS 1 性能更好。与 AMQP 0.9.1 和 AMQP 1.0 不同,MQTT 的设计目标不是最大化吞吐量:例如,没有 多重确认。每个 QoS 1 的 PUBLISH 数据包都需要单独确认。
- 降低 TCP 缓冲区大小,如 TCP 监听器选项 部分所述。
在具有大量并发连接客户端的环境中,这可以显著降低 内存使用。
- 较少的主题级别(在 MQTT 主题和 MQTT 主题过滤器中)比更多的主题级别性能更好。例如,如果可能,请优先将主题结构设置为
city/name,而不是continent/country/city/name。主题过滤器中的每个主题级别目前会在 RabbitMQ 使用的数据库中创建自己的条目。因此,创建和删除大量订阅在主题级别较少时会更快。此外,路由具有较少主题级别消息也更快。 - 在订阅频繁变化的工作负载中,增加 Mnesia 配置参数
dump_log_write_threshold(例如RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="-mnesia dump_log_write_threshold 20000") - 当连接大量客户端时,增加 Erlang 进程的最大数量(例如
RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+P 10000000)和打开的最大端口数(例如ERL_MAX_PORTS=10000000)。
代理协议
MQTT 插件支持 代理协议。此功能默认禁用,要为 MQTT 客户端启用它
mqtt.proxy_protocol = true
有关代理协议的更多信息,请参阅 网络指南。
Sparkplug 支持
Sparkplug 是一项规范,为 MQTT 系统的设计提供了指导。在 Sparkplug 中,MQTT 主题必须以 spAvM.N 或 spBvM.N 开头,其中 M 和 N 是整数。这不幸地与 RabbitMQ MQTT 插件 将 MQTT 主题转换为 AMQP 0.9.1 路由键 的方式冲突。
要解决此问题,可以将 sparkplug 配置项设置为 true
mqtt.sparkplug = true
启用 Sparkplug 支持后,MQTT 插件将不会转换主题名称的 spAvM.N/spBvM.N 部分。
限制
RabbitMQ MQTT 插件目前存在以下限制。
QoS 2
QoS 2:仅一次交付 不受支持。
如果 MQTT 3.0 或 3.1.1 客户端发布了 QoS 2 的消息,RabbitMQ 会将其 QoS 级别降级到 1。如果 MQTT 5.0 客户端发布了 QoS 2 的消息,RabbitMQ 将断开客户端连接,原因码为 155: QoS not supported。
重新认证
RabbitMQ 不支持 MQTT 5.0 的 AUTH 数据包,因此不支持 重新认证。
虽然 RabbitMQ 支持 AMQP 1.0、AMQP 0.9.1 和 RabbitMQ 流协议的 OAuth 2.0 令牌续订,但 RabbitMQ 不支持 MQTT 的 OAuth 2.0 令牌续订。如果令牌过期,RabbitMQ 将断开 MQTT 客户端连接,原因码为 160: Maximum connect time。
共享订阅
共享订阅 不受支持。
重叠订阅
具有不同 QoS 级别的重叠订阅可能导致消息重复传递。应用程序需要考虑这一点。例如,当同一个 MQTT 客户端为主题过滤器 /sports/football/# 创建 QoS 0 订阅,并为主题过滤器 /sports/# 创建 QoS 1 订阅时,它将接收重复的消息。
保留消息存储
如 保留消息和存储 中所述,保留消息仅在节点本地存储和查询。此外,如果主题过滤器包含通配符(多级通配符字符“#”或单级通配符字符“+”),则不会发送保留消息。
延迟和保留的遗嘱消息
禁用插件
在节点上禁用插件或从集群中移除节点之前,必须使用 rabbitmqctl 进行退役
rabbitmqctl decommission_mqtt_node <node>;
保留消息和存储
该插件支持保留消息。消息存储实现是可插拔的,插件开箱即用地提供了两种实现
- ETS(内存中)实现,在
rabbit_mqtt_retained_msg_store_ets模块中 - DETS(磁盘上)实现,在
rabbit_mqtt_retained_msg_store_dets中
这两种实现都有局限性和权衡。对于前者,可以保留的最大消息数量受 RAM 限制。对于后者,每个 vhost 的限制为 2 GB。两者都是节点本地的(在一个代理节点上保留的消息不会复制到集群中的其他节点)。
要配置存储,请使用 rabbitmq_mqtt.retained_message_store 配置键
mqtt.allow_anonymous = true
mqtt.vhost = /
mqtt.exchange = amq.topic
mqtt.max_session_expiry_interval_seconds = 1800
mqtt.prefetch = 10
## use DETS (disk-based) store for retained messages
mqtt.retained_message_store = rabbit_mqtt_retained_msg_store_dets
## only used by DETS store
mqtt.retained_message_store_dets_sync_interval = 2000
mqtt.listeners.ssl = none
mqtt.listeners.tcp.default = 1883
值必须是实现该存储的模块
rabbit_mqtt_retained_msg_store_ets用于基于 RAM 的存储rabbit_mqtt_retained_msg_store_dets用于基于磁盘的存储
这些实现适用于开发,但有时不适用于生产环境。MQTT 3.1 规范没有定义保留消息存储的一致性或复制要求,因此 RabbitMQ 允许自定义存储以满足特定环境的一致性和可用性需求。例如,基于 Riak 和 Cassandra 的存储对于大多数生产环境来说都是合适的,因为这些数据存储提供了 可调一致性。
消息存储必须实现 rabbit_mqtt_retained_msg_store 行为。