跳到主要内容

至少一次死信

·阅读时长 20 分钟

RabbitMQ 3.10 中的 Quorum 队列提供了一种更安全的死信形式,它使用至少一次的保证来实现队列之间消息的传输。这篇博文解释了开始使用至少一次死信所需了解的一切。

这篇文章还介绍了 RabbitMQ 3.10 的另外两个特性:Quorum 队列的消息生存时间 (TTL) 和死信消息的 Prometheus 指标。

概述

存储在 RabbitMQ 队列中的某些消息会过期或被消费者否定确认。RabbitMQ 可以配置为“死信”它们,而不是默默地丢弃它们,即将这些消息重新发布到专用交换机。

在 RabbitMQ 3.10 之前,死信一直不安全。从队列(“源队列”)中死信的消息不能保证被传递到由 dead-letter-exchange 策略中配置的交换机(以下称为“目标队列”)路由到的队列。

这是因为消息在内部死信时没有启用发布者确认。我们称之为“最多一次”死信策略。死信消息可能会到达目标队列。它们也可能由于各种原因而丢失

  • 目标队列不可用。例如,经典队列的主机节点已关闭或正在升级,或者 Quorum 队列暂时失去了其大多数节点。
  • 目标队列的长度限制已达到,而其溢出行为设置为 reject-publish,拒绝任何传入消息。
  • 网络分区阻止了源队列和目标队列之间的通信。
  • 死信路由拓扑配置不正确。例如,配置的 dead-letter-exchange 不存在,或者没有目标队列绑定到 dead-letter-exchange

RabbitMQ 3.10 引入了一个名为“至少一次”死信的新功能。它是源队列作为 Quorum 队列的 可选功能。这项新功能确保在源 Quorum 队列中死信的所有消息最终都会到达目标队列(经典队列、Quorum 队列或流),即使在上述场景中,消息在使用“最多一次”策略时会丢失。

这篇博文介绍了如何启用至少一次死信的说明,提供了一个详细的示例,并描述了这项新功能的注意事项和最佳实践。

用法

要为源 Quorum 队列启用 at-least-once 死信,我们需要应用以下策略(或它们等效的以 x- 开头的队列参数)

  • dead-letter-strategy 设置为 at-least-once。默认值为 at-most-once
  • overflow 设置为 reject-publish。默认值为 drop-head
  • dead-letter-exchange 已配置。

此外,必须启用功能标志 stream_queue。默认情况下,自 3.9 以来创建的 RabbitMQ 集群已启用该功能标志。即使在至少一次死信中未使用流(除非目标队列恰好是流),也需要 stream_queue 功能标志,因为至少一次死信依赖于该功能标志带来的一些实现细节。

示例

以下示例需要安装 kubectl 客户端并指向任何正在运行的 Kubernetes 集群 v1.19 或更高版本。

如果您没有可用的 Kubernetes 集群,最快的方法是安装 kind 以在 Docker 中启动本地 Kubernetes 集群

> kind create cluster

安装 rabbitmq/cluster-operator

> kubectl apply -f https://github.com/rabbitmq/cluster-operator/releases/latest/download/cluster-operator.yml

部署一个 3 节点的 RabbitMQ 集群

> cat <<EOF | kubectl apply -f -
---
apiVersion: rabbitmq.com/v1beta1
kind: RabbitmqCluster
metadata:
name: my-rabbit
spec:
replicas: 3
image: rabbitmq:3.10.0-management
EOF

一旦所有 3 个 pod 都准备就绪(不到 1 分钟),我们创建一个源队列和一个目标队列

> kubectl exec my-rabbit-server-0 -c rabbitmq -- rabbitmqadmin declare queue name=my-source-queue \
durable=true queue_type=quorum arguments='{"x-dead-letter-exchange" : "",
"x-dead-letter-routing-key" : "my-target-queue" , "x-overflow" : "reject-publish"}'

> kubectl exec my-rabbit-server-0 -c rabbitmq -- rabbitmqadmin declare queue name=my-target-queue \
durable=true queue_type=classic

