跳至主内容

AMQP 1.0 流控制的十个好处

·阅读 35 分钟

这篇博客文章概述了 AMQP 1.0 流控制相比 AMQP 0.9.1 的十个优势,并通过两个基准测试证明了显著的性能提升。此外,我们深入探讨了强大的 AMQP 1.0 流控制原语以及它们在 RabbitMQ 中的使用方法。

流控制 是指管理两个节点之间数据传输速率的过程,以防止快速发送方压垮慢速接收方。

AMQP 1.0 协议在两个不同级别定义了流控制:

  1. 链路流控制
  2. 会话流控制

在 AMQP 1.0 中,消息通过链路发送。链路连接了发送方客户端应用程序与 RabbitMQ 中的交换,或者连接了 RabbitMQ 中的队列与接收方客户端应用程序。

信息

虽然 AMQP 1.0 规范使用了“发送方”和“接收方”的术语,但 RabbitMQ 文档中使用了“发布者”(或“生产者”)和“消费者”。在讨论客户端应用程序时,这些术语可以互换使用。因此,一个客户端应用程序实例:

  • 将消息发送到 RabbitMQ 的是发送方 / 发布者 / 生产者(RabbitMQ 作为接收方)。
  • 从 RabbitMQ 接收消息的是接收方 / 消费者(RabbitMQ 作为发送方)。

AMQP 1.0 链路流控制的核心思想很简单:接收消息时,消费者必须向发送队列授予信用额度。

一个信用额度对应一条消息。例如,当消费者授予 10 个信用额度时,RabbitMQ 就可以发送 10 条消息。这个简单的原则,即接收方向发送方提供反馈,确保发送方永远不会压垮接收方。

接收方和发送方各自维护其“链路状态”。此状态的一部分是当前的链路信用额度。每次传输消息时,链路信用额度会减少 1。具体来说,发送方发送消息时链路信用额度减 1,接收方接收消息时链路信用额度减 1。当发送方的链路信用额度达到 0 时,它必须停止发送消息。

消息以传输(transfer)帧的形式发送。

信用额度在流(flow)帧中授予

<field name="link-credit" type="uint"/>

正如你可能猜到的,它们被称为“流”(flow)帧,因为这些帧携带流控制信息。`uint` 类型代表无符号整数,一个介于 0 和一个大数(2^32 - 1)之间的值。

即使在链路成功建立(AMQP 1.0 术语中是“已附加(attached)”)之后,RabbitMQ 也不能在消费者发送其第一个 flow 帧并授予发送队列链路信用额度之前开始向消费者发送消息。

最简单的情况下,当客户端(接收方)向队列(发送方)授予单个信用额度时,队列将发送单个消息,如 AMQP 1.0 规范的图 2.43:同步获取所示。

Receiver                                      Sender
=================================================================
...
flow(link-credit=1) ---------->
+---- transfer(...)
*block until transfer arrives* /
<---+
...
-----------------------------------------------------------------

一次同步获取一条消息将导致吞吐量低下。因此,客户端通常会向队列授予多个信用额度,如 AMQP 1.0 规范的图 2.45:异步通知所示。

Receiver                                          Sender
=====================================================================
...
<---------- transfer(...)
<---------- transfer(...)
flow(link-credit=delta) ---+ +--- transfer(...)
\ /
x
/ \
<--+ +-->
<---------- transfer(...)
<---------- transfer(...)
flow(link-credit=delta) ---+ +--- transfer(...)
\ /
x
/ \
<--+ +-->
...
---------------------------------------------------------------------

如果接收方授予 N 个信用额度,并在等待所有 N 条消息到达后再授予下一个 N 个信用额度,与图 2.43(N=1)相比,吞吐量会更高。但是,仔细查看图 2.45,你会发现接收方在收到所有之前的消息之前就授予了更多的信用额度。这种方法可以实现最高的吞吐量。例如,在图 2.45 中,接收方最初可能授予了 6 个信用额度,然后在收到 3 条消息后,再向 RabbitMQ 发送一个 flow 帧,其中 link-credit = 6

注意

授予链路信用额度不是累积的。

当接收方发送一个 flow 帧,其中 link-credit = N 时,接收方设置当前信用额度为 N,而不是增加 N 个信用额度。例如,如果接收方在没有任何消息传输的情况下发送了两个 flow 帧,其中 link-credit = 50,那么接收方将拥有 50 个信用额度,而不是 100 个。

接收方知道其当前的吞吐能力,因此总是接收方(而不是发送方)决定当前的链路信用额度。发送方通过发送更多消息来“消耗”接收方授予的链路信用额度。

接收方可以根据其当前的吞吐能力动态地增加或减少链路信用额度的数量。

优势 #1

消费客户端应用程序可以动态调整它希望从特定队列接收的消息数量。

