跳至主内容

RabbitMQ 3.13 将支持 MQTT 5.0

·阅读 29 分钟

RabbitMQ 3.12 中发布的原生 MQTT 为 IoT 用例带来了显著的可伸缩性和性能改进。

RabbitMQ 3.13 将支持MQTT 5.0,因此将是我们成为领先 MQTT 代理之一的旅程中的下一个重要里程碑。

这篇博文解释了 RabbitMQ 中如何使用新的 MQTT 5.0 功能。

MQTT 概述

MQTT 是物联网 (IoT) 的标准协议。

物联网远程设备在连接到代理时可能网络质量较差。因此,MQTT 是轻量级的:MQTT 协议头很小,以节省网络带宽。

由于物联网设备可能会频繁断开和重新连接——想象一下汽车驶过隧道——MQTT 也非常高效:客户端通过比其他消息协议更短的握手过程进行连接和身份验证。

MQTT 协议已经存在多年。如下表所示,最新的 MQTT 协议版本是 5.0。

MQTT 版本CONNECT 数据包中的协议版本MQTT 规范发布年份RabbitMQ 支持年份(版本)
3.1320102012 (3.0)
3.1.1420142014 (3.3)
5.0520192024 (3.13)

值得一提的是,用户可见的协议版本和“内部”协议版本(也称为协议级别)之间存在差异。后者在 CONNECT 数据包中从客户端发送到服务器。由于用户可见的协议版本 3.1.1 映射到内部协议版本 4,为了避免进一步混淆,MQTT 委员会决定跳过用户可见的版本 4.0,以便用户可见的版本 5.0 映射到内部协议版本 5。

MQTT 5.0 特性

附录 C. MQTT v5.0 新特性摘要 提供了 MQTT 5.0 新特性的完整列表。

由于您可以在网上找到出色的 MQTT 5.0 资源,包括说明性图表和使用模式,因此这篇博文仅关注 RabbitMQ 的特定内容。本节解释了在 PR #7263 中实现的最重要的特性。对于每个特性,我们提供了如何与 RabbitMQ 使用它的示例,或者概述了它在 RabbitMQ 中的实现方式。

要自行运行示例,请启动 RabbitMQ 服务器 3.13,例如,使用此 Docker 镜像标签

docker run -it --rm --name rabbitmq -p 1883:1883 -p 15672:15672 -p 15692:15692 rabbitmq:3.13.0-management

在另一个终端窗口中,启用 MQTT 插件

docker exec rabbitmq rabbitmq-plugins enable rabbitmq_mqtt

由于 MQTT 插件是动态启用的,MQTT 插件定义的特性标志 是禁用的。启用所有特性标志,包括特性标志 mqtt_v5

docker exec rabbitmq rabbitmqctl enable_feature_flag all

列出特性标志现在应该显示所有特性标志都已启用

docker exec rabbitmq rabbitmqctl list_feature_flags --formatter=pretty_table

以下示例使用了 MQTTX CLI 版本 1.9.4。我们使用 CLI 而不是图形界面,以便您可以轻松地通过复制粘贴命令来运行示例。

所有新特性也适用于 RabbitMQ Web MQTT 插件

以下是本博文涵盖的 MQTT 5.0 特性列表

特性 1:消息过期

描述

可以为发布到代理的每条消息设置以秒为单位的过期间隔。如果消息在过期间隔内未被消耗,则消息将被丢弃或作为死信处理。

示例

创建一个主题 t/1 的订阅。这会在 RabbitMQ 中创建一个队列。通过在终端中输入 Ctrl+C 断开客户端连接。由于我们使用了 600 秒的会话过期间隔,此队列将存在 10 分钟。

mqttx sub --client-id sub-1 --topic t/1 --session-expiry-interval 600 --qos 1
… Connecting...
✔ Connected
… Subscribing to t/1...
✔ Subscribed to t/1
^C

发布一条消息到同一主题,并设置消息过期间隔为 30 秒

mqttx pub --topic t/1 --message m1 --message-expiry-interval 30 --qos 1
… Connecting...
✔ Connected
… Message publishing...
✔ Message published

在接下来的 30 秒内,列出队列

docker exec rabbitmq rabbitmqctl --quiet --formatter=pretty_table list_queues name type messages
┌─────────────────────────────┬─────────┬──────────┐
│ name │ type │ messages │
├─────────────────────────────┼─────────┼──────────┤
│ mqtt-subscription-sub-1qos1 │ classic │ 1
└─────────────────────────────┴─────────┴──────────┘

