跳至主要内容

RabbitMQ 3.13 即将支持 MQTT 5.0

·27 分钟阅读

RabbitMQ 3.12 中发布的原生 MQTT 为物联网用例带来了显著的可扩展性和性能提升。

RabbitMQ 3.13 将支持 MQTT 5.0,因此将是我们使 RabbitMQ 成为领先的 MQTT Broker 之一的旅程中的下一个重要步骤。

这篇博文解释了 RabbitMQ 中如何使用新的 MQTT 5.0 功能。

MQTT 概述

MQTT 是物联网 (IoT) 的标准协议。

当连接到 Broker 时,物联网远程设备可能具有较差的网络质量。因此,MQTT 是轻量级的:MQTT 协议头很小,以节省网络带宽。

由于物联网设备可能经常断开和重新连接 - 想象一下汽车驶过隧道 - MQTT 也很高效:与其他消息传递协议相比,客户端通过更短的握手进行连接和身份验证。

MQTT 协议已经存在多年。如下表所示,最新的 MQTT 协议版本是 5.0。

MQTT 版本CONNECT 数据包中的协议版本MQTT 规范发布年份RabbitMQ 支持年份(版本)
3.1320102012 (3.0)
3.1.1420142014 (3.3)
5.0520192024 (3.13)

值得一提的是,用户面向的协议版本和“内部”协议版本(也称为协议级别)之间存在差异。后者在 CONNECT 数据包中从客户端发送到服务器。由于用户面向的协议版本 3.1.1 映射到内部协议版本 4,为了避免进一步的混淆,MQTT 委员会决定跳过用户面向的版本 4.0,使得用户面向的版本 5.0 映射到内部协议版本 5。

MQTT 5.0 功能

附录 C. MQTT v5.0 中新功能的摘要 提供了 MQTT 5.0 新功能的完整列表。

由于您可以在网络上找到包括说明性图表和使用模式在内的很棒的 MQTT 5.0 资源,因此这篇博文仅关注 RabbitMQ 的具体细节。本节解释了 PR #7263 中实现的最重要功能。对于每个功能,我们都提供了一个示例,说明如何在 RabbitMQ 中使用它,或概述了它在 RabbitMQ 中的高级实现描述。

要自行运行示例,请启动 RabbitMQ 服务器 3.13,例如,使用 此 Docker 镜像标签

docker run -it --rm --name rabbitmq -p 1883:1883 -p 15672:15672 -p 15692:15692 rabbitmq:3.13.0-management

在另一个终端窗口中,启用 MQTT 插件

docker exec rabbitmq rabbitmq-plugins enable rabbitmq_mqtt

由于 MQTT 插件是动态启用的,因此 MQTT 插件定义的功能标志 已禁用。启用所有功能标志,包括功能标志 mqtt_v5

docker exec rabbitmq rabbitmqctl enable_feature_flag all

现在列出功能标志应显示所有功能标志均已启用

docker exec rabbitmq rabbitmqctl list_feature_flags --formatter=pretty_table

以下示例使用 MQTTX CLI 版本 1.9.4。我们使用 CLI 而不是图形 UI,以便您可以轻松地通过复制粘贴命令来运行示例。

所有新功能也适用于 RabbitMQ Web MQTT 插件

以下是这篇博文中涵盖的 MQTT 5.0 功能列表

功能 1:消息过期

描述

可以为发布到 Broker 的每条消息设置以秒为单位的过期间隔。如果在过期间隔内未消费该消息,则该消息将被丢弃或死信。

示例

为主题 t/1 创建订阅。这会在 RabbitMQ 中创建一个队列。通过在终端中键入 Ctrl+C 断开客户端连接。由于我们使用了 600 秒的 会话过期间隔,因此该队列将再存在 10 分钟。

mqttx sub --client-id sub-1 --topic t/1 --session-expiry-interval 600 --qos 1
… Connecting...
✔ Connected
… Subscribing to t/1...
✔ Subscribed to t/1
^C

使用 30 秒的消息过期间隔向同一主题发布消息

mqttx pub --topic t/1 --message m1 --message-expiry-interval 30 --qos 1
… Connecting...
✔ Connected
… Message publishing...
✔ Message published

在接下来的 30 秒内,列出队列

docker exec rabbitmq rabbitmqctl --quiet --formatter=pretty_table list_queues name type messages
┌─────────────────────────────┬─────────┬──────────┐
│ name │ type │ messages │
├─────────────────────────────┼─────────┼──────────┤
│ mqtt-subscription-sub-1qos1 │ classic │ 1
└─────────────────────────────┴─────────┴──────────┘

