跳至主要内容
版本: 3.13

MQTT 插件

概述

RabbitMQ 通过一个在核心发行版中提供的插件支持 MQTT 版本 3.13.1.15.0

本指南涵盖以下主题

启用插件

MQTT 插件包含在 RabbitMQ 发行版中。在客户端能够成功连接之前,必须使用 rabbitmq-plugins 在所有集群节点上启用它。

rabbitmq-plugins enable rabbitmq_mqtt

支持的 MQTT 功能

RabbitMQ 支持大多数 MQTT 5.0 功能,包括以下功能

MQTT 5.0 博客文章 提供了支持的 MQTT 5.0 功能的完整列表,包括它们的用法和实现细节。

MQTT 客户端可以与其他协议互操作。例如,如果这些消费者从绑定到 MQTT 主题交换(通过 mqtt.exchange 配置,默认值为 amq.topic)的队列中消费,则 MQTT 发布者可以将消息发送到 AMQP 0.9.1 或 AMQP 1.0 消费者。同样,AMQP 0.9.1、AMQP 1.0 或 STOMP 发布者可以将消息发送到 MQTT 订阅者,如果发布者发布到 MQTT 主题交换。

限制

以下 MQTT 功能不受支持

以下 MQTT 功能支持有限制

  • 保留消息仅在节点本地存储和查询。请参阅保留消息和存储
  • 具有不同 QoS 级别但重叠的订阅会导致重复消息被传递。应用程序需要考虑这一点。例如,当同一个 MQTT 客户端为主题过滤器 /sports/football/epl/# 创建 QoS 0 订阅,并为主题过滤器 /sports/football/# 创建 QoS 1 订阅时,它将收到重复消息。

MQTT 插件的工作原理

RabbitMQ MQTT 插件针对 MQTT 3.1、3.1.1 和 5.0,支持广泛的 MQTT 客户端。它还使 MQTT 客户端能够与 AMQP 0-9-1、AMQP 1.0 和 STOMP 客户端互操作。此外还支持多租户。

将 MQTT 映射到 AMQP 0.9.1 模型

RabbitMQ 核心实现 AMQP 0.9.1 协议。插件建立在 AMQP 0.9.1 实体之上:交换队列 和绑定。发布到 MQTT 主题的消息由 AMQP 0.9.1 主题交换路由。MQTT 订阅者从绑定到主题交换的队列中消费。

MQTT 插件为每个 MQTT 订阅者创建一个专用队列。更准确地说,每个 MQTT 会话可能有 0、1 或 2 个队列

  • 如果 MQTT 客户端从未发送SUBSCRIBE 数据包,则 MQTT 会话有 0 个队列。MQTT 客户端只发布消息。
  • 如果 MQTT 客户端使用相同的 QoS 级别创建了一个或多个订阅,则 MQTT 会话有 1 个队列。
  • 如果 MQTT 客户端使用两种 QoS 级别创建订阅:QoS 0 和 QoS 1,则 MQTT 会话有 2 个队列。

列出队列时,您将观察到队列命名模式 mqtt-subscription-<MQTT client ID>qos[0|1],其中 <MQTT client ID> 是 MQTT 客户端标识符,而 [0|1] 对于 QoS 0 订阅为 0,对于 QoS 1 订阅为 1。为每个 MQTT 订阅者拥有一个单独的队列,允许每个 MQTT 订阅者接收应用程序消息的副本。

插件为 MQTT 订阅客户端透明地创建队列。MQTT 规范没有定义队列的概念,MQTT 客户端不知道这些队列的存在。队列是 RabbitMQ 实现 MQTT 协议的方式的实现细节。

队列类型

MQTT 客户端可以将消息发布到任何队列类型。为此,经典队列仲裁队列 必须绑定到主题交换,其绑定键与PUBLISH 数据包的主题匹配。

MQTT 插件为每个 MQTT 订阅者创建一个经典队列、仲裁队列或 MQTT QoS 0 队列。默认情况下,插件创建经典队列。

插件可以配置为为其 MQTT 会话持续时间超过其 MQTT 网络连接的订阅者创建仲裁队列(而不是经典队列)。这在部分仲裁队列中解释。

