跳到主要内容
版本: 4.1

发布者

概述

本指南涵盖与发布者相关的各种主题

等等。

本指南侧重于 AMQP 0-9-1,并提及 RabbitMQ 支持的其他协议(AMQP 1.0、MQTT 和 STOMP)的关键协议特定差异。

术语

术语“发布者”在不同的上下文中意味着不同的事物。一般来说,在消息传递中,发布者(也称为“生产者”)是发布(生产)消息的应用程序(或应用程序实例)。同一个应用程序也可以消费消息,因此同时也是一个消费者

消息传递协议也有持久订阅消息传递的概念。订阅是通常用于描述此类实体的术语之一。消费者是另一个。RabbitMQ 支持的消息传递协议都使用这两个术语,但 RabbitMQ 文档倾向于首选后者。

基础知识

RabbitMQ 是一个消息代理。它接受来自发布者的消息,对其进行路由,如果存在要路由到的队列,则存储它们以供消费,或者立即传递给消费者(如果有)。

发布者发布到目标,目标因协议而异。

在 AMQP 0-9-1 中,发布者发布到交换机。在 AMQP 1.0 中,发布发生在链接上。在 MQTT 中,发布者发布到主题。最后,STOMP 支持多种目标类型:主题、队列、AMQP 0-9-1 交换机。

这在 协议特定差异 部分中更详细地介绍。

发布的消息必须路由到队列(主题等)。队列(主题)可能具有在线消费者。当消息成功路由到队列,并且有在线消费者可以接受更多传递时,消息将被发送给消费者。

尝试发布到不存在的队列(主题)将导致通道级异常,代码为 404 Not Found,并使尝试发布的通道关闭。

发布者生命周期

发布者通常是长期存在的:也就是说,在发布者的整个生命周期中,它会发布多条消息。打开连接或通道(会话)来发布单条消息不是最佳的。

发布者通常在应用程序启动期间打开其连接。它们的生命周期通常与它们的连接甚至应用程序的运行时间一样长。

发布者可以更加动态,并在响应系统事件时开始发布,并在不再需要时停止。这在使用通过 Web STOMPWeb MQTT 插件、移动客户端等使用的 WebSocket 客户端时很常见。

协议差异

在 RabbitMQ 支持的每个协议中,发布消息的过程都非常相似。所有四种协议都允许用户发布具有有效负载(正文)和一个或多个消息属性(标头)的消息。

所有四种协议也都支持发布者的确认机制,该机制允许发布应用程序跟踪已被代理成功接受或未被接受的消息,并继续发布下一批或重试发布当前批次。

差异通常更多地与使用的术语有关,而不是语义。消息属性也因协议而异。

AMQP 0-9-1

在 AMQP 0-9-1 中,发布发生在通道上,目标是交换机。交换机使用路由拓扑,该拓扑通过定义一个或多个队列与交换机之间的绑定,或者源交换机和目标交换机之间的绑定来设置。成功路由的消息存储在队列中。

每个实体的角色在AMQP 0-9-1 概念指南中介绍。

发布者确认是发布者确认机制。

有几种常见的发布者错误类型,它们使用不同的协议功能来处理

  • 发布到不存在的交换机会导致通道错误,从而关闭通道,以便不允许在其上进行进一步的发布(或任何其他操作)。
  • 当发布的消息无法路由到任何队列时(例如,因为没有为目标交换机定义绑定),并且发布者将 mandatory 消息属性设置为 false(这是默认设置),则该消息将被丢弃或重新发布到备用交换机(如果有)。
  • 当发布的消息无法路由到任何队列,并且发布者将 mandatory 消息属性设置为 true 时,该消息将返回给发布者。发布者必须设置返回消息处理程序才能处理返回(例如,通过记录错误或使用不同的交换机重试)

AMQP 1.0

在 AMQP 1.0 中,发布发生在链接的上下文中。

MQTT

在 MQTT 中,消息在连接上发布到主题。服务器端 MQTT 连接过程通过主题交换机将消息路由到队列

当发布者选择使用 QoS 1 时,发布的消息将由 RabbitMQ 使用 PUBACK 数据包确认。