等待 30 秒,然后再次列出队列

docker exec rabbitmq rabbitmqctl --quiet --formatter=pretty_table list_queues
┌─────────────────────────────┬─────────┬──────────┐
│ name │ type │ messages │
├─────────────────────────────┼─────────┼──────────┤
│ mqtt-subscription-sub-1qos1 │ classic │ 0
└─────────────────────────────┴─────────┴──────────┘

消息已过期,因为客户端 sub-1 尚未连接到 Broker 以消费该消息。如果设置了 死信 策略,则该消息将被死信到交换机。在我们的例子中,死信被禁用。查询 Prometheus 端点证明 1 条消息从经典队列中过期。

curl --silent localhost:15692/metrics | grep rabbitmq_global_messages_dead_lettered_expired_total
# TYPE rabbitmq_global_messages_dead_lettered_expired_total counter
# HELP rabbitmq_global_messages_dead_lettered_expired_total Total number of messages dead-lettered due to message TTL exceeded
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_classic_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_classic_queue",dead_letter_strategy="disabled"} 1
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="disabled"} 0

另一个有趣的功能是以下要求

服务器发送给客户端的 PUBLISH 数据包必须包含消息过期间隔,该间隔设置为接收值减去应用程序消息在服务器中等待的时间。

向 Broker 发送第二条消息,消息过期间隔为 60 秒

mqttx pub --topic t/1 --message m2 --message-expiry-interval 60 --qos 1

在重新连接订阅客户端之前等待 20 秒

mqttx sub --client-id sub-1 --topic t/1 --no-clean --session-expiry-interval 0  --qos 1 --output-mode clean
{
"topic": "t/1",
"payload": "m2",
"packet": {
...
"properties": {
"messageExpiryInterval": 40
}
}
}

按照 MQTT 5.0 协议规范的要求,客户端接收到的第二条消息的消息过期间隔设置为 40 秒:Broker 接收到的 60 秒减去消息在 Broker 中等待的 20 秒。

实现

MQTT 5.0 消息过期在 RabbitMQ 中使用 每消息 TTL 实现,类似于 AMQP 0.9.1 发布者中的 expiration 字段。

功能 2:订阅标识符

描述

客户端可以在 SUBSCRIBE 数据包中设置订阅标识符。如果由于该订阅而向客户端发送消息,则 Broker 会将该订阅标识符包含到 PUBLISH 数据包中。

订阅标识符的用例在 SUBSCRIBE 操作 部分列出。

示例

从同一客户端向服务器发送 3 个单独的 SUBSCRIBE 数据包,每个数据包具有不同的主题过滤器和不同的订阅标识符

mqttx sub --client-id sub-2 --topic t/1 --subscription-identifier 1 --session-expiry-interval 600
^C
mqttx sub --client-id sub-2 --topic t/2 --subscription-identifier 2 --session-expiry-interval 600 --no-clean
^C
mqttx sub --client-id sub-2 --topic "t/#" --subscription-identifier 3 --session-expiry-interval 0 --no-clean --output-mode clean

在第二个终端窗口中,我们看到来自同一队列到同一主题交换机的 3 个绑定,每个绑定都具有不同的路由键

docker exec rabbitmq rabbitmqctl --quiet --formatter=pretty_table list_bindings \
source_name source_kind destination_name destination_kind routing_key
┌─────────────┬─────────────┬─────────────────────────────┬──────────────────┬─────────────────────────────┐
│ source_name │ source_kind │ destination_name │ destination_kind │ routing_key │
├─────────────┼─────────────┼─────────────────────────────┼──────────────────┼─────────────────────────────┤
│ │ exchange │ mqtt-subscription-sub-2qos0 │ queue │ mqtt-subscription-sub-2qos0 │
├─────────────┼─────────────┼─────────────────────────────┼──────────────────┼─────────────────────────────┤
│ amq.topic │ exchange │ mqtt-subscription-sub-2qos0 │ queue │ t.# │
├─────────────┼─────────────┼─────────────────────────────┼──────────────────┼─────────────────────────────┤
│ amq.topic │ exchange │ mqtt-subscription-sub-2qos0 │ queue │ t.1 │
├─────────────┼─────────────┼─────────────────────────────┼──────────────────┼─────────────────────────────┤
│ amq.topic │ exchange │ mqtt-subscription-sub-2qos0 │ queue │ t.2 │
└─────────────┴─────────────┴─────────────────────────────┴──────────────────┴─────────────────────────────┘