这是 AMQP 1.0 链路流控制相对于 AMQP 0.9.1 中消费者预取的一个巨大优势。在 AMQP 0.9.1 中,basic.qos 方法适用于给定AMQP 0.9.1 通道上的所有消费者。此外,动态更新消费者预取是不可能或不方便的,正如在#10174 中讨论的那样。

优势 #2

消费客户端应用程序可以动态地确定它希望从同一会话中的多个队列中接收消息的优先级。

这是 AMQP 1.0 链路流控制相对于 AMQP 0.9.1 中消费者预取的另一个优势。一旦 AMQP 0.9.1 客户端在多个队列上调用 basic.consume,它将持续从所有这些队列接收消息,直到它调用 basic.cancel

你可能会问:链路信用额度的好值是多少?多久应该补充一次链路信用额度?像往常一样,答案是你需要通过基准测试你的特定工作负载,并尝试不同的值来找出答案。

我建议从简单的开始,而不是实现复杂的算法:例如,客户端可以最初授予 200 个链路信用额度,并在剩余链路信用额度低于 100 时发送一个 flow 帧,其中 link-credit = 200

实际上,RabbitMQ 反向执行此操作:RabbitMQ AMQP 1.0 会话进程最初授予发布者170 个链路信用额度,当剩余链路信用额度低于一半(即 85)并且未确认消息的数量小于 170 时,它会再次授予 170 个链路信用额度。 (在代理内部,AMQP 1.0 会话进程与目标队列之间始终启用发布者确认,即使没有向发布客户端发送确认。这意味着如果目标队列未足够快地确认,RabbitMQ 将停止向发送应用程序授予链路信用额度。)请注意,这些 RabbitMQ 实现细节可能会随时更改。

170 的值可以通过 advanced.config 设置 rabbit.max_link_credit 进行配置。

优势 #3

当一个目标队列过载时,发布者可以继续高速发布到所有其他目标队列。

应用程序可以在同一个 AMQP 1.0 连接或会话中发送到多个队列(通过附加多个链路)。假设一个简单的场景:客户端打开两个链路。

  • 链路 1 发送到一个经典队列。
  • 链路 2 发送到一个 5 副本的仲裁队列。

在确认消息之前,仲裁队列必须将消息复制到大多数副本,每个副本fsync(同步)消息到其本地磁盘。

相比之下,经典队列不复制消息。此外,当消息被足够快地消费和确认时,经典队列可以(之后)将消息确认回发布者,而无需写入磁盘。因此,在这种情况下,经典队列的吞吐量将远高于仲裁队列。

AMQP 1.0 链路流控制的美妙之处在于,RabbitMQ 可以减缓向链路 2 授予信用额度,同时继续以高频率向链路 1 授予信用额度。因此,即使 5 副本仲裁队列的处理速度不如(单副本)经典队列,客户端也可以继续以全速发送到经典队列。

下面的图片是从之前的 AMQP 0.9.1 博客文章中复制的。

Flow control in AMQP 0.9.1
AMQP 0.9.1 中的流控制

图片中的“credit”(信用额度)一词指的是 RabbitMQ 对 AMQP 0.9.1 连接的内部流控制,与 AMQP 1.0 中的链路信用额度无关。

上图中的 reader 是从套接字读取 AMQP 0.9.1 帧的 Erlang 进程。图片显示,对于 AMQP 0.9.1 连接,RabbitMQ 会阻塞 reader,从而导致 TCP 反压应用于客户端。因此,当单个目标队列过载时,RabbitMQ 会限制 AMQP 0.9.1 连接,影响到所有其他目标队列的发布。

以下基准测试显示,当连接发送到多个目标队列时,AMQP 1.0 与 AMQP 0.9.1 相比可以提供数倍更高的吞吐量。

基准测试:两个发送方

为了将我们刚才讨论的理论付诸实践,two_senders 程序模拟了类似场景。

该程序打开了一个 AMQP 0.9.1 连接和通道,以及一个 AMQP 1.0 连接和会话。

在 AMQP 0.9.1 通道和 AMQP 1.0 会话上,有两个 goroutines 以最快速度发布到经典队列和仲裁队列。总共导致四个目标队列。

    main.go                                                      RabbitMQ
+-------------+ +----------------------------------+
| | AMQP 0.9.1 connection | |
| |#####################################| |
| +---+ |-------------------------------------| +------------------------+ |
| | P | | classic-queue-amqp-091 | |
| +---+ +------------------------+ |
| AMQP 0.9.1 channel |
| +---+ +------------------------+ |
| | P | | quorum-queue-amqp-091 | |
| +---+ |-------------------------------------| +------------------------+ |
| |#####################################| |
| | | |
| | | |
| | | |
| |#####################################| |
| +---+ |-------------------------------------| +-----------------------+ |
| | P |O============================================>O| classic-queue-amqp-10 | |
| +---+ +-----------------------+ |
| AMQP 1.0 session |
| +---+ +-----------------------+ |
| | P |O======================================+=====>O| quorum-queue-amqp-10 | |
| +-+-+ |----------------------------------|--| +-----------------------+ |
| | |##################################|##| |
| | | AMQP 1.0 connection | | |
+------|------+ | +----------------------------------+
| |
| |
Publisher AMQP 1.0 link
goroutine

