跳至主要内容
版本:3.13

消费者确认和发布者确认

概述

本指南介绍了与数据安全相关的两个功能:消费者确认和发布者确认

等等。消费者和发布者端的确认对于使用消息传递的应用程序的数据安全至关重要。

更多相关主题在发布者消费者指南中介绍。

基础知识

使用 RabbitMQ 等消息代理的系统本质上是分布式的。由于发送的协议方法(消息)不能保证到达对等方或被其成功处理,因此发布者和消费者都需要一种机制来确认传递和处理。RabbitMQ 支持的几种消息传递协议提供了此类功能。本指南介绍了 AMQP 0-9-1 中的功能,但其他支持的协议中的想法基本相同。

来自消费者到 RabbitMQ 的传递处理确认在消息传递协议中被称为确认;代理对发布者的确认是称为发布者确认的协议扩展。这两个功能都基于相同的想法,并受到 TCP 的启发。

它们对于从发布者到 RabbitMQ 节点以及从 RabbitMQ 节点到消费者的可靠传递至关重要。换句话说,它们对数据安全至关重要,应用程序和 RabbitMQ 节点都为此负责。

发布者确认是否与消费者传递确认相关?

发布者确认消费者传递确认是解决不同上下文中类似问题的非常相似的功能

  1. 消费者确认顾名思义,涵盖 RabbitMQ 与消费者的通信
  2. 发布者确认涵盖发布者与 RabbitMQ 的通信

然而,这两个功能完全正交且相互独立。

发布者确认不知道消费者:它们只涵盖发布者与其连接的节点以及队列(或)领导者副本之间的交互。

消费者确认不知道发布者:它们的目标是向 RabbitMQ 节点确认给定传递已成功接收并成功处理,以便可以标记已传递的消息以供将来删除。

有时发布和消费应用程序需要通过请求和响应进行通信,这些请求和响应需要对等方的明确确认。RabbitMQ 教程 #6演示了如何执行此操作的基础知识,而直接回复到提供了一种无需声明大量短暂的临时响应队列的方法。

然而,本指南并未介绍这种类型的通信,仅提及它与本指南中描述的更集中的消息传递协议功能形成对比。

(消费者)传递确认

当 RabbitMQ 将消息传递给消费者时,它需要知道何时认为消息已成功发送。最佳逻辑类型取决于系统。因此,这主要是一个应用程序决策。在 AMQP 0-9-1 中,它是在使用 basic.consume 方法注册消费者或使用 basic.get 方法按需获取消息时做出的。

如果您更喜欢面向示例和逐步的材料,消费者确认也在RabbitMQ 教程 #2中介绍。

传递标识符:传递标签

在继续讨论其他主题之前,有必要解释如何识别传递(以及确认指示各自的传递)。当注册消费者(订阅)时,RabbitMQ 将使用 basic.deliver 方法传递(推送)消息。该方法带有一个传递标签,它在通道上唯一地标识传递。因此,传递标签按通道进行范围限定。

传递标签是单调递增的正整数,并由客户端库以这种形式呈现。确认传递的客户端库方法将传递标签作为参数。

由于传递标签按通道进行范围限定,因此必须在接收传递的同一个通道上确认传递。在其他通道上确认会导致“未知传递标签”协议异常并关闭通道。

消费者确认模式和数据安全注意事项

当节点将消息传递给消费者时,它必须决定是否应将消息视为消费者已处理(或至少接收)。由于多个事物(客户端连接、消费者应用程序等)可能会失败,因此此决策是一个数据安全问题。消息传递协议通常提供一种确认机制,允许消费者向其连接的节点确认传递。是否使用该机制是在消费者订阅时决定的。

根据使用的确认模式,RabbitMQ 可以认为消息在发送后立即(写入 TCP 套接字)或在收到显式(“手动”)客户端确认时成功传递。手动发送的确认可以是肯定的或否定的,并使用以下协议方法之一

  • basic.ack 用于肯定确认
  • basic.nack 用于否定确认(注意:这是RabbitMQ 对 AMQP 0-9-1 的扩展)
  • basic.reject 用于否定确认,但与 basic.nack 相比有一个限制

将在下面讨论这些方法如何在客户端库 API 中公开。

肯定确认只是指示 RabbitMQ 将消息记录为已传递并可以丢弃。使用 basic.reject 的否定确认具有相同的效果。区别主要在于语义:肯定确认假设消息已成功处理,而否定确认则表示传递未处理,但仍应删除。

在自动确认模式下,消息在发送后立即被视为已成功传递。这种模式以更高的吞吐量(只要消费者能够跟上)为代价,降低了传递和消费者处理的安全级别。这种模式通常被称为“即发即忘”。与手动确认模型不同,如果消费者的 TCP 连接或通道在成功传递之前关闭,则服务器发送的消息将丢失。因此,自动消息确认应被视为不安全,不适合所有工作负载。

