跳到主要内容

AMQP 0-9-1 模型详解

概述

本指南概述了 AMQP 0-9-1 协议,RabbitMQ 支持的协议之一。

AMQP 0-9-1 和 AMQP 模型的高级概述

什么是 AMQP 0-9-1?

AMQP 0-9-1(高级消息队列协议)是一种消息传递协议,使符合标准的客户端应用程序能够与符合标准的消息传递中间件代理进行通信。

代理及其作用

消息代理从发布者(发布消息的应用程序,也称为生产者)接收消息,并将它们路由到消费者(处理消息的应用程序)。

由于它是一种网络协议,因此发布者、消费者和代理可以都驻留在不同的机器上。

AMQP 0-9-1 模型简述

AMQP 0-9-1 模型对世界的看法如下:消息发布到交换机,交换机通常比作邮局或邮箱。然后,交换机使用称为绑定的规则将消息副本分发到队列。然后,代理将消息传递给订阅队列的消费者,或者消费者按需从队列中获取/拉取消息。

Publish path from publisher to consumer via exchange and queue

发布消息时,发布者可以指定各种消息属性(消息元数据)。其中一些元数据可能被代理使用,但是,其余的元数据对代理完全不透明,仅供接收消息的应用程序使用。

网络是不可靠的,应用程序可能无法处理消息,因此 AMQP 0-9-1 模型具有消息确认的概念:当消息传递给消费者时,消费者通知代理,可以是自动通知,也可以在应用程序开发人员选择这样做时通知。当使用消息确认时,代理只有在收到该消息(或一组消息)的通知后,才会从队列中完全删除该消息。

在某些情况下,例如,当消息无法路由时,消息可能会返回给发布者、丢弃,或者,如果代理实现了扩展,则放入所谓的“死信队列”。发布者通过使用某些参数发布消息来选择如何处理这种情况。

队列、交换机和绑定统称为 AMQP 实体

AMQP 0-9-1 是一种可编程协议

AMQP 0-9-1 是一种可编程协议,因为 AMQP 0-9-1 实体和路由方案主要由应用程序本身定义,而不是由代理管理员定义。因此,为声明队列和交换机、定义它们之间的绑定、订阅队列等协议操作提供了规定。

这为应用程序开发人员提供了很大的自由度,但也要求他们意识到潜在的定义冲突。在实践中,定义冲突很少见,通常表明配置错误。

应用程序声明他们需要的 AMQP 0-9-1 实体,定义必要的路由方案,并且可以选择在不再使用 AMQP 0-9-1 实体时删除它们。

交换机和交换机类型

交换机是 AMQP 0-9-1 实体,消息发送到这里。交换机接收消息并将其路由到零个或多个队列。使用的路由算法取决于交换机类型和称为绑定的规则。AMQP 0-9-1 代理提供四种交换机类型

交换机类型默认预声明名称
直接交换机(空字符串)和 amq.direct
扇出交换机amq.fanout
主题交换机amq.topic
头部交换机amq.match(和 RabbitMQ 中的 amq.headers)

除了交换机类型外,交换机还声明了许多属性,其中最重要的属性是

  • 名称
  • 持久性(交换机在代理重启后仍然存在)
  • 自动删除(当最后一个队列从交换机解除绑定时,交换机被删除)
  • 参数(可选,由插件和特定于代理的功能使用)

交换机可以是持久的或瞬态的。持久交换机在代理重启后仍然存在,而瞬态交换机则不会(它们必须在代理重新上线时重新声明)。并非所有场景和用例都要求交换机是持久的。

默认交换机

默认交换机是一个没有名称(空字符串)的直接交换机,由代理预先声明。它有一个特殊的属性,使其对于简单的应用程序非常有用:创建的每个队列都会自动绑定到它,路由键与队列名称相同。

