RabbitMQ 3.5.5 上的新信用流设置
这篇博文是为 2015 年发布的 RabbitMQ 3.5 撰写的。虽然有些部分仍然适用,但有很多过时的信息。例如,RabbitMQ 4.0 不再支持队列镜像,并且“将消息分页到磁盘”不再是 RabbitMQ 必须做的事情,因为消息几乎总是立即持久化到磁盘。
为了防止快速发布者用超过代理在任何特定时刻可以处理的消息来淹没代理,RabbitMQ 实现了一种称为信用流的内部机制,该机制将被 RabbitMQ 内部的各种系统使用,以节流发布者,同时允许消息消费者赶上进度。在这篇博文中,我们将了解信用流的工作原理,以及我们可以做些什么来调整其配置以获得最佳行为。
最新版本的 RabbitMQ 包含几个新的配置值,允许用户调整内部信用流设置。了解这些设置如何根据您的特定工作负载工作可以帮助您在性能方面充分利用 RabbitMQ,但请注意,增加这些值只是为了看看会发生什么可能会对 RabbitMQ 如何响应消息突发产生不利影响,从而影响 RabbitMQ 为了应对内存压力而采取的内部策略。谨慎处理。
要理解新的信用流设置,首先我们需要理解 RabbitMQ 的内部工作原理,关于消息发布和将消息分页到磁盘。让我们首先看看消息发布在 RabbitMQ 中是如何工作的。
消息发布
要查看 credit_flow
及其设置如何影响发布,让我们看看 RabbitMQ 中的内部消息流。请记住,RabbitMQ 是用 Erlang 实现的,进程之间通过相互发送消息进行通信。
每当 RabbitMQ 实例运行时,可能都有数百个 Erlang 进程交换消息以相互通信。例如,我们有一个 reader 进程,它从网络读取 AMQP 帧。这些帧被转换为 AMQP 命令,并转发到 AMQP 通道进程。如果此通道正在处理发布,则它需要向特定的交换机询问此消息应最终进入的队列列表,这意味着通道会将消息传递到这些队列中的每一个。最后,如果 AMQP 消息需要持久化,则 msg_store 进程将接收它并将其写入磁盘。因此,每当我们向 RabbitMQ 发布 AMQP 消息时,我们都有以下 erlang 消息流1
reader -> channel -> queue process -> message store.
为了防止任何这些进程淹没链中的下一个进程,我们实施了一个信用流机制。每个进程最初都会向向其发送消息的进程授予一定数量的信用。一旦进程能够处理 N 个消息,它将向发送消息的进程授予更多信用。在默认的信用流设置(rabbitmq.config
下的 credit_flow_default_credit
)下,这些值是 200 条消息的初始信用,并且在接收进程处理了 50 条消息后,发送消息的进程将获得 50 个额外的信用。
假设我们正在向 RabbitMQ 发布消息,这意味着 reader 将为接收到的每个 AMQP basic.publish
向通道进程发送一个 erlang 消息。这些消息中的每一个都将消耗通道的信用。一旦通道能够处理 50 条消息,它将向 reader 授予更多信用。到目前为止一切顺利。
反过来,通道会将消息发送到与消息路由规则匹配的队列进程。这将消耗队列进程授予通道的信用中的一个信用。在队列进程设法处理 50 个交付后,它将向通道授予 50 个额外的信用。
最后,如果消息被认为是持久性的(它是持久性的并且发布到持久队列),它将被发送到消息存储,在这种情况下,它也将消耗消息存储授予队列进程的信用。在这种情况下,初始值是不同的,并由 msg_store_credit_disc_bound
设置处理:2000 条消息的初始信用和 500 条消息在消息存储处理了 500 条消息后获得的额外信用。
因此,我们知道内部消息如何在 RabbitMQ 内部流动,以及何时将信用授予消息流中上方的进程。棘手的部分是进程之间授予信用。在正常情况下,通道将处理来自 reader 的 50 条消息,然后向 reader 授予 50 个额外的信用,但请记住,通道不仅处理发布,还向消费者发送消息、将消息路由到队列等等。
如果 reader 向通道发送消息的速度高于通道能够处理的速度,会发生什么情况?如果我们达到这种情况,那么通道将阻止 reader 进程,这将导致生产者被 RabbitMQ 节流。在默认设置下,reader 一旦向通道发送 200 条消息就会被阻止,但通道无法处理至少 50 条消息,以便向 reader 返回信用。
同样,在正常情况下,一旦通道设法处理完消息积压,它将向 reader 授予更多信用,但有一个问题。如果通道进程由于类似的原因而被队列进程阻止怎么办?那么本应授予 reader 进程的新信用将被延迟。reader 进程将保持阻塞。
一旦队列进程设法处理完来自通道的交付积压,它将向通道授予更多信用,解除对它的阻止,这将导致通道向 reader 授予更多信用,解除对它的阻止。再次强调,这是在正常情况下,但是,您猜对了,如果消息存储正在阻止队列进程怎么办?那么对通道的信用将被延迟,这将保持阻塞,延迟对 reader 的信用,使 reader 保持阻塞。在某个时候,消息存储将向队列进程授予消息,队列进程将向通道返回消息,然后通道最终将向 reader 授予消息并解除对 reader 的阻止
reader <--[grant]-- channel <--[grant]-- queue process <--[grant]-- message store.
拥有一个通道和一个队列进程使事情更容易理解,但这可能无法反映现实。RabbitMQ 用户通常在同一连接上有多个通道发布消息。更常见的是,一条消息被路由到多个队列。我们刚刚解释的信用流方案发生的情况是,如果其中一个队列阻止了通道,那么reader 也将被阻止。
问题在于,从 reader 的角度来看,当我们从网络读取帧时,我们甚至不知道它属于哪个通道。请记住,通道是 AMQP 连接之上的逻辑概念。因此,即使新的 AMQP 命令最终会进入一个没有阻止 reader 的通道,reader 也无法知道这一点。请注意,我们只阻止发布连接,消费者连接不受影响,因为我们希望消费者从队列中消耗消息。这是一个很好的理由,说明为什么最好拥有专用于发布消息的连接和仅用于消费者的连接。
以类似的方式,每当通道处理消息发布时,它都不知道消息最终将去哪里,直到它执行路由。因此,通道可能会接收到一条消息,该消息应该最终进入一个没有阻止通道的队列。由于在入口时我们不知道任何这些,那么实施的信用流策略是阻止 reader,直到链中的进程能够处理新消息。
RabbitMQ 3.5.5 中引入的新设置之一是能够修改 credit_flow_default_credit
的值。此设置采用 {InitialCredit, MoreCreditAfter}
形式的元组。InitialCredit 默认设置为 200,MoreCreditAfter 设置为 50。根据您的特定工作流程,您需要决定是否值得增加这些值。让我们再次看看消息流方案
reader -> channel -> queue process -> message store.
增加 {InitialCredit, MoreCreditAfter}
的值将意味着在链中的任何点,我们最终都可能拥有比代理在特定时间点可以处理的消息更多的消息。更多的消息意味着更多的 RAM 使用量。msg_store_credit_disc_bound
也是如此,但请记住,每个 RabbitMQ 实例只有一个消息存储2,并且可能有许多通道向同一个队列进程发送消息。因此,虽然队列进程从消息存储获得的 InitialCredit
值为 2000,但该队列可以从不同的通道/连接源摄取该值的许多倍。因此,200 个信用作为初始 credit_flow_default_credit
值可能被认为过于保守,但您需要了解,根据您的工作流程,这是否仍然足够好。
消息分页
让我们看一下 RabbitMQ 队列如何存储消息。当消息进入队列时,队列需要确定消息是否应该持久化。如果消息必须持久化,那么 RabbitMQ 将立即执行3。现在,即使消息已持久化到磁盘,这并不意味着消息已从 RAM 中删除,因为 RabbitMQ 在 RAM 中保留消息缓存,以便在向消费者传递消息时快速访问。每当我们谈论将消息分页到磁盘时,我们都在谈论 RabbitMQ 在必须将消息从此缓存发送到文件系统时所做的事情。
当 RabbitMQ 决定需要将消息分页到磁盘时,它将在内部队列实现中调用函数 reduce_memory_use
,以便将消息发送到文件系统。消息将分批分页;这些批次有多大取决于当前的内存压力状态。它基本上是这样工作的
函数 reduce_memory_use
将接收一个名为 target ram count
的数字,该数字告诉 RabbitMQ 它应该尝试分页消息,直到 RAM 中只剩下那么多消息。请记住,无论消息是否持久,它们仍然保存在 RAM 中,以便快速传递给消费者。只有当内存压力开始时,内存中的消息才会被分页到磁盘。引用我们的代码注释:消息是否在 RAM 中以及它是否持久是正交的问题。
在此块计算期间计算的消息数量是 RAM 中的消息(在前面提到的缓存中)加上 RAM 中保留的待处理确认的数量(即:已传递给消费者但正在等待确认的消息)。如果我们有 20000 条消息在 RAM 中(缓存 + 待处理确认),然后 target ram count
设置为 8000,我们将不得不分页 12000 条消息。这意味着分页将收到 12000 条消息的配额。每个分页到磁盘的消息都将消耗该配额中的一个单位,无论是待处理确认还是缓存中的实际消息。
一旦我们知道有多少消息需要分页,我们需要决定我们应该首先从哪里分页它们:待处理确认,还是消息缓存。如果待处理确认的增长速度快于消息缓存,即:传递给消费者的消息多于摄取的消息,这意味着该算法将首先尝试分页待处理确认,然后尝试将消息从缓存推送到文件系统。如果缓存的增长速度快于待处理确认,则缓存中的消息将首先被推送出去。
这里的关键是,从待处理确认(或者如果它先出现,则从缓存)分页消息可能会导致该过程的第一部分消耗所有需要推送到磁盘的消息配额。因此,如果在我们的示例中,待处理确认将 12000 个确认推送到磁盘,这意味着我们将不会分页缓存中的消息,反之亦然。
分页过程的第一部分将一定数量的消息(在确认 + 从缓存分页的消息之间)发送到磁盘。已分页出的消息刚刚将其内容分页出,但它们在队列中的位置仍然在 RAM 中。现在,队列需要决定是否也需要分页保存在 RAM 中的此额外信息,以进一步减少内存使用量。这就是最终 msg_store_io_batch_size
发挥作用的地方(也与 msg_store_credit_disc_bound
结合使用)。让我们尝试了解它们是如何工作的。
msg_store_credit_disc_bound
的设置影响在将消息发送到磁盘时如何处理内部信用流。rabbitmq_msg_store
模块实现了一个数据库,该数据库负责将消息持久化到磁盘。有关此实现原因的一些详细信息,请参见此处:RabbitMQ、后备存储、数据库和磁盘。
消息存储为每个向其发送写入的客户端提供信用系统。每个 RabbitMQ 队列都将是此存储的读/写客户端。消息存储具有信用机制,以防止特定的写入器用消息淹没其收件箱。假设当前的默认值,当写入器开始与消息存储对话时,它会收到 2000 条消息的初始信用,并且一旦处理了 500 条消息,它将收到更多信用。那么何时消耗此信用呢?每当我们写入消息存储时都会消耗信用,但这并非发生在每条消息上。情节变得复杂了。
自 3.5.0 版本以来,可以将小消息嵌入到队列索引中,而不是必须为此访问消息存储。小于可配置设置(当前为 4096 字节)的消息将在持久化时进入队列索引,因此这些消息不会消耗此信用。现在,让我们看看需要进入消息存储的消息会发生什么。
每当我们发布被确定为持久性的消息(发布到持久队列的持久消息)时,该消息将消耗这些信用中的一个信用。如果必须从上面提到的缓存中将消息分页到磁盘,它也会消耗一个信用。因此,如果在消息分页期间,我们消耗的信用超过了队列当前可用的信用,则分页过程的前半部分可能会停止,因为当消息存储不会接受写入时,发送写入是没有意义的。这意味着,从我们本来必须分页的 12000 个消息的初始配额中,我们只设法处理了其中的 2000 个(假设所有消息都需要进入消息存储)。
因此,我们设法分页了 2000 条消息,但我们仍然将它们在队列中的位置保存在 RAM 中。现在,分页过程将确定是否也需要将这些消息位置中的任何一个分页到磁盘。RabbitMQ 将计算其中有多少可以保留在 RAM 中,然后它将尝试将其余部分分页到磁盘。为了使第二次分页发生,必须分页到磁盘的消息量必须大于 msg_store_io_batch_size
。此数字越大,RabbitMQ 将在 RAM 中保留的消息位置就越多,因此,再次根据您的特定工作负载,您也需要调整此参数。
我们在 3.5.5 中显着改进的另一件事是将队列索引内容分页到磁盘的性能。如果您的消息通常小于 queue_index_embed_msgs_below
,那么您将看到这些更改的好处。这些更改还会影响消息位置如何分页到磁盘,因此您应该也能看到这方面的改进。因此,虽然具有较低的 msg_store_io_batch_size
可能意味着队列索引将有更多工作分页到磁盘,但请记住,此过程已得到优化。
队列镜像
为了使上述描述更简单一些,我们避免将队列镜像引入图中。从通道的角度来看,信用流也会影响镜像。当通道将 AMQP 消息传递到队列时,它会将消息发送到每个镜像,从而消耗每个镜像进程的一个信用。如果任何镜像处理消息的速度较慢,那么该特定镜像可能会导致通道被阻止。如果通道被镜像阻止,并且该队列镜像与网络断开连接,那么通道只有在 RabbitMQ 检测到镜像死亡后才会解除阻止。
信用流也在同步镜像队列时发挥作用,但这您不应太在意,主要是因为您对此无能为力,因为镜像同步完全由 RabbitMQ 处理。
结论
无论如何,我们希望这篇博文内容丰富,并能帮助您进行 RabbitMQ 调优。如果您对新的信用流设置有任何意见或问题,请随时通过 RabbitMQ 邮件列表与我们联系:rabbitmq-users。