发布者可以向服务器提供提示,表明在主题上发布的消息必须保留(存储以供将来传递给新的订阅者)。每个主题只能保留最新的已发布消息。

MQTT 5.0 PUBACK 数据包包含一个 原因代码,该代码告诉发布者发布是否成功。RabbitMQ 返回的原因代码包括

  • 0 - Success:消息路由到的所有队列都成功接受了该消息。
  • 16 - No matching subscribers:RabbitMQ 无法将消息路由到任何队列(因为没有定义到主题交换机的绑定)。
  • 131 - Implementation specific error:RabbitMQ 拒绝了消息(例如,当目标经典队列不可用时)。

在 MQTT 3.1 和 3.1.1 中,除了关闭连接之外,服务器没有其他机制可以将发布错误传达给客户端。

请参阅 MQTTMQTT-over-WebSockets 指南以了解更多信息。

STOMP

STOMP 客户端在连接上发布到一个或多个目标,这些目标在 RabbitMQ 的情况下可以具有不同的语义。

STOMP 提供了一种服务器向发布者传达消息处理错误的方式。其 发布者确认 的变体称为收据,这是客户端在发布时启用的功能。

请参阅 STOMP 指南STOMP-over-WebSocketsSTOMP 1.2 规范以了解更多信息。

路由

AMQP 0-9-1

AMQP 0-9-1 中的路由由交换机执行。交换机是命名的路由表。表条目称为绑定。这在AMQP 0-9-1 概念指南中更详细地介绍。

有几种内置的交换机类型

  • 主题
  • 扇出
  • 直接(包括默认交换机)
  • 标头

前三种类型在教程中通过示例介绍。

更多交换机类型可以通过插件提供。一致性哈希交换机随机路由交换机内部事件交换机延迟消息交换机是随 RabbitMQ 提供的交换机插件。与所有插件一样,它们必须先启用才能使用。

不可路由消息处理

客户端可能会尝试将消息发布到不存在的目标(交换机、主题、队列)。本节介绍不同协议在处理此类情况时的不同之处。

RabbitMQ 收集并公开了指标,这些指标可用于检测发布不可路由消息的发布者。

AMQP 0-9-1

当发布的消息无法路由到任何队列时(例如,因为没有为目标交换机定义绑定),并且发布者将 mandatory 消息属性设置为 false(这是默认设置),则该消息将被丢弃或重新发布到备用交换机(如果有)。

当发布的消息无法路由到任何队列时,并且发布者将 mandatory 消息属性设置为 true 时,该消息将返回给发布者。发布者必须设置返回消息处理程序才能处理返回(例如,通过记录错误或使用不同的交换机重试)。

备用交换机是 AMQP 0-9-1 交换机功能,允许客户端处理交换机无法路由的消息(即,要么是因为没有绑定的队列,要么是因为没有匹配的绑定)。典型的例子是检测客户端何时意外或恶意发布无法路由的消息,“否则”路由语义,其中一些消息被特殊处理,其余消息由通用处理程序处理。

MQTT

发布到新主题将为其设置一个队列。不同的主题/QoS 级别组合将使用具有不同属性的不同队列。因此,发布者和消费者必须使用相同的 QoS 级别。

STOMP

STOMP 支持多种不同的目标,包括那些假设预先存在的拓扑结构的目标。

  • /topic:发布到尚未有消费者的主题将导致消息被丢弃。主题上的第一个订阅者将为其声明一个队列。
  • /exchange:目标交换机必须存在,否则服务器将报告错误
  • /amq/queue:目标队列必须存在,否则服务器将报告错误
  • /queue:发布到不存在的队列将设置该队列
  • /temp-queue:发布到不存在的临时队列将设置该队列

指标

有一个用于不可路由的丢弃消息的指标

Unroutable message metrics

在上面的示例中,所有发布的消息都作为不可路由消息(和非强制性消息)被丢弃。

消息属性

AMQP 0-9-1

每次传递都结合了消息元数据和传递信息。不同的客户端库使用略有不同的方式来提供对这些属性的访问。通常,传递处理程序可以访问传递数据结构。

以下属性是传递和路由详细信息;它们本身不是消息属性,而是在路由和传递时由 RabbitMQ 设置的

属性类型描述
传递标签正整数