按如下方式运行基准测试:

  1. 使用 make run-broker 在 Ubuntu 机器上启动 RabbitMQ 服务器 v4.0.0-beta.6。(在 macOS 上,fsync 不会物理写入磁盘盘片)。
  2. 使用 go run two_senders/main.go 执行 Go 程序。10 秒后,Go 程序将完成。
  3. 列出每个队列中的消息数量
./sbin/rabbitmqctl --silent list_queues name type messages --formatter=pretty_table
┌────────────────────────┬─────────┬──────────┐
│ name │ type │ messages │
├────────────────────────┼─────────┼──────────┤
│ classic-queue-amqp-091 │ classic │ 159077 │
├────────────────────────┼─────────┼──────────┤
│ quorum-queue-amqp-091 │ quorum │ 155782 │
├────────────────────────┼─────────┼──────────┤
│ classic-queue-amqp-10 │ classic │ 1089075 │
├────────────────────────┼─────────┼──────────┤
│ quorum-queue-amqp-10 │ quorum │ 148580 │
└────────────────────────┴─────────┴──────────┘

AMQP 1.0 基准测试博客文章中所述,仲裁队列会fsync(更准确地说,是 fdatasync),而经典队列则不会。因此,即使没有复制,仲裁队列也比经典队列慢得多,因为我使用的是消费者级磁盘,每次 fsync 需要至少 5 毫秒。对于生产集群,建议使用高端的企业级磁盘,其 fsync 速度更快。

结果显示,单个 AMQP 0.9.1 连接发送到目标经典队列和目标仲裁队列的消息数量大致相同。这是因为 quorum-queue-amqp-091 在我的基准测试中导致整个 AMQP 0.9.1 连接大约每秒被阻塞(和解除阻塞)80 次。因此,单个 AMQP 0.9.1 连接到多个目标队列(classic-queue-amqp-091quorum-queue-amqp-091)的发布速率受限于最慢的目标队列(quorum-queue-amqp-091)。总共,AMQP 0.9.1 连接发送了 159,077 + 155,782 = 314,859 条消息。

相反,得益于链路流控制,RabbitMQ 只限制到 quorum-queue-amqp-10 目标的链路,允许 AMQP 1.0 客户端继续以全速发布到 classic-queue-amqp-10 目标。总共,AMQP 1.0 连接发送了 1,089,075 + 148,580 = 1,237,655 条消息。

因此,在我们简单的基准测试中,AMQP 1.0 的总发送吞吐量是 AMQP 0.9.1 的四倍(!)

优势 #4

当一个目标队列过载时,客户端可以继续从所有源队列高速消费。因此,AMQP 1.0 客户端可以使用单个连接同时进行发布和消费,并实现高吞吐量。

优势 #3 描述了当一个目标队列过载时,RabbitMQ 会阻塞读取器进程读取任何 AMQP 0.9.1 帧。这意味着客户端不仅无法发布消息,而且也无法消费消息。这是因为客户端的消息确认不再被 RabbitMQ 处理,一旦其预取限制达到,就无法向消费者传递新消息。

尽管这种消费节流是暂时的(AMQP 0.9.1 reader 进程每秒会被阻塞和解除阻塞多次),但它会显著降低消费速率。

RabbitMQ AMQP 0.9.1 文档建议:

因此,建议在可能的情况下,发布者和消费者使用单独的连接,以便消费者不受可能应用于发布连接的流控制的影响,这会影响手动消费者确认。

这导致了整个 AMQP 0.9.1 客户端库生态系统都采纳了这种“最佳实践”,即使用单独的连接进行发布和消费。例如,RabbitMQ AMQP 0.9.1 C++ 库说明

发布和消费发生在不同的连接上

一个常见的应用程序陷阱是在同一个连接上进行消费和生产。这可能导致消费速率降低,因为 RabbitMQ 会对快速发布者施加反压——根据确切的队列消费/发布情况,这可能导致恶性循环。

相比之下,AMQP 1.0 链路流控制允许只减缓客户端应用程序中的链路发送方。所有其他链路(无论是发送还是消费)都可以继续以全速运行。

因此,在 AMQP 1.0 中,客户端可以使用单个连接同时进行发布和消费。

基准测试:一个发送方,一个接收方

程序 one_sender_one_receiver 模拟了一个场景:客户端打开两个链路。

  • 链路 1 从经典队列接收。
  • 链路 2 发送到仲裁队列。

该程序打开了一个 AMQP 0.9.1 连接和通道,以及一个 AMQP 1.0 连接和会话。

为了准备基准测试,该程序将一百万条消息写入每个经典队列。

