RabbitMQ 4.0:新的 Quorum 队列功能
RabbitMQ 4.0(目前为 beta 版)包含新的 quorum 队列功能
- 消息优先级
- 消费者优先级与单一活动消费者结合使用
- 默认投递限制现在为 20(重大变更!)
- 更快速的长队列恢复
消息优先级
对消息优先级的支持可能是最受需求的 quorum 队列功能,主要是希望迁移到 quorum 队列的现有经典镜像队列用户提出的(请记住,对经典队列镜像的支持已在 4.0 版本中移除)。
然而,优先级支持的方式与经典队列处理优先级的方式显著不同。经典队列需要 x-max-priority
参数来定义给定队列的最大优先级数量(如果未提供此参数,队列将同等对待所有消息)。从 0 到 255 的值在技术上是允许的,但每个队列真正应该使用的优先级不应超过几个。Quorum 队列不需要任何预先声明(无需为给定队列启用优先级),但每个队列只有两个优先级:普通和高。此行为与 AMQP 1.0 规范的行为相匹配(请参阅 AMQP 1.0 规范的第 3.2.1 章)
- 介于 0 和 4(包括 0 和 4)之间的优先级值被视为普通优先级
- 任何高于 4 的值都被视为高优先级
- 如果发布者未指定消息的优先级,则假定值为
4
(普通优先级)
如果 quorum 队列同时包含普通优先级和高优先级消息,则消费者将收到两者的混合消息,高优先级消息与普通优先级消息的比例为 2:1。这种方法避免了饥饿,因为无论高优先级消息的数量如何,普通优先级消息的处理也会取得进展。这与经典队列的实现形成对比,经典队列总是优先投递更高优先级的消息(如果存在),因此普通优先级消息可能永远无法投递(或者,更可能的是,它们的投递延迟会非常高)。
以下是此工作原理的可视化表示。在为此测试做准备时,我们首先发布了 10 万条普通优先级消息,然后发布了 10 万条高优先级消息。由于 quorum 队列在 4.0 版本之前不感知优先级,如果我们在旧版本中这样做,然后启动消费者,它将首先接收普通优先级消息(因为它们较旧),然后接收所有高优先级消息。在 4.0 版本中,我们可以看到消费者立即开始接收大约每秒 1500 条普通优先级消息和两倍数量的高优先级消息的混合消息,总计约为每秒 4500 条消息(实际投递速率在这里并不重要,它们取决于许多因素;2:1 的高/普通优先级比率是优先级上下文中重要的)。一旦队列投递了所有高优先级消息,消费者就开始接收每秒约 4500 条普通优先级消息 - 与此测试场景中它可以处理的消息数量一样多。蓝色虚线(右侧有轴刻度)是队列中就绪消息的数量(两种优先级的总和) - 我们可以看到它从 20 万开始,最终降至零。

让我们考虑相反的情况 - 如果我们先发布所有高优先级消息,然后再发布所有普通优先级消息会怎样?在这种情况下,消费者将按照发布顺序接收消息。普通优先级消息根本没有理由超越更高优先级消息。