例如,当您声明一个名为“search-indexing-online”的队列时,AMQP 0-9-1 代理会使用“search-indexing-online”作为路由键(在这种情况下有时称为绑定键)将其绑定到默认交换机。因此,发布到默认交换机且路由键为“search-indexing-online”的消息将被路由到队列“search-indexing-online”。换句话说,默认交换机使它看起来可以像直接将消息传递到队列一样,即使从技术上讲,情况并非如此。

在 RabbitMQ 中,默认交换机不允许绑定/解除绑定操作。对默认交换机的绑定操作将导致错误。

直接交换机

直接交换机根据消息路由键将消息传递到队列。直接交换机非常适合消息的单播路由。它们也可以用于多播路由。

以下是它的工作原理

  • 队列使用路由键 K 绑定到交换机
  • 当路由键为 R 的新消息到达直接交换机时,如果 K = R,则交换机将其路由到队列
  • 如果多个队列使用相同的路由键 K 绑定到直接交换机,则交换机将消息路由到所有 K = R 的队列

直接交换机可以用图形表示如下

exchange delivering messages to  queues based on routing key

扇出交换机

扇出交换机将消息路由到绑定到它的所有队列,并且忽略路由键。如果 N 个队列绑定到扇出交换机,则当向该交换机发布新消息时,消息的副本将传递到所有 N 个队列。扇出交换机非常适合消息的广播路由。

由于扇出交换机将消息的副本传递到绑定到它的每个队列,因此它的用例非常相似

  • 大型多人在线 (MMO) 游戏可以使用它进行排行榜更新或其他全局事件
  • 体育新闻网站可以使用扇出交换机将比分更新近乎实时地分发到移动客户端
  • 分布式系统可以广播各种状态和配置更新
  • 群聊可以使用扇出交换机在参与者之间分发消息(尽管 AMQP 没有内置的存在概念,因此 XMPP 可能是更好的选择)

扇出交换机可以用图形表示如下

exchange delivering messages to three queues

主题交换机

主题交换机根据消息路由键与用于将队列绑定到交换机的模式之间的匹配,将消息路由到一个或多个队列。主题交换机类型通常用于实现各种发布/订阅模式变体。主题交换机通常用于消息的多播路由。

主题交换机具有非常广泛的用例。每当问题涉及选择性地选择他们想要接收的消息类型的多个消费者/应用程序时,都应考虑使用主题交换机。

用例示例

  • 分发与特定地理位置相关的数据,例如,销售点
  • 由多个工作者完成的后台任务处理,每个工作者都能够处理特定任务集
  • 股票价格更新(以及其他类型的金融数据更新)
  • 涉及分类或标记的新闻更新(例如,仅针对特定运动或团队)
  • 云中不同类型的服务的编排
  • 分布式架构/特定于操作系统的软件构建或打包,其中每个构建器只能处理一种架构或操作系统

头部交换机

头部交换机旨在基于多个属性进行路由,这些属性比路由键更容易表示为消息头。头部交换机忽略路由键属性。相反,用于路由的属性取自头部属性。如果头的值等于绑定时指定的值,则消息被视为匹配。

可以使用多个头进行匹配,将队列绑定到头部交换机。在这种情况下,代理需要来自应用程序开发人员的更多信息,即,它应该考虑任何头匹配的消息,还是所有头都匹配?这就是“x-match”绑定参数的用途。当“x-match”参数设置为“any”时,仅一个匹配的头值就足够了。或者,将“x-match”设置为“all”则强制要求所有值都必须匹配。

对于“any”和“all”,以字符串 x- 开头的头将不用于评估匹配项。将“x-match”设置为“any-with-x”或“all-with-x”也将使用以字符串 x- 开头的头来评估匹配项。

头部交换机可以看作是“增强型直接交换机”。由于它们基于头值进行路由,因此它们可以用作直接交换机,其中路由键不必是字符串;例如,它可以是整数或哈希(字典)。

队列