最后两个命令通过在 pod my-rabbit-server-0 的 RabbitMQ 容器中执行 rabbitmqadmin 命令来声明一个队列。

rabbitmqadmin 命令是一个 Python 脚本,它与 RabbitMQ 管理 API 对话。rabbitmqadmin 命令不是声明队列和发送消息的推荐方式。我们在这篇博文中使用了它,因为它对于您来说是遵循示例的最简单方法。

rabbitmq/cluster-operator 创建 pod 名称的格式为 <rabbitmq-cluster-name>-server-<index>。在上面的 YAML 中,我们将 <rabbitmq-cluster-name> 定义为 my-rabbit

第一个命令创建源队列。为了使至少一次死信工作,它必须是 queue_type=quorum。对于源队列,我们定义了进一步的队列参数(以 x- 开头),以 JSON 格式编码

  • x-dead-letter-exchange 设置为空字符串 ("") 表示源队列死信的消息发布到默认交换机。(虽然我们可以创建一个新的死信交换机,但死信到默认交换机使此示例更简单。)
  • x-dead-letter-routing-key 设置为 my-target-queue 表示死信消息将以路由密钥 my-target-queue 发布。由于此路由密钥与目标队列的队列名称(由第二个命令创建)匹配,因此死信消息将由默认交换机路由到目标队列,而无需创建任何进一步的绑定。
  • 如上所述,x-overflow 必须设置为 reject-publish,作为至少一次死信的先决条件。

第二个命令创建目标队列。它可以是任何队列类型。在本示例中,我们选择了一个经典队列。请注意,与在所有 3 个节点上具有 3 个副本的源 Quorum 队列相比,目标经典队列不是高可用的,并且驻留在单个节点上。

让我们发布我们的第一条消息 msg1

> kubectl exec my-rabbit-server-0 -c rabbitmq -- rabbitmqadmin publish exchange=amq.default routing_key=my-source-queue \
payload=msg1 properties='{"expiration" : "1000", "delivery_mode" : 2}'

此命令演示了 RabbitMQ 3.10 的另一个新功能:Quorum 队列支持消息 TTL。下图说明了消息如何流动

  1. 我们将一条消息发布到默认交换机。
  2. 它被路由到源 Quorum 队列,并在 1 秒(1000 毫秒)后过期。
  3. 过期导致消息被死信到默认交换机。
  4. 它被路由到目标经典队列。

Figure 1: Dead letter routing topology (at-most-once)
图 1:死信路由拓扑(最多一次)

请注意,我们将 delivery_mode 设置为整数 2,表示消息是持久的。当最初将消息发布到源 Quorum 队列时,该标志并不重要,因为 Quorum 队列中的所有消息都会写入磁盘。但是,一旦消息被死信到目标队列(可能不是 Quorum 队列),持久性标志就变得重要。

我们可以验证消息是否已到达目标队列

> kubectl exec my-rabbit-server-0 -c rabbitmq -- rabbitmqctl list_queues --formatter=pretty_table --quiet \
name type messages messages_ready messages_unacknowledged

┌─────────────────┬─────────┬──────────┬────────────────┬─────────────────────────┐
│ name │ type │ messages │ messages_ready │ messages_unacknowledged │
├─────────────────┼─────────┼──────────┼────────────────┼─────────────────────────┤
│ my-target-queue │ classic │ 1 │ 1 │ 0 │
├─────────────────┼─────────┼──────────┼────────────────┼─────────────────────────┤
│ my-source-queue │ quorum │ 0 │ 0 │ 0 │
└─────────────────┴─────────┴──────────┴────────────────┴─────────────────────────┘

接下来,让我们尝试一下当目标队列变得不可用时会发生什么。确定目标经典队列的主机节点的一种方法是列出队列的进程标识符 (PID)

> kubectl exec my-rabbit-server-0 -c rabbitmq -- rabbitmqctl list_queues --quiet name pid

