跳到主要内容
版本:4.1

消费者确认和发布者确认

概述

本指南涵盖了与数据安全相关的两个相关功能:消费者确认和发布者确认

以及更多内容。消费者和发布者端的确认对于使用消息传递的应用程序中的数据安全至关重要。

更多相关主题在发布者消费者指南中介绍。

基础知识

使用消息代理(如 RabbitMQ)的系统本质上是分布式的。由于协议方法(消息)的发送不能保证到达对等方或被其成功处理,因此发布者和消费者都需要一种用于交付和处理确认的机制。RabbitMQ 支持的几种消息传递协议都提供了此类功能。本指南介绍了 AMQP 0-9-1 中的功能,但其思想在其他支持的协议中基本相同。

消费者到 RabbitMQ 的交付处理确认在消息传递协议中被称为确认;代理到发布者的确认是一种协议扩展,称为 发布者确认。这两个功能都建立在相同的思想之上,并受到 TCP 的启发。

它们对于从发布者到 RabbitMQ 节点以及从 RabbitMQ 节点到消费者的可靠交付至关重要。换句话说,它们对于数据安全至关重要,应用程序和 RabbitMQ 节点对此负有同等责任。

发布者确认是否与消费者交付确认相关?

发布者确认消费者交付确认 是非常相似的功能,它们在不同的上下文中解决了类似的问题

  1. 顾名思义,消费者确认涵盖了 RabbitMQ 与消费者之间的通信
  2. 发布者确认涵盖了发布者与 RabbitMQ 之间的通信

然而,这两个功能完全是正交的,并且互不感知。

发布者确认不知道消费者:它们只涵盖发布者与其连接的节点以及队列(或)领导者副本的交互。

消费者确认不知道发布者:它们的目标是向 RabbitMQ 节点确认给定的交付已成功接收并成功处理,以便可以将已交付的消息标记为将来删除。

有时,发布和消费应用程序需要通过请求和响应进行通信,这需要来自对等方的显式确认。RabbitMQ 教程 #6 演示了如何完成此操作的基础知识,而 Direct Reply-to 提供了一种无需声明大量短生命周期临时响应队列即可实现此目的的方法。

然而,本指南不涵盖此类通信,提及它只是为了将其与本指南中描述的更集中的消息传递协议功能进行对比。

(消费者)交付确认

当 RabbitMQ 将消息传递给消费者时,它需要知道何时将消息视为已成功发送。哪种逻辑最优取决于系统。因此,这主要是应用程序的决定。在 AMQP 0-9-1 中,当使用 basic.consume 方法注册消费者或使用 basic.get 方法按需获取消息时,会做出此决定。

如果您更喜欢面向示例和分步的材料,RabbitMQ 教程 #2 也介绍了消费者确认。

交付标识符:交付标签

在我们继续讨论其他主题之前,重要的是解释如何识别交付(以及确认如何指示其各自的交付)。当注册消费者(订阅)时,RabbitMQ 将使用 basic.deliver 方法传递(推送)消息。该方法携带一个 delivery tag(交付标签),该标签唯一标识通道上的交付。因此,交付标签的作用域限定在每个通道。

交付标签是单调递增的正整数,并由客户端库以这种形式呈现。确认交付的客户端库方法将交付标签作为参数。

由于交付标签的作用域限定在每个通道,因此必须在接收交付的同一通道上确认交付。在不同的通道上确认将导致“未知交付标签”协议异常并关闭通道。

消费者确认模式和数据安全考虑

当节点将消息传递给消费者时,它必须决定是否应将该消息视为已被消费者处理(或至少已接收)。由于多种因素(客户端连接、消费者应用程序等等)可能会失败,因此此决定是数据安全问题。消息传递协议通常提供一种确认机制,允许消费者向与其连接的节点确认交付。是否使用该机制是在消费者订阅时决定的。