等待 30 秒,然后再次列出队列

docker exec rabbitmq rabbitmqctl --quiet --formatter=pretty_table list_queues
┌─────────────────────────────┬─────────┬──────────┐
│ name │ type │ messages │
├─────────────────────────────┼─────────┼──────────┤
│ mqtt-subscription-sub-1qos1 │ classic │ 0
└─────────────────────────────┴─────────┴──────────┘

消息已过期,因为客户端 sub-1 未连接到代理来消耗消息。如果设置了死信策略,消息将被死信到交换机。在本例中,死信是禁用的。查询 Prometheus 端点可证明从经典队列中过期了 1 条消息。

curl --silent localhost:15692/metrics | grep rabbitmq_global_messages_dead_lettered_expired_total
# TYPE rabbitmq_global_messages_dead_lettered_expired_total counter
# HELP rabbitmq_global_messages_dead_lettered_expired_total Total number of messages dead-lettered due to message TTL exceeded
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_classic_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_classic_queue",dead_letter_strategy="disabled"} 1
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="disabled"} 0

另一个有趣的特性是以下要求

服务器发送到客户端的 PUBLISH 数据包必须包含一个消息过期间隔,设置为接收到的值减去应用程序消息在服务器中等待的时间。

将第二条消息发送到代理,并设置消息过期间隔为 60 秒

mqttx pub --topic t/1 --message m2 --message-expiry-interval 60 --qos 1

在订阅客户端重新连接之前等待 20 秒

mqttx sub --client-id sub-1 --topic t/1 --no-clean --session-expiry-interval 0  --qos 1 --output-mode clean
{
"topic": "t/1",
"payload": "m2",
"packet": {
...
"properties": {
"messageExpiryInterval": 40
}
}
}

根据 MQTT 5.0 协议规范的规定,客户端收到第二条消息时,消息过期间隔设置为 40 秒:代理接收到的 60 秒减去消息在代理中等待的 20 秒。

实现

MQTT 5.0 消息过期在 RabbitMQ 中使用每条消息 TTL 实现,类似于 AMQP 0.9.1 发布者的 expiration 字段。

特性 2:订阅标识符

描述

客户端可以在 SUBSCRIBE 数据包中设置订阅标识符。如果客户端因该订阅而收到消息,代理将在 PUBLISH 数据包中包含该订阅标识符。

订阅标识符的用例列在 SUBSCRIBE Actions 部分。

示例

从同一客户端向服务器发送 3 个单独的 SUBSCRIBE 数据包,每个数据包具有不同的主题过滤器和不同的订阅标识符

mqttx sub --client-id sub-2 --topic t/1 --subscription-identifier 1 --session-expiry-interval 600
^C
mqttx sub --client-id sub-2 --topic t/2 --subscription-identifier 2 --session-expiry-interval 600 --no-clean
^C
mqttx sub --client-id sub-2 --topic "t/#" --subscription-identifier 3 --session-expiry-interval 0 --no-clean --output-mode clean

在第二个终端窗口中,我们看到 3 个绑定从同一个队列到同一个主题交换机,每个绑定具有不同的路由键

docker exec rabbitmq rabbitmqctl --quiet --formatter=pretty_table list_bindings \
source_name source_kind destination_name destination_kind routing_key
┌─────────────┬─────────────┬─────────────────────────────┬──────────────────┬─────────────────────────────┐
│ source_name │ source_kind │ destination_name │ destination_kind │ routing_key │
├─────────────┼─────────────┼─────────────────────────────┼──────────────────┼─────────────────────────────┤
│ │ exchange │ mqtt-subscription-sub-2qos0 │ queue │ mqtt-subscription-sub-2qos0 │
├─────────────┼─────────────┼─────────────────────────────┼──────────────────┼─────────────────────────────┤
│ amq.topic │ exchange │ mqtt-subscription-sub-2qos0 │ queue │ t.# │
├─────────────┼─────────────┼─────────────────────────────┼──────────────────┼─────────────────────────────┤
│ amq.topic │ exchange │ mqtt-subscription-sub-2qos0 │ queue │ t.1 │
├─────────────┼─────────────┼─────────────────────────────┼──────────────────┼─────────────────────────────┤
│ amq.topic │ exchange │ mqtt-subscription-sub-2qos0 │ queue │ t.2 │
└─────────────┴─────────────┴─────────────────────────────┴──────────────────┴─────────────────────────────┘