在 AMQP 0.9.1 通道和 AMQP 1.0 会话上,都有两个 goroutines:

  • 一个 goroutine(链路 1)以 200 的预取值从经典队列接收消息并确认每一条。
  • 一个 goroutine(链路 2)以 10,000 条消息的批次发布到仲裁队列。(在收到所有 10,000 个确认后,发布下一批。)
    main.go                                                      RabbitMQ
+-------------+ +----------------------------------+
| | AMQP 0.9.1 connection | |
| |#####################################| |
| +---+ |-------------------------------------| +------------------------+ |
| | C | | classic-queue-amqp-091 | |
| +---+ +------------------------+ |
| AMQP 0.9.1 channel |
| +---+ +------------------------+ |
| | P | | quorum-queue-amqp-091 | |
| +---+ |-------------------------------------| +------------------------+ |
| |#####################################| |
| | | |
| | | |
| | | |
| |#####################################| |
| +---+ |-------------------------------------| +-----------------------+ |
| | C |O<============================================O| classic-queue-amqp-10 | |
| +---+ +-----------------------+ |
| AMQP 1.0 session |
| +---+ +-----------------------+ |
| | P |O======================================+=====>O| quorum-queue-amqp-10 | |
| +-+-+ |----------------------------------|--| +-----------------------+ |
| | |##################################|##| |
| | | AMQP 1.0 connection | | |
+------|------+ | +----------------------------------+
| |
| |
Publisher or Consumer AMQP 1.0 link
goroutine

按如下方式运行基准测试:

  1. 使用 make run-broker 在 Ubuntu 机器上启动 RabbitMQ 服务器 v4.0.0-beta.6
  2. 使用 go run one_sender_one_receiver/main.go 执行 Go 程序。
  3. 程序完成后,列出每个队列中的消息数量。
./sbin/rabbitmqctl --silent list_queues name type messages --formatter=pretty_table
┌────────────────────────┬─────────┬──────────┐
│ name │ type │ messages │
├────────────────────────┼─────────┼──────────┤
│ classic-queue-amqp-091 │ classic │ 990932 │
├────────────────────────┼─────────┼──────────┤
│ quorum-queue-amqp-091 │ quorum │ 172800 │
├────────────────────────┼─────────┼──────────┤
│ classic-queue-amqp-10 │ classic │ 336229 │
├────────────────────────┼─────────┼──────────┤
│ quorum-queue-amqp-10 │ quorum │ 130000 │
└────────────────────────┴─────────┴──────────┘

虽然 AMQP 0.9.1 客户端仅消费了 1,000,000 - 990,932 = 9,068 条消息,但 AMQP 1.0 客户端消费了 1,000,000 - 336,229 = 663,771 条消息。

因此,在此基准测试中,AMQP 1.0 客户端接收的消息数量是 AMQP 0.9.1 客户端的 73 倍(!)

注意

在 AMQP 0.9.1 中,消费者预取限制了未确认消息的数量。当消费者通过发送 basic.ack 帧来确认消息时,RabbitMQ 会传递其他消息。

在 AMQP 1.0 中,消息确认独立于链路流控制。消费者通过发送处置(disposition)帧来确认消息,这不会促使 RabbitMQ 发送更多消息。相反,客户端必须通过发送 flow 帧来补充链路信用额度,RabbitMQ 才能继续发送消息。为了方便起见,一些 AMQP 1.0 客户端库会在您的应用程序确认消息时自动发送 dispositionflow 帧。

传递计数(Delivery-Count)

到目前为止,我们只理解了 flow 帧的一个字段:link-credit

在以下场景中会发生什么?

Receiver                                  Sender
=======================================================================
...
link state: link state:
link-credit = 3 link-credit = 3

flow(link-credit = 6) ---+ +--- transfer(...)
\ /
x
/ \
<--+ +-->

link state: link state:
link-credit = 5 link-credit = 6

最初,link-credit 是 3。接收方决定发送一个 flow 帧,将新的 link-credit 设置为 6。同时,发送方发送一个 transfer 帧。

由于接收方在发送 flow 帧后收到了 transfer 帧,它将计算其新的 link-credit 为 6 - 1 = 5。然而,由于发送方在发送 transfer 帧后收到了 flow 帧,它将信用额度设置为 6。结果,状态——以及因此对链路还剩下多少信用额度的视图——变得不一致。这是有问题的,因为发送方可能会溢出接收方。

为了防止这种不一致,链路状态和 flow 帧都需要第二个字段。

<field name="delivery-count" type="sequence-no"/>

每次传输消息时,传递计数(delivery-count)会增加 1。具体来说,发送方发送消息时递增传递计数,接收方接收消息时递增传递计数。

当发送方收到一个 flow 帧(包含 link-credit 和 delivery-count)时,发送方会根据此公式设置其 link-credit:

link-credit(snd) := delivery-count(rcv) + link-credit(rcv) - delivery-count(snd).