AMQP 0-9-1 模型中的队列与其他消息和任务队列系统中的队列非常相似:它们存储由应用程序使用的消息。队列与交换机共享一些属性,但也具有一些附加属性

  • 名称
  • 持久性(队列将在代理重启后仍然存在)
  • 独占性(仅由一个连接使用,并且队列将在该连接关闭时删除)
  • 自动删除(至少有一个消费者的队列在最后一个消费者取消订阅时被删除)
  • 参数(可选;由插件和特定于代理的功能使用,例如消息 TTL、队列长度限制等)

队列在使用之前必须先声明。声明队列将导致队列在尚不存在时被创建。如果队列已经存在并且其属性与声明中的属性相同,则声明将不起作用。当现有队列属性与声明中的属性不同时,将引发通道级异常,代码为 406 (PRECONDITION_FAILED)。

队列名称

应用程序可以选择队列名称,也可以要求代理为它们生成名称。队列名称最多可以为 255 个字节的 UTF-8 字符。AMQP 0-9-1 代理可以代表应用程序生成唯一的队列名称。要使用此功能,请传递空字符串作为队列名称参数。生成的名称将与队列声明响应一起返回给客户端。

以“amq.”开头的队列名称保留供代理内部使用。尝试声明违反此规则的名称的队列将导致通道级异常,回复代码为 403 (ACCESS_REFUSED)。

队列持久性

在 AMQP 0-9-1 中,队列可以声明为持久或瞬态。持久队列的元数据存储在磁盘上,而瞬态队列的元数据尽可能存储在内存中。

对于发布时的消息也做了同样的区分。

在持久性很重要的环境和用例中,应用程序必须使用持久队列并且确保发布者将发布的消息标记为持久。

本主题在队列指南中更详细地介绍。

绑定

绑定是交换机使用的(除其他外)将消息路由到队列的规则。要指示交换机 E 将消息路由到队列 Q,Q 必须绑定到 E。绑定可能具有可选的路由键属性,某些交换机类型使用该属性。路由键的目的是选择发布到交换机的某些消息以路由到绑定的队列。换句话说,路由键充当过滤器。

为了进行类比

  • 队列就像您在纽约市的目的地
  • 交换机就像肯尼迪机场
  • 绑定是从肯尼迪机场到您的目的地的路线。可以有零个或多个到达目的地的途径

拥有这种间接层可以实现使用直接发布到队列无法实现或很难实现的路由方案,并且还消除了应用程序开发人员必须完成的某些重复工作。

如果消息无法路由到任何队列(例如,因为它发布到的交换机没有绑定),则该消息要么丢弃或返回给发布者,具体取决于发布者设置的消息属性。

消费者

除非应用程序可以消费它们,否则将消息存储在队列中是无用的。在 AMQP 0-9-1 模型中,应用程序可以通过两种方式执行此操作

  • 订阅以将消息传递给他们(“推送 API”):这是推荐的选项
  • 轮询(“拉取 API”):这种方式效率极低,在大多数情况下应避免使用

使用“推送 API”,应用程序必须表明有兴趣消费来自特定队列的消息。当他们这样做时,我们说他们注册了一个消费者,或者简单地说,订阅了一个队列。每个队列可以有多个消费者,也可以注册一个独占消费者(在消费者消费时,将所有其他消费者从队列中排除)。

每个消费者(订阅)都有一个称为消费者标签的标识符。它可以用于取消订阅消息。消费者标签只是字符串。

消息确认

消费者应用程序——即接收和处理消息的应用程序——有时可能会在处理单个消息时失败,丢失与服务器的连接,或以许多其他方式失败。

还可能存在网络问题导致问题。这就提出了一个问题:代理应该何时从队列中删除消息?AMQP 0-9-1 规范允许消费者控制这一点。有两种确认模式

  • 在代理将消息发送到应用程序后(使用 basic.deliverbasic.get-ok 方法)。
  • 在应用程序发回确认后(使用 basic.ack 方法)。

前一种选择称为自动确认模型,而后一种选择称为显式确认模型。使用显式模型,应用程序可以选择何时发送确认。它可以是在收到消息后立即发送,或者在将其持久化到数据存储之前处理,或者在完全处理消息之后(例如,成功获取网页、处理并将其存储到某个持久性数据存储中)。

