跳到主要内容

使用 RabbitMQ 防止无限增长的缓冲区

·9 分钟阅读
Álvaro Videla

我们架构中不同的服务将需要一定数量的资源才能运行,无论是 CPU、RAM 还是磁盘空间,我们都需要确保我们有足够的资源。如果我们不对服务器将要使用的资源数量设置限制,那么在某个时候我们会遇到麻烦。如果您的数据库耗尽文件系统空间,您的媒体存储空间被图像填满并且从不将它们移动到其他地方,或者您的 JVM 耗尽 RAM,就会发生这种情况。如果您没有过期/删除旧备份的策略,即使您的备份解决方案也会成为问题。嗯,队列也不例外。我们必须确保我们的应用程序不会允许队列永远增长。我们需要制定一些策略来删除/驱逐/迁移旧消息。

为什么会发生这个问题?

我们的队列可能被消息填满的原因有很多。首要原因是我们的数据生产者超过了消费者的速度。幸运的是,解决方案很简单:添加更多消费者。

如果我们的应用程序仍然无法处理负载会发生什么?例如,您的消费者处理每条消息需要太长时间,并且您无法添加更多消费者,因为您已经耗尽了服务器。那么您的队列将开始被消息填满。RabbitMQ 已经针对快速消息传递进行了优化,队列中的消息尽可能少。虽然 RabbitMQ 配备了各种流量控制机制,但您可能当然希望找到一种方法来防止进入流量控制被激活的情况。让我们看看 RabbitMQ 如何在这方面帮助我们。

每个队列的消息 TTL

RabbitMQ 允许设置每个队列的消息 TTL,这将使服务器不传递在队列中存在时间超过定义的每个队列 TTL 的消息。此外,服务器将尝试尽快过期或死信处理这些消息。

当您的数据仅在按时到达时才与生产者相关时,这非常有用。如果您的数据不能被丢弃,但您仍然希望队列尽可能保持为空,请参阅下面的“死信处理”部分

有两种方法设置队列 TTL,一种是在 queue.declare 期间传递一些额外的参数,如下所示

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 60000);
channel.queueDeclare("myqueue", false, false, false, args);

之前的代码将告诉 RabbitMQ 在 60 秒后过期队列 myqueue 上的消息。

也可以通过向我们的队列添加策略来设置相同的策略

rabbitmqctl set_policy TTL ".*" '{"message-ttl":60000}' --apply-to queues

此策略将匹配默认虚拟主机中的所有队列,并将使消息在 60 秒后过期。请注意,Windows 命令有点不同。当然,您可以使该策略仅匹配一个队列。有关它的更多详细信息,请参见此处:参数和策略

如果我们想要更精细地控制哪些消息正在过期怎么办?

每个消息的 TTL

RabbitMQ 还支持设置每个消息的 TTL。我们可以通过在 basic.publish 方法调用中设置 expiration 字段来设置消息的 TTL。与前一种情况一样,该值应以毫秒为单位表示。以下代码将发布一条将在 60 秒后过期的消息

byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties();
properties.setExpiration("60000");
channel.basicPublish("my-exchange", "routing-key", properties, messageBodyBytes);

如果我们将每个消息的 TTL 与每个队列的 TTL 结合使用,则最短的 TTL 将占上风。RabbitMQ 将确保消费者永远不会收到过期的消息,但在每个消息的 ttl 的情况下,在这些消息到达队列头部之前,它们不会过期。

队列 TTL

使用 RabbitMQ,我们还可以让整个队列过期,也就是说,在一段时间未使用后被 RabbitMQ 删除。假设我们设置为在一小时后过期我们的队列。如果在一小时内,该队列上没有消费者,没有发出 basic.get 命令,或者队列没有被重新声明,那么 RabbitMQ 将认为它未使用并将其删除。

例如,如果您在用户在线时为他们创建队列,但在不活动 15 分钟后您想删除这些队列,您可能需要使用此功能。考虑一个聊天应用程序,该应用程序为每个连接的用户保留一个队列。您可以声明一个 auto_delete 队列,该队列将在用户关闭通道后立即消失,但这可能对某些场景有用,但如果用户实际上因为他们在移动网络中连接质量差而断开连接,会发生什么?当然,您不希望在他们断开连接后立即删除他们的所有消息。使用此功能,您可以让这些队列存活更长时间。