(snd) 指的是发送方的链路状态,而接收方的链路状态 (rcv)flow 帧中发送给发送方。

在发送方,这个公式的意思是:“将新的 link-credit 设置为我在 flow 帧中收到的 link-credit,减去任何正在传输中的交付(in-flight deliveries)。”

传递计数(delivery-count)的目的是建立事件序列的顺序,这些事件是:

  • 发送方发送消息
  • 接收方接收消息
  • 接收方授予链路信用额度
  • 发送方计算接收方的链路信用额度

使用传递计数解决了我们之前讨论的不一致问题。

假设传递计数最初设置为 20。

Receiver                                      Sender
========================================================================================
...
link state: link state:
delivery-count = 20 delivery-count = 20
link-credit = 3 link-credit = 3

flow(delivery-count = 20,
link-credit = 6) ---+ +--- transfer(...)
\ /
x
/ \
<--+ +-->

link state: link state:
delivery-count = 21 delivery-count = 21
link-credit = 5 link-credit = 20+6-21 = 5 (above formula)
注意

一些 AMQP 1.0 字段,包括传递计数,是 sequence-no 类型。这些是 32 位 RFC-1982 序列号,范围为 [0 .. 4,294,967,295] 并会回绕:加 1 到 4,294,967,295 将得到 0。

注意

传递计数由发送方初始化,发送方在 attach 帧的 initial-delivery-count 字段中发送其选择的值。

发送方可以将传递计数初始化为它选择的任何值,例如 0、10 或 4,294,967,295。这个值没有内在含义,除了我们前面讨论的用途:比较接收方和发送方的传递计数,以确定有多少消息正在传输中。

信息

据我所知,AMQP 1.0 链路流控制似乎基于 1995 年的论文《ATM 网络基于信用的流控制》

AMQP 1.0 规范第 2.6.7 节中的公式

link-credit(snd) := delivery-count(rcv) + link-credit(rcv) - delivery-count(snd).

与论文中的公式

Crd_Bal = Buf_Alloc - (Tx_Cnt - Fwd_Cnt)(1)

匹配。

AMQP 1.0 规范甚至采纳了论文中的类似术语,例如“链路”(link)和“节点”(node)。此外,该规范可能指的是“传递计数”(delivery-count),因为它在论文中也被称为“计数”(count)(尽管规范澄清在 AMQP 中它实际上不是计数,而是序列号)。

论文中的图 2 很好地说明了该概念。

Credit-Based Flow Control for ATM Networks, Figure 2: Credit Update Protocol
ATM 网络基于信用的流控制,图 2:信用更新协议

在此图中:

  • Tx_Cnt 对应于 delivery-count(snd)
  • Fwd_Cnt 对应于 delivery-count(rcv)
  • Crd_Bal 对应于 link-credit(snd)
  • Buf_Alloc 对应于 link-credit(rcv)

该论文详细解释了接收方应该多久以及每次补充多少链路信用额度。如果你想成为 AMQP 1.0 流控制方面的专家,我建议阅读 AMQP 1.0 规范和该论文。

排水(Drain)

在理解了 flow 帧的两个最重要的链路控制字段(link-creditdelivery-count)之后,我们来介绍 drain 字段。

<field name="drain" type="boolean" default="false"/>

默认情况下,drain 字段设置为 false。由接收方决定是否启用排水模式,发送方将 drain 设置为接收方已知的最后一个值。

排水意味着发送方应该通过发送可用消息来使用接收方的所有链路信用额度。如果没有足够的消息发送,发送方仍然必须用尽所有链路信用额度,如下所示:

  1. 将传递计数(delivery-count)按剩余的 link-credit 递增。
  2. 将 link-credit 设置为 0。
  3. 发送一个 flow 帧到接收方。

通过将 drain 字段设置为 true,消费者请求 RabbitMQ“发送一个传输或一个帧。” 如果源队列为空,RabbitMQ 将只快速回复一个 flow 帧。

因此,drain 字段允许消费者在同步获取消息时设置超时,如 AMQP 1.0 规范的图 2.44:带超时的同步获取所示。

    Receiver                                      Sender
=================================================================
...
flow(link-credit=1) ---------->
*wait for link-credit <= 0*
flow(drain=True) ---+ +--- transfer(...)
\ /
x
/ \
(1) <--+ +-->
(2) <---------- flow(...)
...
-----------------------------------------------------------------
(1) If a message is available within the timeout, it will
arrive at this point.
(2) If a message is not available within the timeout, the
drain flag will ensure that the sender promptly advances the
delivery-count until link-credit is consumed.

由于 link-credit 被快速消耗,消费者可以明确地确定是收到了消息还是操作超时。

回显(Echo)

drain 字段类似,echo 字段:

  • 默认设置为 false
  • 由消费者决定,因为 RabbitMQ 在其当前实现中不设置此字段。