如果启用 功能标志 rabbit_mqtt_qos0_queue,插件将为 QoS 0 订阅者创建 MQTT QoS 0 队列,其 MQTT 会话持续时间与其 MQTT 网络连接一样长。这在部分MQTT QoS 0 队列类型中解释。

队列属性参数

自 RabbitMQ 3.12 以来,MQTT 插件创建的所有队列

  • 都是持久的,即队列元数据存储在磁盘上。
  • 如果 MQTT 会话持续时间与其 MQTT 网络连接一样长,则为独占的。在这种情况下,RabbitMQ 将在网络连接(和会话)结束时删除 MQTT 客户端的所有状态,包括其队列。只有订阅的 MQTT 客户端才能从其队列中消费。
  • 不是自动删除。例如,如果 MQTT 客户端订阅了一个主题并随后取消订阅,该队列将不会被删除。但是,当 MQTT 会话结束时,该队列将被删除。
  • 如果 MQTT 会话最终过期(即会话过期未被 RabbitMQ 操作员禁用,见下文)并持续时间超过 MQTT 网络连接,则具有设置的队列 TTL(队列参数 x-expires)。队列 TTL(以毫秒为单位)由 MQTT 客户端在CONNECT 数据包中请求的会话过期间隔(以秒为单位)和服务器端配置的 mqtt.max_session_expiry_interval_seconds 的最小值决定。

mqtt.max_session_expiry_interval_seconds 的默认值为 86400(1 天)。RabbitMQ 操作员可以通过将此参数设置为 0 来强制所有 MQTT 会话在其网络连接结束时立即结束。

RabbitMQ 操作员可以通过将此参数设置为 infinity 来允许 MQTT 会话永远持续。这存在风险:使用非干净会话的短暂客户端可能会留下队列和消息,这将消耗资源并需要手动清理。

当 MQTT 会话结束时,RabbitMQ 会删除 MQTT 客户端的任何状态。此状态包括客户端的队列(包括 QoS 0 和 QoS 1 消息)以及队列绑定(即客户端的订阅)。

主题层级分隔符和通配符

MQTT 协议规范将斜杠(“/”)定义为主题层级分隔符,而 AMQP 0-9-1 将点(“.”)定义为主题层级分隔符。此插件在幕后转换模式以弥合两者之间的差距。

例如,MQTT 主题 cities/london 变成 AMQP 0.9.1 主题 cities.london,反之亦然。因此,当 MQTT 客户端发布主题为 cities/london 的消息时,如果 AMQP 0.9.1 客户端想要接收该消息,它应该从其队列到主题交换机创建绑定,绑定键为 cities.london

反之,当 AMQP 0.9.1 客户端发布主题为 cities.london 的消息时,如果 MQTT 客户端想要接收该消息,它应该创建主题过滤器为 cities/london 的 MQTT 订阅。

这有一个重要的限制:包含点的 MQTT 主题将无法按预期工作,应避免使用,同样,包含斜杠的 AMQP 0-9-1 路由和绑定键也会出现问题。

此外,MQTT 将加号(“+”)定义为 单级通配符,而 AMQP 0.9.1 将星号(“*”)定义为匹配单个词。

MQTTAMQP 0.9.1描述
/ (斜杠). (点)主题级分隔符
+ (加号)* (星号)单级通配符(匹配单个词)
# (哈希)# (哈希)多级通配符(匹配零个或多个词)

使用仲裁队列

使用 mqtt.durable_queue_type 选项,可以选择对那些 MQTT 会话持续时间超过 MQTT 网络连接时间的订阅者使用 仲裁队列

此值只能在任何客户端声明持久订阅之前为新集群启用。由于队列类型在声明后无法更改,如果此设置的值对现有集群进行了更改,则具有现有持久状态的客户端将遇到队列类型不匹配错误,并无法订阅

下面是一个选择使用仲裁队列的 rabbitmq.conf 示例

# must ONLY be enabled for new clusters before any clients declare durable subscriptions
mqtt.durable_queue_type = quorum

