消费者确认和发布者确认 (Publisher Confirms)
概述
本指南涵盖了与数据安全相关的两个特性:消费者确认和发布者确认。
等等。在应用程序中使用消息传递时,消费者和发布者两端的确认机制对于数据安全至关重要。
基础知识
使用 RabbitMQ 这样的消息代理的系统,按照定义来说是分布式的。由于发送的协议方法(消息)不保证能够到达对端或被其成功处理,因此发布者和消费者都需要一种机制来确认投递和处理。RabbitMQ 支持的几种消息协议提供了这些功能。本指南介绍的是 AMQP 0-9-1 中的功能,但对于其他支持的协议,其原理基本相同。
从消费者到 RabbitMQ 的投递处理确认在消息传递协议中被称为确认(acknowledgements),而从消息代理到发布者的确认是协议扩展,称为发布者确认(publisher confirms)。这两种功能都基于相同的理念,并受到 TCP 的启发。
它们对于从发布者到 RabbitMQ 节点以及从 RabbitMQ 节点到消费者的可靠投递至关重要。换句话说,它们是**数据安全的关键**,应用程序和 RabbitMQ 节点在这个方面同样负有责任。
发布者确认是否与消费者投递确认相关?
发布者确认和消费者投递确认是非常相似的功能,它们在不同的上下文中解决了类似的问题。
- 顾名思义,消费者确认涵盖了 RabbitMQ 与消费者的通信。
- 发布者确认涵盖了发布者与 RabbitMQ 的通信。
然而,这两种功能完全正交,互不感知。
发布者确认不感知消费者:它们只覆盖发布者与其连接的节点以及队列(或流)领导副本的交互。
消费者确认不感知发布者:它们的目标是向 RabbitMQ 节点确认某个投递已被成功接收和处理,以便已投递的消息可以被标记为将来删除。.
有时,发布和消费应用程序需要通过请求-响应模式进行通信,这需要对端进行显式确认。 RabbitMQ 教程 #6 演示了如何完成此操作的基础知识,而直接回复(Direct Reply-to)提供了一种无需创建响应队列的方式。
然而,本指南不涵盖此类通信,仅提及它是为了与本指南中描述的更具针对性的消息传递协议功能进行对比。
(消费者) 投递确认
当 RabbitMQ 向消费者投递消息时,它需要知道何时认为消息已成功发送。哪种逻辑最合适取决于系统。因此,这主要是一个应用程序的决定。在 AMQP 0-9-1 中,它是在使用 basic.consume 方法注册消费者时,或使用 basic.get 方法按需获取消息时做出的。
如果您更喜欢面向示例和循序渐进的材料,RabbitMQ 教程 #2 也涵盖了消费者确认。
投递标识符:投递标签
在我们继续讨论其他主题之前,解释投递如何被识别(以及确认如何指示其各自的投递)非常重要。当注册了一个消费者(订阅)时,RabbitMQ 将使用 basic.deliver 方法进行投递(推送)消息。该方法带有一个 delivery tag,它在通道上唯一标识了该投递。因此,投递标签是按通道范围限定的。
投递标签是单调递增的正整数,并且客户端库会以这种方式呈现。确认投递的客户端库方法将投递标签作为参数。
由于投递标签是按通道范围限定的,因此必须在接收消息的同一通道上确认投递。在不同的通道上确认将导致“未知的投递标签”协议异常并关闭该通道。
消费者确认模式和数据安全注意事项
当节点向消费者投递消息时,它必须决定消息是否被视为已被消费者处理(或至少接收)。由于多种因素(客户端连接、消费者应用程序等)可能失败,此决定关系到数据安全。消息传递协议通常提供一种确认机制,允许消费者向其连接的节点确认投递。是否使用该机制是在消费者订阅时决定的。
根据使用的确认模式,RabbitMQ 可以立即认为消息已成功投递(写入 TCP 套接字),或者在收到显式(“手动”)客户端确认后才认为已成功投递。手动发送的确认可以是肯定的或否定的,并使用以下协议方法之一:
basic.ack用于肯定确认。basic.nack用于否定确认(注意:这是RabbitMQ 对 AMQP 0-9-1 的扩展)。basic.reject用于否定确认,但与basic.nack相比有一个限制。
这些方法在客户端库 API 中如何暴露将在下面讨论。
肯定确认只是指示 RabbitMQ 将消息记录为已投递,并且可以丢弃。使用 basic.reject 的否定确认具有相同的效果。主要区别在于语义:肯定确认假定消息已成功处理,而否定确认则表明投递未被处理但仍应被删除。
在自动确认模式下,消息在发送后(写入 TCP 套接字)立即被视为成功投递。此模式牺牲了更高的吞吐量(只要消费者能够跟上)来换取较低的投递和消费者处理安全性。此模式通常被称为“fire-and-forget”。与手动确认模式不同,如果消费者的 TCP 连接或通道在成功投递之前关闭,服务器发送的消息将丢失。因此,自动消息确认**应被认为是不安全的**,不适用于所有工作负载。
使用自动确认模式时还需要考虑的另一件事是消费者过载。手动确认模式通常与有界通道预取一起使用,该预取限制了通道上未处理(“进行中”)投递的数量。然而,在自动确认模式下,根据定义没有这样的限制。因此,消费者可能会被投递速率淹没,可能在内存中积累待处理消息并耗尽堆空间,或者被操作系统终止进程。一些客户端库会应用 TCP 背压(在未处理投递的积压量低于某个限制之前停止从套接字读取)。因此,自动确认模式仅推荐用于能够高效且稳定地处理投递的消费者。
肯定确认投递
用于投递确认的 API 方法通常在客户端库中作为通道上的操作暴露。Java 客户端用户将使用 Channel#basicAck 和 Channel#basicNack 来分别执行 basic.ack 和 basic.nack。这是一个演示肯定确认的 Java 客户端示例:
// this example assumes an existing channel instance
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
long deliveryTag = envelope.getDeliveryTag();
// positively acknowledge a single delivery, the message will
// be discarded
channel.basicAck(deliveryTag, false);
}
});
在 .NET 客户端中,这些方法分别是 IModel#BasicAck 和 IModel#BasicNack。这是一个使用该客户端演示肯定确认的示例:
// this example assumes an existing channel (IModel) instance
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
{
var body = ea.Body.ToArray();
// positively acknowledge a single delivery, the message will
// be discarded
channel.BasicAck(ea.DeliveryTag, false);
};
String consumerTag = channel.BasicConsume(queueName, false, consumer);
一次性确认多条投递
手动确认可以批量处理以减少网络流量。这是通过将确认方法(如上所述)的 multiple 字段设置为 true 来实现的。请注意,basic.reject 从历史上没有该字段,这就是 RabbitMQ 引入 basic.nack 作为协议扩展的原因。
当 multiple 字段设置为 true 时,RabbitMQ 将确认到当前指定的投递标签为止的所有未确认的投递标签。与其他与确认相关的所有内容一样,这也是按通道范围限定的。例如,假设通道 Ch 上有未确认的投递标签 5、6、7 和 8,当在该通道上收到一个确认帧,其中 delivery_tag 设置为 8 且 multiple 设置为 true 时,将确认 5 到 8 的所有标签。如果 multiple 设置为 false,则投递 5、6 和 7 仍将未确认。
要使用 RabbitMQ Java 客户端一次性确认多条投递,请将 Channel#basicAck 的 multiple 参数设置为 true。
// this example assumes an existing channel instance
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
long deliveryTag = envelope.getDeliveryTag();
// positively acknowledge all deliveries up to
// this delivery tag
channel.basicAck(deliveryTag, true);
}
});
.NET 客户端的想法也基本相同。
// this example assumes an existing channel (IModel) instance
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
{
var body = ea.Body.ToArray();
// positively acknowledge all deliveries up to
// this delivery tag
channel.BasicAck(ea.DeliveryTag, true);
};
String consumerTag = channel.BasicConsume(queueName, false, consumer);
否定确认和重新排队投递
有时消费者无法立即处理投递,但其他实例可能可以。在这种情况下,可能希望将其重新排队,让另一个消费者接收并处理它。basic.reject 和 basic.nack 是用于此的两种协议方法。
这些方法通常用于否定确认投递。此类投递可以被丢弃、发送到死信队列或由代理重新排队。此行为由 requeue 字段控制。当字段设置为 true 时,代理将使用指定的投递标签重新排队该投递(或多条投递,稍后将详细说明)。或者,当此字段设置为 false 时,消息将被路由到死信交换(Dead Letter Exchange)(如果已配置),否则将被丢弃。
这两种方法通常在客户端库中作为通道上的操作暴露。Java 客户端用户将分别使用 Channel#basicReject 和 Channel#basicNack 来执行 basic.reject 和 basic.nack。
// this example assumes an existing channel instance
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
long deliveryTag = envelope.getDeliveryTag();
// negatively acknowledge, the message will
// be discarded
channel.basicReject(deliveryTag, false);
}
});
// this example assumes an existing channel instance
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
long deliveryTag = envelope.getDeliveryTag();
// requeue the delivery
channel.basicReject(deliveryTag, true);
}
});
在 .NET 客户端中,这些方法分别是 IModel#BasicReject 和 IModel#BasicNack。
// this example assumes an existing channel (IModel) instance
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
{
var body = ea.Body.ToArray();
// negatively acknowledge, the message will
// be discarded
channel.BasicReject(ea.DeliveryTag, false);
};
String consumerTag = channel.BasicConsume(queueName, false, consumer);
// this example assumes an existing channel (IModel) instance
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
{
var body = ea.Body.ToArray();
// requeue the delivery
channel.BasicReject(ea.DeliveryTag, true);
};
String consumerTag = channel.BasicConsume(queueName, false, consumer);
当消息被重新排队时,如果可能,它将被放回队列中的原始位置。如果不能(由于多个消费者共享一个队列时,其他消费者的并发投递和确认),消息将被重新排队到更靠近队列头部的位置。
重新排队的消息可能已准备好立即重新投递,具体取决于它们在队列中的位置以及通道使用的预取值(对于有活动消费者的通道)。这意味着,如果所有消费者都因为瞬态原因无法处理投递而重新排队,它们将创建一个重新排队/重新投递循环。此类循环可能在网络带宽和 CPU 资源方面成本高昂。消费者实现可以跟踪重新投递次数,然后拒绝消息(丢弃)或在延迟后安排重新排队。
可以使用 basic.nack 方法一次性拒绝或重新排队多条消息。这正是它与 basic.reject 的区别之处。它接受一个额外的参数 multiple。这是一个 Java 客户端示例:
// this example assumes an existing channel instance
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
long deliveryTag = envelope.getDeliveryTag();
// requeue all unacknowledged deliveries up to
// this delivery tag
channel.basicNack(deliveryTag, true, true);
}
});
.NET 客户端的工作方式非常相似。
// this example assumes an existing channel (IModel) instance
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
{
var body = ea.Body.ToArray();
// requeue all unacknowledged deliveries up to
// this delivery tag
channel.BasicNack(ea.DeliveryTag, true, true);
};
String consumerTag = channel.BasicConsume(queueName, false, consumer);
通道预取设置(QoS)
消息是异步交付(发送)到客户端的,在任何给定时刻,通道上可能有多条消息“在途”。客户端的手动确认本质上也是异步的,但方向相反。
这意味着存在一个未确认投递的滑动窗口。
对于大多数消费者来说,限制此窗口的大小是有意义的,以避免在消费者端出现无界缓冲区(堆)增长的问题。这是通过使用 basic.qos 方法设置“预取计数”值来完成的。该值定义了在通道上允许的未确认投递的最大数量。当数量达到配置的计数时,RabbitMQ 将停止在该通道上投递更多消息,直到至少一条已发送的消息被确认。
值为 0 表示“无限制”,允许任意数量的未确认消息。
例如,假设通道 Ch 上有未确认的投递标签 5、6、7 和 8,并且通道 Ch 的预取计数设置为 4,那么除非至少一条未确认的投递被确认,否则 RabbitMQ 不会在 Ch 上推送更多投递。
当在该通道上收到一个确认帧,其中 delivery_tag 设置为 5(或 6、7、8)时,RabbitMQ 将注意到并投递一条更多消息。一次性确认多条消息将使多条消息可用于投递。
值得重申的是,投递和手动客户端确认的流程完全是异步的。因此,如果预取值在已有消息在途时被更改,就会出现自然的竞态条件,并且通道上可能会暂时出现超过预取计数数量的未确认消息。
“无限制”预取
在 AMQP 0-9-1 协议中,值为 0 表示“无限制”,允许在通道上任意数量的未确认消息。
消息源,例如队列和流,仍然可以引入有限的限制,使得通道上的“无限制预取”在没有来自消费者的投递确认的情况下服从某个限制。
例如,仲裁队列(quorum queues)将消费者预取上限限制为 2,000,以限制潜在的 Raft 日志增长失控,类似于仲裁队列对消费者强制执行投递确认超时。
正如消费者确认模式、预取和吞吐量下面解释的,2,000 的预取值在消费者吞吐量方面与 300 的预取值没有实质性区别。
在吞吐量是最高优先级的场景中,请使用流和分区流配合 RabbitMQ 流协议客户端,而不是增加通道预取。
按通道、按消费者和全局预取
QoS 设置可以针对特定通道或特定消费者进行配置。消费者预取指南解释了这种范围界定的影响。
预取和轮询消费者
QoS 预取设置对使用 basic.get(“拉取 API”)获取的消息没有影响,即使在手动确认模式下也是如此。
消费者确认模式、预取和吞吐量
确认模式和 QoS 预取值对消费者吞吐量有显著影响。一般来说,增加预取会提高消息投递到消费者的速率。自动确认模式可以实现最佳的投递速率。但是,在这两种情况下,已投递但尚未处理的消息数量也会增加,从而增加消费者的 RAM 消耗。
自动确认模式或无限预取的上手动确认模式应谨慎使用。大量消耗消息而不确认的消费者会导致其连接节点的内存消耗增长。找到合适的预取值需要反复试验,并且因工作负载而异。100 到 300 的值通常能提供最佳吞吐量,并且不会显著冒着压垮消费者的风险。更高的值通常会遇到收益递减法则。
预取值为 1 是最保守的。它将显著降低吞吐量,尤其是在消费者连接延迟较高的环境中。对于许多应用程序来说,更高的值将是适当和最优的。
当消费者失败或失去连接时:自动重新排队
当使用手动确认时,任何未确认的投递(消息)将在发生投递的通道(或连接)关闭时自动重新排队。这包括客户端的 TCP 连接丢失、消费者应用程序(进程)故障以及通道级协议异常(下面会介绍)。
请注意,检测不可用客户端需要一段时间。
由于这种行为,消费者必须准备好处理重投递,并且在实现时应考虑幂等性。重投递将有一个特殊的布尔属性 redeliver,由 RabbitMQ 设置为 true。对于首次投递,它将设置为 false。请注意,消费者可能会收到之前投递给另一个消费者的消息。
客户端错误:重复确认和未知标签
如果客户端多次确认同一个投递标签,RabbitMQ 将会产生一个通道错误,例如 PRECONDITION_FAILED - unknown delivery tag 100。如果使用未知的投递标签,也会抛出相同的通道异常。
在另一种情况下,代理会因为“未知的投递标签”而报错,那就是在与接收到投递的通道不同的通道上尝试进行确认(无论是肯定的还是否定的)。投递必须在同一通道上确认。
发布者确认
网络可能以不易察觉的方式出现故障,并且检测某些故障需要时间。因此,一个写入了协议帧或一组帧(例如,已发布的的消息)到其套接字中的客户端不能假定消息已到达服务器并被成功处理。它可能在途中丢失,或者其投递可能会被严重延迟。
使用标准的 AMQP 0-9-1,保证消息不丢失的唯一方法是使用事务——使通道事务化,然后对每条消息或一组消息进行发布、提交。在这种情况下,事务的开销过大,并将吞吐量降低了 250 倍。为了解决这个问题,引入了一种确认机制。它模仿了协议中已有的消费者确认机制。
要启用确认,客户端需要发送 confirm.select 方法。根据是否设置了 no-wait,代理可能会响应 confirm.select-ok。一旦在通道上使用了 confirm.select 方法,该通道就处于确认模式。事务性通道不能置于确认模式,一旦通道处于确认模式,就不能使其事务化。
一旦通道处于确认模式,代理和客户端都会对消息进行计数(计数从第一次 confirm.select 开始,从 1 开始)。然后,代理在处理消息时会通过在同一通道上发送 basic.ack 来确认消息。delivery-tag 字段包含已确认消息的序列号。代理还可以在 basic.ack 中设置 multiple 字段,以指示包括具有序列号的消息在内的所有消息都已被处理。
发布的消息的否定确认
在代理无法成功处理消息的特殊情况下,代理将发送 basic.nack 而不是 basic.ack。在此上下文中,basic.nack 的字段与 basic.ack 中相应的字段具有相同的含义,并且 requeue 字段应被忽略。通过 nack 一条或多条消息,代理表明它无法处理这些消息并拒绝承担责任;此时,客户端可以选择重新发布这些消息。
将通道置于确认模式后,所有后续发布的消息都将得到一次确认或 nack。不保证消息何时被确认。消息不会同时被确认和 nack。
basic.nack 仅在负责队列的 Erlang 进程中发生内部错误时才会发送。
何时会收到代理对已发布消息的确认?
对于无法路由的消息,一旦交换器确认消息不会路由到任何队列(返回一个空队列列表),代理将发出确认。如果消息也以 mandatory 标志发布,则在 basic.ack 之前会将 basic.return 发送到客户端。对于否定确认(basic.nack)也是如此。
对于可以路由的消息,当消息被所有队列接受后,将发送 basic.ack。对于路由到持久化队列的持久化消息,这意味着**持久化到磁盘**。对于仲裁队列,这意味着仲裁副本已接受消息并已向选定的领导者确认。
持久化消息的确认延迟
对于路由到持久化队列的持久化消息,将在消息持久化到磁盘后发送 basic.ack。RabbitMQ 消息存储会以批次形式在一定间隔(几百毫秒)后将消息持久化到磁盘,以最小化 fsync(2) 调用次数,或者在队列空闲时进行持久化。
这意味着在恒定负载下,basic.ack 的延迟可能达到几百毫秒。为了提高吞吐量,强烈建议应用程序异步处理确认(作为流)或发布消息批次并等待未完成的确认。具体的 API 因客户端库而异。
发布者确认的顺序考虑
在大多数情况下,RabbitMQ 会以与发布者发布消息相同的顺序确认消息(这适用于在单个通道上发布的消息)。然而,发布者确认是异步发出的,并且可以确认单个消息或一组消息。确认发出的确切时间取决于消息的投递模式(持久化与瞬态)以及消息所路由到的队列的属性(见上文)。也就是说,不同的消息可能在不同时间准备好进行确认。这意味着确认可能会以与相应消息不同的顺序到达。如果可能,应用程序不应依赖于确认的顺序。
发布者确认和保证投递
RabbitMQ 节点在将持久化消息写入磁盘之前可能会丢失它们。例如,考虑以下场景:
- 客户端将一条持久化消息发布到一个持久化队列。
- 客户端从队列中消费该消息(请注意,该消息是持久化的,队列是持久化的,但确认未激活),
- 代理节点失败并重启,并且
- 客户端重新连接并开始消费消息
此时,客户端可以合理地假设该消息将被再次投递。但事实并非如此:重启导致代理丢失了消息。为了保证持久性,客户端应使用确认。如果发布者的通道处于确认模式,那么发布者将**不会**收到丢失消息的 ack(因为消息尚未写入磁盘)。
限制
最大投递标签
投递标签是一个 64 位长值,因此其最大值为 9223372036854775807。由于投递标签是按通道范围限定的,因此发布者或消费者在实践中极不可能超出此值。