此测试是如何执行的?
对于此测试,我们使用了 omq,这是一个用于 AMQP 1.0、MQTT 和 STOMP 的测试客户端。quorum 队列的行为不依赖于使用的协议 - 仅使用 AMQP 1.0 是因为 omq
按消息优先级发出消息消费指标。
# declare a quorum queue (you can use the Management UI or any other method)
rabbitmqadmin declare queue name=qq queue_type=quorum
# publish normal priority messages (10 publishers, 10k messages each)
omq amqp --publishers 10 --consumers 0 --publish-to /queues/qq --message-priority 1 --pmessages 10000
# publish high priority messages
omq amqp --publishers 10 --consumers 0 --publish-to /queues/qq --message-priority 10 --pmessages 10000
# consume all messages from the queue
omq amqp --publishers 0 --consumers 1 --consume-from /queues/qq --consumer-credits 100
对于第二种情况,只需以相反的顺序运行发布命令即可。
如果我需要更多控制怎么办?
如果具有 2:1 投递比率的两个优先级不能满足您的要求,我们可以推荐两件事
- 重新考虑您的需求。😄推理具有许多优先级的消息投递顺序真的很难。可能更容易确保所有消息都得到足够快的投递,并使用优先级来确保在偶尔出现长积压的情况下,重要消息可以跳过队列。
- 如果您确实需要更多优先级和/或更多地控制如何处理不同的优先级,那么使用多个队列是您的最佳选择。您可以开发一个订阅多个队列的消费者,然后决定从哪个队列消费。
消费者优先级与单一活动消费者结合使用
从 RabbitMQ 4.0 开始,quorum 队列在选择单一活动消费者时将考虑消费者优先级。如果更高优先级变得可用(订阅),quorum 队列将切换到它。如果您有多个队列,每个队列都应有一个消费者,但您不希望应用程序的单个实例成为所有这些队列的消费者,这在第一个启动的应用程序实例订阅所有这些单一活动消费者队列时很可能发生。现在,您可以在订阅不同的队列时选择不同的优先级,以确保每个实例仅从其“首选”队列消费,并且仅充当其他队列的备份消费者。
为了更好地解释此功能,让我们回顾一下所有移动部件。单一活动消费者是一个队列参数,它可以防止队列向多个消费者投递消息,无论有多少个消费者订阅了该队列。一个消费者是活动的,所有其他消费者都不是。如果活动消费者断开连接,则会激活其他消费者之一。如果需要保持严格的消息处理顺序,则使用此功能。
消费者优先级允许您指定,不是以公平的轮询方式向所有订阅的消费者投递消息(这是经典队列和 quorum 队列的默认行为),而是应该首选某个消费者。
在 4.0 版本之前,这些功能实际上是互斥的 - 如果启用了单一活动消费者,则新消费者永远不会变为活动状态,无论其优先级如何,只要之前的消费者保持活动状态即可。从 4.0 版本开始,如果新消费者的优先级高于当前活动消费者的优先级,则 quorum 队列将切换到更高优先级的消费者:它将停止向当前消费者投递消息,等待所有消息得到确认,然后将停用旧消费者,并激活更高优先级的消费者。
下图显示了此行为。此图上有三个指标
- 绿线显示第一个(默认优先级)消费者消费的消息数量(碰巧配置为每秒消费 10 条消息)
- 黄色,显示相同的值,但适用于第二个,更高优先级的消费者
- 蓝色,显示未确认消息的数量(右侧的轴刻度)

