跳至主要内容
版本: 3.13

什么是流

RabbitMQ 流是一个持久化的复制数据结构,它可以完成与队列相同的功能:它们缓冲来自生产者的消息,这些消息被消费者读取。但是,流与队列在两个重要方面有所不同:消息的存储和消费方式。

流模拟消息的追加式日志,可以重复读取,直到它们过期。流始终是持久化的并且被复制。对这种流行为的更技术性的描述是“非破坏性消费者语义”。

为了从 RabbitMQ 流中读取消息,一个或多个消费者订阅它并根据需要多次读取相同的消息。

可以通过 RabbitMQ 客户端库或通过 专用的二进制协议 插件和关联的客户端来使用流中的数据。后一种选择 **强烈推荐**,因为它提供了对所有流特定功能的访问并提供了最佳的吞吐量(性能)。

现在,您可能在问以下问题

  • 那么流会取代队列吗?
  • 我应该停止使用队列吗?

为了回答这些问题,流的引入不是为了取代队列,而是为了补充队列。流为 RabbitMQ 的新用例打开了許多可能性,这些用例在 使用流的用例 中进行了描述。

以下信息详细介绍了流的使用以及流的管理和维护操作。

您还应该查看 流插件 信息以详细了解如何使用二进制 RabbitMQ 流协议使用流以及 流核心和流插件比较页面 以了解功能矩阵。

使用流的用例

流的开发最初是为了覆盖 4 个消息传递用例,这些用例是现有的队列类型无法提供或提供的有缺点的用例。

  1. 大型扇出

    当希望将同一消息传递给多个订阅者时,用户目前必须为每个消费者绑定一个专用队列。如果消费者数量很多,这将可能导致效率低下,尤其是在需要持久性和/或复制时。流将允许任意数量的消费者以非破坏性的方式从同一队列中消费相同的消息,从而避免了绑定多个队列的必要性。流消费者还可以从副本中读取,从而将读取负载分散到整个集群中。

  2. 重播(时间旅行)

    由于所有当前的 RabbitMQ 队列类型都具有破坏性消费行为,即消息在消费者完成处理后会从队列中删除,因此无法重新读取已消费的消息。流将允许消费者在日志中的任何位置附加并从该位置读取。

  3. 吞吐量性能

    没有持久性队列类型能够提供与任何现有基于日志的消息传递系统相媲美的吞吐量。流的设计以性能为主要目标。

  4. 大型积压

    大多数 RabbitMQ 队列被设计为收敛到空状态,并且为此进行了优化,因此当给定队列上存在数百万条消息时,性能会变差。流被设计为以高效的方式存储大量数据,并且内存开销最小。

如何使用 RabbitMQ 流

能够指定 可选队列和消费者参数 的 AMQP 0.9.1 客户端库将能够像常规 AMQP 0.9.1 队列一样使用流。

与队列一样,流也必须首先声明。

声明 RabbitMQ 流

要声明流,将 x-queue-type 队列参数设置为 stream(默认值为 classic)。该参数必须由客户端在声明时提供;它不能使用 策略 设置或更改。这是因为策略定义或适用的策略可以动态更改,但队列类型不能。它必须在声明时指定。

以下代码片段展示了如何使用 AMQP 0.9.1 Java 客户端 创建流

ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(
"my-stream",
true, // durable
false, false, // not exclusive, not auto-delete
Collections.singletonMap("x-queue-type", "stream")
);

声明一个 x-queue-type 参数设置为 stream 的队列将在每个配置的 RabbitMQ 节点上创建一个具有副本的流。流是仲裁系统,因此强烈建议使用大小不一致的集群。

流仍然是 AMQP 0.9.1 队列,因此它可以在创建后绑定到任何交换机,就像任何其他 RabbitMQ 队列一样。

如果使用 管理 UI 声明,则必须使用队列类型下拉菜单指定 stream 类型。

流支持其他 队列参数,这些参数也可以使用 策略 配置。

  • x-max-length-bytes

设置流的最大大小(以字节为单位)。请参见 保留。默认值:未设置。

  • x-max-age

设置流的最大年龄。请参见 保留。默认值:未设置。

  • x-stream-max-segment-size-bytes

单位:字节。