name pid
my-target-queue <rabbit@my-rabbit-server-0.my-rabbit-nodes.default.1646297039.856.0>
my-source-queue <rabbit@my-rabbit-server-0.my-rabbit-nodes.default.1646297039.821.0>

PID 显示目标经典队列进程和源 Quorum 队列领导者进程都驻留在 pod my-rabbit-server-0 中。让我们停止该 RabbitMQ 服务器

> kubectl exec my-rabbit-server-0 -c rabbitmq -- rabbitmqctl stop_app

Stopping rabbit application on node rabbit@my-rabbit-server-0.my-rabbit-nodes.default ...

源 Quorum 队列仍然可用,因为大多数节点(3 个节点中的 2 个)可用,并且另一个节点成为新的领导者。

和以前一样,我们再次向源队列发送一条消息,并使其在 1 秒后过期。由于 pod my-rabbit-server-0 中的 RabbitMQ 节点已关闭,我们在 my-rabbit-server-1 中执行以下命令

> kubectl exec my-rabbit-server-1 -c rabbitmq -- rabbitmqadmin publish exchange=amq.default routing_key=my-source-queue \
payload=msg2 properties='{"expiration" : "1000", "delivery_mode" : 2}'

由于目标队列已关闭,它不会报告其统计信息

>  kubectl exec my-rabbit-server-1 -c rabbitmq -- rabbitmqctl list_queues --formatter=pretty_table --quiet \
name type messages messages_ready messages_unacknowledged state

┌─────────────────┬──────────────────────┬──────────┬────────────────┬─────────────────────────┬─────────┐
│ name │ type │ messages │ messages_ready │ messages_unacknowledged │ state │
├─────────────────┼──────────────────────┼──────────┼────────────────┼─────────────────────────┼─────────┤
│ my-target-queue │ rabbit_classic_queue │ │ │ │ down │
├─────────────────┼──────────────────────┼──────────┼────────────────┼─────────────────────────┼─────────┤
│ my-source-queue │ quorum │ 0 │ 0 │ 0 │ running │
└─────────────────┴──────────────────────┴──────────┴────────────────┴─────────────────────────┴─────────┘

但是,由于目标队列已关闭,并且源队列不包含任何消息,我们知道第二条消息在死信时丢失了!

由于我们在上面声明源队列时还没有将 dead-letter-strategy 定义为 at-least-once,因此源队列使用默认策略 at-most-once。我们可以做得更好。在本示例中,我们通过应用策略将死信策略动态切换为 at-least-once

> kubectl exec my-rabbit-server-1 -c rabbitmq -- rabbitmqctl set_policy --apply-to queues \
my-policy my-source-queue '{"dead-letter-strategy" : "at-least-once"}'

Setting policy "my-policy" for pattern "my-source-queue" to "{"dead-letter-strategy" : "at-least-once"}"
with priority "0" for vhost "/" ...

让我们发送第三条消息

> kubectl exec my-rabbit-server-1 -c rabbitmq -- rabbitmqadmin publish exchange=amq.default routing_key=my-source-queue \
payload=msg3 properties='{"expiration" : "1000", "delivery_mode" : 2}'

使用新的 at-least-once 策略,当第三条消息过期并死信时,它将被源队列存储,因为目标队列不可用

> kubectl exec my-rabbit-server-1 -c rabbitmq -- rabbitmqctl list_queues --formatter=pretty_table --quiet \
name type messages messages_ready messages_unacknowledged state

┌─────────────────┬──────────────────────┬──────────┬────────────────┬─────────────────────────┬─────────┐
│ name │ type │ messages │ messages_ready │ messages_unacknowledged │ state │
├─────────────────┼──────────────────────┼──────────┼────────────────┼─────────────────────────┼─────────┤
│ my-target-queue │ rabbit_classic_queue │ │ │ │ down │
├─────────────────┼──────────────────────┼──────────┼────────────────┼─────────────────────────┼─────────┤
│ my-source-queue │ quorum │ 1 │ 0 │ 0 │ running │
└─────────────────┴──────────────────────┴──────────┴────────────────┴─────────────────────────┴─────────┘