在使用自动确认模式时,另一个需要考虑的事情是消费者超载。手动确认模式通常与有界通道预取一起使用,这限制了通道上未完成的(正在进行的)传递的数量。然而,在自动确认的情况下,默认情况下没有这种限制。因此,消费者可能会被传递速率所淹没,可能在内存中累积积压,耗尽堆空间或被操作系统终止其进程。一些客户端库将应用 TCP 反压(停止从套接字读取,直到未处理的传递积压量下降到某个特定限制以下)。因此,自动确认模式仅推荐用于能够有效地以稳定速率处理传递的消费者。

肯定确认传递

用于传递确认的 API 方法通常在客户端库中以通道上的操作形式公开。Java 客户端用户将使用 Channel#basicAckChannel#basicNack 分别执行 basic.ackbasic.nack。以下是一个演示肯定确认的 Java 客户端示例

// this example assumes an existing channel instance

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
long deliveryTag = envelope.getDeliveryTag();
// positively acknowledge a single delivery, the message will
// be discarded
channel.basicAck(deliveryTag, false);
}
});

在 .NET 客户端中,这些方法分别为 IModel#BasicAckIModel#BasicNack。以下是一个演示使用该客户端进行肯定确认的示例

// this example assumes an existing channel (IModel) instance

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
{
var body = ea.Body.ToArray();
// positively acknowledge a single delivery, the message will
// be discarded
channel.BasicAck(ea.DeliveryTag, false);
};
String consumerTag = channel.BasicConsume(queueName, false, consumer);

一次确认多个传递

可以对手动确认进行批处理以减少网络流量。这可以通过将确认方法的 multiple 字段(见上文)设置为 true 来实现。请注意,basic.reject 在历史上没有此字段,这就是 RabbitMQ 引入 basic.nack 作为协议扩展的原因。

multiple 字段设置为 true 时,RabbitMQ 将确认所有未确认的传递标签,直到和包括确认中指定的标签。与所有与确认相关的事物一样,这在每个通道上都有范围。例如,假设通道 Ch 上有 5、6、7 和 8 未确认的传递标签,当确认帧到达该通道时,delivery_tag 设置为 8multiple 设置为 true,则从 5 到 8 的所有标签都将被确认。如果 multiple 设置为 false,则 5、6 和 7 的传递将仍然未确认。

要使用 RabbitMQ Java 客户端确认多个传递,请将 Channel#basicAckmultiple 参数传递为 true

// this example assumes an existing channel instance

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
long deliveryTag = envelope.getDeliveryTag();
// positively acknowledge all deliveries up to
// this delivery tag
channel.basicAck(deliveryTag, true);
}
});

.NET 客户端中的想法非常相似

// this example assumes an existing channel (IModel) instance

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
{
var body = ea.Body.ToArray();
// positively acknowledge all deliveries up to
// this delivery tag
channel.BasicAck(ea.DeliveryTag, true);
};
String consumerTag = channel.BasicConsume(queueName, false, consumer);

否定确认和传递的重新入队

有时消费者无法立即处理传递,但其他实例可能能够处理。在这种情况下,可能希望重新入队它并让另一个消费者接收和处理它。basic.rejectbasic.nack 是用于此的两种协议方法。

这些方法通常用于对传递进行否定确认。此类传递可以被代理丢弃或放入死信队列,或者重新排队。此行为由requeue字段控制。当该字段设置为true时,代理将使用指定的传递标签将传递(或多个传递,稍后将解释)重新排队。或者,当该字段设置为false时,如果配置了死信交换机,则消息将被路由到死信交换机,否则将被丢弃。

两种方法通常在客户端库中作为对通道的操作公开。Java 客户端用户将使用Channel#basicRejectChannel#basicNack分别执行basic.rejectbasic.nack

// this example assumes an existing channel instance

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
long deliveryTag = envelope.getDeliveryTag();
// negatively acknowledge, the message will
// be discarded
channel.basicReject(deliveryTag, false);
}
});
// this example assumes an existing channel instance

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
long deliveryTag = envelope.getDeliveryTag();
// requeue the delivery
channel.basicReject(deliveryTag, true);
}
});

在 .NET 客户端中,这些方法分别为IModel#BasicRejectIModel#BasicNack

// this example assumes an existing channel (IModel) instance

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
{
var body = ea.Body.ToArray();
// negatively acknowledge, the message will
// be discarded
channel.BasicReject(ea.DeliveryTag, false);
};
String consumerTag = channel.BasicConsume(queueName, false, consumer);
// this example assumes an existing channel (IModel) instance

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
{
var body = ea.Body.ToArray();
// requeue the delivery
channel.BasicReject(ea.DeliveryTag, true);
};
String consumerTag = channel.BasicConsume(queueName, false, consumer);