流被划分为磁盘上的固定大小的段文件。此设置控制这些文件的大小。默认值:(500000000 字节)。

虽然可以通过策略配置此参数,但它 *只* 会在策略在流声明时设置(存在)时应用于该流。如果对匹配但预先存在的流更改此参数,即使队列记录的有效策略可能表明它已更改,它 **也不会被更改**。

因此,最好只通过选项队列参数 x-stream-filter-size-bytes 来配置它。

注意

虽然可以通过策略配置 x-stream-filter-size-bytes,但它 *只* 会在策略在流声明时存在时应用于该流。

以下 Java 示例演示了如何在应用程序代码中在流声明时设置该参数

Map<String, Object> arguments = new HashMap<>();
arguments.put("x-queue-type", "stream");
arguments.put("x-max-length-bytes", 20_000_000_000); // maximum stream size: 20 GB
arguments.put("x-stream-max-segment-size-bytes", 100_000_000); // size of segment files: 100 MB
channel.queueDeclare(
"my-stream",
true, // durable
false, false, // not exclusive, not auto-delete
arguments
);

该值以字节为单位设置。

用于 过滤 的布隆过滤器的大小。该值必须介于 16 和 255 之间。默认值:16。

客户端操作

消费

由于流永远不会删除任何消息,因此任何消费者都可以从日志中的任何位置开始读取/消费。这由 x-stream-offset 消费者参数控制。如果未指定,消费者将从消费者启动后写入日志的下一个偏移量开始读取。支持以下值

  • first - 从日志中第一个可用消息开始
  • last - 这将从最后写入的“块”消息开始读取 *(块是流中使用的存储和传输单元,简单地说,它是一批由几千条到几千条消息组成的消息,具体取决于输入)*
  • next - 与不指定任何偏移量相同
  • 偏移量 - 一个数值,指定要附加到日志的特定偏移量。如果此偏移量不存在,它将分别钳制到日志的开头或结尾。
  • 时间戳 - 一个时间戳值,指定要附加到日志的时间点。它将钳制到最接近的偏移量,如果时间戳超出流的范围,它将分别钳制到日志的开头或结尾。使用 AMQP 0.9.1,使用的时间戳是 POSIX 时间,精度为一秒,即自 1970-01-01 00:00:00 UTC 以来经过的秒数。请注意,消费者可能会收到在指定时间戳之前发布的消息。
  • 间隔 - 一个字符串值,指定相对于当前时间的要附加到日志的时间间隔。使用与 x-max-age 相同的规范(请参见 保留

以下代码片段展示了如何使用 first 偏移量规范

channel.basicQos(100); // QoS must be specified
channel.basicConsume(
"my-stream",
false,
Collections.singletonMap("x-stream-offset", "first"), // "first" offset specification
(consumerTag, message) -> {
// message processing
// ...
channel.basicAck(message.getEnvelope().getDeliveryTag(), false); // ack is required
},
consumerTag -> { });

以下代码片段展示了如何指定特定偏移量以从该偏移量开始消费

channel.basicQos(100); // QoS must be specified
channel.basicConsume(
"my-stream",
false,
Collections.singletonMap("x-stream-offset", 5000), // offset value
(consumerTag, message) -> {
// message processing
// ...
channel.basicAck(message.getEnvelope().getDeliveryTag(), false); // ack is required
},
consumerTag -> { });

以下代码片段展示了如何指定特定时间戳以从该时间戳开始消费

// an hour ago
Date timestamp = new Date(System.currentTimeMillis() - 60 * 60 * 1_000)
channel.basicQos(100); // QoS must be specified
channel.basicConsume(
"my-stream",
false,
Collections.singletonMap("x-stream-offset", timestamp), // timestamp offset
(consumerTag, message) -> {
// message processing
// ...
channel.basicAck(message.getEnvelope().getDeliveryTag(), false); // ack is required
},
consumerTag -> { });

其他流操作

以下操作可以像经典队列和仲裁队列一样使用,但一些操作具有一些特定于队列的行为。

流的单个活动消费者功能

流的单个活动消费者是在 RabbitMQ 3.11 及更高版本中提供的功能。它在流上提供了 *独占消费* 和 *消费连续性*。当多个共享相同流和名称的消费者实例启用单个活动消费者时,这些实例中只有一个会在任何时间点处于活动状态,因此会接收消息。其他实例将处于空闲状态。

单个活动消费者功能提供了 2 个好处

  • 消息按顺序处理:一次只有一个消费者。
  • 消费连续性得到维护:如果活动消费者停止或崩溃,组中的消费者将接管。

一篇 博客文章 提供了有关流的单个活动消费者的更多详细信息。

超级流

超级流是一种通过将大型流划分为较小的流来进行横向扩展的方法。它们与单个活动消费者集成,以保留分区内的消息顺序。超级流从 RabbitMQ 3.11 版本开始可用。

超级流是由单个常规流组成的逻辑流。它是一种使用 RabbitMQ 流进行发布和消费的扩展方法:一个大型逻辑流被划分为分区流,从而将存储和流量分散到多个集群节点上。

超级流仍然是一个逻辑实体:应用程序通过客户端库的智能功能将其视为一个“大型”流。超级流的拓扑结构基于AMQP 0.9.1 模型,即交换机、队列和它们之间的绑定。

可以使用任何 AMQP 0.9.1 库或管理插件来创建超级流的拓扑结构,这需要创建一个直接交换机、“分区”流,并将它们绑定在一起。不过,使用rabbitmq-streams add_super_stream命令可能更容易。以下是如何使用它来创建一个包含 3 个分区的invoices超级流

rabbitmq-streams add_super_stream invoices --partitions 3

使用rabbitmq-streams add_super_stream --help了解更多关于该命令的信息。

与单个流相比,超级流增加了复杂性,因此不应将其视为涉及流的所有用例的默认解决方案。仅当您确定已达到单个流的限制时,才考虑使用超级流。

一篇博客文章概述了超级流。

过滤

RabbitMQ 流提供了一个服务器端过滤功能,该功能避免了读取流中的所有消息,仅在客户端进行过滤。当消费应用程序只需要消息子集(例如,来自特定地理区域的消息)时,这有助于节省网络带宽。

流过滤支持流协议、AMQP 0.9.1 和STOMP。示例将使用 AMQP 0.9.1。

为了使过滤功能生效,消息必须使用关联的过滤值进行发布。该值由x-stream-filter-value头指定。

channel.basicPublish(
"", // default exchange
"my-stream",
new AMQP.BasicProperties.Builder()
.headers(Collections.singletonMap(
"x-stream-filter-value", "california" // set filter value
))
.build(),
body
);

如果消费者希望仅接收特定过滤值的消息,则必须使用x-stream-filter参数。

channel.basicQos(100); // QoS must be specified
channel.basicConsume(
"my-stream",
false,
Collections.singletonMap("x-stream-filter", "california"), // set filter
(consumerTag, message) -> {
Map<String, Object> headers = message.getProperties().getHeaders();
// there must be some client-side filter logic
if ("california".equals(headers.get("x-stream-filter-value"))) {
// message processing
// ...
}
channel.basicAck(message.getEnvelope().getDeliveryTag(), false); // ack is required
},
consumerTag -> { });

如上例所示,也必须有一些客户端过滤逻辑,因为服务器端过滤是概率性的:不匹配过滤值的消息仍然可以发送到消费者。服务器使用布隆过滤器,这是一种空间效率高的概率数据结构,可能会出现误报。尽管如此,过滤仍然节省了一些带宽,这是它的主要目标。

关于过滤的更多说明

  • 可以在同一流中发布带有和不带有过滤值的消息。
  • 当消费者设置过滤器时,不带有过滤值的消息不会发送。将x-stream-match-unfiltered参数设置为true以更改此行为,并接收未过滤的消息。
  • x-stream-filter消费者参数接受字符串,也接受字符串数组,以接收具有不同过滤值的消息。

一篇第一篇博客文章概述了流过滤,而第二篇博客文章介绍了内部机制。

功能比较:常规队列与流

流并非真正意义上的传统队列,因此与 AMQP 0.9.1 队列语义并不十分一致。许多其他队列类型支持的功能不受支持,并且由于队列类型的性质,永远不会支持。

只要使用消费者确认,任何可以使用常规队列的 AMQP 0.9.1 客户端库都可以使用流。

由于流的非破坏性读取语义,许多功能永远不会得到支持。

功能矩阵

功能经典
非持久队列
排他性
每条消息持久化每条消息总是
成员资格更改自动手动
TTL否(但请参阅保留
队列长度限制否(但请参阅保留
延迟行为固有
消息优先级
消费者优先级
死信交换机
符合策略(请参阅保留
响应内存警报否(使用最小 RAM)
中毒消息处理
全局QoS 预取

非持久队列

流始终是持久性的,因为它们假设的用例,它们不能像常规队列那样非持久性

排他性

流始终是持久性的,因为它们假设的用例,它们不能像常规队列那样排他性。它们不适合用作临时队列

延迟模式

流将所有数据直接存储在磁盘上,写入消息后,它不会使用任何内存,直到消息被读取。流本质上是延迟的。

全局 QoS

流不支持全局QoS 预取,其中通道为使用该通道的所有消费者设置单个预取限制。如果尝试从具有全局 QoS 启用的通道的流中消费,将返回通道错误。

使用每个消费者的 QoS 预取,这是许多流行客户端的默认设置。

数据保留

流实现为一个不可变的追加式磁盘日志。这意味着日志将无限期增长,直到磁盘空间用完。为了避免这种不希望出现的情况,可以为每个流设置保留配置,该配置将根据日志总数据大小和/或时间戳丢弃日志中最旧的数据。

有两个参数控制流的保留。它们可以组合使用。这些参数可以在声明时使用队列参数设置,也可以作为可以动态更新的策略设置。

  • max-age:

    有效单位:Y、M、D、h、m、s

    例如:7D 代表一周

  • max-length-bytes:

    最大总大小(以字节为单位)

注意:保留是按段进行评估的,因此还有一个生效的参数,即流的段大小。只要段中至少包含一条消息,流将始终保留至少一个段。当使用代理提供的偏移量跟踪时,每个消费者的偏移量会作为非消息数据持久化到流本身中。

性能特征

由于流在执行任何操作之前将所有数据持久化到磁盘,因此建议使用最快的磁盘。

由于流的磁盘 I/O 密集型性质,它们的吞吐量会随着消息大小的增加而下降。

与仲裁队列一样,流也会受到集群大小的影响。流拥有的副本越多,它的吞吐量通常越低,因为复制数据并达成共识需要进行更多工作。

控制初始副本因子

x-initial-cluster-size队列参数控制初始流集群应跨越多少个 RabbitMQ 节点。

管理流副本

流的副本由操作员显式管理。当将新节点添加到集群时,它不会托管任何流副本,除非操作员将其显式添加到流的副本集中。

当必须停用节点(从集群中永久删除)时,必须将其从当前托管副本的所有流的副本列表中显式删除。

提供两个CLI 命令来执行上述操作,即rabbitmq-streams add_replicarabbitmq-streams delete_replica

rabbitmq-streams add_replica [-p <vhost>] <stream-name> <node>
rabbitmq-streams delete_replica [-p <vhost>] <stream-name> <node>

为了成功添加和删除副本,流协调器必须在集群中可用。

需要注意的是,在执行涉及成员资格更改的维护操作时,不要意外地使流不可用,因为失去了仲裁。

由于流的成员资格没有嵌入到流本身中,因此在当前情况下,添加副本无法完全保证安全。因此,如果任何时候存在不同步的副本,则无法添加另一个副本,并将返回错误。

替换集群节点时,更安全的做法是首先添加一个新节点,等待它同步,然后停用它所替换的节点。

可以使用以下命令查询流的复制状态

rabbitmq-streams stream_status [-p <vhost>] <stream-name>

此外,可以使用以下命令重启流

rabbitmq-streams restart_stream [-p <vhost>] <stream-name>

流行为

每个流都有一个主写入器(领导者)和零个或多个副本。

领导者选举和故障处理

声明新流时,将随机选择托管其副本的节点集,但始终包括声明流的客户端连接到的节点。

哪个副本成为初始领导者,可以通过三种方式控制,即使用x-queue-leader-locator可选队列参数,设置queue-leader-locator策略键,或者在配置文件中定义queue_leader_locator键。以下是可能的值

  • client-local:选择声明流的客户端连接到的节点。这是默认值。
  • balanced:如果总共有不到 1000 个队列(经典队列、仲裁队列和流),则选择托管最少数量的流领导者的节点。如果总共有超过 1000 个队列,则随机选择一个节点。

流需要声明节点的仲裁才能正常运行。当托管流领导者的 RabbitMQ 节点出现故障或停止时,托管该流的副本之一的另一个节点将被选为领导者,并恢复操作。

出现故障并重新加入的副本将与领导者重新同步(“追赶”)。与仲裁队列类似,副本的临时故障不需要从当前选出的领导者那里进行完全重新同步。如果重新加入的副本落后于领导者,则只需传输增量部分。这种“追赶”过程不会影响领导者的可用性。

副本必须显式添加。当添加新副本时,它将从领导者那里同步整个流状态,类似于新添加的仲裁队列副本。

容错和最小在线副本数量

共识系统可以提供关于数据安全性的某些保证。这些保证确实意味着在它们变得相关之前需要满足某些条件,例如需要至少三个集群节点才能提供容错,并且需要超过一半的成员可用才能工作。

各种规模集群的容错特性可以用表格描述

集群节点数量容忍的节点故障数量容忍网络分区
10不适用
20
31
41如果其中一侧存在多数则适用
52
62如果其中一侧存在多数则适用
73
83如果其中一侧存在多数则适用
94

使用 Streams 时的数据安全性

Streams 在多个节点上复制数据,并且只有在数据已复制到一组 Stream 副本时才会发出发布者确认。

Streams 始终将数据存储在磁盘上,但是它们不会显式地将数据从操作系统页面缓存刷新(fsync)到底层存储介质,而是依赖操作系统根据需要进行操作。这意味着服务器的非正常关闭可能会导致在该节点上托管的副本数据丢失。尽管从理论上讲,这会导致确认数据丢失的可能性,但在正常运行期间发生这种情况的可能性很小,并且单个节点上的数据丢失通常会从系统中的其他节点重新复制。

如果需要更高的数据安全性,请考虑使用 quorum 队列,因为只有在至少一组节点都写入将数据刷新到磁盘后才会发出发布者确认。

对于没有使用发布者确认机制确认的消息,不提供任何保证。此类消息可能会在操作系统缓冲区中“中途”丢失或无法到达流领导者。

流可用性

流应该能够容忍少数流副本变得不可用,而不会对可用性产生影响或影响很小。

请注意,根据所使用的分区处理策略,RabbitMQ 可能会在恢复期间自行重启并重置节点,但只要不发生这种情况,此可用性保证应该成立。

例如,具有三个副本的流可以容忍一个节点故障而不会丢失可用性。具有五个副本的流可以容忍两个节点故障,依此类推。

如果一组节点无法恢复(例如,如果 3 个 RabbitMQ 节点中的 2 个永久丢失),则队列将永久不可用,并且很可能需要操作员干预才能恢复。

配置 Streams

有关流协议端口、TLS 和其他配置,请参阅Stream 插件指南。有关所需的流复制端口,请参阅网络指南.

Streams 如何使用资源

Streams 通常比 quorum 队列具有更低的 CPU 和内存占用量。

所有数据都存储在磁盘上,只有未写入的数据存储在内存中。

使用 Streams 时进行偏移量跟踪

使用代理提供的偏移量跟踪功能时(目前仅在使用Stream 插件时可用),偏移量会作为非消息数据持久化到流本身。这意味着,当请求偏移量持久化时,流在磁盘上的大小会因每次偏移量持久化请求而增加少量。

限制

消息编码

Streams 在内部将消息存储为 AMQP 1.0 编码的数据。这意味着在使用 AMQP 0.9.1 发布时会进行转换。尽管 AMQP 1.0 数据模型在很大程度上能够包含 AMQP 0.9.1 的所有数据模型,但有一些限制。如果 AMQP 0.9.1 消息包含具有复杂值(如数组或表格)的标头条目,则这些标头将不会被转换。这是因为标头存储为 AMQP 1.0 消息内的应用程序属性,而这些属性只能包含简单类型的值,如字符串和数字。

UI 指标准确性

管理 UI 显示的消息数量可能会略微超过流中实际的数量。由于流存储的实现方式,偏移量跟踪信息也被计算为消息,这使得消息数量人为地大于实际数量。这在大多数系统中应该不会造成实际影响。