<field name="echo" type="boolean" default="false"/>

消费者可以设置 echo 字段来请求 RabbitMQ 回复一个 flow 帧。一个用例如图 2.46:停止入站消息(Figure 2.46: Stopping Incoming Messages)所示。

    Receiver                                       Sender
================================================================
...
<---------- transfer(...)
flow(..., ---+ +--- transfer(...)
link-credit=0, \ /
echo=True) x
/ \
(1) <--+ +-->
(2) <---------- flow(...)
...
----------------------------------------------------------------
(1) In-flight transfers can still arrive until the flow state
is updated at the sender.
(2) At this point no further transfers will arrive.
优势 #5

在 AMQP 1.0 中,消费者可以停止/暂停,之后再恢复。

在 AMQP 0.9.1 中,消费者不能暂停和恢复。相反,消费者必须使用 basic.cancel 方法取消订阅,然后才能使用 basic.consume 注册新消费者。

优势 #6

AMQP 1.0 允许从一个单一活动消费者优雅地切换到下一个,同时保持消息顺序。

在 AMQP 1.0 中,消费者可以选择先停止链路,确认消息后再分离(detach)链路(首选),或者直接分离链路。直接分离链路会将任何未确认的消息重新排队。无论哪种方式,分离链路都会激活下一个消费者,并按原始顺序接收消息。

相比之下,AMQP 0.9.1 中的单一活动消费者无法优雅且安全地切换到下一个。当 AMQP 0.9.1 消费者通过 basic.cancel 取消订阅但仍有未确认的消息时,队列将激活下一个消费者。如果 AMQP 0.9.1 客户端随后崩溃,旧消费者签出的消息将被重新排队,可能违反消息顺序。为了保持消息顺序,AMQP 0.9.1 客户端必须关闭整个通道(在调用 basic.cancel 之前),以便在下一个消费者激活之前重新排队消息。

可用(Available)

RabbitMQ 在 flow 帧中设置 available 字段,以告知消费者有多少消息可用。

<field name="available" type="uint"/>

available 值仅为近似值,因为从队列发出此信息到 flow 帧到达消费者这段时间内,此信息可能已经过时,例如当其他客户端发布消息到该队列或从该队列消费消息时。

对于经典队列仲裁队列available 表示准备好传递的消息数量(即队列长度,不包括已签出给消费者的消息)。

对于流(streams)available 表示已提交偏移量与最后消费偏移量之间的差值。大致来说,已提交偏移量是 RabbitMQ 集群中不同流副本同意的流的结束点。最后消费的偏移量可能与已提交偏移量相同,也可能落后于它。available 值仅仅是一个估计,因为流偏移量不一定代表一条消息

如果启用了单一活动消费者功能,仲裁队列将为所有非活动(等待)的消费者返回 available = 0。这是有道理的,因为无论它们补充了多少信用额度,对于非活动消费者来说都没有可用的消息。

在前一节“回显”中,我们了解到消费者设置 echo 字段的一个用例是停止链路。另一个用例是当消费者想了解其正在消费的队列中可用消息的数量时。

优势 #7

AMQP 1.0 将队列中可用消息的数量告知消费者。

在从 RabbitMQ 发送给消费者的每个 flow 帧中包含此信息,这比 AMQP 0.9.1 具有优势。在 AMQP 0.9.1 中,查询可用消息的方法更繁琐且效率低下。

  • queue.declare 配合 passive=truequeue.declare_ok 回复将包含一个 message_count 字段。
  • basic.getbasic.get_ok 回复将包含一个 message_count 字段。

available 字段也可以由发布者设置,以告知 RabbitMQ 发布者有多少消息可用。目前 RabbitMQ 会忽略此信息。

属性(Properties)

flow 帧的 properties 字段可以携带应用程序特定的链路状态属性。

<field name="properties" type="fields"/>

目前 RabbitMQ 不使用此字段。

优势 #8

AMQP 1.0 链路流控制是可扩展的。

想象一下 RabbitMQ 希望允许发布者直接向仲裁队列领导者发送消息。由于所有 Ra 命令(包括入队消息)都必须首先通过领导者,因此客户端直接连接到承载仲裁队列领导者的 RabbitMQ 节点是有意义的。这种“队列局部性”将减少 RabbitMQ 集群内部流量,从而提高延迟和吞吐量。当领导者更改时,RabbitMQ 可以通过 properties 字段向发布者推送领导者更改通知(包括承载仲裁队列领导者的新 RabbitMQ 节点),而不是导致 RabbitMQ 分离链路(这可能会干扰发布应用程序)。应用程序可以决定何时方便分离链路,并在另一个连接上附加新链路以继续“本地”发布。

或者,RabbitMQ 可以在每个 flow 帧的 properties 字段中发送一个布尔值,键为 local,以指示链路当前是本地发布还是本地消费。如果 local 字段从 true 变为 false,客户端可以通过 HTTP over AMQP 1.0 查询新的队列拓扑和领导者。