根据使用的确认模式,RabbitMQ 可以认为消息已成功交付,要么在消息发送出去后(写入 TCP 套接字)立即交付,要么在收到显式(“手动”)客户端确认时交付。手动发送的确认可以是肯定或否定,并使用以下协议方法之一

  • basic.ack 用于肯定确认
  • basic.nack 用于否定确认(注意:这是 RabbitMQ 对 AMQP 0-9-1 的扩展
  • basic.reject 用于否定确认,但与 basic.nack 相比有一个限制

这些方法如何在客户端库 API 中公开将在下面讨论。

肯定确认只是指示 RabbitMQ 将消息记录为已交付,并且可以丢弃。使用 basic.reject 的否定确认具有相同的效果。区别主要在于语义:肯定确认假定消息已成功处理,而其否定对应物则表明交付未处理但仍应删除。

在自动确认模式下,消息在发送后立即被视为已成功交付。这种模式以更高的吞吐量(只要消费者能够跟上)换取降低的交付和消费者处理安全性。这种模式通常被称为“即发即弃”。与手动确认模式不同,如果在成功交付之前消费者的 TCP 连接或通道关闭,则服务器发送的消息将丢失。因此,自动消息确认应被视为不安全,不适合所有工作负载。

使用自动确认模式时,另一个需要考虑的重要事项是消费者过载。手动确认模式通常与有界通道预取一起使用,这限制了通道上未完成(“正在进行中”)交付的数量。然而,对于自动确认,根据定义没有这样的限制。因此,消费者可能会被交付速率淹没,可能会在内存中累积积压,并耗尽堆空间或使其进程被操作系统终止。某些客户端库将应用 TCP 背压(停止从套接字读取,直到未处理交付的积压降至某个限制以下)。因此,自动确认模式仅建议用于可以高效且稳定地处理交付的消费者。

肯定确认交付

用于交付确认的 API 方法通常在客户端库中作为通道上的操作公开。Java 客户端用户将使用 Channel#basicAckChannel#basicNack 分别执行 basic.ackbasic.nack。这是一个演示肯定确认的 Java 客户端示例

// this example assumes an existing channel instance

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
long deliveryTag = envelope.getDeliveryTag();
// positively acknowledge a single delivery, the message will
// be discarded
channel.basicAck(deliveryTag, false);
}
});

在 .NET 客户端中,方法分别为 IModel#BasicAckIModel#BasicNack。这是一个演示使用该客户端进行肯定确认的示例

// this example assumes an existing channel (IModel) instance

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
{
var body = ea.Body.ToArray();
// positively acknowledge a single delivery, the message will
// be discarded
channel.BasicAck(ea.DeliveryTag, false);
};
String consumerTag = channel.BasicConsume(queueName, false, consumer);

一次确认多个交付

可以批量处理手动确认以减少网络流量。这通过将确认方法的 multiple 字段(参见上文)设置为 true 来完成。请注意,basic.reject 历史上没有该字段,这就是 RabbitMQ 引入 basic.nack 作为协议扩展的原因。

multiple 字段设置为 true 时,RabbitMQ 将确认所有未完成的交付标签,直到并包括确认中指定的标签。与与确认相关的所有其他内容一样,这也在通道范围内。例如,假设通道 Ch 上有交付标签 5、6、7 和 8 未确认,当一个确认帧到达该通道,delivery_tag 设置为 8multiple 设置为 true 时,将确认从 5 到 8 的所有标签。如果 multiple 设置为 false,则交付 5、6 和 7 仍将未确认。

要使用 RabbitMQ Java 客户端确认多个交付,请为 Channel#basicAckmultiple 参数传递 true

// this example assumes an existing channel instance

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
long deliveryTag = envelope.getDeliveryTag();
// positively acknowledge all deliveries up to
// this delivery tag
channel.basicAck(deliveryTag, true);
}
});

这个想法与 .NET 客户端非常相似