传递标识符,请参阅确认

已重新传递布尔值如果此消息先前已传递并重新排队,则设置为 true
交换机字符串路由此消息的交换机
路由键字符串发布者使用的路由键
消费者标签字符串消费者(订阅)标识符

以下是消息属性。其中大多数是可选的。它们由发布者在发布时设置

属性类型描述必需?
传递模式枚举(1 或 2)

2 表示“持久”,1 表示“瞬态”。一些客户端库将此属性公开为布尔值或枚举。

类型字符串应用程序特定的消息类型,例如“orders.created”
标头映射(字符串 => 任意)具有字符串标头名称的任意标头映射
内容类型字符串内容类型,例如“application/json”。由应用程序使用,而不是核心 RabbitMQ
内容编码字符串内容编码,例如“gzip”。由应用程序使用,而不是核心 RabbitMQ
消息 ID字符串任意消息 ID
关联 ID字符串帮助将请求与响应关联,请参阅教程 6
回复到字符串携带响应队列名称,请参阅教程 6
过期时间字符串每条消息的 TTL
时间戳时间戳应用程序提供的时间戳
用户 ID字符串用户 ID,如果设置则已验证
应用 ID字符串应用程序名称

消息类型

消息上的类型属性是一个任意字符串,可帮助应用程序沟通消息的类型。它由发布者在发布时设置。该值可以是发布者和消费者约定的任何特定于领域的字符串。

RabbitMQ 不验证或使用此字段,它存在供应用程序和插件使用和解释。

实践中,消息类型自然地分为几组,点分隔的命名约定很常见(但 RabbitMQ 或客户端没有要求),例如 orders.createdlogs.lineprofiles.image.changed

如果消费者收到未知类型的传递,强烈建议记录此类事件,以便于故障排除。

内容类型和编码

内容(MIME 媒体)类型和内容编码字段允许发布者沟通消费者应如何反序列化和解码消息负载。

RabbitMQ 不验证或使用这些字段,它存在供应用程序和插件使用和解释。

例如,具有 JSON 负载的消息 应使用 application/json。如果负载使用 LZ77 (GZip) 算法压缩,则其内容编码应为 gzip

可以通过用逗号分隔来指定多个编码。

发布者确认(Confirms)和数据安全

确保数据安全是应用程序、客户端库和 RabbitMQ 集群节点的共同责任。本节介绍许多与数据安全相关的主题。

网络可能会以不太明显的方式发生故障,并且检测某些故障需要时间。因此,已将其协议帧或一组帧(例如,已发布的消息)写入其套接字的客户端不能假定该消息已到达服务器并已成功处理。它可能在途中丢失,或者其传递可能会显着延迟。

为了解决这个问题,开发了发布者端确认机制。它模仿协议中已有的消费者确认机制

使用发布者确认的策略

发布者确认为应用程序开发人员提供了一种机制,用于跟踪哪些消息已被 RabbitMQ 成功接受。有几种常用的使用发布者确认的策略

  • 单独发布消息并使用流式确认(异步 API 元素:确认事件处理程序、future/promise 等)
  • 发布一批消息并等待所有未完成的确认
  • 单独发布消息并在继续发布之前等待其被确认。由于其对发布者吞吐量的强烈负面影响,因此强烈建议不要使用此选项

它们在吞吐量影响和易用性方面有所不同。

流式确认

大多数客户端库通常为开发人员提供一种处理从服务器到达的单个确认的方式。确认将异步到达。由于发布在 AMQP 0-9-1 中本质上也是异步的,因此此选项允许以非常少的开销进行安全发布。该算法通常类似于这样

  • 在通道上启用发布者确认
  • 对于每条发布的消息,添加一个映射条目,将当前序列号映射到该消息
  • 当收到肯定确认时,删除该条目
  • 当收到否定确认时,删除该条目并安排其消息重新发布(或其他合适的处理方式)

在 RabbitMQ Java 客户端中,确认处理程序通过 ConfirmCallbackConfirmListener 接口公开。必须将一个或多个侦听器添加到通道

批量发布

