消费者
概述
本指南涵盖了与消费者相关的各种主题
- 基础知识
- 消费者生命周期
- 如何注册消费者(订阅,“推送 API”)
- 确认模式
- 消息属性和传递元数据
- 如何使用预取限制未完成的传递数量
- 传递确认超时
- 消费者容量指标
- 如何取消消费者
- 消费者独占性
- 单一活跃消费者
- 消费者活动
- 消费者优先级
- 连接失败恢复
- 异常处理
- 并发考虑
以及更多。
术语
术语“消费者”在不同的上下文中意味着不同的事物。一般来说,在消息和流媒体的上下文中,消费者是一个应用程序(或应用程序实例),它消费并确认消息。同一个应用程序也可以发布消息,从而同时成为发布者。
消息协议也有持久订阅的概念,用于消息传递。订阅是通常用于描述这种实体的术语之一。消费者是另一个。RabbitMQ 支持的消息协议同时使用这两个术语,但 RabbitMQ 文档倾向于使用后者。
从这个意义上说,消费者是消息传递的订阅,必须在开始传递之前注册,并且可以被应用程序取消。
基础知识
RabbitMQ 是一个消息代理。它接受来自发布者的消息,路由它们,并且,如果存在要路由到的队列,则存储它们以供消费,或者立即传递给消费者(如果有)。
消费者从队列中消费。为了消费消息,必须有一个队列。当添加新的消费者时,假设队列中已经有准备好的消息,传递将立即开始。
目标队列在消费者注册时可以为空。在这种情况下,首次传递将在新消息入队时发生。
尝试从不存在的队列中消费将导致通道级异常,代码为 404 Not Found
,并使尝试的通道关闭。
消费者标签
每个消费者都有一个标识符,客户端库使用该标识符来确定为给定的传递调用哪个处理程序。它们的名称因协议而异。消费者标签和订阅 ID 是两个最常用的术语。RabbitMQ 文档倾向于使用前者。
消费者标签也用于取消消费者。
消费者生命周期
消费者旨在长期存在:也就是说,在消费者的整个生命周期中,它会接收多次传递。注册消费者以消费单条消息不是最佳选择。
消费者通常在应用程序启动期间注册。它们的生命周期通常与它们的连接甚至应用程序运行的时间一样长。
消费者可以更加动态,并响应系统事件注册,并在不再需要时取消订阅。这在使用通过 Web STOMP 和 Web MQTT 插件、移动客户端等使用的 WebSocket 客户端时很常见。
连接恢复
客户端可能会丢失与 RabbitMQ 的连接。当 检测到连接丢失时,消息传递停止。
一些客户端库提供自动连接恢复功能,其中涉及消费者恢复。Java、.NET 和 Bunny 是此类库的示例。虽然连接恢复无法涵盖 100% 的场景和工作负载,但它通常非常适用于消费应用程序,并且是推荐的。
对于其他客户端库,应用程序开发人员负责执行连接恢复。通常,以下恢复顺序效果很好
- 恢复连接
- 恢复通道
- 恢复队列
- 恢复交换机
- 恢复绑定
- 恢复消费者
换句话说,消费者通常最后恢复,在它们的目标队列和这些队列的绑定到位之后。
请注意,自动恢复使用自动删除和独占队列的连接应确保这些队列是服务器命名的。
注册消费者(订阅,“推送 API”)
应用程序可以订阅,以便 RabbitMQ 将入队的消息(传递)推送到它们。这是通过在队列上注册消费者(订阅)来完成的。订阅到位后,RabbitMQ 将开始传递消息。对于每次传递,都将调用用户提供的处理程序。根据使用的客户端库,这可以是用户提供的函数或符合特定接口的对象。
成功的订阅操作返回订阅标识符(消费者标签)。它稍后可用于取消消费者。
Java 客户端
有关示例,请参阅 Java 客户端指南。
.NET 客户端
有关示例,请参阅 .NET 客户端指南。
消息属性和传递元数据
每次传递都结合了消息元数据和传递信息。不同的客户端库使用略有不同的方式来提供对这些属性的访问。通常,传递处理程序可以访问传递数据结构。
以下属性是传递和路由详细信息;它们本身不是消息属性,而是在路由和传递时由 RabbitMQ 设置的
属性 | 类型 | 描述 |
传递标签 | 正整数 | 传递标识符,请参阅 确认。 |
重新传递 | 布尔值 | 如果此消息先前已传递并重新入队,则设置为 true |
交换机 | 字符串 | 路由此消息的交换机 |
路由键 | 字符串 | 发布者使用的路由键 |
消费者标签 | 字符串 | 消费者(订阅)标识符 |
以下是消息属性。它们大多数是可选的。它们由发布者在发布时设置
属性 | 类型 | 描述 | 是否必需? |
传递模式 | 枚举(1 或 2) | 2 表示“持久”,1 表示“瞬态”。一些客户端库将此属性公开为布尔值或枚举。 | 是 |
类型 | 字符串 | 应用程序特定的消息类型,例如“orders.created” | 否 |
标头 | 映射(字符串 => 任意) | 带有字符串标头名称的任意标头映射 | 否 |
内容类型 | 字符串 | 内容类型,例如“application/json”。由应用程序使用,而不是核心 RabbitMQ | 否 |
内容编码 | 字符串 | 内容编码,例如“gzip”。由应用程序使用,而不是核心 RabbitMQ | 否 |
消息 ID | 字符串 | 任意消息 ID | 否 |
关联 ID | 字符串 | 帮助关联请求与响应,请参阅教程 6 | 否 |
回复到 | 字符串 | 携带响应队列名称,请参阅教程 6 | 否 |
过期时间 | 字符串 | 每消息 TTL | 否 |
时间戳 | 时间戳 | 应用程序提供的时间戳 | 否 |
用户 ID | 字符串 | 用户 ID,如果设置则已验证 | 否 |
应用 ID | 字符串 | 应用程序名称 | 否 |
消息类型
消息上的类型属性是一个任意字符串,用于帮助应用程序沟通消息的类型。它由发布者在发布时设置。该值可以是发布者和消费者约定的任何特定于域的字符串。
RabbitMQ 不会验证或使用此字段,它存在是为了供应用程序和插件使用和解释。
实践中的消息类型自然地分为几组,点分隔的命名约定很常见(但不是 RabbitMQ 或客户端要求的),例如 orders.created
或 logs.line
或 profiles.image.changed
。
如果消费者收到无法处理的类型的传递,强烈建议记录此类事件,以便更轻松地进行故障排除。
内容类型和编码
内容(MIME 媒体)类型和内容编码字段允许发布者沟通消费者应如何反序列化和解码消息有效负载。
RabbitMQ 不会验证或使用这些字段,它存在是为了供应用程序和插件使用和解释。
例如,带有 JSON 有效负载的消息应使用 application/json
。如果有效负载使用 LZ77 (GZip) 算法压缩,则其内容编码应为 gzip
。
可以通过用逗号分隔来指定多个编码。
确认模式
注册消费者时,应用程序可以选择两种传递模式之一
- 自动(传递不需要确认,又名“即发即弃”)
- 手动(传递需要客户端确认)
消费者确认是单独文档指南的主题,与发布者确认一起,这是发布者的密切相关概念。
使用预取限制同时传递
使用手动确认模式,消费者有一种限制可以“在飞行中”(在网络上传输或已传递但未确认)的传递数量的方法。这可以避免消费者过载。
此功能以及消费者确认是单独文档指南的主题。
消费者容量指标
RabbitMQ 管理 UI 以及 监控数据 端点(例如 Prometheus 抓取的端点)显示名为消费者容量(以前称为消费者利用率)的指标,用于单个队列。
该指标计算为队列能够立即向消费者传递消息的时间的百分比。它帮助操作员注意到以下情况:可能值得向队列添加更多消费者(应用程序实例)。
如果此数字小于 100%,则队列领导者副本可能能够更快地传递消息,如果
- 有更多消费者,或者
- 消费者花费更少的时间处理传递,或者
- 消费者通道使用更高的预取值
对于没有消费者的队列,消费者容量将为 0%。对于有在线消费者但没有消息流的队列,该值将为 100%:这个想法是任何数量的消费者都可以维持这种传递速率。
请注意,消费者容量仅是一个提示。消费者应用程序可以并且应该收集有关其操作的更具体指标,以帮助进行大小调整和任何可能的容量更改。
取消消费者(取消订阅)
要取消消费者,必须知道其标识符(消费者标签)。
取消消费者后,将不会再向其分派未来的传递。请注意,仍然可能存在先前分派的“在飞行中”的传递。取消消费者既不会丢弃也不会重新排队它们。
除了在 RabbitMQ 处理 basic.cancel
方法时正在进行的传递之外,取消的消费者将不会观察到任何新的传递。所有先前未确认的传递都不会受到任何影响。要重新排队飞行中的传递,应用程序必须关闭通道。
Java 客户端
有关示例,请参阅 Java 客户端指南。
.NET 客户端
有关示例,请参阅 .NET 客户端指南。
轮询单个消息(“拉取 API”)
本节中描述的机制是一种轮询形式。与分布式系统中的任何基于轮询的方法一样,它效率非常低,尤其是在队列可能会长时间为空的情况下。
除了集成测试外,强烈建议不要使用此 AMQP 0-9-1 消费机制。
RabbitMQ 管理 和 Prometheus 插件提供了几个指标,可帮助检测使用轮询(basic.get
)的应用程序。
使用长期存在的消费者而不是轮询。
使用 AMQP 0-9-1,可以使用 basic.get
协议方法逐个获取消息。消息按 FIFO 顺序获取。可以像消费者(订阅)一样使用自动或手动确认。
强烈不建议逐个获取消息,因为它与常规长期存在的消费者相比,效率非常低下。与任何基于轮询的算法一样,在消息发布是零星的且队列可能长时间保持为空的系统中,它将极其浪费资源。
如有疑问,请首选使用常规的长期存在的消费者。
Java 客户端
有关示例,请参阅 Java 客户端指南。
.NET 客户端
有关示例,请参阅 .NET 客户端指南。
传递确认超时
RabbitMQ 对消费者传递确认强制执行超时。这是一种保护机制,用于检测消费者何时未确认消息传递。配置传递确认超时可以帮助防止磁盘数据压缩和驱动节点超出磁盘空间。
工作原理
如果消费者未在超时值内确认其传递,则其通道将关闭,并显示 PRECONDITION_FAILED
通道异常。消息将如下所示
Consumer 'consumer-tag-998754663370' on channel 1 and queue 'qq.1' in vhost '/' has timed out
waiting for a consumer acknowledgement of a delivery with delivery tag = 10. Timeout used: 180000 ms.
This timeout value can be configured, see consumers doc guide to learn more
该错误由消费者连接到的节点记录。然后,该通道上所有消费者发出的所有后续传递都将重新排队。要解决 PRECONDITION_FAILED
通道异常,请重新评估您的消费者并考虑增加超时值。
RabbitMQ 的默认超时值为 30 分钟。是否应强制执行超时会定期评估,间隔为一分钟。不支持低于一分钟的值,不建议使用低于五分钟的值。
按节点配置
超时值可在 rabbitmq.conf 中配置(以毫秒为单位)
# 30 minutes in milliseconds
consumer_timeout = 1800000
# one hour in milliseconds
consumer_timeout = 3600000
可以使用 advanced.config
禁用超时。不建议这样做
%% advanced.config
[
{rabbit, [
{consumer_timeout, undefined}
]}
].
与其完全禁用超时,不如考虑使用较高的值(例如,几个小时)。
按队列配置
从 RabbitMQ 3.12 开始,超时值也可以按队列配置。
使用策略的按队列传递超时
设置 consumer-timeout
策略键。
该值必须以毫秒为单位。是否应强制执行超时会定期评估,间隔为一分钟。
# override consumer timeout for a group of queues using a policy
rabbitmqctl set_policy queue_consumer_timeout "with_delivery_timeout\.*" '{"consumer-timeout":3600000}' --apply-to classic_queues
使用可选队列参数的按队列传递超时
在声明队列时,在队列上设置 x-consumer-timeout
可选队列参数。超时以毫秒为单位指定。是否应强制执行超时会定期评估,间隔为一分钟。
限制每个通道的消费者数量
在某些可能发生消费者泄漏的场景中,最好限制每个通道上可以处于活动状态的消费者数量。这可以在 rabbitmq.conf 中使用设置 consumer_max_per_channel
进行配置
consumer_max_per_channel = 100
独占性
仅对于经典队列,当使用 AMQP 0-9-1 客户端注册消费者时,可以将 basic.consume
方法的 exclusive
标志设置为 true,以请求消费者成为目标队列上的唯一消费者。仅当当时队列中没有已注册的消费者时,调用才会成功。这允许确保每次只有一个消费者从队列中消费。
如果独占消费者被取消或死亡,则应用程序有责任注册一个新的消费者以继续从队列中消费。
如果需要独占消费和消费连续性,请使用单一活跃消费者。
单一活跃消费者
单一活跃消费者允许每次只有一个消费者从队列中消费,并在活跃消费者被取消或死亡的情况下故障转移到另一个已注册的消费者。当消息必须按照它们到达队列的相同顺序消费和处理时,仅使用一个消费者进行消费非常有用。
典型的事件序列如下
- 声明一个队列,并且一些消费者在大致相同的时间注册到该队列。
- 第一个注册的消费者成为单一活跃消费者:消息被分派给它,而其他消费者被忽略。
- 如果队列是仲裁队列,并且新注册的消费者具有更高的优先级,则队列将停止向当前活跃消费者传递消息。当所有消息都被确认后,新消费者将成为活跃消费者。
- 当单一活跃消费者因某种原因被取消或简单地死亡时,另一个消费者将被选为活跃消费者。换句话说,队列会自动故障转移到另一个消费者。有关如何选择新消费者的更多详细信息,请参阅SAC 行为。
请注意,如果未启用单一活跃消费者功能,消息将使用轮询调度分派给所有消费者。
本节介绍了可用于经典队列和仲裁队列上的 AMQP 0-9-1 和 AMQP 1.0 客户端的单一活跃消费者。它与流的单一活跃消费者功能有意义上的不同。
尝试在流上使用 AMQP 0-9-1 客户端启用 SAC 将不起作用。要在流上使用 SAC,必须使用原生 RabbitMQ 流协议客户端。
在仲裁队列和经典队列上启用单一活跃消费者
当声明队列时,可以通过将 x-single-active-consumer
参数设置为 true
来启用单一活跃消费者,例如,使用 Java 客户端
Channel ch = ...;
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-single-active-consumer", true);
ch.queueDeclare("my-queue", false, false, false, arguments);
与独占消费者的区别
与AMQP 0-9-1 独占消费者相比,单一活跃消费者对应用程序端维护消费连续性的压力较小。消费者只需要注册,故障转移会自动处理,无需检测活跃消费者故障并注册新消费者。
确定当前哪个消费者处于活动状态
管理 UI 和 CLI 可以报告哪个消费者是在启用该功能的队列上的当前活跃消费者。
初始 SAC 选择
当与经典队列一起使用时,即使消费者优先级正在使用,初始活跃消费者也是随机选择的。
如果队列是仲裁队列,并且新注册的消费者具有更高的优先级,则队列将停止向当前活跃消费者传递消息。当所有消息都被确认后,新消费者将成为活跃消费者。
在宣布此功能的博客文章中了解有关此行为的更多信息。
SAC 和独占消费者是互斥的
尝试使用 SAC 注册独占消费者将导致错误。根据定义,SAC 假定将有多个消费者在线。
SAC 和通道预取
消息始终传递给活跃消费者,即使它在某个时候太忙也是如此。当使用手动确认和 basic.qos
时,可能会发生这种情况,消费者可能正忙于处理它使用 basic.qos
请求的最大未确认消息数。在这种情况下,其他消费者将被忽略,并且传递将返回到队列。
无法使用策略启用 SAC
无法使用策略启用单一活跃消费者功能。由于 RabbitMQ 中的策略本质上是动态的,它们可能会来来去去,启用和禁用它们声明的功能。想象一下突然在一个队列上禁用单一活跃消费者:代理将开始向非活跃消费者发送消息,并且消息将并行处理,这与单一活跃消费者试图实现的目标完全相反。由于单一活跃消费者的语义与策略的动态性质不兼容,因此只能在声明队列时使用队列参数启用此功能。
消费者活动
管理 UI 和 list_consumers
CLI 命令报告消费者的 active
标志。此标志的值取决于几个参数。
- 对于经典队列,当未启用单一活跃消费者时,该标志始终为
true
。 - 对于仲裁队列,并且当未启用单一活跃消费者时,该标志默认设置为
true
,如果怀疑消费者连接到的节点已关闭,则设置为false
。 - 如果启用了单一活跃消费者,则该标志仅对当前单一活跃消费者设置为
true
,队列上的其他消费者正在等待提升,如果活跃消费者消失,则它们的活动设置为false
。
优先级
通常,连接到队列的活跃消费者以轮询方式从队列接收消息。
消费者优先级允许您确保高优先级消费者在活动时接收消息,而只有当高优先级消费者被阻塞时(例如,通过有效的预取设置)消息才会发送给低优先级消费者。
当使用消费者优先级时,如果存在多个具有相同高优先级的活跃消费者,则消息以轮询方式传递。
消费者优先级在单独指南中介绍。
异常处理
消费者应处理在处理传递或任何其他消费者操作期间产生的任何异常。此类异常应被记录、收集和忽略。
如果消费者由于依赖项不可用或类似原因而无法处理传递,则应清楚地记录下来并取消自身,直到它能够再次处理传递为止。这将使消费者的不可用性对 RabbitMQ 和监控系统可见。
并发考虑
消费者并发性主要是客户端库实现细节和应用程序配置的问题。对于大多数客户端库(例如 Java、.NET、Go、Erlang),传递都分派给线程池(或类似机制),该线程池处理所有异步消费者操作。该池通常具有可控制的并发度。
Java 和 .NET 客户端保证,无论并发程度如何,单个通道上的传递都将按照接收的相同顺序分派。请注意,一旦分派,并发处理传递将在执行处理的线程之间导致自然竞争条件。
某些客户端(例如 Bunny)和框架可能会选择将消费者分派池限制为单个线程(或类似机制),以避免在并发处理传递时出现自然竞争条件。某些应用程序依赖于传递的严格顺序处理,因此必须使用 1 的并发因子或在其自己的代码中处理同步。可以并发处理传递的应用程序可以使用高达其可用内核数的并发度。
队列并行性考虑
一个 RabbitMQ 队列绑定到一个核心。使用多个队列可以提高节点上的 CPU 利用率。诸如 分片 (sharding) 和 一致性哈希交换 (consistent hash exchange) 等插件有助于提高并行性。