第一个条目是到默认交换机的隐式绑定。

每个带有 MQTT 主题过滤器的 MQTT 订阅都对应一个带有绑定键的 AMQP 0.9.1 绑定。确切地说,表列 routing_key 名称有误:它应该称为 binding_key。MQTT 中的主题级别分隔符是“/”字符,而 AMQP 0.9.1 主题交换机中的主题级别分隔符是“.”字符。

再次在第二个终端窗口中,向主题 t/1 发送消息

mqttx pub --topic t/1 --message m1

第一个终端窗口(订阅客户端的终端窗口)接收到以下 PUBLISH 数据包

{
"topic": "t/1",
"payload": "m1",
"packet": {
...
"properties": {
"subscriptionIdentifier": [
1,
3
]
}
}
}

它包含订阅标识符 1 和 3,因为主题过滤器 t/1t/# 都匹配主题 t/1

同样,如果您向主题 t/2 发送第二条消息,则订阅客户端将收到一个 PUBLISH 数据包,其中包含订阅标识符 2 和 3。

实现

订阅标识符是 MQTT 会话状态的一部分。因此,当客户端断开连接时以及在 MQTT 会话结束之前,必须将订阅标识符持久存储在服务器的数据库中。RabbitMQ 将订阅标识符存储在绑定参数中

docker exec rabbitmq rabbitmqctl --quiet --formatter=pretty_table list_bindings routing_key arguments
┌─────────────────────────────┬───────────────────────────────────────────────────────────────────────────────────┐
│ routing_key │ arguments │
├─────────────────────────────┼───────────────────────────────────────────────────────────────────────────────────┤
│ mqtt-subscription-sub-2qos0 │ │
├─────────────────────────────┼───────────────────────────────────────────────────────────────────────────────────┤
│ t.# │ {mqtt_subscription_opts,0,false,false,0,3}{<<"x-binding-key">>,longstr,<<"t.#">>} │
├─────────────────────────────┼───────────────────────────────────────────────────────────────────────────────────┤
│ t.1 │ {mqtt_subscription_opts,0,false,false,0,1}{<<"x-binding-key">>,longstr,<<"t.1">>}
├─────────────────────────────┼───────────────────────────────────────────────────────────────────────────────────┤
│ t.2 │ {mqtt_subscription_opts,0,false,false,0,2}{<<"x-binding-key">>,longstr,<<"t.2">>}
└─────────────────────────────┴───────────────────────────────────────────────────────────────────────────────────┘

绑定参数的确切结构并不重要,并且可能会在未来的 RabbitMQ 版本中更改。但是,您可以在绑定参数中看到整数 1、2 和 3,它们对应于订阅标识符。

当主题交换机路由消息时,发布 Erlang 进程会将所有匹配的绑定键包含到消息中。订阅 MQTT 客户端的 Erlang 进程会将匹配的绑定键与它知道的 MQTT 主题过滤器进行比较,并将订阅标识符包含到发送给 MQTT 客户端的 PUBLISH 数据包中。

发布 Erlang 进程可以是 MQTT 连接进程或 AMQP 0.9.1 通道进程。与往常一样,RabbitMQ 在跨协议互操作性方面表现出色:当 AMQP 0.9.1(或 STOMP 或 AMQP 1.0)客户端向主题交换机发送消息时,正确的订阅标识符将包含在发送给 MQTT 客户端的 PUBLISH 数据包中。

功能 3:订阅选项

描述

MQTT 5.0 带有 3 个新的订阅选项

  1. No Local(本地不发送)
  2. Retain as Published(按发布保留)
  3. Retain Handling(保留处理)

所有订阅选项均由 RabbitMQ 实现。在这里,我们仅关注保留处理选项

此选项指定在建立订阅时是否发送保留消息。
这些值是
0 = 在订阅时发送保留消息
1 = 仅当订阅当前不存在时才在订阅时发送保留消息
2 = 在订阅时不要发送保留消息

示例

发送保留消息

mqttx pub --topic mytopic --message m --retain

保留处理值 0 将接收保留消息,而值 2 将不会接收

mqttx sub --topic mytopic --retain-handling 0
… Connecting...
✔ Connected
… Subscribing to mytopic...
✔ Subscribed to mytopic
payload: m
retain: true
^C

