跳到主要内容

使用 RabbitMQ 的分布式信号量

·7 分钟阅读
Álvaro Videla

在这篇博文中,我们将讨论在分布式系统中控制对特定资源访问的问题。解决这个问题的方法在计算机科学中是众所周知的,它被称为信号量,由 Dijkstra 在 1965 年在他的论文 "协作顺序进程" 中发明。我们将看到如何使用 AMQP 的构建块(如消费者、生产者和队列)来实现它。

信号量的需求

在深入实际解决方案之前,让我们看看什么时候我们可能真正需要这样的东西

假设我们的应用程序有许多进程从队列中获取作业,然后将记录插入数据库,我们可能需要限制多少进程可以同时执行此操作。

同样,worker 正在调整大小的图像需要在准备好后存储在网络上的远程服务器上。我们希望防止图像传输溢出我们的网络链路,因此我们也限制了多少 worker 可以同时传输图像。这样,当我们的 worker 尽可能快地调整图像大小时,一旦轮到他们使用网络链路,他们就会分批次将图像移动到最终目的地。

另一个与 RabbitMQ 相关的例子可能是,您的应用程序可能只需要一组生产者中的一个向交换机发送消息,但是一旦该进程停止,您希望该组中的下一个生产者开始发送消息。您可能希望这样做有很多原因,容量控制可能是其中之一。

另一方面,可能需要消费者竞争访问队列,但虽然 AMQP 提供了一种拥有独占队列和独占消费者的方法,但空闲消费者无法知道何时队列访问被释放。因此,使用与上述类似的方法,我们可以让消费者在访问队列时轮流进行。

值得注意的是,没有什么可以阻止我们拥有多个进程访问特定资源。假设我们有十个生产者,但我们只希望其中五个同时发布消息。使用信号量,我们也可以实现这一点。

前面的例子都有一个共同的额外要求:竞争资源的进程不应该轮询 RabbitMQ 或其他协调器,以便知道何时可以开始工作。理想情况下,它们将空闲等待轮到它们,一旦资源被释放,RabbitMQ 将通知下一个进程,以便它可以自动开始工作。

现在让我们继续讨论实现。

实现信号量

我们的信号量将使用队列和消息来实现。惊喜,惊喜!

我们首先声明一个名为 resource.semaphore 的队列,其中 resource 将是我们信号量要控制的资源的名称,它可以是 "images"、"database"、"file_server" 或任何适合我们特定应用程序的名称。

我们向 resource.semaphore 队列发布 *一条* 消息。然后我们启动将寻求访问该消息的进程。每个进程将从 resource.semaphore 队列消费;第一个到达的进程将获得消息,而所有其他进程将空闲等待它。诀窍是这些进程将 *永远不确认* 消息,但它们将以 ack_mode=onresource.semaphore 队列消费。因此,RabbitMQ 将跟踪消息,如果进程崩溃或退出,消息将返回到队列,并将传递给从我们的信号量队列侦听的下一个进程。

通过这种简单的技术,我们将一次只有一个进程可以访问资源,并且我们确信进程崩溃时不会持有资源。当然,我们假设所有访问信号量的进程都表现良好,即:它们永远不会确认消息。如果它们这样做,RabbitMQ 将删除消息,并且组中的所有其他进程都将饿死。

当一个进程想要停止时,我们该怎么办,它如何返回 "令牌"?当然,进程可以突然关闭通道,RabbitMQ 会自动处理消息,但还有一种礼貌的方式来做到这一点。进程可以 *basic.reject* 消息,告诉 RabbitMQ 重新排队消息,以便它返回到信号量队列。

让我们看看这在代码中是如何实现的,我们假设我们已经获得了连接和通道

这是设置我们信号量的代码

channel.queueDeclare("resource.semaphore", true, false, false, null);
String message = "resource";
channel.basicPublish("", "resource.semaphore", null, message.getBytes());

我们创建一个名为 "resource.semaphore" 的持久队列,然后我们使用 *default* 交换机向其发布消息。

这是进程将用于访问信号量的代码

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicQos(1);
channel.basicConsume("resource.semaphore", false, consumer);

while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();

// here we access the resource controlled by the semaphore.

if(shouldStopProcessing()) {
channel.basicReject(delivery.getEnvelope().getDeliveryTag(), true);
}
}

在那里,我们创建一个 QueueingConsumer,它正在等待来自 "resource.semaphore" 队列的消息。我们通过在我们的 basicQos 调用中将 *prefetch-count* 设置为 1,确保我们的进程只从队列中选择一条消息。一旦消息到达,进程将开始使用资源。当满足条件 shouldStopProcessing() 时,进程将 basicReject 消息,告诉 RabbitMQ 重新排队它。请记住,消费者是在 ack 模式下启动的,并且它永远不会确认从信号量队列接收到的消息。如果它这样做,则被认为是错误的。

优先访问信号量

是否可以优先访问信号量?是的,自 3.2.0 版本以来,RabbitMQ 支持 消费者优先级。通过使用消费者优先级,我们可以告诉 RabbitMQ 在传递来自信号量的令牌消息时,应该优先考虑哪些进程。

二进制信号量与计数信号量

到目前为止,我们已经实现了所谓的 *二进制信号量*,也就是说,一个一次只允许一个进程访问资源的信号量。如果我们允许多个进程同时访问同一个资源,但我们仍然需要限制该操作,那么我们可以实现一个 *计数信号量*。为此,当我们设置信号量时,我们可以发布与允许同时工作的进程一样多的消息,而不是发布一条消息。我们需要确保我们的进程像之前一样将 *prefetch-count* 值设置为 1。

更改计数

请注意,设置信号量队列的进程可以随着时间的推移添加额外的消息,以提高处理能力。如果我们想减少可以同时访问资源的进程数量,那么我们将不得不停止正在运行的进程并清除队列。另一种方法是启动一个具有非常 高优先级 的额外消费者,以便它可以从信号量队列中获取尽可能多的消息并确认它们,以便它们从系统中删除。

一些阅读材料

正如您所看到的,使用 AMQP 基本结构实现信号量非常容易,并且使用 RabbitMQ,我们还可以优先访问资源。

最后,我想分享一些关于信号量作为并发结构的的文章。首先,是 Dijkstra 的开创性论文 协作顺序进程。最后是维基百科关于信号量的文章,其中解释了许多定义:信号量

编辑:2014年2月20日

正如 这里 和其他地方与我的同事讨论的那样,此设置对 网络分区 没有弹性,因此请谨慎处理。感谢 @aphyr 和其他人在博客文章中为我提供反馈。在 RabbitMQ 团队,我们始终喜欢保持诚实,并告诉用户服务器可以做什么和不能做什么。

编辑:2014年3月10日

值得注意的是,此设置通常对网络故障没有弹性。例如,可能会发生 worker 拥有令牌并且与服务器的连接突然关闭的情况。然后服务器将获取令牌并将其排队,以便将其传递给另一个 worker。与此同时,网络连接已关闭的 worker 仍会认为它拥有令牌,因此,它将继续访问它不应该访问的资源。

© . All rights reserved.