目前,此设置适用于所有满足以下条件的 MQTT 客户端

  1. 使用 QoS 1 订阅,以及
  2. 连接具有大于 0 的 会话过期时间间隔(MQTT 5.0)或将 CleanSession 设置为 0(MQTT 3.1.1)

第二个条件意味着 MQTT 会话持续时间超过 MQTT 网络连接时间。

虽然仲裁队列旨在保证数据安全性和从副本故障中进行可预测的有效恢复,但它们也有一些缺点。根据定义,仲裁队列需要集群中至少有三个副本。因此,仲裁队列的声明和删除需要更长时间,并且不适合具有 高客户端连接抖动 或具有大量(数十万)订阅者的环境。

仲裁队列非常适合少数(数百个)生命周期较长的客户端,这些客户端非常关心数据安全。

MQTT QoS 0 队列类型

如果满足以下三个条件,MQTT 插件将创建一个 MQTT QoS 0 队列

  1. 功能标记 rabbit_mqtt_qos0_queue 已启用。
  2. MQTT 客户端使用 QoS 0 订阅。
  3. MQTT 5.0 客户端连接具有 会话过期时间间隔 为 0,或者 MQTT 3.1.1 客户端连接具有 CleanSession 设置为 1。

第三个条件意味着 MQTT 会话只持续到网络连接时间。

MQTT QoS 0 队列类型可以看作是“伪”或“虚拟”队列:它与其他队列类型(经典队列、仲裁队列和流)有很大不同,因为这种新的队列类型既不是单独的 Erlang 进程,也不在磁盘上存储消息。相反,这种队列类型是 Erlang 进程邮箱的子集。MQTT 消息直接发送到订阅客户端的 MQTT 连接进程。换句话说,MQTT 消息发送到任何“在线”MQTT 订阅者。

更准确地说,应该认为队列被“跳过”。将消息直接发送到 MQTT 连接进程作为队列类型实现是为了简化消息路由和协议互操作性,以便消息不仅可以从 MQTT 发布连接进程发送,还可以从 AMQP 0.9.1 通道 进程发送。后者允许从 AMQP 0.9.1、AMQP 1.0 或 STOMP 客户端直接将消息发送到 MQTT 订阅者连接进程,跳过专用队列进程。

使用 MQTT QoS 0 队列类型的好处是

  1. 支持更大的扇出,例如,从“云”(RabbitMQ)向所有设备(MQTT 客户端)发送消息。
  2. 降低 内存使用量
  3. 降低发布者确认延迟
  4. 降低端到端延迟

由于 MQTT QoS 0 队列类型没有流量控制,因此 MQTT 消息可能比从 MQTT 连接进程传递到 MQTT 订阅客户端更快地到达 MQTT 连接进程邮箱。这种情况可能发生在 MQTT 订阅客户端和 RabbitMQ 之间的网络连接不佳,或者在许多发布者压倒单个 MQTT 订阅客户端的大扇入场景中。

过载保护

为了防止由于 MQTT QoS 0 消息在 MQTT 连接进程邮箱中堆积而导致 内存使用量 较高,RabbitMQ 会有意从 MQTT QoS 0 队列中删除 QoS 0 消息,前提是以下两个条件都满足

  1. MQTT 连接进程邮箱中的消息数量超过了配置的 mqtt.mailbox_soft_limit(默认为 200),并且
  2. 发送到 MQTT 客户端的套接字处于繁忙状态(由于 TCP 反压而没有足够快地发送)。

请注意,进程邮箱中可能还有其他消息(例如,从 MQTT 订阅客户端发送到 RabbitMQ 的应用程序消息或来自其他队列类型到 MQTT 连接进程的确认),这些消息显然不会被删除。但是,这些其他消息也会计入 mqtt.mailbox_soft_limit

mqtt.mailbox_soft_limit 设置为 0 将禁用过载保护机制,这意味着 QoS 0 消息永远不会被 RabbitMQ 有意删除。将 mqtt.mailbox_soft_limit 设置为非常高的值会降低有意删除 QoS 0 消息的可能性,同时会增加导致集群范围内存警报的风险(特别是如果消息有效负载很大,或者有许多类型为 rabbit_mqtt_qos0_queue 的过载队列)。