// this example assumes an existing channel (IModel) instance

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
{
var body = ea.Body.ToArray();
// positively acknowledge all deliveries up to
// this delivery tag
channel.BasicAck(ea.DeliveryTag, true);
};
String consumerTag = channel.BasicConsume(queueName, false, consumer);

否定确认和重新排队交付

有时,消费者无法立即处理交付,但其他实例可能能够处理。在这种情况下,可能需要将其重新排队,并让另一个消费者接收和处理它。basic.rejectbasic.nack 是用于此目的的两种协议方法。

这些方法通常用于否定确认交付。此类交付可以被代理丢弃、死信或重新排队。此行为由 requeue 字段控制。当该字段设置为 true 时,代理将使用指定的交付标签重新排队交付(或多个交付,稍后将解释)。或者,当此字段设置为 false 时,如果配置了 死信交换机,则消息将被路由到死信交换机,否则将被丢弃。

这两种方法通常在客户端库中作为通道上的操作公开。Java 客户端用户将使用 Channel#basicRejectChannel#basicNack 分别执行 basic.rejectbasic.nack

// this example assumes an existing channel instance

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
long deliveryTag = envelope.getDeliveryTag();
// negatively acknowledge, the message will
// be discarded
channel.basicReject(deliveryTag, false);
}
});
// this example assumes an existing channel instance

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
long deliveryTag = envelope.getDeliveryTag();
// requeue the delivery
channel.basicReject(deliveryTag, true);
}
});

在 .NET 客户端中,方法分别为 IModel#BasicRejectIModel#BasicNack

// this example assumes an existing channel (IModel) instance

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
{
var body = ea.Body.ToArray();
// negatively acknowledge, the message will
// be discarded
channel.BasicReject(ea.DeliveryTag, false);
};
String consumerTag = channel.BasicConsume(queueName, false, consumer);
// this example assumes an existing channel (IModel) instance

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
{
var body = ea.Body.ToArray();
// requeue the delivery
channel.BasicReject(ea.DeliveryTag, true);
};
String consumerTag = channel.BasicConsume(queueName, false, consumer);

当消息被重新排队时,如果可能,它将被放置在其队列中的原始位置。如果不是(由于当多个消费者共享一个队列时来自其他消费者的并发交付和确认),则消息将被重新排队到更靠近队列头的位置。

重新排队的消息可能立即准备好重新交付,具体取决于它们在队列中的位置以及具有活动消费者的通道使用的预取值。这意味着,如果所有消费者都因为无法处理由于瞬态条件导致的交付而重新排队,他们将创建一个重新排队/重新交付循环。此类循环在网络带宽和 CPU 资源方面可能会很昂贵。消费者实现可以跟踪重新交付的次数,并彻底拒绝消息(丢弃它们)或安排在延迟后重新排队。

可以使用 basic.nack 方法一次拒绝或重新排队多条消息。这就是它与 basic.reject 的区别。它接受一个额外的参数,multiple。这是一个 Java 客户端示例

// this example assumes an existing channel instance

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
long deliveryTag = envelope.getDeliveryTag();
// requeue all unacknowledged deliveries up to
// this delivery tag
channel.basicNack(deliveryTag, true, true);
}
});

在 .NET 客户端中,工作方式非常相似

// this example assumes an existing channel (IModel) instance

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
{
var body = ea.Body.ToArray();
// requeue all unacknowledged deliveries up to
// this delivery tag
channel.BasicNack(ea.DeliveryTag, true, true);
};
String consumerTag = channel.BasicConsume(queueName, false, consumer);

通道预取设置 (QoS)

消息以异步方式交付(发送)到客户端,并且在任何给定时刻,通道上可能有多条消息“正在传输”。来自客户端的手动确认本质上也是异步的,但方向相反。

这意味着一个未确认交付的滑动窗口。