mqttx sub --topic mytopic --retain-handling 2
… Connecting...
✔ Connected
… Subscribing to mytopic...
✔ Subscribed to mytopic

功能 4:所有 ACK 上的原因代码

描述

数据包 CONNACK、PUBACK、SUBACK、UNSUBACK 和 DISCONNECT 包含原因代码。

实现

一个实现示例是,如果消息未路由到任何队列,RabbitMQ 将在 PUBACK 数据包中回复原因代码 No matching subscribers。MQTT 5.0 原因代码 No matching subscribers 在概念上对应于 AMQP 0.9.1 中的 mandatory 消息属性和 BasicReturn 处理程序。

功能 5:用户属性

描述

大多数 MQTT 数据包都可以包含用户属性。用户属性的含义未在 MQTT 规范中定义。

PUBLISH 数据包示例

PUBLISH 数据包中的用户属性由客户端应用程序定义,并由服务器原封不动地转发。

在第一个终端窗口中订阅

mqttx sub --topic t/5

在第二个终端窗口中发布带有用户属性的消息

mqttx pub --topic t/5 --message m --user-properties "key1: value1"

第一个终端窗口将原封不动地接收用户属性

payload: m
userProperties: [ { key: 'key1', value: 'value1' } ]

MQTT 5.0 PUBLISH 数据包中的用户属性类似于 AMQP 0.9.1 中的 headers 消息属性。

CONNECT 数据包示例

连接时带有用户属性

mqttx conn --client-id myclient --user-properties "connecting-from: London"

在浏览器中打开管理 UI http://localhost:15672/#/connections(用户名和密码均为 guest),然后单击 MQTT 连接

Figure 1: User Property in the CONNECT packet
图 1:CONNECT 数据包中的用户属性

RabbitMQ 将在管理 UI 中显示来自 CONNECT 数据包的用户属性。

功能 6:有效载荷格式内容类型

描述

发布者可以指定 MIME 内容类型。它还可以设置有效载荷格式指示符,指示有效载荷是否由 UTF-8 编码的字符数据或未指定的二进制数据组成。

示例

在第一个终端窗口中,订阅一个主题

mqttx sub --topic t/6 --output-mode clean

在第二个终端窗口中,发送一条带有内容类型和有效载荷格式指示符的消息

mqttx pub --topic t/6 --message "my UTF-8 encoded data 🙂" --content-type text/plain --payload-format-indicator

第一个终端窗口将原封不动地接收内容类型和有效载荷格式指示符

{
"topic": "t/6",
"payload": "my UTF-8 encoded data 🙂",
"packet": {
...
"properties": {
"payloadFormatIndicator": true,
"contentType": "text/plain"
}
}
}

功能 7:请求/响应

描述

MQTT 5.0 正式确定了请求/响应模式。

在发布消息之前,MQTT 客户端(请求者)订阅响应主题。请求者将响应主题和一些关联数据包含在请求消息中。

另一个 MQTT 客户端(响应者)接收请求消息,执行某些操作,并将带有相同关联数据的响应消息发布到响应主题。

MQTT 5.0 请求/响应功能对应于 AMQP 0.9.1 中的远程过程调用。但是,在 AMQP 0.9.1 中,请求者将在 AMQP 0.9.1 消息属性 reply_to 中包含回调队列的名称。MQTT 协议未定义队列的概念。因此,在 MQTT 中,要回复的“地址”是主题名称。

尽管协议规范之间存在不兼容性,但 RabbitMQ 在协议互操作性方面表现出色:因此,RabbitMQ 跨协议支持请求/响应交互。

例如,MQTT 客户端可以在请求消息中包含响应主题和关联数据。如果 AMQP 0.9.1 客户端创建了一个绑定到主题交换机 amq.topic 的队列,其绑定键与请求消息的主题匹配,它将收到一条 AMQP 0.9.1 消息,其属性 correlation_id 设置为 MQTT 客户端发送的关联数据,并且一个名为 x-opt-reply-to-topic 的标头。然后,AMQP 0.9.1 客户端可以通过使用相同的 correlation_id 并将响应消息发布到主题交换机 amq.topic 以及 x-opt-reply-to-topic 标头中存在的主题来响应 MQTT 5.0 客户端。

示例

此示例仅关注 MQTT 客户端。

在第一个终端窗口中,响应 MQTT 客户端订阅主题 t/7

mqttx sub --client-id responder --topic t/7 --session-expiry-interval 600 --output-mode clean --qos 1