此策略涉及发布批量消息并等待整个批次得到确认。重试在批次上执行。

  • 在通道上启用发布者确认
  • 对于每个发布的批次消息,等待所有未完成的确认
  • 当所有确认都为肯定时,发布下一个批次
  • 如果存在否定确认或超时,则重新发布整个批次或仅重新发布相关消息

一些客户端为等待所有未完成的确认提供了便捷的 API 元素。例如,在 Java 客户端中,有 Channel#waitForConfirms(timeout)

由于此方法涉及等待确认,因此会对发布者吞吐量产生负面影响。批次越大,影响越小。

发布并等待

此策略可以被认为是反模式,并且主要为了完整性而记录。它涉及发布消息并立即等待未完成的确认到达。它可以被认为是上述策略的批量发布,其中批次大小等于 1。

这种方法将对吞吐量产生非常显着的负面影响,因此不建议使用。

从连接故障中恢复

客户端和 RabbitMQ 节点之间的网络连接可能会失败。应用程序如何处理此类故障直接影响整个系统的数据安全。

一些 RabbitMQ 客户端支持连接和拓扑(队列、交换机、绑定和消费者)的自动恢复:Java、.NET、Bunny 就是一些例子。

其他客户端不提供自动恢复作为一项功能,但确实提供了应用程序开发人员如何实现恢复的示例。

许多应用程序的自动恢复过程遵循以下步骤

  1. 重新连接到可访问的节点
  2. 恢复连接侦听器
  3. 重新打开通道
  4. 恢复通道侦听器
  5. 恢复通道 basic.qos 设置、发布者确认和事务设置

连接和通道恢复后,拓扑恢复可以开始。拓扑恢复包括以下操作,对每个通道执行

  1. 重新声明交换机(预定义的交换机除外)
  2. 重新声明队列
  3. 恢复所有绑定
  4. 恢复所有消费者

异常处理

发布者通常可以预期两种类型的异常

  • 由于写入失败或超时而导致的网络 I/O 异常
  • 确认传递超时

请注意,此处的“异常”是指一般意义上的错误;某些编程语言根本没有异常,因此那里的客户端将以不同的方式传达错误。本节中的讨论和建议应同样适用于大多数客户端库和编程语言。

前一种类型的异常可能会在写入期间立即发生,或者会延迟一段时间发生。这是因为某些类型的 I/O 故障(例如,高网络拥塞或数据包丢失率)可能需要时间才能检测到。在连接恢复后,发布可以继续,但如果连接因警报而被阻止,则所有进一步的尝试都将失败,直到警报清除为止。这在下面的资源警报的影响部分中更详细地介绍。

后一种类型的异常仅在应用程序开发人员提供超时时才会发生。给定应用程序的合理超时值由开发人员决定。它不应低于有效的心跳超时

资源警报的影响

当集群节点资源警报生效时,集群中所有尝试发布消息的连接都将被阻止,直到集群中的所有警报都清除为止。

当连接被阻止时,将不会读取、解析或处理此连接发送的更多数据。当连接解除阻止时,所有客户端流量处理将恢复。

兼容的 AMQP 0-9-1 客户端在被阻止和解除阻止时将收到通知

阻止连接上的写入将超时或因 I/O 写入异常而失败。

指标

指标收集和监控对于发布者与对于应用程序或组件中的任何其他应用程序或组件一样重要。当涉及到发布者时,RabbitMQ 收集的几个指标尤其值得关注

发布和确认速率在很大程度上是不言自明的。波动率如此重要,是因为它们有助于检测未最佳地使用连接或通道的应用程序,从而提供次优的发布速率并浪费资源。

不可路由的消息速率可以帮助检测发布无法路由到任何队列的消息的应用程序。例如,这可能表明配置错误。

客户端库也可以收集指标。RabbitMQ Java 客户端就是一个例子。这些指标可以提供对特定于应用程序的架构(例如,哪个发布组件发布不可路由的消息)的深入了解,而 RabbitMQ 节点无法推断出这些信息。

并发注意事项

并发主题都与客户端库实现细节有关,但可以提供一些通用建议。一般来说,应避免并认为在共享的“发布上下文”(AMQP 0-9-1 中的通道,STOMP 中的连接,AMQP 1.0 中的会话等等)上发布是不安全的。

这样做可能会导致在线路上错误地构建数据帧。这会导致连接关闭。