该消息既不是“就绪”(即可供普通队列消费者使用),也不是“未确认”(即已被普通队列消费者消费但尚未确认)。但是,该消息安全地保存在源 Quorum 队列中,在一个单独的数据结构中,该数据结构仅可供特殊的 RabbitMQ 内部死信消费者进程使用。

让我们输出该死信消费者进程的日志。死信消费者进程与 Quorum 队列领导者节点位于同一位置。我们首先需要找出哪个节点成为新的领导者

> kubectl exec my-rabbit-server-1 -c rabbitmq -- rabbitmqctl list_queues --formatter=pretty_table --quiet name leader
┌─────────────────┬───────────────────────────────────────────────────┐
│ name │ leader │
├─────────────────┼───────────────────────────────────────────────────┤
│ my-target-queue │ │
├─────────────────┼───────────────────────────────────────────────────┤
│ my-source-queue │ rabbit@my-rabbit-server-1.my-rabbit-nodes.default │
└─────────────────┴───────────────────────────────────────────────────┘

在我们的示例中,新的领导者恰好在 pod my-rabbit-server-1 上。当您运行此示例时,它也可能是 my-rabbit-server-2,在这种情况下,您需要在以下命令中将 1 替换为 2

日志显示了一条描述性的警告消息

> kubectl logs my-rabbit-server-1 -c rabbitmq | grep dead-letter

[warn] <0.4156.0> Cannot forward any dead-letter messages from source quorum queue 'my-source-queue'
in vhost '/' with configured dead-letter-exchange exchange '' in vhost '/' and configured
dead-letter-routing-key 'my-target-queue'. This can happen either if the dead-letter routing topology is misconfigured
(for example no queue bound to dead-letter-exchange or wrong dead-letter-routing-key configured)
or if non-mirrored classic queues are bound whose host node is down.
Fix this issue to prevent dead-lettered messages from piling up in the source quorum queue.
This message will not be logged again.

我们通过重启目标经典队列的主机节点来解决此问题

> kubectl exec my-rabbit-server-0 -c rabbitmq -- rabbitmqctl start_app

Starting node rabbit@my-rabbit-server-0.my-rabbit-nodes.default ...

内部死信消费者进程会定期重试发送消息。当前的默认重试间隔为 3 分钟。在不迟于 3 分钟后,第三条消息应该已到达目标队列

> kubectl exec my-rabbit-server-0 -c rabbitmq -- rabbitmqctl list_queues --formatter=pretty_table --quiet \
name type messages messages_ready messages_unacknowledged state

┌─────────────────┬─────────┬──────────┬────────────────┬─────────────────────────┬─────────┐
│ name │ type │ messages │ messages_ready │ messages_unacknowledged │ state │
├─────────────────┼─────────┼──────────┼────────────────┼─────────────────────────┼─────────┤
│ my-source-queue │ quorum │ 0 │ 0 │ 0 │ running │
├─────────────────┼─────────┼──────────┼────────────────┼─────────────────────────┼─────────┤
│ my-target-queue │ classic │ 2 │ 2 │ 0 │ running │
└─────────────────┴─────────┴──────────┴────────────────┴─────────────────────────┴─────────┘

我们的理解是,第一条和第三条消息在目标队列中,但第二条消息丢失了,因为它在使用 at-most-once 死信时目标队列已关闭

> kubectl exec my-rabbit-server-0 -c rabbitmq -- rabbitmqadmin get queue=my-target-queue count=2
+-----------------+----------+---------------+---------+---------------+------------------+-------------+
| routing_key | exchange | message_count | payload | payload_bytes | payload_encoding | redelivered |
+-----------------+----------+---------------+---------+---------------+------------------+-------------+
| my-target-queue | | 1 | msg1 | 4 | string | False |
| my-target-queue | | 0 | msg3 | 4 | string | False |
+-----------------+----------+---------------+---------+---------------+------------------+-------------+

