AMQP 1.0 修改的 Outcome
这篇博客文章探讨了 AMQP 1.0 修改的 Outcome 的用例。
修改后的结果是 AMQP 1.0 独有的 功能,AMQP 0.9.1 中不可用。它支持 仲裁队列,但不支持 经典队列。
此功能允许消费者在重新排队或 死信 消息之前添加或更新 消息注解。
重新排队
在重新排队消息时包含附加元数据,对于提高消息处理过程中的可追溯性和调试非常有用。
例如,使用 RabbitMQ AMQP 1.0 Java 客户端 的应用程序可以在将消息重新排队到仲裁队列的头部之前设置特定的消息注解,如下所示:
Consumer consumer = connection.consumerBuilder()
.queue(ordersQueue)
.messageHandler((context, message) -> {
Map<String, Object> annotations = new HashMap<>();
annotations.put("x-opt-requeue-reason", "external_service_unavailable");
annotations.put("x-opt-requeue-time", System.currentTimeMillis());
annotations.put("x-opt-requeued-by", "consumer_1");
context.requeue(annotations);
}).build();
这些注解可以使用不同的类型,包括 map、list 或 array。这种灵活性不仅可以设置上次重新排队的原因、时间、消费者等详细信息,还可以跟踪重新排队事件的历史记录。维护这样的历史记录可以揭示模式,例如识别更频繁重新排队消息的消费者,或发现系统中常见的重新排队原因。但是,请注意,仲裁队列会在内存中保留修改后的消息注解,这会增加每个重新排队消息的内存开销。
在 AMQP 0.9.1 中,不支持在将消息重新排队到队列头部之前设置自定义标头。
无论通过 AMQP 1.0 还是 AMQP 0.9.1 将消息重新排队到仲裁队列,x-delivery-count 注解始终会递增。
死信
在死信消息时,消费者可以在消息注解中包含死信的自定义原因。
Consumer consumer = connection.consumerBuilder()
.queue(ordersQueue)
.messageHandler((context, message) -> {
Map<String, Object> annotations = new HashMap<>();
annotations.put("x-opt-dead-letter-reason", "Incompatible Message Format");
context.discard(annotations);
}).build();
在死信到 headers 交换 时,消费者甚至可以决定消息将路由到哪个目标队列。
在此示例中,两个死信仲裁队列绑定到死信 headers 交换。
transient-failures-dlqbusiness-logic-failures-dlq
不同的死信队列可以由不同的应用程序或团队处理,并根据故障的性质采取不同的操作。例如,transient-failures-dlq 中的所有消息都可以重新发布到原始的 orders 队列,而 business-logic-failures-dlq 中的消息可能需要人工干预。
可以添加更多死信队列,例如:
data-integrity-dlq用于模式未知消息resource-limit-dlq用于超出速率限制的情况critical-errors-dlq用于需要管理员注意的情况。
所有从 orders 队列死信的消息都必须可路由。上图中的 备用交换 提供“否则”路由语义,确保在未设置 x-opt-dead-letter-category 注解时,消息最终会进入 uncategorised-dlq。这可能发生,例如,如果发布者设置了 ttl 头 但没有消费者授予 链接信用,导致消息过期并被死信。
上面描绘的场景在 modified-outcome 示例应用程序 中得到了演示。
modified-outcome 示例应用程序
该示例应用程序使用 RabbitMQ AMQP 1.0 Java 客户端。
您可以按以下方式运行此示例应用程序:
- 通过
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0-management启动 RabbitMQ 服务器。 - 在 示例应用程序 的根目录下,通过
mvn clean compile exec:java启动客户端。
在将消息发布到 orders 队列后,客户端应用程序会消耗该消息并在控制台上输出以下内容:
publisher: received ACCEPTED outcome
consumer: setting annotations {x-opt-dead-letter-reason=Customer Not Eligible for Discount, x-opt-dead-letter-category=business-logic} and dead lettering...
消息将被死信到 business-logic-failures-dlq。
为防止消息在死信过程中丢失,示例应用程序使用了 至少一次死信。
死信与重新发布
AMQP 0.9.1 消费者在死信消息之前无法设置自定义标头。但是,与其使用 basic.nack 或 basic.reject 并设置 requeue=false 来死信消息,AMQP 0.9.1 客户端可以采用以下方法:
- 将消息直接重新发布到特定的“死信”队列,并带有新的自定义标头。
- 等待 RabbitMQ 确认重新发布的消息。
- 通过
basic.ack确认原始消息。
AMQP 1.0 客户端可以选择使用自定义消息注解死信消息或重新发布消息。这两种方法各有利弊。
| 标准 | 带有自定义原因的死信 | 带有自定义原因的重新发布 |
|---|---|---|
| 简单性 | 对消费者来说更简单。 | 更复杂,因为消费者必须处理重新发布过程。 |
| 开销 | 低开销。 | 客户端开销更高:消息负载必须从客户端重新发布到 RabbitMQ,由于额外的发布和确认步骤,延迟会增加。 |
| 客户端和 RabbitMQ 之间在确认已消耗的消息之前发生网络故障。 | 消息被重新排队。 | 消息可能已被重新发布和重新排队,导致一份副本进入“死信”队列,另一份进入原始队列。 |
| 灵活性 | 只能修改消息注解并通过死信 headers 交换进行路由。 | 允许修改消息的任何部分并重新发布到任何交换。 |
总结
AMQP 1.0 的修改结果功能允许消费者在重新排队或死信消息之前修改消息注解。
消费者可以通过自定义死信事件跟踪,甚至选择将消息发送到哪个死信队列,而不仅仅依赖 RabbitMQ 通过 x-opt-deaths 进行内置的死信跟踪。