对于大多数消费者来说,限制此窗口的大小以避免消费者端的无界缓冲区(堆)增长问题是有意义的。这通过使用 basic.qos 方法设置“预取计数”值来完成。该值定义了通道上允许的最大未确认交付数量。当数量达到配置的计数时,RabbitMQ 将停止在该通道上交付更多消息,直到至少确认一个未完成的消息。

0 表示“无限制”,允许任意数量的未确认消息。

例如,假设通道 Ch 上有四个交付,交付标签为 5、6、7 和 8 未确认,并且通道 Ch 的预取计数设置为 4,除非至少确认一个未完成的交付,否则 RabbitMQ 不会在 Ch 上推送更多交付。

当一个确认帧到达该通道,delivery_tag 设置为 5(或 678)时,RabbitMQ 将注意到并交付另一条消息。一次确认多条消息将使多条消息可用于交付。

值得重申的是,交付流和手动客户端确认完全是异步的。因此,如果在已经有正在传输的交付时更改预取值,则会产生自然的竞争条件,并且通道上可能暂时存在超过预取计数的未确认消息。

每通道、每消费者和全局预取

QoS 设置可以为特定通道或特定消费者配置。消费者预取指南解释了此作用域的效果。

预取和轮询消费者

QoS 预取设置对使用 basic.get(“拉取 API”)获取的消息没有影响,即使在手动确认模式下也是如此。

消费者确认模式、预取和吞吐量

确认模式和 QoS 预取值对消费者吞吐量有重大影响。一般来说,增加预取将提高向消费者交付消息的速率。自动确认模式产生最佳的交付速率。然而,在这两种情况下,已交付但尚未处理的消息数量也会增加,从而增加消费者 RAM 消耗。

应谨慎使用自动确认模式或具有无限预取的手动确认模式。消费者在不确认的情况下消费大量消息将导致与其连接的节点上的内存消耗增长。找到合适的预取值是一个反复试验的过程,并且会因工作负载而异。100 到 300 范围内的值通常提供最佳吞吐量,并且不会带来压垮消费者的重大风险。更高的值通常会遇到收益递减规律

预取值 1 是最保守的。它将显着降低吞吐量,尤其是在消费者连接延迟很高的环境中。对于许多应用程序,更高的值将是适当和最佳的。

当消费者失败或失去连接时:自动重新排队

当使用手动确认时,任何未确认的交付(消息)在发生交付的通道(或连接)关闭时都会自动重新排队。这包括客户端的 TCP 连接丢失、消费者应用程序(进程)故障以及通道级协议异常(在下面介绍)。

请注意,检测不可用客户端需要一段时间。

由于这种行为,消费者必须准备好处理重新交付,并且在实现时要牢记幂等性。重新交付将具有一个特殊的布尔属性 redeliver,由 RabbitMQ 设置为 true。对于首次交付,它将设置为 false。请注意,消费者可能会收到先前已交付给另一个消费者的消息。

客户端错误:双重确认和未知标签

如果客户端多次确认相同的交付标签,RabbitMQ 将导致通道错误,例如 PRECONDITION_FAILED - unknown delivery tag 100。如果使用未知交付标签,也会抛出相同的通道异常。

代理将抱怨“未知交付标签”的另一种情况是,当在与接收交付的通道不同的通道上尝试确认时,无论是肯定还是否定。交付必须在同一通道上确认。

发布者确认

网络可能会以不太明显的方式发生故障,并且检测某些故障需要时间。因此,已将其协议帧或一组帧(例如,已发布的消息)写入其套接字的客户端不能假定消息已到达服务器并已成功处理。它可能在途中丢失,或者其交付可能会显着延迟。

使用标准 AMQP 0-9-1,保证消息不丢失的唯一方法是使用事务——使通道成为事务性的,然后对于每条消息或一组消息发布、提交。在这种情况下,事务是不必要的重量级,并将吞吐量降低 250 倍。为了解决这个问题,引入了一种确认机制。它模仿协议中已存在的消费者确认机制。