payload 列验证了我们的理解是正确的,并且 at-least-once 死信按预期工作。即使目标队列不可用,死信消息在再次可用后也到达了目标队列。第一条消息仍然存储在目标队列中,因为我们发布到源 Quorum 队列时设置了持久性标志。如果我们没有设置持久性标志,第一条消息也会丢失。

下图总结了第三条消息的流程。

Figure 2: Dead letter routing topology (at-least-once)
图 2:死信路由拓扑(至少一次)

  1. 消息发布到默认交换机。
  2. 消息被路由到源 Quorum 队列。Quorum 队列是 Raft 共识算法中的复制状态机。Quorum 队列的状态不仅仅包含一个队列数据结构,发布者的消息在其中排队:状态还包括关于发布者、消费者以及发送给消费者(但尚未被消费者确认)的消息以及一些其他统计数据的数据。At-least-once 死信为 Quorum 队列的状态添加了另一个队列数据结构:一个仅包含死信消息的队列。因此,当消息在 1 秒后过期时,它会从“正常”消息队列移动到死信消息队列。消息安全地存储在那里,直到它被步骤 7 确认。
  3. 在 Quorum 队列领导者的节点上,有一个(RabbitMQ 内部)死信消费者进程与它位于同一位置。它的工作是从单个源 Quorum 队列的死信消息队列中消费消息,将它们转发到所有目标队列,等待直到收到所有发布者确认(步骤 6),最后将死信消息确认回源 Quorum 队列(步骤 7)。
  4. 死信消费者通过配置的 dead-letter-exchange 路由死信消息。在我们的示例中,我们将默认交换机配置为死信交换机。如果路由不存在,死信消费者将在一段时间后尝试再次路由。
  5. 如果路由存在,则消息将发送到目标队列。
  6. 目标队列向死信消费者发送发布者确认。
  7. 死信消费者向源 Quorum 队列发送消费者确认,死信消息将在其中被删除。

Prometheus 指标

RabbitMQ 3.10 附带了另一个新功能:死信消息的 Prometheus 指标。节点全局计数器将返回死信消息的数量,按以下维度细分

  1. 死信原因

    • expired:消息 TTL 超出(如我们的示例中所示)。
    • rejected:消费者发送 basic.rejectbasic.nack,没有 requeue 选项。
    • maxlen:队列长度超出,overflow 设置为 drop-headreject-publish-dlx。(后一个设置仅适用于经典队列。)
    • delivery_limit:超出传递限制。(仅适用于 Quorum 队列)。消息被重新排队太频繁,例如因为消费者发送了带有 requeue 选项的 basic.rejectbasic.nack,或者消费者与 Quorum 队列领导者断开连接。
  2. 源队列类型。即消息死信来自的队列类型

    • rabbit_classic_queue
    • rabbit_quorum_queue
    • (流不进行死信消息,因为它们是仅追加日志,其中的消息根据保留策略被截断。)
  3. 死信策略

    • disabled:队列没有配置 dead-letter-exchange,或者配置的 dead-letter-exchange 不存在,这意味着消息将被丢弃。
    • at_most_once:队列的配置的死信交换机存在。
    • at_least_once:队列类型为 rabbit_quorum_queue,配置了 dead-letter-exchangedead-letter-strategy 设置为 at-least-onceoverflow 设置为 reject-publish

按照我们的示例,让我们输出这些指标。在一个 shell 窗口中,端口转发 RabbitMQ 的 Prometheus 端口 15692

> kubectl port-forward pod/my-rabbit-server-1 15692

在另一个 shell 窗口中,抓取 Prometheus 端点

> curl --silent localhost:15692/metrics/ | grep rabbitmq_global_messages_dead_lettered

# TYPE rabbitmq_global_messages_dead_lettered_confirmed_total counter
# HELP rabbitmq_global_messages_dead_lettered_confirmed_total Total number of messages dead-lettered and confirmed by target queues
rabbitmq_global_messages_dead_lettered_confirmed_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 1