这些只是假设性的例子,展示了 RabbitMQ 在未来如何利用链路流控制的可扩展性,这比 AMQP 0.9.1 具有优势。

总结

我们了解到 AMQP 1.0 链路流控制可以保护单个消费者或队列免受消息过载的侵害。接收方会定期向发送方提供其当前可以处理的消息数量的反馈。

每个链路端点(发送方和接收方)都维护完整的链路流控制状态,并通过流(flow)帧与另一方交换此状态。

以下链路流控制字段由接收方独立确定:

  • 链路信用额度(link-credit)
  • 排水(drain)

以下链路流控制字段由发送方独立确定:

  • 传递计数(delivery-count)
  • 可用(available)

会话流控制(Session Flow Control)

在深入研究会话流控制之前,让我们回顾一下会话到底是什么。

会话(Session)

客户端库为每个 AMQP 连接创建一个 TCP 连接。AMQP 1.0 连接使用打开(open)帧建立。

在一个 AMQP 1.0 连接中,客户端可以启动多个 AMQP 1.0 会话。这类似于 AMQP 0.9.1 客户端如何在 AMQP 0.9.1 连接中打开多个AMQP 0.9.1 通道。AMQP 1.0 会话使用开始(begin)帧启动。

在一个会话中,客户端可以通过发送附加(attach)帧来创建链路

  Client App                                       RabbitMQ
+-------------+ +-------------+
| |################################| |
| +---+ |--------------------------------| +---+ |
| | C |O<===============================+========O| Q | |
| +-+-+ \ |-------------------+-------|----| |+-+-+ |
| | \ |#######+###########|#######|####| | | |
+-----|-----\-+ | | | +---|--|------+
| \ | | | | |
| Target | | | Source |
| | | | |
Consumer | | Link Queue
| Session
Connection

客户端应用程序可以创建,例如:

  • 一个具有单个会话的连接,或者
  • 一个具有多个会话的连接,或者
  • 多个连接,每个连接带有一个或多个会话。

通常,一个连接带有一个会话就足够了。

打开 AMQP 连接会产生一些开销:必须建立 TCP 连接,分配客户端和服务器双方的操作系统资源(如套接字和 TCP 缓冲区),并产生 TCP 或 TLS 握手的延迟。此外,在 RabbitMQ 节点上,每个传入的 AMQP 连接都会创建一个监控树

会话可以被认为是“轻量级”连接。每个 AMQP 1.0 会话当前实现为自己的 Erlang 进程。因此,如果连接已经建立,创建会话的成本很低。

在 AMQP 连接中创建第二个会话可能在以下场景中有用:

  • 快速且廉价地创建另一个“虚拟连接”,而无需承担上述 AMQP 连接设置开销。
  • 确保高优先级的传输帧(需要低延迟)不会被其他传输帧阻塞。
  • 当 RabbitMQ 会话进程变得非常繁忙时(例如,路由消息到队列),增加并行性。

流字段(Flow Fields)

会话提供了一个基于传输帧数量的流控制方案。由于给定连接的帧有最大大小,这提供了基于传输字节数量的流控制。

记住,一个大消息会被分割成多个传输帧。

AMQP 1.0 中的会话流控制比链路流控制运行在更高的层级。链路流控制保护单个消费者和队列,而会话流控制旨在保护整个客户端应用程序和 RabbitMQ 整体。

就像每个链路端点维护链路流控制状态并通过 flow 帧交换此状态一样,每个会话端点维护会话流控制状态,并在相同的 flow 帧中交换此状态。因此,flow 帧中剩余的字段用于会话流控制。

<field name="next-incoming-id" type="transfer-number"/>
<field name="incoming-window" type="uint" mandatory="true"/>
<field name="next-outgoing-id" type="transfer-number" mandatory="true"/>
<field name="outgoing-window" type="uint" mandatory="true"/>

incoming-window 类似于 link-credit 字段,接收方告知发送方它可以容忍接收多少“单元”。区别在于,对于 link-credit,单元是潜在的大应用程序消息,而对于 incoming-window,单元是 transfer 帧。

为了提供一个极端的例子以便更好地理解:如果连接上协商的 max-frame-size 非常低,并且一条消息非常大,如果接收方授予发送方一个链路信用额度,发送方在发送整条消息时仍可能被会话流控制阻塞。

next-incoming-idnext-outgoing-id 是序列号,其作用与链路流控制中的传递计数相同:它们确保当 transfer 帧与 flow 帧并行发送时,另一端的窗口能够正确计算。会话流控制需要两个序列号,而链路流控制只需要一个,因为会话是双向的,而链路是单向的。

最初,RabbitMQ 允许发布者发送400 个传输帧。每当 RabbitMQ 会话进程处理了该数字的一半(200 个传输帧)时,RabbitMQ 就会通过向发布者发送一个包含 incoming_window = 400flow 帧来扩展此窗口。