在第二个终端窗口中,请求 MQTT 客户端订阅一个名为 my/response/topic 的主题

mqttx sub --client-id requester --topic my/response/topic --session-expiry-interval 600 --qos 1
… Connecting...
✔ Connected
… Subscribing to my/response/topic...
✔ Subscribed to my/response/topic
^C

在第二个终端窗口中,请求者然后发布请求消息

mqttx pub --client-id requester --topic t/7 --message "my request" \
--correlation-data abc-123 --response-topic my/response/topic \
--session-expiry-interval 600 --no-clean

在第一个终端窗口中,响应者接收请求消息

{
"topic": "t/7",
"payload": "my request",
"packet": {
...
"properties": {
"responseTopic": "my/response/topic",
"correlationData": {
"type": "Buffer",
"data": [
97,
98,
99,
45,
49,
50,
51
]
}
}
}
}
^C

在第一个终端窗口中,响应者通过复制关联数据并发布到响应主题来响应请求者

mqttx pub --client-id responder --topic my/response/topic --message "my response" --correlation-data abc-123

在第二个终端窗口中,请求者接收响应。

mqttx sub --client-id requester --topic my/response/topic --no-clean --qos 1 --output-mode clean
{
"topic": "my/response/topic",
"payload": "my response",
"packet": {
...
"properties": {
"correlationData": {
"type": "Buffer",
"data": [
97,
98,
99,
45,
49,
50,
51
]
}
}
}
}

关联数据对于将响应与请求关联起来很有用。请求者通常为它发布的每个请求选择唯一的关联数据。

功能 8:分配的客户端标识符

描述

如果客户端使用零长度客户端标识符连接,则服务器必须使用包含分配的客户端标识符的 CONNACK 进行响应。

与 MQTT 3.1.1 相比,这取消了服务器分配的客户端 ID 只能与 Clean Session = 1 连接一起使用的限制。

实现

RabbitMQ 将生成一些随机客户端 ID(例如 dcGB2kSwS0JlXnaBa1A6QA)并在 CONNACK 数据包中返回它。

功能 9:主题别名

描述

主题别名是一个整数值,用于标识主题,而不是使用主题名称。这减小了 PUBLISH 数据包的大小,并且在主题名称很长且在网络连接中重复使用相同的主题名称时非常有用。

实现

RabbitMQ 中的默认 主题别名最大值 为 16。您可以在 rabbitmq.conf 中配置此值,例如

mqtt.topic_alias_maximum = 32

此配置值映射到从 RabbitMQ 发送到客户端的 CONNACK 数据包中的主题别名最大值。它限制了两个方向上的主题别名数量,即从客户端到 RabbitMQ 以及从 RabbitMQ 到客户端。如果客户端发送到或接收来自许多不同主题的消息,则设置更高的值将需要 RabbitMQ 中更多的内存使用量。

RabbitMQ 运营商可以通过设置以下内容来禁止使用主题别名

mqtt.topic_alias_maximum = 0

功能 10:流量控制

描述

MQTT 5.0 属性接收最大值定义了未确认的 QoS 1 PUBLISH 数据包的上限。

实现

从 RabbitMQ 发送到客户端的未确认 QoS 1 PUBLISH 数据包的最大数量由客户端在 CONNECT 数据包中发送到 RabbitMQ 的接收最大值与配置的 mqtt.prefetch 值中的最小值确定

mqtt.prefetch = 10

mqtt.prefetch 的默认值为 10。

mqtt.prefetch 值在 RabbitMQ 3.13 之前已经存在于 MQTT 3.1 和 3.1.1 中。它映射到 RabbitMQ 中的 消费者预取。换句话说,它定义了队列发送到其 MQTT 连接进程的正在传输的消息数量。

功能 11:最大数据包大小

描述

客户端和服务器可以独立指定它们支持的最大数据包大小。

示例

此示例演示如何限制从客户端发送到 RabbitMQ 的最大 MQTT 数据包大小。

假设在成功身份验证后,RabbitMQ 运营商不希望 RabbitMQ 接受任何大于 1 KiB 的 MQTT 数据包。将以下配置写入 rabbitmq.conf(在您当前的工作目录中)

mqtt.max_packet_size_authenticated = 1024

停止 RabbitMQ 服务器后,启动 RabbitMQ 服务器并应用新配置