对于单个应用程序中少量并发发布者,每个发布者使用一个线程(或类似线程)是最佳解决方案。对于大量(例如,数百或数千个),请使用线程池。

暂时阻止发布

可以通过将内存高水位线设置为 0 来有效地阻止集群中的所有发布,从而使资源警报立即触发

rabbitmqctl set_vm_memory_high_watermark 0

发布者故障排除

本节介绍发布者的一些常见问题,以及如何识别和解决这些问题。分布式系统中的故障有多种形式,因此此列表绝非详尽无遗。

连接失败

像任何客户端一样,发布者必须首先成功连接并成功进行身份验证。

潜在的连接问题数量非常广泛,并且有一个专门的指南

身份验证和授权

像任何客户端一样,发布者可能无法进行身份验证,或者没有访问其目标虚拟主机或发布到目标交换机的权限。

RabbitMQ会将此类失败记录为错误。

请参阅身份验证授权的故障排除部分,在访问控制指南中。

连接抖动

有些应用程序为发布的每条消息打开一个新连接。这是非常低效的,也不是消息传递协议的设计用途。可以使用连接指标检测到这种情况。

尽可能首选长连接。

连接中断

网络连接可能会失败。一些客户端库支持自动连接和拓扑恢复,另一些客户端库则可以轻松地在应用程序代码中实现连接恢复。

当连接断开时,发布将不会通过,也不会在客户端内部排队(延迟)。此外,先前序列化并写入套接字的消息不能保证到达目标节点。因此,对于需要可靠发布和数据安全的发布者来说,至关重要的是使用发布者确认来跟踪RabbitMQ确认了哪些发布。未确认的消息应在一段时间后被视为未送达。如果对于应用程序来说是安全的,则可以重新发布这些消息。这在教程 7和本指南的数据安全部分中进行了介绍。

有关详细信息,请参阅从网络连接故障中恢复

路由问题

发布者可以成功连接、身份验证并被授予发布到交换机(主题、目标)的权限。但是,这些消息可能不会路由到任何队列或消费者。这可能是由于

  • 应用程序之间的配置不匹配,例如,发布者和消费者使用的主题不匹配
  • 发布者配置错误(交换机、主题、路由键不是它们应该是什么)
  • 对于 AMQP 0-9-1,目标交换机上缺少绑定
  • 资源警报正在生效:请参阅以下部分
  • 网络连接已失败,客户端未恢复:请参阅以上部分

检查拓扑和指标通常有助于快速缩小问题范围。例如,可以使用管理 UI中的单个交换机页面来确认是否存在入站消息活动(入口速率高于零)以及绑定是什么。

在以下示例中,交换机没有绑定,因此不会将任何消息路由到任何地方

An exchange without bindings

也可以使用rabbitmq-diagnostics列出绑定

# note that the implicit default exchange bindings won't
# be listed as of RabbitMQ 3.8
rabbitmq-diagnostics list_bindings --vhost "/"
=> Listing bindings for vhost /...

在上面的示例中,该命令没有产生任何结果。

从 RabbitMQ 3.8 开始,有一个新的指标用于无法路由的丢弃消息

Unroutable message metrics

在上面的示例中,所有发布的消息都作为无法路由(且非强制性)的消息被丢弃。请参阅本指南中的无法路由的消息处理部分。

集群范围和连接指标以及服务器日志将有助于发现正在生效的资源警报。

资源警报

当资源警报生效时,所有发布的连接都将被阻止,直到警报清除。客户端可以选择接收通知,当它们被阻止时。在资源警报指南中了解更多信息。

协议异常

对于某些协议,例如 AMQP 0-9-1 和 STOMP,发布者可能会遇到称为协议错误(异常)的情况。例如,发布到不存在的交换机或将交换机绑定到不存在的交换机将导致通道异常,并将导致通道关闭。在关闭的通道上无法发布。RabbitMQ 节点会记录发布者连接到的此类事件。失败的发布尝试也会导致客户端异常或返回错误,具体取决于所使用的客户端库。

在共享通道上并发发布

客户端库不支持在共享通道上并发发布。在并发注意事项部分中了解更多信息。

© . All rights reserved.