mqtt.mailbox_soft_limit 可以看作是 队列长度限制(虽然不完全是,因为如前所述,Erlang 进程邮箱可能包含除 MQTT 应用程序消息之外的其他消息)。这就是为什么配置键 mqtt.mailbox_soft_limit 包含单词 soft。所描述的过载保护机制大致对应于经典队列和仲裁队列中存在的 溢出行为 drop-head

由给定 RabbitMQ 节点报告的以下 Prometheus 指标显示在该节点的生命周期中,在所有类型为 rabbit_mqtt_qos0_queue 的队列中总共删除了多少个 QoS 0 消息

rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_mqtt_qos0_queue",dead_letter_strategy="disabled"} 0

原生 MQTT 博客文章更详细地描述了 MQTT QoS 0 队列类型。

用户和身份验证

只要 MQTT 客户端具有现有用户的凭据集,并且该用户具有相应的权限,他们就可以连接。

为了使 MQTT 连接成功,它必须成功进行身份验证,并且用户必须对插件使用的虚拟主机(见下文)具有 适当的权限

MQTT 客户端可以在连接时指定一组凭据(通常会这样做)。凭据可以是用户名和密码对,也可以是 x.509 证书(见下文)。

插件支持匿名身份验证,但强烈建议不要使用它,并且默认情况下它会受到某些限制(见下文)的限制,以保证合理的安全性。

可以使用 rabbitmqctl管理 UIHTTP API 管理用户及其权限。

例如,以下命令为具有对插件使用的默认 虚拟主机 的完全访问权限的 MQTT 连接创建一个新用户

# username and password are both "mqtt-test"
rabbitmqctl add_user mqtt-test mqtt-test
rabbitmqctl set_permissions -p "/" mqtt-test ".*" ".*" ".*"
rabbitmqctl set_user_tags mqtt-test management

请注意,用户名中不能包含冒号。

本地与远程客户端连接

当 MQTT 客户端不提供登录凭据时,插件默认使用 guest 帐户,该帐户不允许非 localhost 连接。从远程主机连接时,以下选项可以确保远程客户端能够成功连接

  • 创建一个或多个新用户,授予他们对 MQTT 插件使用的虚拟主机的完全权限,并让从远程主机连接的客户端使用这些凭据。这是推荐的选项。
  • 通过 插件配置default_userdefault_pass 设置为具有 适当权限 的非 guest 用户。

匿名连接

MQTT 支持可选身份验证(客户端可以不提供凭据),但 RabbitMQ 不支持。因此,匿名连接使用一组默认凭据。

mqtt.default_usermqtt.default_pass 配置键用于指定凭据

mqtt.default_user = some-user
mqtt.default_pass = s3kRe7

可以禁用匿名连接

mqtt.allow_anonymous = false

如果 mqtt.allow_anonymous 键设置为 false,则客户端必须提供凭据。

强烈建议不要使用匿名连接,并且默认情况下它会受到某些限制(见上文)的限制,以保证合理的安全性。

插件配置

以下是一个 配置 示例,演示了 MQTT 插件的许多设置

mqtt.listeners.tcp.default = 1883
## Default MQTT with TLS port is 8883
# mqtt.listeners.ssl.default = 8883

# anonymous connections, if allowed, will use the default
# credentials specified here
mqtt.allow_anonymous = true
mqtt.default_user = guest
mqtt.default_pass = guest

mqtt.vhost = /
mqtt.exchange = amq.topic
mqtt.prefetch = 10
# 24 hours by default
mqtt.max_session_expiry_interval_seconds = 86400

TCP 侦听器

如果没有指定配置,MQTT 插件将在端口 1883 上所有接口上侦听,并且默认用户的登录/密码为 guest/guest

要更改侦听器端口,请编辑您的 配置文件,在 rabbitmq_mqtt 应用程序中包含 tcp_listeners 变量。

例如,一个将侦听器端口更改为 12345 的最小配置文件如下所示