如果消费者在没有发送确认的情况下死亡,代理会将消息重新传递给另一个消费者,或者,如果在当时没有可用的消费者,代理将等待至少一个消费者注册到同一个队列,然后再尝试重新传递。

拒绝消息

当消费者应用程序收到消息时,该消息的处理可能会成功,也可能不会成功。应用程序可以向代理指示消息处理失败(或当时无法完成),方法是拒绝消息。拒绝消息时,应用程序可以要求代理丢弃或重新排队该消息。当队列上只有一个消费者时,请确保不要通过一遍又一遍地拒绝和重新排队来自同一消费者的消息来创建无限消息传递循环。

否定确认

消息通过 basic.reject 方法拒绝。basic.reject 有一个限制:无法像确认那样拒绝多条消息。但是,如果您使用的是 RabbitMQ,则有一个解决方案。RabbitMQ 提供了一个称为否定确认nack 的 AMQP 0-9-1 扩展。有关更多信息,请参阅确认basic.nack 扩展指南。

预取消息

对于多个消费者共享队列的情况,能够指定在发送下一个确认之前,每个消费者一次可以发送多少消息非常有用。这可以用作简单的负载平衡技术,或者如果消息倾向于批量发布,则可以提高吞吐量。例如,如果生产应用程序每分钟发送消息,因为这是它正在完成的工作的性质。

请注意,RabbitMQ 仅支持通道级预取计数,不支持基于连接或大小的预取。

消息属性和有效负载

AMQP 0-9-1 模型中的消息具有属性。某些属性非常常见,以至于 AMQP 0-9-1 规范定义了它们,应用程序开发人员不必考虑确切的属性名称。一些示例是

  • 内容类型
  • 内容编码
  • 路由键
  • 传递模式(持久或非持久)
  • 消息优先级
  • 消息发布时间戳
  • 过期时间段
  • 发布者应用程序 ID

某些属性由 AMQP 代理使用,但大多数属性可供接收它们的应用程序解释。某些属性是可选的,称为头部。它们类似于 HTTP 中的 X-Headers。消息属性在发布消息时设置。

消息还具有有效负载(它们携带的数据),AMQP 代理将其视为不透明的字节数组。代理不会检查或修改有效负载。消息可能仅包含属性,而不包含有效负载。通常使用 JSON、Thrift、Protocol Buffers 和 MessagePack 等序列化格式来序列化结构化数据,以便将其作为消息有效负载发布。协议对等方通常使用“content-type”和“content-encoding”字段来传达此信息,但这仅是约定俗成。

消息可以发布为持久消息,这使得代理将它们持久化到磁盘。如果服务器重新启动,系统将确保收到的持久消息不会丢失。仅将消息发布到持久交换机或将其路由到的队列是持久队列的事实并不能使消息持久化:这一切都取决于消息本身的持久性模式。将消息发布为持久消息会影响性能(就像数据存储一样,持久性会以一定的性能成本为代价)。

发布者指南中了解更多信息。

AMQP 0-9-1 方法

AMQP 0-9-1 结构化为许多方法。方法是操作(如 HTTP 方法),与面向对象编程语言中的方法无关。AMQP 0-9-1 中的协议方法分为。类只是 AMQP 0-9-1 方法的逻辑分组。AMQP 0-9-1 参考提供了所有 AMQP 0-9-1 方法(协议操作)的完整详细信息。

让我们看一下 exchange 类,这是一组与交换机操作相关的方法。它包括以下操作

  • exchange.declare
  • exchange.declare-ok
  • exchange.delete
  • exchange.delete-ok

(请注意,RabbitMQ 站点参考还包括 RabbitMQ 特定的 exchange 类的扩展,我们将在本指南中不讨论这些扩展)。

上述操作形成逻辑对:exchange.declareexchange.declare-okexchange.deleteexchange.delete-ok。这些操作是“请求”(由客户端发送)和“响应”(由代理响应上述“请求”发送)。