第一项是隐式绑定到默认交换机的。

每个 MQTT 订阅及其 MQTT 主题过滤器对应一个 AMQP 0.9.1 绑定,具有绑定键。精确地说,表列 routing_key 被错误命名:它应该被称为 binding_key。MQTT 中的主题级别分隔符是“/”字符,而 AMQP 0.9.1 主题交换机中的主题级别分隔符是“.”字符。

再次在第二个终端窗口中,向主题 t/1 发送一条消息

mqttx pub --topic t/1 --message m1

第一个终端窗口(订阅客户端)收到以下 PUBLISH 数据包

{
"topic": "t/1",
"payload": "m1",
"packet": {
...
"properties": {
"subscriptionIdentifier": [
1,
3
]
}
}
}

它包含订阅标识符 1 和 3,因为主题过滤器 t/1t/# 都匹配主题 t/1

同样,如果您向主题 t/2 发送第二条消息,订阅客户端将收到一个包含订阅标识符 2 和 3 的 PUBLISH 数据包。

实现

订阅标识符是 MQTT 会话状态的一部分。因此,订阅标识符必须在客户端断开连接期间以及 MQTT 会话结束之前存储在服务器的数据库中。RabbitMQ 在绑定参数中存储订阅标识符

docker exec rabbitmq rabbitmqctl --quiet --formatter=pretty_table list_bindings routing_key arguments
┌─────────────────────────────┬───────────────────────────────────────────────────────────────────────────────────┐
│ routing_key │ arguments │
├─────────────────────────────┼───────────────────────────────────────────────────────────────────────────────────┤
│ mqtt-subscription-sub-2qos0 │ │
├─────────────────────────────┼───────────────────────────────────────────────────────────────────────────────────┤
│ t.# │ {mqtt_subscription_opts,0,false,false,0,3}{<<"x-binding-key">>,longstr,<<"t.#">>} │
├─────────────────────────────┼───────────────────────────────────────────────────────────────────────────────────┤
│ t.1 │ {mqtt_subscription_opts,0,false,false,0,1}{<<"x-binding-key">>,longstr,<<"t.1">>}
├─────────────────────────────┼───────────────────────────────────────────────────────────────────────────────────┤
│ t.2 │ {mqtt_subscription_opts,0,false,false,0,2}{<<"x-binding-key">>,longstr,<<"t.2">>}
└─────────────────────────────┴───────────────────────────────────────────────────────────────────────────────────┘

绑定参数的确切结构并不重要,并且很可能在未来的 RabbitMQ 版本中发生变化。但是,您可以在绑定参数中看到整数 1、2 和 3,它们对应于订阅标识符。

当主题交换机路由一条消息时,发布 Erlang 进程将包括所有匹配的绑定键到消息中。订阅 MQTT 客户端的 Erlang 进程会将匹配的绑定键与其已知的 MQTT 主题过滤器进行比较,并将订阅标识符包含在发送到 MQTT 客户端的 PUBLISH 数据包中。

发布 Erlang 进程可以是 MQTT 连接进程或 AMQP 0.9.1 通道进程。一如既往,RabbitMQ 在跨协议互操作性方面表现出色:当 AMQP 0.9.1(或 STOMP 或 AMQP 1.0)客户端向主题交换机发送消息时,正确的订阅标识符将包含在发送到 MQTT 客户端的 PUBLISH 数据包中。

特性 3:订阅选项

描述

MQTT 5.0 带来了 3 个新的订阅选项

  1. 无本地
  2. 保留已发布
  3. 保留处理

所有订阅选项都由 RabbitMQ 实现。这里,我们只关注保留处理选项

此选项指定在建立订阅时是否发送保留消息。
值如下
0 = 在订阅时发送保留消息
1 = 仅当订阅当前不存在时,在订阅时发送保留消息
2 = 在订阅时不要发送保留消息

示例

发送一条保留消息

mqttx pub --topic mytopic --message m --retain

保留处理值 0 将收到保留消息,而值 2 则不会

mqttx sub --topic mytopic --retain-handling 0
… Connecting...
✔ Connected
… Subscribing to mytopic...
✔ Subscribed to mytopic
payload: m
retain: true
^C