mqtt.listeners.tcp.1 = 12345

而一个将侦听器更改为仅在 localhost(对于 IPv4 和 IPv6)上侦听的配置文件如下所示

mqtt.listeners.tcp.1 = 127.0.0.1:1883
mqtt.listeners.tcp.2 = ::1:1883

TCP 侦听器选项

插件支持 TCP 侦听器选项配置。

这些设置使用一个通用前缀 mqtt.tcp_listen_options,并控制诸如 TCP 缓冲区大小、入站 TCP 连接队列长度、是否启用 TCP 保持活动 等内容。有关详细信息,请参阅 网络指南

mqtt.listeners.tcp.1 = 127.0.0.1:1883
mqtt.listeners.tcp.2 = ::1:1883

mqtt.tcp_listen_options.backlog = 4096
mqtt.tcp_listen_options.buffer = 131072
mqtt.tcp_listen_options.recbuf = 131072
mqtt.tcp_listen_options.sndbuf = 131072

mqtt.tcp_listen_options.keepalive = true
mqtt.tcp_listen_options.nodelay = true

mqtt.tcp_listen_options.exit_on_close = true
mqtt.tcp_listen_options.send_timeout = 120

TLS 支持

要对 MQTT 连接使用 TLS,必须在代理中 配置 TLS。要启用支持 TLS 的 MQTT 连接,请使用 mqtt.listeners.ssl.* 配置键为 MQTT 添加 TLS 侦听器。

插件将使用核心 RabbitMQ 服务器证书和密钥(就像 AMQP 0-9-1 和 AMQP 1.0 侦听器一样)

ssl_options.cacertfile = /path/to/ca_certificate.pem
ssl_options.certfile = /path/to/server_certificate.pem
ssl_options.keyfile = /path/to/server_key.pem
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = true

# default TLS-enabled port for MQTT connections
mqtt.listeners.ssl.default = 8883
mqtt.listeners.tcp.default = 1883

请注意,RabbitMQ 默认情况下拒绝 SSLv3 连接,因为该协议已知存在漏洞。

有关详细信息,请参阅 TLS 配置指南

虚拟主机

RabbitMQ 在核心是一个多租户系统,每个连接都属于一个虚拟主机。一些消息协议有虚拟主机的概念,而另一些没有。MQTT 属于后者。因此,MQTT 插件需要提供一种将连接映射到虚拟主机的机制。

vhost 选项控制适配器默认连接到的 RabbitMQ 虚拟主机。只有在建立连接时没有提供虚拟主机的情况下才会使用 vhost 配置。有几种(可选)方法可以指定客户端将连接到的虚拟主机。

端口到虚拟主机映射

第一种方法是将 MQTT 插件(TCP 或 TLS)监听端口映射到虚拟主机。映射是通过 mqtt_port_to_vhost_mapping 全局运行时参数 指定的。让我们看看以下插件配置

mqtt.listeners.tcp.1 = 1883
mqtt.listeners.tcp.2 = 1884

mqtt.listeners.ssl.1 = 8883
mqtt.listeners.ssl.2 = 8884

# (other TLS settings are omitted for brevity)

mqtt.vhost = /

请注意,插件监听端口 1883、1884、8883 和 8884。假设您希望连接到端口 1883 和 8883 的客户端连接到 vhost1 虚拟主机,而连接到端口 1884 和 8884 的客户端连接到 vhost2 虚拟主机。可以通过使用 rabbitmqctl 设置 mqtt_port_to_vhost_mapping 全局参数来创建端口到虚拟主机映射

rabbitmqctl set_global_parameter mqtt_port_to_vhost_mapping \
'{"1883":"vhost1", "8883":"vhost1", "1884":"vhost2", "8884":"vhost2"}'

如果没有为给定端口找到映射(因为在 mqtt_port_to_vhost_mapping 全局参数 JSON 文档中找不到该端口,或者根本没有设置全局参数),插件将尝试从用户名中提取虚拟主机(见下文),并最终使用 vhost 插件配置选项。