以下是如何使用 Java Client 设置 15 分钟队列过期时间

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-expires", 900000);
channel.queueDeclare("myqueue", false, false, false, args);

以及通过策略

rabbitmqctl set_policy expiry ".*" '{"expires":900000}' --apply-to queues

队列长度限制

如果我们希望我们的队列接收的消息不超过某个阈值,我们可以在声明队列时通过 x-max-length 参数来配置它。这是一种相当简洁且简单的方式来控制容量;如果我们的队列达到阈值并且有新消息到达,那么队列前端的消息(“较旧的消息”)将被删除,从而为新到达的消息腾出空间。这种行为的原因之一是旧消息可能与您的应用程序无关,因此新消息被允许进入队列。

请记住,队列长度仅考虑准备好传递的消息。未确认的消息不会添加到计数中。拥有正确的 basic.qos 设置将在此处帮助您的应用程序,因为默认情况下,RabbitMQ 将尽可能多地向消费者发送消息,从而造成您的队列看起来是空的,但实际上您有很多未确认的消息也在占用资源的情况。

设置队列长度限制非常容易,这是一个 Java 示例,它将限制设置为 10 条消息

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-length", 10);
channel.queueDeclare("myqueue", false, false, false, args);

以及通过策略

rabbitmqctl set_policy Ten ".*" '{"max-length":10}' --apply-to queues

混合策略

请记住,在任何给定时间最多只有一个策略应用于队列。因此,如果您连续运行之前的 set_policy 命令,则只有最后一个命令会生效。对同一资源应用多个策略的技巧在于在同一个 JSON 对象上传递所有策略,例如

rabbitmqctl set_policy capped_queues "^capped\." \ 
'{"max-length":10, "expires":900000, "message-ttl":60000}' --apply-to queues

完全不排队

等等,我没看错吧?是的。不排队。

想象一下,在一个非常繁忙的日子里,您到达邮局,却发现每个柜台都很忙。由于您没有时间浪费在排队上,您只是回去继续做您之前正在做的事情。换句话说:您有一个必须立即提供的请求,也就是说:不排队。嗯,RabbitMQ 可以使用您的应用程序消息和队列做类似的事情。

诀窍在于将 per-queue-TTL 设置为 0(零)。如果消息无法立即传递给消费者,那么它们将立即过期。如果您设置了死信交换机,那么您可以让消息死信处理到单独的队列。

死信处理

我们已经多次提到死信处理。此功能的作用是您可以为一个队列设置一个死信交换机 (DLX),然后当该队列上的消息过期或超出队列限制时,该消息将被发布到 DLX。您可以将单独的队列绑定到该交换机,然后在以后处理发送到那里的消息。

这是一个用于设置 DLX 的 queue.declare 示例

channel.exchangeDeclare("some.exchange.name", "direct");

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "some.exchange.name");
channel.queueDeclare("myqueue", false, false, false, args);

死信处理消息将使您的队列保持正确的尺寸和预期的消息数量,但这并不能阻止您用消息填满节点。如果这些消息正在同一节点上的不同队列中排队,那么在某个时候,这个新的死信队列可能会出现问题。在这种情况下,您可以做的是使用 交换机联邦 将这些消息发送到单独的节点,并与应用程序的主流程分开处理它们。

结论

关于到达我们系统的请求的排队论的基本问题之一可以表述如下1

λ = 平均到达时间
µ = 平均服务速率
如果 λ > µ 会发生什么?
队列长度随着时间推移趋于无限。

我们知道,如果我们在架构中的任何时候遇到这个问题,我们的应用程序迟早会遇到麻烦。幸运的是,RabbitMQ 提供了许多功能,例如队列和消息 TTL、队列过期和队列长度,专门用于避免此问题。更有趣的是,我们不需要仅仅因为使用这些功能而丢失消息。死信交换机可以帮助我们将消息重新路由到更合适的位置。现在是我们让这些技术成为我们排队和消息传递武器库的一部分的时候了。

脚注

  1. 计算机系统性能建模与设计:行动中的排队论

© . All rights reserved.