mqttx sub --topic mytopic --retain-handling 2
… Connecting...
✔ Connected
… Subscribing to mytopic...
✔ Subscribed to mytopic

特性 4:所有 ACK 上的原因码

描述

CONNACK、PUBACK、SUBACK、UNSUBACK 和 DISCONNECT 数据包包含原因码。

实现

一个实现示例是,如果消息未路由到任何队列,RabbitMQ 将在 PUBACK 数据包中回复原因码 No matching subscribers。MQTT 5.0 原因码 No matching subscribers 在概念上对应于 AMQP 0.9.1 中强制性消息属性和 BasicReturn 处理程序。

特性 5:用户属性

描述

大多数 MQTT 数据包都可以包含用户属性。用户属性的含义未在 MQTT 规范中定义。

PUBLISH 数据包示例

PUBLISH 数据包中的用户属性由客户端应用程序定义,并由服务器转发而未经更改。

在第一个终端窗口中订阅

mqttx sub --topic t/5

在第二个终端窗口中,发布带有用户属性的消息

mqttx pub --topic t/5 --message m --user-properties "key1: value1"

第一个终端窗口将原样接收用户属性

payload: m
userProperties: [ { key: 'key1', value: 'value1' } ]

MQTT 5.0 PUBLISH 数据包中的用户属性类似于 AMQP 0.9.1 中的 headers 消息属性。

CONNECT 数据包示例

使用用户属性连接

mqttx conn --client-id myclient --user-properties "connecting-from: London"

在浏览器中打开管理 UI https://:15672/#/connections(用户名和密码均为 guest),然后点击 MQTT 连接

Figure 1: User Property in the CONNECT packet
图 1:CONNECT 数据包中的用户属性

RabbitMQ 将在管理 UI 中显示 CONNECT 数据包中的用户属性。

特性 6:载荷格式内容类型

描述

发布者可以指定 MIME 内容类型。它还可以设置载荷格式指示器,指示载荷是 UTF-8 编码的字符数据还是未指定的二进制数据。

示例

在第一个终端窗口中,订阅一个主题

mqttx sub --topic t/6 --output-mode clean

在第二个终端窗口中,发送一个包含内容类型和载荷格式指示器的消息

mqttx pub --topic t/6 --message "my UTF-8 encoded data 🙂" --content-type text/plain --payload-format-indicator

第一个终端窗口将原样接收内容类型和载荷格式指示器

{
"topic": "t/6",
"payload": "my UTF-8 encoded data 🙂",
"packet": {
...
"properties": {
"payloadFormatIndicator": true,
"contentType": "text/plain"
}
}
}

特性 7:请求/响应

描述

MQTT 5.0 正式定义了请求/响应模式。

在发布消息之前,MQTT 客户端(请求者)订阅一个响应主题。请求者将响应主题和一些关联数据包含在请求消息中。

另一个 MQTT 客户端(响应者)接收请求消息,执行一些操作,然后将具有相同关联数据的响应消息发布到响应主题。

MQTT 5.0 请求/响应特性对应于 AMQP 0.9.1 中的远程过程调用。然而,在 AMQP 0.9.1 中,请求者将在 AMQP 0.9.1 消息属性 reply_to 中包含一个回调队列的名称。MQTT 协议不定义队列的概念。因此,在 MQTT 中,回复的“地址”是主题名称。

尽管协议规范之间存在不兼容性,RabbitMQ 在协议互操作性方面表现出色:因此,请求/响应交互受到 RabbitMQ 的跨协议支持。

例如,MQTT 客户端可以在请求消息中包含响应主题和关联数据。如果 AMQP 0.9.1 客户端创建了一个绑定到主题交换机 amq.topic 且绑定键匹配请求消息主题的队列,它将收到一个 AMQP 0.9.1 消息,其 correlation_id 属性设置为 MQTT 客户端发送的关联数据,并且有一个名为 x-opt-reply-to-topic 的头部。AMQP 0.9.1 客户端然后可以通过使用相同的 correlation_id 并将响应消息发布到主题交换机 amq.topic,主题为 x-opt-reply-to-topic 头部中存在的主题来响应 MQTT 5.0 客户端。

示例

此示例仅关注 MQTT 客户端。

在第一个终端窗口中,响应的 MQTT 客户端订阅主题 t/7

mqttx sub --client-id responder --topic t/7 --session-expiry-interval 600 --output-mode clean --qos 1