要启用确认,客户端发送 confirm.select 方法。根据是否设置了 no-wait,代理可能会响应 confirm.select-ok。一旦在通道上使用了 confirm.select 方法,就称其处于确认模式。事务通道不能置于确认模式,一旦通道处于确认模式,就不能使其成为事务性的。

一旦通道处于确认模式,代理和客户端都会对消息进行计数(计数从第一个 confirm.select 开始为 1)。然后,代理在处理消息时通过在同一通道上发送 basic.ack 来确认消息。delivery-tag 字段包含已确认消息的序列号。代理还可以设置 basic.ack 中的 multiple 字段,以指示已处理所有消息,直到并包括具有序列号的消息。

发布否认

在代理无法成功处理消息的特殊情况下,代理将发送 basic.nack 而不是 basic.ack。在这种情况下,basic.nack 的字段与 basic.ack 中相应的字段具有相同的含义,并且应忽略 requeue 字段。通过否定确认一条或多条消息,代理指示它无法处理这些消息并拒绝承担责任;此时,客户端可以选择重新发布消息。

在通道置于确认模式后,所有后续发布的消息都将被确认或否定确认一次。对于消息何时被确认,不作任何保证。任何消息都不会同时被确认和否定确认。

只有当负责队列的 Erlang 进程中发生内部错误时,才会交付 basic.nack

发布的消息何时将被代理确认?

对于无法路由的消息,一旦交换机验证消息不会路由到任何队列(返回一个空队列列表),代理将发出确认。如果消息也发布为强制性的,则 basic.return 将在 basic.ack 之前发送到客户端。否定确认(basic.nack)也是如此。

对于可路由的消息,当消息被所有队列接受时,将发送 basic.ack。对于路由到持久队列的持久消息,这意味着持久化到磁盘。对于仲裁队列,这意味着仲裁副本已接受消息并将其确认给选定的领导者。

持久消息的确认延迟

路由到持久队列的持久消息的 basic.ack 将在将消息持久化到磁盘后发送。RabbitMQ 消息存储在间隔一段时间(几百毫秒)后批量将消息持久化到磁盘,以最大限度地减少 fsync(2) 调用次数,或者在队列空闲时进行持久化。

这意味着在恒定负载下,basic.ack 的延迟可能达到几百毫秒。为了提高吞吐量,强烈建议应用程序异步处理确认(作为流)或发布批量消息并等待未完成的确认。此操作的确切 API 因客户端库而异。

发布者确认的排序注意事项

在大多数情况下,RabbitMQ 将按照消息发布的顺序向发布者确认消息(这适用于在单个通道上发布的消息)。但是,发布者确认是异步发出的,并且可以确认单个消息或一组消息。发出确认的确切时刻取决于消息的传递模式(持久与瞬态)以及消息路由到的队列的属性(参见上文)。也就是说,不同的消息可以在不同的时间被认为可以确认。这意味着确认的到达顺序可能与它们各自的消息不同。应用程序应尽可能不依赖确认的顺序。

发布者确认和保证交付

如果 RabbitMQ 节点在持久消息写入磁盘之前发生故障,则可能会丢失这些持久消息。例如,考虑以下场景

  1. 客户端将持久消息发布到持久队列
  2. 客户端从队列中消费消息(注意消息是持久的,队列是持久的),但确认未激活
  3. 代理节点失败并重新启动,并且
  4. 客户端重新连接并开始消费消息

此时,客户端可以合理地假设消息将再次交付。事实并非如此:重新启动导致代理丢失消息。为了保证持久性,客户端应使用确认。如果发布者的通道处于确认模式,则发布者将不会收到丢失消息的确认(因为消息尚未写入磁盘)。

局限性

最大交付标签

交付标签是一个 64 位长整型值,因此其最大值为 9223372036854775807。由于交付标签的作用域限定在每个通道,因此在实践中发布者或消费者不太可能超出此值。

© . All rights reserved.