Skip to main content
版本:4.1

消费者预取

概述

消费者预取是 通道预取机制 的扩展。

AMQP 0-9-1 规范指定了 basic.qos 方法,以便在消费时(也称为“预取计数”)限制通道(或连接)上未确认的消息数量。不幸的是,通道不是此操作的理想范围 - 因为单个通道可能从多个队列消费,所以通道和队列需要针对发送的每条消息进行协调,以确保它们不超过限制。这在单台机器上很慢,而在跨集群消费时则非常慢。

此外,对于许多用途来说,更自然的做法是指定适用于每个消费者的预取计数。

因此,在将预取应用于通道上的多个消费者时,RabbitMQ 稍微偏离了 AMQP 0-9-1 规范

AMQP 0-9-1 中 prefetch_count 的含义RabbitMQ 中 prefetch_count 的含义
在通道上的所有消费者之间共享分别应用于通道上的每个新消费者

单个消费者

以下 Java 中的基本示例将一次最多接收 10 条未确认的消息

Channel channel = ...;
Consumer consumer = ...;
channel.basicQos(10); // Per consumer limit
channel.basicConsume("my-queue", false, consumer);

0 被视为无限,允许任意数量的未确认消息。

Channel channel = ...;
Consumer consumer = ...;
channel.basicQos(0); // No limit for this consumer
channel.basicConsume("my-queue", false, consumer);

独立消费者

此示例在同一通道上启动两个消费者,每个消费者将独立地一次最多接收 10 条未确认的消息

Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10); // Per consumer limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);

多个消费者共享限制

AMQP 0-9-1 规范没有解释多次使用不同的 global 值调用 basic.qos 会发生什么。 RabbitMQ 将此解释为意味着应独立于彼此强制执行两个预取限制;只有当未确认消息的两个限制都未达到时,消费者才会收到新消息。

例如

Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10, false); // Per consumer limit
channel.basicQos(15, true); // Per channel limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);

这两个消费者之间最多只有 15 条未确认的消息,每个消费者最多 10 条消息。由于通道和队列之间需要额外的协调开销来强制执行全局限制,因此这会比上面的示例慢。

可配置的默认预取

如果消费者未指定预取值,RabbitMQ 可以使用将应用的默认预取值。该值可以在 高级配置文件 中配置为 rabbit.default_consumer_prefetch

%% advanced.config file
[
{rabbit, [
{default_consumer_prefetch, {false,250}}
]
}
].
© . All rights reserved.