在第二个终端窗口中,请求的 MQTT 客户端订阅一个名为 my/response/topic 的主题

mqttx sub --client-id requester --topic my/response/topic --session-expiry-interval 600 --qos 1
… Connecting...
✔ Connected
… Subscribing to my/response/topic...
✔ Subscribed to my/response/topic
^C

在第二个终端窗口中,请求者然后发布一个请求消息

mqttx pub --client-id requester --topic t/7 --message "my request" \
--correlation-data abc-123 --response-topic my/response/topic \
--session-expiry-interval 600 --no-clean

在第一个终端窗口中,响应者接收到请求消息

{
"topic": "t/7",
"payload": "my request",
"packet": {
...
"properties": {
"responseTopic": "my/response/topic",
"correlationData": {
"type": "Buffer",
"data": [
97,
98,
99,
45,
49,
50,
51
]
}
}
}
}
^C

在第一个终端窗口中,响应者通过复制关联数据并发布到响应主题来响应请求者

mqttx pub --client-id responder --topic my/response/topic --message "my response" --correlation-data abc-123

在第二个终端窗口中,请求者收到响应。

mqttx sub --client-id requester --topic my/response/topic --no-clean --qos 1 --output-mode clean
{
"topic": "my/response/topic",
"payload": "my response",
"packet": {
...
"properties": {
"correlationData": {
"type": "Buffer",
"data": [
97,
98,
99,
45,
49,
50,
51
]
}
}
}
}

关联数据对于将响应与请求关联很有用。请求者通常为它发布的每个请求选择唯一的关联数据。

特性 8:分配的客户端标识符

描述

如果客户端使用零长度的客户端标识符连接,服务器必须响应一个包含分配的客户端标识符的 CONNACK。

与 MQTT 3.1.1 相比,这取消了服务器分配的客户端 ID 只能与 Clean Session = 1 连接一起使用的限制。

实现

RabbitMQ 将生成一个随机客户端 ID(例如 dcGB2kSwS0JlXnaBa1A6QA)并将其返回到 CONNACK 数据包中。

特性 9:主题别名

描述

主题别名是一个整数值,用于标识主题而不是使用主题名称。这减小了 PUBLISH 数据包的大小,当主题名称很长且在网络连接中重复使用相同的​​主题名称时非常有用。

实现

RabbitMQ 中的默认主题别名最大值为 16。您可以在 rabbitmq.conf 中配置此值,例如

mqtt.topic_alias_maximum = 32

此配置值映射到从 RabbitMQ 发送到客户端的 CONNACK 数据包中的主题别名最大值。它限制了双向主题别名的数量,即从客户端到 RabbitMQ 和从 RabbitMQ 到客户端。设置更高的值会在 RabbitMQ 中占用更多内存,如果客户端与许多不同的主题进行发送或接收。

RabbitMQ 操作员可以通过设置来禁止使用主题别名

mqtt.topic_alias_maximum = 0

特性 10:流量控制

描述

MQTT 5.0 属性 Receive Maximum 定义了未确认的 QoS 1 PUBLISH 数据包的上限。

实现

从 RabbitMQ 发送到客户端的未确认 QoS 1 PUBLISH 数据包的最大数量由 CONNECT 数据包中从客户端发送到 RabbitMQ 的 Receive Maximum 的最小值和配置的 mqtt.prefetch 值决定

mqtt.prefetch = 10

mqtt.prefetch 的默认值为 10。

mqtt.prefetch 值在 RabbitMQ 3.13 之前已存在于 MQTT 3.1 和 3.1.1 中。它映射到 RabbitMQ 中的消费者预取。换句话说,它定义了队列向其 MQTT 连接进程发送多少个正在进行的(in-flight)消息。

特性 11:最大数据包大小

描述

客户端和服务器可以独立指定它们支持的最大数据包大小。

示例

此示例演示了如何限制从客户端发送到 RabbitMQ 的最大 MQTT 数据包大小。

假设在成功认证后,RabbitMQ 操作员不希望 RabbitMQ 接受任何大于 1 KiB 的 MQTT 数据包。将以下配置写入 rabbitmq.conf(在您的当前工作目录中)

mqtt.max_packet_size_authenticated = 1024

停止 RabbitMQ 服务器后,使用新的配置启动 RabbitMQ 服务器