代理在连接时查询 mqtt_port_to_vhost_mapping 全局参数值。如果值发生变化,已连接的客户端不会收到通知或断开连接。他们需要重新连接才能切换到新的虚拟主机。

虚拟主机作为用户名的一部分

另一种更具体的方法是在连接时在用户名前加上虚拟主机,并用冒号分隔。

例如,使用以下方法连接

/:guest

等效于默认虚拟主机和用户名,而

mqtt-vhost:mqtt-username

表示连接到虚拟主机 mqtt-host,用户名为 mqtt-username

在用户名中指定虚拟主机优先于使用 mqtt_port_to_vhost_mapping 全局参数指定的端口到虚拟主机映射。

使用 TLS/x509 客户端证书进行身份验证

插件可以通过从客户端的 TLS (x509) 证书中提取名称来验证启用 TLS 的连接,而无需使用密码。

为了安全起见,服务器必须使用 TLS 选项配置,将 fail_if_no_peer_cert 设置为 true,并将 verify 设置为 verify_peer,以强制所有 TLS 客户端拥有可验证的客户端证书。

要启用此功能,请将 rabbitmq_mqtt 应用程序的 ssl_cert_login 设置为 true。例如

mqtt.ssl_cert_login = true

默认情况下,这会将用户名设置为证书主体可分辨名称的 RFC4514 格式字符串,类似于 OpenSSL 的“-nameopt RFC2253”选项生成的字符串。

要使用通用名称,请添加

ssl_cert_login_from = common_name

到您的配置中。

请注意

  • 已验证的用户必须存在于配置的身份验证/授权后端中。
  • 客户端**不能**提供用户名和密码。

您可以选择使用 mqtt_default_vhosts 全局运行时参数 为客户端证书指定虚拟主机。此全局参数的值必须包含一个 JSON 文档,该文档将证书主体的可分辨名称映射到其目标虚拟主机。让我们看看如何将 2 个证书 O=client,CN=guestO=client,CN=rabbit 分别映射到 vhost1vhost2 虚拟主机。

全局参数可以使用以下方法设置

rabbitmqctl set_global_parameter mqtt_default_vhosts \
'{"O=client,CN=guest": "vhost1", "O=client,CN=rabbit": "vhost2"}'

请注意

  • 如果找不到证书的虚拟主机(因为在 mqtt_default_vhosts 全局参数 JSON 文档中找不到证书主体 DN,或者根本没有设置全局参数),将使用 vhost 插件配置选项指定的虚拟主机。
  • 代理在连接时查询 mqtt_default_vhosts 全局参数值。如果值发生变化,已连接的客户端不会收到通知或断开连接。他们需要重新连接才能切换到新的虚拟主机。
  • 使用 mqtt_default_vhosts 全局参数的证书到虚拟主机映射被认为比使用 mqtt_port_to_vhost_mapping 全局参数的端口到虚拟主机映射更具体,因此优先于后者。

流量控制

prefetch 选项控制将传递的具有 QoS=1 的未确认 PUBLISH 数据包的最大数量。此选项的解释方式与消费者预取相同。

MQTT 5.0 客户端可以通过在 CONNECT 数据包中设置接收最大值来定义较低的数字。

自定义交换机

exchange 选项确定将 MQTT 客户端的消息发布到哪个交换机。交换机必须在客户端发布任何消息之前创建。交换机应该是一个主题交换机.

默认主题交换机 amq.topic 是预先声明的:因此,它在 RabbitMQ 启动时存在。

保留消息和存储

插件支持保留消息,但有本节中描述的限制。消息存储实现是可插拔的,插件开箱即用地提供了两种实现

  • ETS(内存中),在模块 rabbit_mqtt_retained_msg_store_ets 中实现
  • DETS(磁盘上),在模块 rabbit_mqtt_retained_msg_store_dets 中实现

两种实现都有限制和权衡。对于第一个,可以保留的消息的最大数量受 RAM 限制。对于第二个,每个虚拟主机有 2 GB 的限制。两者都是**节点本地**的:保留消息不会复制到远程集群节点,也不会从远程集群节点查询。