docker run -it --rm --name rabbitmq -p 1883:1883 -p 15672:15672 -p 15692:15692 \
--mount type=bind,source="$(pwd)"/rabbitmq.conf,target=/etc/rabbitmq/conf.d/11-blog-post.conf \
rabbitmq:3.13.0-beta.2-management
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_mqtt
docker exec rabbitmq rabbitmqctl enable_feature_flag all

在第一个终端窗口中,订阅一个主题

mqttx sub --topic t/11

在第二个终端窗口中,向该主题发送有效载荷为 3 字节的消息

payload=$(head --bytes 3 < /dev/zero | tr '\0' x)
mqttx pub --topic t/11 -m "$payload"

第一行从特殊文件 /dev/zero 读取 3 个字节(3 个空字符),将每个空字符转换为 ASCII 字符 x,并将结果 xxx 保存在变量 payload 中。

第一个终端窗口将接收该消息

payload: xxx

接下来,在第二个终端窗口中,发送有效载荷为 2,000 字节的消息

payload=$(head --bytes 2000 < /dev/zero | tr '\0' x)
mqttx pub --topic t/11 -m "$payload"

这次,第一个终端窗口没有收到消息,因为从客户端发送到 RabbitMQ 的 PUBLISH 数据包大于配置的最大数据包大小 1024 字节。

相反,RabbitMQ 记录了一条描述性错误消息

[error] <0.836.0> MQTT packet size (2007 bytes, type 3) exceeds mqtt.max_packet_size_authenticated (1024 bytes)

日志消息显示 2,007 字节,因为 PUBLISH 数据包的固定和可变标头需要 7 个字节(其中主题名称 t/11 需要 4 个字节)。

功能 12:服务器发起的 DISCONNECT

描述

在 MQTT 5.0 中,DISCONNECT 数据包不仅可以从客户端发送到服务器,还可以从服务器发送到客户端。

实现

在终止连接之前,RabbitMQ 将在以下情况下向客户端发送 DISCONNECT 数据包

DISCONNECT 原因代码名称情况
会话被接管另一个客户端使用相同的客户端 ID 连接。
服务器正在关闭RabbitMQ 进入 维护模式
Keep Alive 超时客户端在 Keep Alive 时间内未能通信。
数据包过大RabbitMQ 接收到大小超过 mqtt.max_packet_size_authenticated 的数据包

功能 13:会话过期

描述

在 MQTT 5.0 中,客户端可以在 CONNECT 数据包中向服务器建议会话过期间隔。服务器可以在 CONNACK 数据包中接受建议的会话过期间隔或强制使用不同的会话过期间隔。

会话可以跨越一系列网络连接继续进行。它的持续时间为最新的网络连接加上会话过期间隔。

当会话过期间隔到期时,客户端和服务器都将删除任何会话状态。

实现

客户端和服务器将保持会话状态,只要会话持续存在。

服务器中的会话状态包括已发送给客户端但尚未确认的消息、正在等待发送给客户端的消息以及客户端的订阅。RabbitMQ 以队列和绑定的形式对 MQTT 会话状态进行建模。

因此,会话过期间隔映射到 RabbitMQ 中的 队列 TTL。当 MQTT 会话过期时,队列及其消息和绑定将被删除。

示例

默认情况下,服务器允许的最大会话过期间隔为 1 天。如果 MQTT 客户端在 1 天内未重新连接,则其会话状态将在 RabbitMQ 中删除。

此值是可配置的。对于此示例,让我们在 rabbitmq.conf 中设置非常低的会话过期间隔,例如 1 分钟

mqtt.max_session_expiry_interval_seconds = 60

设置名称包含前缀 max,因为 MQTT 5.0 客户端可以通过在 CONNECT 数据包中发送会话过期间隔来选择较低的值。如 最大数据包大小示例 中所示,重新启动 RabbitMQ 节点,以便应用新设置。

使用 20 秒的会话过期间隔连接到 RabbitMQ 并创建订阅

mqttx sub --client-id sub-13 --topic t/13 --session-expiry-interval 20 --qos 1
… Connecting...
✔ Connected
… Subscribing to t/13...
✔ Subscribed to t/13
^C

在终端中键入 Ctrl+C 以断开客户端连接。

在接下来的 20 秒内,列出队列和绑定

docker exec rabbitmq rabbitmqctl list_queues name
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name
mqtt-subscription-sub-13qos1