docker run -it --rm --name rabbitmq -p 1883:1883 -p 15672:15672 -p 15692:15692 \
--mount type=bind,source="$(pwd)"/rabbitmq.conf,target=/etc/rabbitmq/conf.d/11-blog-post.conf \
rabbitmq:3.13.0-beta.2-management
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_mqtt
docker exec rabbitmq rabbitmqctl enable_feature_flag all

在第一个终端窗口中,订阅一个主题

mqttx sub --topic t/11

在第二个终端窗口中,将一个 3 字节载荷的消息发送到该主题

payload=$(head --bytes 3 < /dev/zero | tr '\0' x)
mqttx pub --topic t/11 -m "$payload"

第一行从特殊文件 /dev/zero 读取 3 字节(3 个空字符),将每个空字符转换为 ASCII 字符 x,并将结果 xxx 保存到变量 payload 中。

第一个终端窗口将收到该消息

payload: xxx

接下来,在第二个终端窗口中,发送一个 2,000 字节载荷的消息

payload=$(head --bytes 2000 < /dev/zero | tr '\0' x)
mqttx pub --topic t/11 -m "$payload"

这次,第一个终端窗口没有收到消息,因为从客户端发送到 RabbitMQ 的 PUBLISH 数据包大于配置的最大数据包大小 1024 字节。

相反,RabbitMQ 日志记录了一条描述性的错误消息

[error] <0.836.0> MQTT packet size (2007 bytes, type 3) exceeds mqtt.max_packet_size_authenticated (1024 bytes)

日志消息显示 2,007 字节是因为 PUBLISH 数据包的固定头部和可变头部需要 7 字节(其中 4 字节用于主题名称 t/11)。

特性 12:服务器发起的 DISCONNECT

描述

在 MQTT 5.0 中,DISCONNECT 数据包不仅可以从客户端发送到服务器,还可以从服务器发送到客户端。

实现

在终止连接之前,RabbitMQ 将在以下情况下向客户端发送 DISCONNECT 数据包

DISCONNECT 原因码名称情况
会话接管另一个客户端使用相同的客户端 ID 连接。
服务器正在关闭RabbitMQ 进入维护模式
Keep Alive 超时客户端在Keep Alive时间内未能通信。
数据包过大RabbitMQ 接收到大小超过 mqtt.max_packet_size_authenticated 的数据包

特性 13:会话过期

描述

在 MQTT 5.0 中,客户端可以在 CONNECT 数据包中向服务器建议会话过期间隔。服务器可以在 CONNACK 数据包中接受提议的会话过期间隔或强制执行其他值。

会话可以跨一系列网络连接继续。它持续时间为最新的网络连接加上会话过期间隔。

当会话过期间隔到期时,客户端和服务器都将删除任何会话状态。

实现

客户端和服务器会保留会话状态,直到会话结束。

服务器中的会话状态包括已发送给客户端但尚未确认的消息、待发送给客户端的消息以及客户端的订阅。RabbitMQ 以队列和绑定的形式模拟此 MQTT 会话状态。

因此,会话过期间隔映射到 RabbitMQ 中的队列 TTL。当 MQTT 会话过期时,队列及其消息和绑定将被删除。

示例

默认情况下,服务器允许的最大会话过期间隔为 1 天。如果 MQTT 客户端在 1 天内未重新连接,其会话状态将在 RabbitMQ 中被删除。

此值是可配置的。为了本示例的目的,让我们在 rabbitmq.conf 中设置一个非常低的会话过期间隔:1 分钟

mqtt.max_session_expiry_interval_seconds = 60

设置名称包含前缀 max,因为 MQTT 5.0 客户端可以通过在 CONNECT 数据包中发送会话过期间隔来选择较低的值。如最大数据包大小示例中所述,重新启动 RabbitMQ 节点,以便应用新设置。

使用 20 秒的会话过期间隔连接到 RabbitMQ 并创建订阅

mqttx sub --client-id sub-13 --topic t/13 --session-expiry-interval 20 --qos 1
… Connecting...
✔ Connected
… Subscribing to t/13...
✔ Subscribed to t/13
^C

在终端中输入 Ctrl+C 断开客户端连接。

在接下来的 20 秒内,列出队列和绑定

docker exec rabbitmq rabbitmqctl list_queues name
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name
mqtt-subscription-sub-13qos1