例如,客户端使用 exchange.declare 方法请求代理声明一个新交换机

exchange.declare

如上图所示,exchange.declare 带有几个参数。它们使客户端能够指定交换机名称、类型、持久性标志等等。

如果操作成功,代理将使用 exchange.declare-ok 方法响应

exchange.declare-ok

exchange.declare-ok 除了通道号(通道将在本指南稍后介绍)之外,不携带任何参数。

AMQP 0-9-1 queue 方法类上的另一个方法对:queue.declarequeue.declare-ok 的事件序列非常相似

queue.declare

queue.declare-ok

并非所有 AMQP 0-9-1 方法都有对应项。有些(basic.publish 是最广泛使用的方法之一)没有相应的“响应”方法,而另一些(例如 basic.get)有多个可能的“响应”。

连接

AMQP 0-9-1 连接通常是长寿命的。AMQP 0-9-1 是一种应用程序级协议,使用 TCP 进行可靠传递。连接使用身份验证,并且可以使用 TLS 进行保护。当应用程序不再需要连接到服务器时,它应该优雅地关闭其 AMQP 0-9-1 连接,而不是突然关闭底层 TCP 连接。

通道

某些应用程序需要与代理建立多个连接。但是,不希望同时保持打开许多 TCP 连接,因为这样做会消耗系统资源,并使配置防火墙更加困难。AMQP 0-9-1 连接通过通道进行多路复用,通道可以被认为是“共享单个 TCP 连接的轻量级连接”。

客户端执行的每个协议操作都在通道上发生。特定通道上的通信与另一个通道上的通信完全分离,因此每个协议方法还携带一个通道 ID(也称为通道号),这是一个整数,代理和客户端都使用它来确定该方法用于哪个通道。

通道仅在连接的上下文中存在,而永远不会单独存在。当连接关闭时,其上的所有通道也关闭。

对于使用多个线程/进程进行处理的应用程序,通常为每个线程/进程打开一个新通道,而不是在它们之间共享通道。

虚拟主机

为了使单个代理能够托管多个隔离的“环境”(用户组、交换机、队列等),AMQP 0-9-1 包括虚拟主机(vhosts)的概念。它们类似于许多流行的 Web 服务器使用的虚拟主机,并提供完全隔离的环境,AMQP 实体在其中生存。协议客户端在连接协商期间指定他们想要使用的 vhosts。

AMQP 是可扩展的

AMQP 0-9-1 有几个扩展点

  • 自定义交换机类型使开发人员能够实现开箱即用的交换机类型无法很好地覆盖的路由方案,例如,基于地理数据的路由。
  • 交换机和队列的声明可以包括代理可以使用的其他属性。例如,RabbitMQ 中的每个队列的消息 TTL 就是以这种方式实现的。
  • 特定于代理的协议扩展。例如,请参阅RabbitMQ 实现的扩展
  • 每隔一段时间,就可以为特定情况引入新的 AMQP 0-9-1 方法类,例如 OAuth 2 (JWT) 令牌刷新。
  • 可以使用其他插件扩展代理,例如,RabbitMQ 管理前端和 HTTP API 作为插件实现。

这些功能使 AMQP 0-9-1 模型更加灵活,并适用于非常广泛的问题。

AMQP 0-9-1 客户端生态系统

许多 AMQP 0-9-1 客户端,适用于许多流行的编程语言和平台。其中一些客户端密切遵循 AMQP 术语,并且仅提供 AMQP 方法的实现。另一些客户端具有其他功能、便利方法和抽象。有些客户端是异步的(非阻塞),有些是同步的(阻塞),有些客户端同时支持这两种模型。有些客户端支持特定于供应商的扩展(例如,RabbitMQ 特定的扩展)。

由于 AMQP 的主要目标之一是互操作性,因此开发人员最好理解协议操作,而不要将自己限制在特定客户端库的术语中。这样,与使用不同库的开发人员进行交流将变得更加容易。

© . All rights reserved.