docker exec rabbitmq rabbitmqctl list_bindings source_name destination_name routing_key --formatter=pretty_table
Listing bindings for vhost /...
┌─────────────┬──────────────────────────────┬──────────────────────────────┐
│ source_name │ destination_name │ routing_key │
├─────────────┼──────────────────────────────┼──────────────────────────────┤
│ │ mqtt-subscription-sub-13qos1 │ mqtt-subscription-sub-13qos1 │
├─────────────┼──────────────────────────────┼──────────────────────────────┤
│ amq.topic │ mqtt-subscription-sub-13qos1 │ t.13 │
└─────────────┴──────────────────────────────┴──────────────────────────────┘

20 秒后,再次列出队列和绑定

docker exec rabbitmq rabbitmqctl list_queues name
Timeout: 60.0 seconds ...
Listing queues for vhost / ...

docker exec rabbitmq rabbitmqctl list_bindings source_name destination_name routing_key --formatter=pretty_table
Listing bindings for vhost /...

队列及其绑定已被 RabbitMQ 删除,因为我们的客户端未在 20 秒的会话过期间隔内使用 Clean Session = 0 连接到 RabbitMQ。

接下来,执行相同的测试,但会话过期间隔较长,例如 1 小时

mqttx sub --client-id sub-13 --topic t/13 --session-expiry-interval 3600 --qos 1
… Connecting...
✔ Connected
… Subscribing to t/13...
✔ Subscribed to t/13
^C

您应该观察到队列及其绑定将在 1 分钟后删除,因为有效的会话过期间隔是客户端请求的值(1 小时)和 RabbitMQ 中配置的 mqtt.max_session_expiry_interval_seconds 值(1 分钟)中的最小值。

功能 14:遗嘱延迟

描述

客户端可以在 CONNECT 数据包中定义遗嘱延迟间隔。

服务器延迟发布客户端的遗嘱消息,直到遗嘱延迟间隔过去或会话结束,以先发生者为准。如果在遗嘱延迟间隔过去之前建立了与此会话的新网络连接,则服务器不得发送遗嘱消息。这样做的一个用途是避免在发生临时网络断开连接并且客户端在遗嘱消息发布之前成功重新连接并继续其会话的情况下发布遗嘱消息。

遗嘱延迟间隔的另一个用例是通知会话过期

客户端可以通过将遗嘱延迟间隔设置为长于会话过期间隔,并发送带有原因代码 0x04(断开连接并带有遗嘱消息)的 DISCONNECT 来安排遗嘱消息通知会话过期。

实现

尽管遗嘱消息有效载荷通常很小,但 MQTT 规范允许遗嘱消息有效载荷大小最大为 64 KiB。

为了避免在 Khepri (RabbitMQ 未来的元数据存储) 中存储大型二进制数据,RabbitMQ 创建了一个经典队列,其中包含此单个遗嘱消息。我们称此队列为遗嘱队列。此消息设置了单消息 TTL,以毫秒为单位定义,并对应于遗嘱延迟间隔(以秒为单位)。此外,遗嘱队列还设置了队列 TTL,以毫秒为单位定义,并对应于会话过期间隔(以秒为单位)。有效的单消息 TTL 至少比队列 TTL 低几毫秒,这样消息将在队列(会话)过期前不久发布。

遗嘱队列还定义 amq.topic (MQTT 插件使用的默认主题交换机) 为死信交换机,并将 遗嘱主题 定义为 死信路由键

如果 MQTT 客户端未在其遗嘱延迟间隔内重新连接,则遗嘱队列中的消息将被死信到主题交换机。

让我们用一个例子来说明这一点。

示例

在第 1 个终端窗口中,创建一个订阅,它将消费遗嘱消息

mqttx sub --client-id sub-14 --topic t/14

在第 2 个终端窗口中,创建一个遗嘱延迟间隔为 20 秒的连接

mqttx conn --client-id conn-14 --will-topic t/14 --will-message my-will-message --will-delay-interval 20 --session-expiry-interval 40

在第 3 个终端窗口中,我们看到到目前为止,订阅 MQTT 客户端创建了一个队列

docker exec rabbitmq rabbitmqctl --quiet --formatter=pretty_table list_queues name type messages arguments
┌──────────────────────────────┬────────────┬──────────┬───────────┐
│ name │ type │ messages │ arguments │
├──────────────────────────────┼────────────┼──────────┼───────────┤
│ mqtt-subscription-sub-14qos0 │ MQTT QoS 00 │ │
└──────────────────────────────┴────────────┴──────────┴───────────┘