# TYPE rabbitmq_global_messages_dead_lettered_delivery_limit_total counter
# HELP rabbitmq_global_messages_dead_lettered_delivery_limit_total Total number of messages dead-lettered due to
# delivery-limit exceeded
rabbitmq_global_messages_dead_lettered_delivery_limit_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 0
rabbitmq_global_messages_dead_lettered_delivery_limit_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_delivery_limit_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="disabled"} 0

# TYPE rabbitmq_global_messages_dead_lettered_expired_total counter
# HELP rabbitmq_global_messages_dead_lettered_expired_total Total number of messages dead-lettered due to message TTL exceeded
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_classic_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_classic_queue",dead_letter_strategy="disabled"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 1
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_most_once"} 1
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="disabled"} 0

# TYPE rabbitmq_global_messages_dead_lettered_maxlen_total counter
# HELP rabbitmq_global_messages_dead_lettered_maxlen_total Total number of messages dead-lettered due to overflow drop-head
# or reject-publish-dlx
rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_classic_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_classic_queue",dead_letter_strategy="disabled"} 0
rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="disabled"} 0

# TYPE rabbitmq_global_messages_dead_lettered_rejected_total counter
# HELP rabbitmq_global_messages_dead_lettered_rejected_total Total number of messages dead-lettered due to basic.reject or basic.nack
rabbitmq_global_messages_dead_lettered_rejected_total{queue_type="rabbit_classic_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_rejected_total{queue_type="rabbit_classic_queue",dead_letter_strategy="disabled"} 0
rabbitmq_global_messages_dead_lettered_rejected_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 0
rabbitmq_global_messages_dead_lettered_rejected_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_rejected_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="disabled"} 0

我们只抓取了 pod my-rabbit-server-1 的 Prometheus 指标。由于这些计数器是“节点全局”的,这意味着上面的列表仅显示节点 my-rabbit-server-1 观察到的指标(但在该节点上的所有队列中是全局的)。

我们发送的第一条消息在停止该节点之前发送到 pod my-rabbit-server-0。此后,在我们的示例中,Quorum 队列领导者从 my-rabbit-server-0 更改为 my-rabbit-server-1。然后,我们使用 at-most-once 死信策略发送了第二条消息,并使用 at-least-once 死信策略发送了第三条消息。死信的第三条消息最终被死信消费者确认(或者换句话说,被目标队列确认)。这就是以下计数器的值为 1 的原因

rabbitmq_global_messages_dead_lettered_confirmed_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 1
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 1
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_most_once"} 1

如果您好奇,可以抓取 pod my-rabbit-server-0 的 Prometheus 指标。您期望看到什么?输出是否符合您的期望?提示:“Prometheus 计数器是一个累积指标,它表示一个单调递增的计数器,其值只能增加或在重启时重置为零。”

注意事项

我们看到了死信的消息如何最终从死信源队列到达死信目标队列,即使目标队列暂时不可用。那么,为什么至少一次死信不是新的默认死信策略呢?

启用至少一次死信时,需要注意一些注意事项

注意事项 1 - 源 Quorum 队列中的消息堆积

当目标队列暂时不可用或路由拓扑配置不正确时,至少一次死信在确保消息不丢失方面做得很好。但是,如果死信消费者进程长时间未从所有目标队列获得发布者确认,而源队列中不断有更多消息死信,则可能导致源队列中消息过度堆积。在最坏的情况下,源 Quorum 队列将仅包含死信消息。为了防止消息过度堆积,请为源队列设置队列长度限制max-lengthmax-length-bytes)。

注意事项 2 - 死信吞吐量

死信消费者有一个可配置的设置,称为 dead_letter_worker_consumer_prefetch,其当前默认值设置为 32。这意味着死信消费者进程在等待目标队列的发布者确认时,最多会预取和缓冲 32 条消息。

