消费者预取
概述
消费者预取是 通道预取机制 的扩展。
AMQP 0-9-1 规范指定了 basic.qos
方法,以便在消费时(也称为“预取计数”)限制通道(或连接)上未确认的消息数量。不幸的是,通道不是此操作的理想范围 - 因为单个通道可能从多个队列消费,所以通道和队列需要针对发送的每条消息进行协调,以确保它们不超过限制。这在单台机器上很慢,而在跨集群消费时则非常慢。
此外,对于许多用途来说,更自然的做法是指定适用于每个消费者的预取计数。
因此,在将预取应用于通道上的多个消费者时,RabbitMQ 稍微偏离了 AMQP 0-9-1 规范
AMQP 0-9-1 中 prefetch_count 的含义 | RabbitMQ 中 prefetch_count 的含义 |
---|---|
在通道上的所有消费者之间共享 | 分别应用于通道上的每个新消费者 |
单个消费者
以下 Java 中的基本示例将一次最多接收 10 条未确认的消息
Channel channel = ...;
Consumer consumer = ...;
channel.basicQos(10); // Per consumer limit
channel.basicConsume("my-queue", false, consumer);
值 0
被视为无限,允许任意数量的未确认消息。
Channel channel = ...;
Consumer consumer = ...;
channel.basicQos(0); // No limit for this consumer
channel.basicConsume("my-queue", false, consumer);
独立消费者
此示例在同一通道上启动两个消费者,每个消费者将独立地一次最多接收 10 条未确认的消息
Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10); // Per consumer limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);
多个消费者共享限制
AMQP 0-9-1 规范没有解释多次使用不同的 global
值调用 basic.qos
会发生什么。 RabbitMQ 将此解释为意味着应独立于彼此强制执行两个预取限制;只有当未确认消息的两个限制都未达到时,消费者才会收到新消息。
例如
Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10, false); // Per consumer limit
channel.basicQos(15, true); // Per channel limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);
这两个消费者之间最多只有 15 条未确认的消息,每个消费者最多 10 条消息。由于通道和队列之间需要额外的协调开销来强制执行全局限制,因此这会比上面的示例慢。
可配置的默认预取
如果消费者未指定预取值,RabbitMQ 可以使用将应用的默认预取值。该值可以在 高级配置文件 中配置为 rabbit.default_consumer_prefetch
%% advanced.config file
[
{rabbit, [
{default_consumer_prefetch, {false,250}}
]
}
].