最初,我们只有一个消费者,正如预期的那样,它每秒消费 9-10 条消息(9 和 10 之间的跳跃仅仅是指标的发出和显示方式的结果)。此消费者配置了 1000 条消息的预取,并且由于队列中有很多消息 - 预取缓冲区已最大程度地使用。然后出现黄线,最初为每秒 0 条消息。这是更高优先级的消费者,它已经连接,但尚未激活。从它连接的那一刻起,我们可以看到未确认消息的数量正在减少,因为队列不再向原始消费者投递消息。一旦所有消息都得到确认,新消费者将成为单一活动消费者并接收 1000 条消息,因为那是它的预取值。然后,它愉快地按照配置每秒消费大约 10 条消息。
此测试是如何执行的?
对于此测试,我们使用了 perf-test,这是一个用于 AMQP 0.9.1 的测试客户端。
# Publish 5000 messages to have a backlog (perf-test will declare a quorum queue `qq-sac`)
perf-test --quorum-queue --queue qq-sac --pmessages 5000 --confirm 100 -qa x-single-active-consumer=true --consumers 0
# Start a consumer with the default priority and prefetch of 1000; consume ~10 msgs/s
perf-test --producers 0 --predeclared --queue qq-sac --consumer-latency 100000 --qos 1000
# In another window, some time after starting the first consumer, start a higher priority consumer
perf-test --producers 0 --predeclared --queue qq-sac --consumer-latency 100000 --qos 1000 --consumer-args x-priority=10
一段时间后,您应该看到第一个消费者停止接收消息(不再有来自 perf-test
的输出),而第二个消费者接收消息。
本示例中使用的设置旨在突出显示切换过程,并不适用于真实场景。如果消费者每秒只能处理 10 条消息,通常没有理由将预取值配置为 1000 这么高。
默认投递限制现在为 20
对于某些应用程序来说,这可能是一个重大变更
Quorum 队列现在默认将投递限制设置为 20。过去,未设置限制,因此 quorum 队列会永远尝试投递,直到消息被消费者确认或丢弃。这可能导致消息卡在队列中且永远无法投递的情况。
此更改的缺点是,如果未配置死信队列,则消息将在 20 次尝试后被丢弃。因此,强烈建议为所有 quorum 队列配置死信队列。
更快速的长队列恢复
这与其说是一项功能,不如说是一项内部更改,但肯定值得一提。到目前为止,如果 RabbitMQ 节点重新启动,则该节点上的所有 quorum 队列都必须读取自上次快照以来的所有数据(Raft 日志)以重建其内存状态。例如,如果您现在向 quorum 队列发布数百万条消息,然后重新启动节点,您将看到在节点启动后,队列将在相当长一段时间内(至少几秒钟)报告 0
条就绪消息,并且您将无法开始消费这些消息。队列只是尚未准备好为流量提供服务 - 它仍在从磁盘读取数据(注意:这并不意味着所有数据随后都保存在内存中,绝大多数数据不是,但队列数据的索引/摘要是)。从 RabbitMQ 4.0 开始,quorum 队列创建检查点文件,其中包含队列在特定时间点的状态。启动时,队列可以读取最新的检查点,并且仅读取自该时间点以来的 Raft 日志部分。这意味着 quorum 队列启动所需的时间大大减少。
例如,在我的机器上,具有一个包含 1000 万条 12 字节消息的 quorum 队列的 RabbitMQ 节点大约需要 30 秒才能启动。使用 RabbitMQ 4.0,只需不到一秒的时间。
您可能想知道快照和检查点之间的区别。在许多方面,它们是相同的 - 它们实际上共享将它们写入磁盘的代码。不同之处在于,快照仅在 Raft 日志被截断时创建。对于许多常见的队列用例,这就是全部 - 较旧的消息被消费,我们创建一个不再包含它们的快照,并且我们截断日志。此时,队列不再记得这些消息曾经存在过。另一方面,检查点是在我们无法截断日志时定期创建的。测试用例场景就是一个很好的例子 - 由于我们没有消费任何消息,最旧的消息仍然存在,我们不能仅仅忘记它们。但是检查点仍然允许队列更快地启动。当日志被截断时(在本例中 - 在消费一些较旧的消息之后),检查点可以升级为快照。
如何尝试?
再次,我们将使用 perf-test 来声明队列和发布消息
# Publish 10 million 12-byte messages (feel free to play with other values)
perf-test --quorum-queue --queue qq --consumers 0 --pmessages 5000000 --confirm 1000 --producers 2
# restart the node
rabbitmqctl stop_app && rabbitmqctl start_app
# list the queues (repeat this command until the number of messages is 10 million instead of 0)
rabbitmqctl list_queues
总结
RabbitMQ 4.0 是 RabbitMQ 的一个重要里程碑。随着经典队列镜像的移除,quorum 队列成为高可用性、复制队列的唯一选择(注意:流也是高可用性和复制的,但从技术上讲不是队列;尽管如此,对于过去使用经典镜像队列的某些用例,它们仍然可能是不错的选择)。多年来,Quorum 队列提供了比镜像队列更高的数据安全保证和更好的性能,并且随着这些最新的改进,它们在更广泛的场景中变得更加健壮和高性能。
您现在可以使用 RabbitMQ 4.0 beta 版:https://github.com/rabbitmq/rabbitmq-server/releases/tag/v4.0.0-beta.5