当消息重新排队时,它将尽可能地放置回队列中的原始位置。如果无法做到这一点(由于并发传递和来自共享队列的多个消费者的确认),则消息将重新排队到更靠近队列头的某个位置。

重新排队的消息可能立即准备重新传递,具体取决于它们在队列中的位置以及具有活动消费者的通道使用的预取值。这意味着,如果所有消费者都由于瞬态条件而无法处理传递而重新排队,它们将创建一个重新排队/重新传递循环。此类循环在网络带宽和 CPU 资源方面可能非常昂贵。消费者实现可以跟踪重新传递次数,并永远拒绝消息(丢弃它们)或安排在延迟后重新排队。

可以使用basic.nack方法一次拒绝或重新排队多条消息。这就是它与basic.reject的不同之处。它接受一个额外的参数,multiple。以下是一个 Java 客户端示例

// this example assumes an existing channel instance

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
long deliveryTag = envelope.getDeliveryTag();
// requeue all unacknowledged deliveries up to
// this delivery tag
channel.basicNack(deliveryTag, true, true);
}
});

.NET 客户端的工作原理非常相似

// this example assumes an existing channel (IModel) instance

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
{
var body = ea.Body.ToArray();
// requeue all unacknowledged deliveries up to
// this delivery tag
channel.BasicNack(ea.DeliveryTag, true, true);
};
String consumerTag = channel.BasicConsume(queueName, false, consumer);

通道预取设置 (QoS)

消息是异步地传递(发送)给客户端的,在任何给定时刻,通道上可能有多个消息“在空中”。客户端的手动确认本质上也是异步的,但方向相反。

这意味着一个未确认传递的滑动窗口。

对于大多数消费者来说,限制此窗口的大小是有意义的,以避免消费者端出现无界缓冲区(堆)增长问题。这可以通过使用basic.qos方法设置“预取计数”值来完成。该值定义了通道上允许的最大未确认传递数。当数量达到配置的计数时,RabbitMQ 将停止在通道上传递更多消息,直到至少确认了其中一个未确认的消息。

0表示“无限制”,允许任意数量的未确认消息。

例如,假设通道Ch上有四个未确认的传递,传递标签分别为 5、6、7 和 8,并且通道Ch的预取计数设置为 4,那么 RabbitMQ 不会将任何更多传递推送到Ch,除非至少确认了其中一个未确认的传递。

当确认帧到达该通道时,delivery_tag设置为5(或678),RabbitMQ 将会注意到并传递一条更多消息。确认一次确认多条消息将使不止一条消息可用于传递。

值得重复的是,传递和手动客户端确认的流程完全是异步的。因此,如果在已有传递在空中时更改了预取值,就会出现自然的竞争条件,并且通道上可能暂时存在超过预取计数的未确认消息。

每个通道、每个消费者和全局预取

QoS 设置可以为特定通道或特定消费者配置。消费者预取指南解释了此范围的影响。

预取和轮询消费者

QoS 预取设置对使用basic.get(“拉取 API”)获取的消息没有影响,即使在手动确认模式下也是如此。

消费者确认模式、预取和吞吐量

确认模式和 QoS 预取值对消费者吞吐量有重大影响。一般来说,增加预取将提高消息传递给消费者的速率。自动确认模式提供最佳的传递速率。但是,在这两种情况下,已传递但尚未处理的消息数量也将增加,从而增加消费者 RAM 消耗。

自动确认模式或具有无限预取的手动确认模式应谨慎使用。大量消费消息而不进行确认的消费者会导致连接到它们的节点上的内存消耗增长。找到合适的预取值是一个反复试验的过程,并且会因工作负载而异。100 到 300 范围内的值通常提供最佳吞吐量,并且不会有压倒消费者的重大风险。更高的值通常会遇到边际效益递减规律

预取值为 1 是最保守的。它会显著降低吞吐量,尤其是在消费者连接延迟较高的环境中。对于许多应用程序来说,更高的值会更合适且更佳。

当消费者故障或失去连接时:自动重新排队

当使用手动确认时,任何未确认的传递(消息)将在发生传递的通道(或连接)关闭时自动重新排队。这包括客户端的 TCP 连接丢失、消费者应用程序(进程)故障以及通道级协议异常(在下文中讨论)。

请注意,需要一段时间才能检测到不可用的客户端

由于这种行为,消费者必须准备好处理重新传递,并且在实现时要牢记幂等性。重新传递将有一个特殊的布尔属性,redeliver,由 RabbitMQ 设置为true。对于首次传递,它将设置为false。请注意,消费者可以接收以前传递给其他消费者的消息。

客户端错误:双重确认和未知标签