在第 2 个终端窗口中,键入 Ctrl+C 以断开客户端 ID 为 conn-14 的 MQTT 连接。

这次,列出队列显示遗嘱队列已创建

docker exec rabbitmq rabbitmqctl --quiet --formatter=pretty_table list_queues name type messages arguments
┌──────────────────────────────┬────────────┬──────────┬────────────────────────────────────────────────────────────┐
│ name │ type │ messages │ arguments │
├──────────────────────────────┼────────────┼──────────┼────────────────────────────────────────────────────────────┤
│ mqtt-subscription-sub-14qos0 │ MQTT QoS 00 │ │
├──────────────────────────────┼────────────┼──────────┼────────────────────────────────────────────────────────────┤
│ mqtt-will-conn-14 │ classic │ 1{<<"x-expires">>,long,40000}
│ │ │ │ {<<"x-dead-letter-exchange">>,longstr,<<"amq.topic">>}
│ │ │ │ {<<"x-dead-letter-routing-key">>,longstr,<<"t.14">>}
└──────────────────────────────┴────────────┴──────────┴────────────────────────────────────────────────────────────┘

遗嘱队列的命名模式为 mqtt-will-<MQTT 客户端 ID>。它包含一条消息:遗嘱消息。

如上一节所述,队列 TTL (x-expires) 为 40,000 毫秒,因此与上述命令中的 40 秒会话过期间隔匹配。如果您等待 20 秒,您的第 1 个终端窗口应该会收到遗嘱消息,因为我们的客户端未在遗嘱延迟间隔内重新连接

› payload: my-will-message

功能 15:可选服务器功能可用性

描述

定义服务器不允许的一组功能,并提供一种机制供服务器向客户端指定此项。可以以这种方式指定的功能包括

  • 最大 QoS
  • 保留可用
  • 通配符订阅可用
  • 订阅标识符可用
  • 共享订阅可用

客户端使用服务器声明为不可用的功能是错误的。

实施

RabbitMQ 3.13 在 CONNACK 属性中包含了 最大 QoS = 1 和 共享订阅可用 = 0。

RabbitMQ 尚不支持 QoS 2

如下一节所述,未来的 RabbitMQ 版本将支持 共享订阅

局限性

本节列出了 RabbitMQ MQTT 实施的局限性。

MQTT 5.0 特定局限性

共享订阅

未来的 RabbitMQ 版本将添加 共享订阅。尽管此功能可以很好地映射到 RabbitMQ 中的队列,但共享订阅是会话状态的一部分,并且需要进行一些 RabbitMQ 数据库迁移才能有效地查询给定 MQTT 客户端 ID 的共享订阅。

延迟和保留的遗嘱消息

延迟保留的遗嘱消息将不会被保留。这是因为延迟的遗嘱消息将被死信到主题交换机,但保留器进程目前不从队列中消费。未来可以使用新的保留消息存储来解决此限制。

非 MQTT 5.0 特定局限性

为了完整起见,本节列出了在 RabbitMQ 3.13 中支持 MQTT 5.0 之前以及 RabbitMQ 3.12 中发布 Native MQTT 之前就存在的局限性。

保留消息

RabbitMQ 中保留消息的功能受到限制

保留消息仅存储和查询在本地节点。

一个可行的例子如下:MQTT 客户端向节点 A 发布主题为 topic/1 的保留消息。此后,另一个客户端在节点 A 上订阅主题过滤器 topic/1。新的订阅者将收到保留消息。

但是,如果主题过滤器包含通配符(多级通配符 "#" 或单级通配符 "+"),则不会发送任何保留消息(问题 #8824)。

此外,如果客户端在节点 A 上发布保留消息,而另一个客户端随后在节点 B 上订阅,则该订阅客户端将不会收到节点 A 上存储的任何保留消息(问题 #8096)。

未来的 RabbitMQ 版本将在集群中复制保留消息,并发送与包含通配符的主题过滤器匹配的保留消息。

总结

总而言之,RabbitMQ

  • 是领先的 AMQP 0.9.1 代理
  • 是一个流式代理
  • 擅长跨协议互操作性
  • 由于在 3.13 中发布了对 MQTT 5.0 的支持以及在 3.12 中发布了 Native MQTT,正成为领先的 MQTT 代理之一

我们将 RabbitMQ 转变为功能完善的 IoT 代理的旅程尚未结束,并且计划在未来几个月和几年内完成更多的开发工作。敬请期待!

© . All rights reserved.