400 的值可以通过 advanced.config 设置 rabbit.max_incoming_window 进行配置。

RabbitMQ 警报(RabbitMQ Alarms)

这个规则的唯一例外是当触发内存或磁盘警报时。为了保护 RabbitMQ 免于耗尽内存或磁盘空间,每个会话将通过向发布者发送一个 incoming_window = 0flow 帧来关闭其入站窗口,从而有效地阻止它们发送任何进一步的 transfer 帧。

优势 #9

在集群范围内的内存或磁盘警报发生时,RabbitMQ 将只阻止 AMQP 1.0 客户端发布transfer帧。其他操作,例如同一会话上的 AMQP 1.0 客户端消费,或创建仅用于清空 RabbitMQ 中队列(从而减少内存和磁盘使用)的新连接,仍然是允许的。

在 AMQP 0.9.1 中,RabbitMQ 将完全阻止从连接套接字读取,并阻止打开新连接,直到警报清除。AMQP 0.9.1 客户端必须使用单独的连接进行发布和消费,才能在警报期间继续消费。

优势 #10

正如在优势 #4 和 #9 中所述,由于客户端可以安全有效地使用单个 AMQP 1.0 连接进行发布和消费,因此 RabbitMQ 中的内存使用量减少了

AMQP 0.9.1 实际上需要 AMQP 1.0 两倍数量的连接。 AMQP 1.0 基准测试提供了关于可以节省多少内存的见解。

入站窗口(Incoming-Window)

AMQP 1.0 流控制的缺点是其复杂性。虽然链路流控制和会话流控制背后的思想和动机是合理的,但在所有情况下,实现一个层(会话流控制)在另一个层(链路流控制)之上都可能很棘手且难以高效实现。请考虑客户端可以:

  • 独立修改会话流控制和链路流控制(在同一个 flow 帧中或在单独的 flow 帧中)。
  • 随时以不同的频率发送 flow 帧。
  • 动态地增加或减少会话窗口或链路信用额度。
  • 授予 0 的链路信用额度(导致队列停止发送)或巨量的链路信用额度(最高可达 40 亿)。
  • 关闭其会话 incoming-window(导致服务器会话停止发送)或广泛打开它(最高可达 40 亿)。
  • 停止从其套接字读取,将 TCP 反压应用于服务器。
  • 将其他特殊逻辑(如 drain=true)加入其中。

在 AMQP 1.0 程序中,并发是理想的。根据编程语言,客户端或代理的不同部分由不同的线程或进程实现。

RabbitMQ 通过在不同 Erlang 进程中运行连接读取器进程(解析帧)、连接写入器进程(序列化帧)、会话进程(例如,路由消息)和队列进程(存储和转发消息)来实现并发。

在客户端授予发送队列巨量 link-credit 但维护非常小的会话 incoming-window 的场景中,允许队列进程传递消息,而会话进程不允许将它们转发给客户端。在这种情况下,消息会缓冲在会话进程中,直到客户端的 incoming-window 允许它们被转发。

为了防范这种情况——大量消息可能会缓冲在会话进程中——RabbitMQ 在其当前实现中,内部以最多 256 个链路信用额度的批次向队列进程授予链路信用额度。即使客户端授予了大量的链路信用额度,队列最多也只能看到给定消费者的 256 个信用额度。一旦服务器会话进程发送了这 256 条消息,服务器会话进程将授予队列下一批 256 个信用额度。

256 的值可以通过 advanced.config 设置 rabbit.max_queue_credit 进行配置。

总结

这篇博客文章解释了 AMQP 1.0 链路流控制和会话流控制的工作原理。

我们了解到 RabbitMQ 内的各个进程如何保护自己免受过载:

  • 队列进程受链路流控制保护。
  • 会话进程和 RabbitMQ 整体受会话流控制保护。
  • 连接读取器进程通过应用 TCP 反压来保护(当它们无法足够快地读取帧时,这种情况应该很少见)。

这篇博客文章还强调了 AMQP 1.0 流控制相对于 AMQP 0.9.1 流控制的十个优势。主要优势包括:

  • 对消费者进行精细控制,允许它们在任何时间点确定它们希望从特定队列消费的消息数量。
  • 当目标队列达到其极限时,在单个 AMQP 连接上发布和消费可以实现更高的吞吐量。我们进行了两次基准测试:
    1. 在发送到两个不同目标队列时,AMQP 1.0 的总发送吞吐量是 AMQP 0.9.1 的四倍。
    2. 在极端情况下,当从一个队列接收消息而同时又快速发送到另一个队列时,AMQP 1.0 的消费速率是 AMQP 0.9.1 的 73 倍。
  • 安全有效地使用单个连接进行发布和消费。
  • 消费者可以随时停止和恢复的能力。
  • 可扩展性以适应未来用例。
© . This site is unofficial and not affiliated with VMware.