docker exec rabbitmq rabbitmqctl list_bindings source_name destination_name routing_key --formatter=pretty_table
Listing bindings for vhost /...
┌─────────────┬──────────────────────────────┬──────────────────────────────┐
│ source_name │ destination_name │ routing_key │
├─────────────┼──────────────────────────────┼──────────────────────────────┤
│ │ mqtt-subscription-sub-13qos1 │ mqtt-subscription-sub-13qos1 │
├─────────────┼──────────────────────────────┼──────────────────────────────┤
│ amq.topic │ mqtt-subscription-sub-13qos1 │ t.13 │
└─────────────┴──────────────────────────────┴──────────────────────────────┘

20 秒后,再次列出队列和绑定

docker exec rabbitmq rabbitmqctl list_queues name
Timeout: 60.0 seconds ...
Listing queues for vhost / ...

docker exec rabbitmq rabbitmqctl list_bindings source_name destination_name routing_key --formatter=pretty_table
Listing bindings for vhost /...

队列及其绑定已被 RabbitMQ 删除,因为我们的客户端未在会话过期间隔 20 秒内使用 Clean Session = 0 连接到 RabbitMQ。

接下来,进行相同的测试,但使用较高的会话过期间隔,例如 1 小时

mqttx sub --client-id sub-13 --topic t/13 --session-expiry-interval 3600 --qos 1
… Connecting...
✔ Connected
… Subscribing to t/13...
✔ Subscribed to t/13
^C

您应该会注意到,队列及其绑定将在 1 分钟后被删除,因为有效的会话过期间隔是客户端请求的值(1 小时)和 RabbitMQ 中配置的 mqtt.max_session_expiry_interval_seconds 值(1 分钟)中的最小值。

特性 14:遗嘱消息延迟

描述

客户端可以在 CONNECT 数据包中定义遗嘱延迟间隔。

服务器延迟发布客户端的遗嘱消息,直到遗嘱延迟间隔过去或会话结束(以先发生者为准)。如果在遗嘱延迟间隔过去之前建立了到该会话的新网络连接,则服务器不得发送遗嘱消息。这的一种用途是避免在发生临时网络中断且客户端成功重新连接并继续其会话(在遗嘱消息发布之前)时发布遗嘱消息。

遗嘱延迟间隔的另一种用例是通知会话过期

客户端可以通过将遗嘱延迟间隔设置为长于会话过期间隔,并发送带有原因码 0x04(带遗嘱消息的断开连接)的 DISCONNECT 来安排遗嘱消息通知会话已过期。

实现

尽管遗嘱消息载荷通常很小,但 MQTT 规范允许遗嘱消息载荷大小高达 64 KiB。

为了避免在Khepri(RabbitMQ 的未来元数据存储)中存储大型二进制数据,RabbitMQ 会创建一个包含此单个遗嘱消息的经典队列。我们称此队列为遗嘱队列。此消息设置了每条消息 TTL,以毫秒为单位,对应于以秒为单位的遗嘱延迟间隔。此外,遗嘱队列设置了队列 TTL,以毫秒为单位,对应于以秒为单位的会话过期间隔。有效的每条消息 TTL 至少比队列 TTL 低几毫秒,以便消息在队列(会话)过期前不久发布。

遗嘱队列还将 amq.topic(MQTT 插件使用的默认主题交换机)定义为死信交换机,并将遗嘱主题定义为死信路由键

如果 MQTT 客户端在其遗嘱延迟间隔内未重新连接,遗嘱队列中的消息将被死信到主题交换机。

让我们通过一个例子来说明这一点。

示例

在第一个终端窗口中,创建一个将消耗遗嘱消息的订阅

mqttx sub --client-id sub-14 --topic t/14

在第二个终端窗口中,创建一个遗嘱延迟间隔为 20 秒的连接

mqttx conn --client-id conn-14 --will-topic t/14 --will-message my-will-message --will-delay-interval 20 --session-expiry-interval 40

在第三个终端窗口中,我们看到到目前为止,订阅的 MQTT 客户端只创建了一个队列

docker exec rabbitmq rabbitmqctl --quiet --formatter=pretty_table list_queues name type messages arguments
┌──────────────────────────────┬────────────┬──────────┬───────────┐
│ name │ type │ messages │ arguments │
├──────────────────────────────┼────────────┼──────────┼───────────┤
│ mqtt-subscription-sub-14qos0 │ MQTT QoS 00 │ │
└──────────────────────────────┴────────────┴──────────┴───────────┘

在第二个终端窗口中,键入 Ctrl+C 以断开客户端 ID 为 conn-14 的 MQTT 连接。