由于 RabbitMQ 3.10 Quorum 队列始终将所有消息体/有效负载存储在磁盘上。对于 Quorum 队列中的每条消息,仍然存在非常小的每消息内存开销,因为 Quorum 队列在内存中保存每条消息的一些元数据(例如 Raft 索引和消息有效负载大小)。

另一方面,死信消费者进程将消息体保存在内存中。为了防止数百个 Quorum 队列启用了至少一次死信并且未收到发布者确认的最坏情况,此预取值设置为适度的默认值 32,以避免死信消费者导致高内存使用率。

但是,较低的预取值会导致较低的吞吐量。如果您有每秒需要持续数千条消息死信吞吐量的场景(例如,每秒有数千条消息过期或被拒绝),您可以在高级配置文件中增加预取设置。

以下示例说明了如何在 Kubernetes 中增加预取

---
apiVersion: rabbitmq.com/v1beta1
kind: RabbitmqCluster
metadata:
name: my-rabbit
spec:
replicas: 3
rabbitmq:
advancedConfig: |
[
{rabbit, [
{dead_letter_worker_consumer_prefetch, 512}
]}
].

注意事项 3 - 资源使用量增加

为每个 Quorum 队列启用至少一次死信将增加资源使用量。将消耗更多内存和更多 CPU。比较图 1(最多一次死信)和图 2(至少一次死信),我们观察到至少一次死信将需要发送更多消息(包括确认)。

注意事项 4 - 溢出 drop-head

用法部分所述,启用 at-least-once 死信需要将 overflow 设置为 reject-publish。将 overflow 设置为 drop-head 将使死信策略回退到 at-most-once。不支持 drop-head,因为从源 Quorum 队列中删除死信消息会违反 at-least-once 语义。

注意事项 5 - 切换死信策略

对于 Quorum 队列,可以通过策略将死信策略从 at-most-once 切换为 at-least-once,反之亦然。如果死信策略被更改,无论是直接从 at-least-once 更改为 at-most-once,还是间接更改,例如通过将 overflow 从 reject-publish 更改为 drop-head 或取消设置 dead-letter-exchange,任何尚未被所有目标队列确认的死信消息都将在源 Quorum 队列中永久删除。

最佳实践

根据我们上面学到的知识,至少一次死信的最佳实践包括

最佳实践 1

在源 Quorum 队列中设置 max-lengthmax-length-bytes,以防止消息过度堆积。

最佳实践 2

将目标 Quorum 队列或流绑定到死信交换机。这比目标经典队列提供更高的可用性。对于目标 Quorum 队列或目标流,死信消息的重新传递也比目标经典队列更快。这是因为 Quorum 队列和流有自己的客户端、传递协议和重试机制。请记住,经典镜像队列已弃用。

最佳实践 3

在发布到源 Quorum 队列的所有消息上设置持久性标志。如果未设置持久性标志,则死信消息也不会设置该标志。当死信消息路由到目标经典队列时,这将变得很重要。

总结

在 RabbitMQ 3.10 之前,RabbitMQ 中的死信一直不安全。死信的消息可能会由于各种原因而丢失 - 尤其是在多节点 RabbitMQ 集群中。

至少一次死信确保死信的消息最终到达目标队列,即使在滚动升级和临时故障(如网络分区或路由拓扑错误配置)的情况下也是如此。

除了 Quorum 队列的至少一次死信之外,我们还了解了 RabbitMQ 3.10 中的另外两个新功能:Quorum 队列中的消息 TTL 和死信消息的 Prometheus 指标。

由于至少一次死信会增加资源使用量,因此只有在死信消息在原始意义上不是“死的”,而是“活着的”并且对您的业务逻辑至关重要时才应启用它。如果您的用例中死信消息仅具有信息性质,则应使用最多一次死信。

Quorum 队列的至少一次死信功能为新的用例铺平了道路,在这些用例中,您知道您有可能会被否定确认但仍需要处理的消息,或者您不能丢失具有过期 TTL 的消息。以前使用 RabbitMQ 实现这些场景是不安全或难以实现的。

© . All rights reserved.