如果客户端对同一个传递标签进行多次确认,RabbitMQ 将导致通道错误,例如PRECONDITION_FAILED - unknown delivery tag 100。如果使用未知传递标签,也会引发相同的通道异常。

在代理抱怨“未知传递标签”的另一种情况下,是在与接收传递的通道不同的通道上尝试进行确认(无论正向还是负向)。必须在同一个通道上确认传递。

发布者确认

网络可能以不明显的方式发生故障,并且检测某些故障需要时间。因此,向其套接字写入协议帧或一组帧(例如已发布的消息)的客户端不能假设该消息已到达服务器并成功处理。它可能在途中丢失,或者其传递可能会被显着延迟。

使用标准 AMQP 0-9-1,保证消息不会丢失的唯一方法是使用事务——使通道可进行事务处理,然后为每个消息或一组消息发布,进行提交。在这种情况下,事务不必要地很繁重,并且会将吞吐量降低 250 倍。为了解决这个问题,引入了确认机制。它模仿了协议中已经存在的消费者确认机制。

为了启用确认,客户端发送confirm.select方法。根据no-wait是否设置,代理可能会响应confirm.select-ok。一旦在通道上使用confirm.select方法,该通道就被认为处于确认模式。事务性通道不能处于确认模式,并且一旦通道处于确认模式,它就不能再成为事务性的。

一旦通道处于确认模式,代理和客户端都会对消息进行计数(在第一个confirm.select上,计数从 1 开始)。然后,代理在处理消息时通过在同一个通道上发送basic.ack来确认消息。delivery-tag字段包含已确认消息的序列号。代理还可以设置basic.ack中的multiple字段,以指示包含该序列号的所有消息都已处理。

发布的否定确认

在代理无法成功处理消息的特殊情况下,代理将发送basic.nack,而不是basic.ack。在此上下文中,basic.nack的字段与basic.ack中的对应字段具有相同的含义,并且应忽略requeue字段。通过对一个或多个消息进行否定确认,代理表明它无法处理这些消息,并且拒绝对其负责;此时,客户端可以选择重新发布这些消息。

通道处于确认模式后,所有随后发布的消息都将被确认或否定确认一次。不保证消息确认的时间。不会对任何消息进行确认和否定确认。

basic.nack仅在负责队列的 Erlang 进程中发生内部错误时才会被传递。

代理何时确认已发布的消息?

对于不可路由消息,一旦交换机验证消息不会路由到任何队列(返回空队列列表),代理将发出确认。如果消息也是以强制模式发布的,则在basic.ack之前将basic.return发送给客户端。否定确认(basic.nack)也是如此。

对于可路由消息,当消息被所有队列接受时,将发送basic.ack。对于路由到持久队列的持久消息,这意味着将其持久化到磁盘。对于仲裁队列,这意味着仲裁副本已接受并向选定的领导者确认了消息。

持久消息的确认延迟

路由到持久队列的持久消息的basic.ack将在将消息持久化到磁盘后发送。RabbitMQ 消息存储会在间隔(几百毫秒)后以批处理方式将消息持久化到磁盘,以最大程度地减少 fsync(2) 调用的次数,或者在队列空闲时进行持久化。

这意味着在持续负载下,basic.ack的延迟可能达到几百毫秒。为了提高吞吐量,强烈建议应用程序异步处理确认(作为流)或发布消息批次并等待未完成的确认。此操作的确切 API 因客户端库而异。

发布者确认的排序注意事项

在大多数情况下,RabbitMQ 会以发布者发布消息的相同顺序向发布者确认消息(这适用于在单个通道上发布的消息)。但是,发布者确认是异步发出的,可以确认单个消息或一组消息。确认发出确切的时间取决于消息的传递模式(持久性与瞬态)以及消息路由到的队列的属性(见上文)。也就是说,不同的消息可以在不同的时间被视为准备确认。这意味着确认可能以与各自消息不同的顺序到达。应用程序不应在可能的情况下依赖确认顺序。

发布者确认和保证交付

如果 RabbitMQ 节点在将持久性消息写入磁盘之前发生故障,则可能会丢失持久性消息。例如,考虑以下场景

  1. 客户端将持久性消息发布到持久性队列
  2. 客户端从队列中消费消息(注意消息是持久性的,队列是持久的),但确认未启用
  3. 代理节点发生故障并重新启动,并且
  4. 客户端重新连接并开始消费消息

此时,客户端可以合理地假设该消息将再次被传递。事实并非如此:重启导致代理丢失了该消息。为了保证持久性,客户端应该使用确认。如果发布者的通道处于确认模式,则发布者将不会为丢失的消息收到确认(因为该消息尚未写入磁盘)。

限制

最大交付标签

交付标签是一个 64 位长值,因此其最大值为 9223372036854775807。由于交付标签是针对每个通道进行范围限定的,因此在实践中发布者或消费者不太可能超过此值。