跳到主要内容

使用 RabbitMQ 的消费者优先级

·4 分钟阅读
Álvaro Videla

在 RabbitMQ 3.2.0 中,我们引入了 消费者优先级,不出所料,它允许我们为消费者设置优先级。这为我们提供了一些关于 RabbitMQ 如何向消费者传递消息的控制,以便获得一种可能对我们的应用程序有益的不同类型的调度。

在你的代码中,你何时想要使用消费者优先级?

异构集群

假设我们的工作集群不在完全相同的硬件上运行。一些机器具有某些硬件特性,这些特性根据我们正在运行的任务类型,使它们比集群中的其他机器更具优势。例如,某些机器具有 SSD,而我们的任务需要大量的 I/O;或者,任务可能需要更快的 CPU 来执行计算;或者更多的 RAM 以便缓存结果以供将来计算。在任何情况下,如果当有两个消费者准备好接收更多消息,其中一个在更好的机器上,那么 RabbitMQ 应该选择更好的机器上的消费者并将消息传递给它,而不是另一台较差机器上的消费者,这将是有趣的。请记住,消费者优先级对准备好接收消息的消费者生效。因此,如果我们较差机器中的一个消费者已准备就绪,并且较好机器中没有准备就绪的消费者,那么 RabbitMQ 将直接向该特定消费者发送消息,而无需等待更快的消费者变为可用。

数据局部性

消费者优先级的另一个用途是从数据局部性中获益。在 RabbitMQ 中,队列内容存在于最初声明队列的节点中,并且在 镜像队列 的情况下,将有一个主节点来协调队列,因此虽然消费者可以连接到集群中的各个节点,并从镜像中获取消息,但最终关于谁消费了哪些消息的信息将传回主节点。在这种情况下,我们可以使用消费者优先级来告诉 RabbitMQ 首先将消息传递给连接到主节点的消费者。为此,连接到主节点的消费者将在发出 basic.consume 命令时为自己设置更高的优先级(前提是它有办法知道它已连接到主节点)。

声明消费者优先级

您可以在下面找到示例代码,展示如何使用 RabbitMQ Java 客户端 声明消费者优先级

import java.util.*;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class Consumer {

private final static String EXCHANGE_NAME = "my_exchange";
private final static String QUEUE_NAME = "my_queue";

public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
System.out.println("Waiting for messages. To exit press CTRL+C");

QueueingConsumer consumer = new QueueingConsumer(channel);

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-priority", 10);
channel.basicConsume(QUEUE_NAME, false, "", false, false, args, consumer);

while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("Received '" + message + "'");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}

此代码实现了一个非常简单的消费者,基于 教程 1 中的示例。有趣的部分是从第 25 行到第 27 行,我们首先创建一个 HashMap 来保存我们的 basicConsume 参数。我们创建一个名为 x-priority 的参数,值为 10(值越高,优先级越高)。当我们调用 basicConsume 时,我们将这些参数传递给 RabbitMQ,就是这样!一个非常强大的功能,但使用起来相当简单。像往常一样,明智的做法是运行性能测试,以确定最适合我们消费者的优先级策略。

© . All rights reserved.