这次,列出队列显示遗嘱队列已被创建

docker exec rabbitmq rabbitmqctl --quiet --formatter=pretty_table list_queues name type messages arguments
┌──────────────────────────────┬────────────┬──────────┬────────────────────────────────────────────────────────────┐
│ name │ type │ messages │ arguments │
├──────────────────────────────┼────────────┼──────────┼────────────────────────────────────────────────────────────┤
│ mqtt-subscription-sub-14qos0 │ MQTT QoS 00 │ │
├──────────────────────────────┼────────────┼──────────┼────────────────────────────────────────────────────────────┤
│ mqtt-will-conn-14 │ classic │ 1{<<"x-expires">>,long,40000}
│ │ │ │ {<<"x-dead-letter-exchange">>,longstr,<<"amq.topic">>}
│ │ │ │ {<<"x-dead-letter-routing-key">>,longstr,<<"t.14">>}
└──────────────────────────────┴────────────┴──────────┴────────────────────────────────────────────────────────────┘

遗嘱队列的命名模式为 mqtt-will-<MQTT Client ID>。它包含一条消息:遗嘱消息。

如上一节所述,队列 TTL(x-expires)为 40,000 毫秒,因此对应于我们上面命令中的 40 秒会话过期间隔。如果您等待 20 秒,您的第一个终端窗口应该会收到遗嘱消息,因为我们的客户端未在遗嘱延迟间隔内重新连接

› payload: my-will-message

特性 15:可选服务器功能可用性

描述

定义一组服务器不允许的功能,并提供一种机制供服务器将其指定给客户端。可以这样指定功能的是

  • 最大 QoS
  • 保留可用
  • 通配符订阅可用
  • 订阅标识符可用
  • 共享订阅可用

客户端使用服务器声明不可用的功能是错误的。

实现

RabbitMQ 3.13 在CONNACK 属性中包含最大 QoS = 1 和共享订阅可用 = 0。

QoS 2 未得到 RabbitMQ 的支持。

如下一节所述,共享订阅将在未来的 RabbitMQ 版本中得到支持。

限制

本节列出了 RabbitMQ MQTT 实现的局限性。

MQTT 5.0 特定限制

共享订阅

共享订阅将在未来的 RabbitMQ 版本中添加。尽管此功能与 RabbitMQ 中的队列很好地匹配,但共享订阅是会话状态的一部分,需要一些 RabbitMQ 数据库迁移才能有效地查询给定 MQTT 客户端 ID 的共享订阅。

延迟和保留的遗嘱消息

同时延迟保留的遗嘱消息将不会被保留。这是因为延迟的遗嘱消息将被死信到主题交换机,但保留进程目前不从队列消耗。此限制可以通过新的保留消息存储在将来解决。

非 MQTT 5.0 特定限制

为完整起见,本节列出了在 RabbitMQ 3.13 支持 MQTT 5.0 之前以及在 RabbitMQ 3.12 中发布原生 MQTT 之前就存在的限制。

保留消息

RabbitMQ 中的保留消息功能受限

保留消息仅在节点本地存储和查询。

以下是一个有效示例:MQTT 客户端向节点 A 发布主题为 topic/1 的保留消息。之后,另一个客户端在节点 A 上使用主题过滤器 topic/1 进行订阅。新订阅者将收到保留消息。

但是,如果主题过滤器包含通配符(多级通配符字符“#”或单级通配符字符“+”),则不会发送任何保留消息(问题 #8824)。

此外,如果一个客户端在节点 A 上发布了保留消息,而另一个客户端随后在节点 B 上订阅,则该订阅客户端将不会收到存储在节点 A 上的任何保留消息(问题 #8096)。

未来的 RabbitMQ 版本将复制集群中的保留消息,并发送匹配包含通配符的主题过滤器的保留消息。

总结

总而言之,RabbitMQ

  • 是领先的 AMQP 0.9.1 代理
  • 是一个流式代理
  • 在跨协议互操作性方面表现出色
  • 凭借 3.13 版本发布的 MQTT 5.0 支持和 3.12 版本发布的原生 MQTT,正成为领先的 MQTT 代理之一

我们将 RabbitMQ 打造成一个功能齐全的物联网代理的旅程尚未结束,在接下来的几个月和几年里,计划将进行更多的开发工作。敬请关注!

© . This site is unofficial and not affiliated with VMware.