以下示例有效:MQTT 客户端将保留消息发布到具有主题 topic/1 的节点 A。之后,另一个客户端在节点 A 上使用主题过滤器 topic/1 订阅。新的订阅者将收到保留的消息。

但是,如果客户端在节点 A 上发布保留消息,而另一个客户端随后在节点 B 上订阅,则该订阅客户端将不会收到存储在节点 A 上的任何保留消息。

此外,如果主题过滤器包含通配符(多级通配符字符“#”或单级通配符字符“+”),则不会发送任何保留的消息。

要配置存储,请使用 mqtt.retained_message_store 配置键

## use DETS (disk-based) store for retained messages
mqtt.retained_message_store = rabbit_mqtt_retained_msg_store_dets
## only used by DETS store (in milliseconds)
mqtt.retained_message_store_dets_sync_interval = 2000

该值必须是实现存储的模块

  • rabbit_mqtt_retained_msg_store_ets 用于基于 RAM 的存储
  • rabbit_mqtt_retained_msg_store_dets 用于基于磁盘的存储(这是默认值。)

这些实现适合开发,但有时不适合生产需求。MQTT 规范没有定义保留消息存储的一致性或复制要求,因此 RabbitMQ 允许使用自定义消息存储来满足特定环境的一致性和可用性需求。例如,基于RiakCassandra的存储适合大多数生产环境,因为这些数据存储提供可调整的一致性.

消息存储必须实现 rabbit_mqtt_retained_msg_store 行为。

指标

Prometheus

此插件会发出全局计数器中列出的 Prometheus 指标。

Prometheus 标签 protocol 的值分别为 mqtt310mqtt311mqtt50,具体取决于 MQTT 客户端使用的是 MQTT 3.1、MQTT 3.1.1 还是 MQTT 5.0。

Prometheus 标签 queue_type 的值分别为 rabbit_classic_queuerabbit_quorum_queuerabbit_mqtt_qos0_queue,具体取决于 MQTT 客户端从中消费的队列类型。(请注意,MQTT 客户端永远不会直接从中消费,尽管它们可以将消息发布到流。)

RabbitMQ 管理 API

管理 API 提供了 MQTT 连接(例如,来自/到客户端的网络流量)以及经典队列和仲裁队列(例如,它们包含的消息数量)的指标。但是,从 3.12 开始,与 AMQP 0.9.1 通道相关的管理 API 指标(例如消息速率)不可用。

性能和可扩展性检查列表

MQTT 是物联网 (IoT) 的标准协议。常见的物联网工作负载是许多 MQTT 设备定期将传感器数据发布到 MQTT 代理。可能有数十万,有时甚至数百万个 IoT 设备连接到 MQTT 代理。博客文章原生 MQTT展示了此类工作负载的性能基准测试。

本节旨在提供一个非详尽的检查清单,其中包含配置 RabbitMQ 作为高效 MQTT 代理的提示和技巧,以支持许多客户端连接

  1. management_agent.disable_metrics_collector = true 设置为在管理插件中禁用指标收集。RabbitMQ 管理插件并非为过度收集指标而设计。实际上,通过管理 API 传递指标已弃用。相反,请使用专门为收集和查询大量指标而设计的工具:Prometheus。
  2. 具有 QoS 0 的 MQTT 数据包和订阅的性能远高于 QoS 1。与 AMQP 0.9.1 和 AMQP 1.0 不同,MQTT 并非旨在最大限度地提高吞吐量:例如,没有多重确认。每个具有 QoS 1 的PUBLISH数据包都需要单独确认。
  3. 降低 TCP 缓冲区大小,如TCP 监听器选项部分所述。

这大大减少了在具有许多并发连接客户端的环境中的内存使用量.

  1. 更少的主题级别(在 MQTT 主题和 MQTT 主题过滤器中)的性能优于更多的主题级别。例如,如果可能,请将主题结构化为 city/name 而不是 continent/country/city/name。主题过滤器中的每个主题级别当前都会在 RabbitMQ 使用的数据库中创建自己的条目。因此,在主题级别较少的情况下,创建和删除许多订阅的速度会更快。此外,路由具有较少主题级别的消息的速度更快。
  2. 在订阅变化率高的工作负载中,增加 Mnesia 配置参数 dump_log_write_threshold(例如,RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="-mnesia dump_log_write_threshold 20000"
  3. 当连接多个客户端时,请增加最大 Erlang 进程数(例如:RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+P 10000000)以及最大打开端口数(例如:ERL_MAX_PORTS=10000000)。
  4. **不要** 使用 配置策略 queue_master_locator = min-mastersmin-masters 会从 RabbitMQ 数据库中读取所有队列记录以决定将经典队列放置在哪个节点上,这对包含大量队列的集群来说会很昂贵。请注意,queue_master_locator = min-masters 是从 rabbitmq/cluster-operator v2.7.0 版本开始的默认配置。

请查看 网络配置 指南以获取更多信息。

代理协议

MQTT 插件支持 代理协议。该功能默认情况下处于禁用状态,要为 MQTT 客户端启用它

mqtt.proxy_protocol = true

有关代理协议的更多信息,请参见 网络指南

Sparkplug 支持

Sparkplug 是一种规范,它为 MQTT 系统的设计提供了指导。在 Sparkplug 中,MQTT 主题必须以 spAvM.NspBvM.N 开头,其中 MN 是整数。不幸的是,这与 RabbitMQ MQTT 插件 将 MQTT 主题转换为 AMQP 0.9.1 路由键的方式冲突

为了解决这个问题,可以将 sparkplug 配置项设置为 true

mqtt.sparkplug = true

启用 Sparkplug 支持后,MQTT 插件将不会转换主题名称中的 spAvM.N/spBvM.N 部分。

限制

不支持 QoS 2

QoS 2 订阅将被视为 QoS 1 订阅。

重叠订阅

来自同一客户端的重叠订阅(例如 /sports/football/epl/#/sports/football/#)可能会导致重复消息被传递。应用程序需要考虑到这一点。

保留消息存储

请参阅上面的保留消息。不同的保留消息存储具有不同的优势、权衡和限制。

禁用插件

在节点上禁用插件或从集群中移除节点之前,必须使用 rabbitmqctl 对其进行退役。

rabbitmqctl decommission_mqtt_node <node>;

保留消息和存储

该插件支持保留消息。消息存储实现是可插拔的,该插件开箱即用地提供了两种实现

  • 基于 ETS(内存中),在 rabbit_mqtt_retained_msg_store_ets 模块中实现
  • 基于 DETS(磁盘上),在 rabbit_mqtt_retained_msg_store_dets 中实现

两种实现都有局限性和权衡。对于第一个,可以保留的最大消息数受 RAM 的限制。对于第二个,每个 vhost 有 2 GB 的限制。两者都是节点本地的(在一个代理节点上保留的消息不会复制到集群中的其他节点)。

要配置存储,请使用 rabbitmq_mqtt.retained_message_store 配置键

mqtt.default_user                        = guest
mqtt.default_pass = guest
mqtt.allow_anonymous = true
mqtt.vhost = /
mqtt.exchange = amq.topic
mqtt.max_session_expiry_interval_seconds = 1800
mqtt.prefetch = 10

## use DETS (disk-based) store for retained messages
mqtt.retained_message_store = rabbit_mqtt_retained_msg_store_dets
## only used by DETS store
mqtt.retained_message_store_dets_sync_interval = 2000

mqtt.listeners.ssl = none
mqtt.listeners.tcp.default = 1883

该值必须是实现存储的模块

  • rabbit_mqtt_retained_msg_store_ets 用于基于 RAM 的存储
  • rabbit_mqtt_retained_msg_store_dets 用于基于磁盘的

这些实现适合开发,但有时不适合生产需求。MQTT 3.1 规范没有定义保留消息存储的一致性或复制要求,因此 RabbitMQ 允许使用自定义存储来满足特定环境的一致性和可用性需求。例如,基于 RiakCassandra 的存储适合大多数生产环境,因为这些数据存储提供了 可调一致性

消息存储必须实现 rabbit_mqtt_retained_msg_store 行为。