Streams 和 Super Streams(分区流)
什么是 Stream
RabbitMQ Streams 是一种持久化的复制数据结构,可以完成与队列相同的任务:它们缓冲来自生产者并由消费者读取的消息。然而,Streams 在两个重要方面与队列不同:消息的存储方式和消费方式。
Streams 模型是一个仅追加日志,可以反复读取直到过期。Streams 始终是持久化的并且是复制的。对这种流行为更技术性的描述是“非破坏性消费者语义”。
要从 RabbitMQ 中的流读取消息,一个或多个消费者会订阅它并根据需要多次读取相同消息。
可以通过 RabbitMQ 客户端库或通过一个专用的二进制协议插件和相关的客户端来使用流中的数据。后一种选择被强烈推荐,因为它提供了对所有流特定功能的访问,并提供了最佳的吞吐量(性能)。
一个配套指南Stream 客户端连接,解释了流协议客户端应如何连接到集群节点以获得最佳数据局部性和效率(吞吐量、延迟)。
除了 Streams,RabbitMQ 还支持称为Super Streams的分区流。本指南稍后将更详细地介绍它们。
现在,您可能想问以下问题
- 那么 Streams 会取代队列吗?
- 我应该停止使用队列吗?
为了回答这些问题,引入 Streams 并不是为了取代队列,而是为了补充它们。Streams 为新的 RabbitMQ 用例提供了许多机会,这些用例在使用 Streams 的用例中进行了描述。
以下信息详细介绍了 Streams 的使用,以及 Streams 的管理和维护操作。
您还应该查看Stream 插件信息,以了解更多关于使用二进制 RabbitMQ Stream 协议的 Streams 的信息,以及RabbitMQ 核心与 Stream 插件比较页面的功能矩阵。
使用 Streams 的用例
Streams 的开发最初是为了涵盖现有队列类型无法提供或提供有缺点(upsides)的 4 种消息传递用例
-
大规模扇出
当希望将同一条消息传递给多个订阅者时,用户目前必须为每个消费者绑定一个专用的队列。如果消费者的数量很大,这可能会变得效率低下,尤其是在需要持久化和/或复制时。Streams 将允许任意数量的消费者以非破坏性的方式从同一个队列中消费相同的消息,从而无需绑定多个队列。Stream 消费者还可以从副本读取,从而将读取负载分散到整个集群。
-
重放(时光倒流)
由于所有当前的 RabbitMQ 队列类型都具有破坏性消费行为,即消息在消费者完成处理后从队列中删除,因此无法重新读取已消费的消息。Streams 将允许消费者连接到日志中的任何点并从中读取。
-
吞吐量性能
没有持久化队列类型能够提供与任何现有基于日志的消息系统相媲美的吞吐量。Streams 的设计将性能作为主要目标。
-
大量积压
大多数 RabbitMQ 队列的设计目标是收敛到空状态,并以此进行优化,当给定队列中有数百万条消息时,性能可能会变差。Streams 被设计为高效地存储大量数据,并且内存开销最小。
如何使用 RabbitMQ Streams
能够指定可选队列和消费者参数的 AMQP 0.9.1 客户端库将能够像常规 AMQP 0.9.1 队列一样使用 Streams。
与队列一样,Streams 也必须先声明。
声明 RabbitMQ Stream
要声明一个流,请将 x-queue-type 队列参数设置为 stream(默认为 classic)。此参数必须由客户端在声明时提供;不能使用策略来设置或更改它。这是因为策略定义或适用的策略可以动态更改,但队列类型不能。它必须在声明时指定。
以下代码片段展示了如何使用AMQP 0.9.1 Java 客户端创建流。
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(
"my-stream",
true, // durable
false, false, // not exclusive, not auto-delete
Collections.singletonMap("x-queue-type", "stream")
);
声明一个 x-queue-type 参数设置为 stream 的队列将创建一个在每个配置的 RabbitMQ 节点上都具有副本的流。Streams 是法定系统,因此强烈建议使用非均匀的集群大小。
Stream 仍然是 AMQP 0.9.1 队列,因此在创建后,它可以像任何其他 RabbitMQ 队列一样绑定到任何交换机。
如果使用管理 UI声明,则必须使用队列类型下拉菜单指定 stream 类型。
Streams 支持额外的队列参数,这些参数也可以使用策略进行配置。
x-max-length-bytes
设置流的最大字节大小。请参阅保留。默认值:未设置。
x-max-age
设置流的最大年龄。请参阅保留。默认值:未设置。
x-stream-max-segment-size-bytes
流被分成磁盘上的固定大小的段文件。此设置控制这些段的大小(以字节为单位)。默认值:500000000 字节。
x-stream-filter-size-bytes
用于过滤的 Bloom 过滤器的字节大小。该值必须介于 16 和 255 之间。默认值:16 字节。
虽然 x-stream-max-segment-size-bytes 和 x-stream-filter-size-bytes 参数可以通过策略进行配置,但如果策略在流声明时被设置(存在),它们只会应用于流。如果更改了匹配但预先存在的流的这些参数,它们将不会被更改,即使队列记录的有效策略可能表明它已被更改。
因此,最好仅通过队列参数进行配置。
下面的 Java 示例演示了如何在应用程序代码中在流声明时设置该参数。
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-queue-type", "stream");
// maximum stream size: 20 GB
arguments.put("x-max-length-bytes", 20_000_000_000);
// size of segment files: 100 MB
arguments.put("x-stream-max-segment-size-bytes", 100_000_000);
// size of stream bloom filter: 32
arguments.put("x-stream-filter-size-bytes", 32);
channel.queueDeclare(
"my-stream",
true, // durable
false, false, // not exclusive, not auto-delete
arguments
);
客户端操作
消费
由于 Streams 永远不会删除任何消息,因此任何消费者都可以从日志中的任何点开始读取/消费。这由 x-stream-offset 消费者参数控制。如果未指定,消费者将从消费者启动后写入日志的下一个偏移量开始读取。支持以下值:
first- 从日志中的第一个可用消息开始last- 这将从最后写入的“块”消息开始读取(块是 Streams 中使用的存储和传输单元,简单地说,它是一批消息,由几千条消息组成,具体取决于入口)next- 与不指定任何偏移量相同- Offset - 指定精确偏移量的数值,用于附加到日志。如果此偏移量不存在,则会相应地钳制到日志的开头或结尾。
- Timestamp - 指定附加到日志的时间点的时间戳值。它将钳制到最近的偏移量,如果时间戳超出流的范围,它将分别钳制到日志的开头或结尾。使用 AMQP 0.9.1 时,使用的时间戳是 POSIX 时间,精度为一秒,即自 1970 年 1 月 1 日 UTC 00:00:00 以来的秒数。请注意,消费者可能会收到在指定时间戳之前发布的几条消息。
- Interval - 指定相对于当前时间附加到日志的时间间隔的字符串值。使用与
x-max-age相同的规范(请参阅保留)。
以下代码片段展示了如何使用 first 偏移量规范。
channel.basicQos(100); // QoS must be specified
channel.basicConsume(
"my-stream",
false,
Collections.singletonMap("x-stream-offset", "first"), // "first" offset specification
(consumerTag, message) -> {
// message processing
// ...
channel.basicAck(message.getEnvelope().getDeliveryTag(), false); // ack is required
},
consumerTag -> { });
以下代码片段展示了如何指定要从中消费的特定偏移量。
channel.basicQos(100); // QoS must be specified
channel.basicConsume(
"my-stream",
false,
Collections.singletonMap("x-stream-offset", 5000), // offset value
(consumerTag, message) -> {
// message processing
// ...
channel.basicAck(message.getEnvelope().getDeliveryTag(), false); // ack is required
},
consumerTag -> { });
以下代码片段展示了如何指定要从中消费的特定时间戳。
// an hour ago
Date timestamp = new Date(System.currentTimeMillis() - 60 * 60 * 1_000)
channel.basicQos(100); // QoS must be specified
channel.basicConsume(
"my-stream",
false,
Collections.singletonMap("x-stream-offset", timestamp), // timestamp offset
(consumerTag, message) -> {
// message processing
// ...
channel.basicAck(message.getEnvelope().getDeliveryTag(), false); // ack is required
},
consumerTag -> { });
其他 Stream 操作
以下操作可以以与经典队列和仲裁队列类似的方式使用,但其中一些具有特定的队列行为。
Streams 的单活跃消费者功能
Streams 的单活跃消费者功能是在 RabbitMQ 3.11 及更高版本中提供的。它在流上提供独占消费和消费连续性。当多个共享同一流和名称的消费者实例启用单活跃消费者时,一次只有一个实例处于活动状态并接收消息。其他实例将处于空闲状态。
单活跃消费者功能提供了 2 个好处
- 消息按顺序处理:一次只有一个消费者。
- 保持消费连续性:如果活动消费者停止或崩溃,组中的一个消费者将接管。
一篇博客文章提供了有关 Streams 单活跃消费者的更多详细信息。
Super Streams
Super Streams 是一种通过将大型流分区为更小的流来扩展的方法。它们与单活跃消费者集成,以在分区内保持消息顺序。Super Streams 从 RabbitMQ 3.11 开始可用。
Super Stream 是由单个常规流组成的逻辑流。它是通过 RabbitMQ Streams 扩展发布和消费的方法:一个大型逻辑流被分成分区流,将存储和流量分散到多个集群节点上。
Super Stream 仍然是一个逻辑实体:由于客户端库的智能性,应用程序将其视为一个“大型”流。Super Stream 的拓扑基于AMQP 0.9.1 模型,即交换机、队列以及它们之间的绑定。
可以使用任何 AMQP 0.9.1 库或管理插件来创建 Super Stream 的拓扑,这需要创建直接交换机、“分区”流并将它们绑定在一起。然而,使用 rabbitmq-streams add_super_stream 命令可能会更容易。以下是如何使用它来创建一个具有 3 个分区的 invoices Super Stream。
rabbitmq-streams add_super_stream invoices --partitions 3
使用 rabbitmq-streams add_super_stream --help 来了解有关该命令的更多信息。
与单个流相比,Super Streams 增加了复杂性,因此它们不应被视为涉及流的所有用例的默认解决方案。仅当您确定已达到单个流的限制时,才考虑使用 Super Streams。
一篇博客文章概述了 Super Streams。
功能比较:常规队列与 Streams
Streams 在传统意义上并不是真正的队列,因此与 AMQP 0.9.1 队列语义的对齐程度不高。由于队列类型的性质,许多其他队列类型支持的功能不支持,并且永远不会支持。
可以使用常规队列的 AMQP 0.9.1 客户端库只要使用消费者确认,就可以使用 Streams。
由于其非破坏性读取语义,许多功能将永远不会被 Streams 支持。
功能矩阵
| 功能 | 经典 | Stream |
|---|---|---|
| 非持久化队列 | 是 | 否 |
| 独占性 | 是 | 否 |
| 每条消息持久化 | 每条消息 | 始终 |
| 成员资格变更 | 否 | 手动 |
| TTL | 是 | 否(但请参阅保留) |
| 队列长度限制 | 是 | 否(但请参阅保留) |
| 将消息保留在内存中 | 参见经典队列 | 从不 |
| 消息优先级 | 是 | 否 |
| 消费者优先级 | 是 | 否 |
| 死信交换机 | 是 | 否 |
| 遵循策略 | 是 | 是(参见保留) |
| 响应内存警报 | 是 | 否(使用最少的 RAM) |
| 有毒消息处理 | 否 | 否 |
非持久化队列
Streams 始终是持久化的,这与其假定的用例一致,它们不能像常规队列那样非持久化。
独占性
Streams 始终是持久化的,这与其假定的用例一致,它们不能像常规队列那样独占。它们不应被用作临时队列。
全局 QoS
Streams 不支持全局QoS 预取,即通道为使用该通道的所有消费者设置单个预取限制。如果尝试使用全局 QoS 启用的通道从流中消耗,将返回通道错误。
使用每个消费者的 QoS 预取,这在几个流行的客户端中是默认设置。
数据保留
Streams 实现为不可变的仅追加磁盘日志。这意味着日志将无限期地增长,直到磁盘耗尽。为避免这种不希望出现的情况,可以为每个流设置保留配置,该配置将根据总日志数据大小和/或年龄丢弃日志中最旧的数据。
有两个参数控制流的保留。这些可以组合使用。它们可以在声明时使用队列参数设置,也可以作为策略设置,策略可以动态更新。策略优先于队列参数。
-
max-age:有效单位:Y、M、D、h、m、s
例如
7D表示一周 -
max-length-bytes:最大总字节大小
注意:保留是按段评估的,因此还有一个参数会生效,那就是流的段大小。只要段包含至少一条消息,流将始终保留至少一个段。在使用代理提供的偏移量跟踪时,每个消费者的偏移量作为非消息数据持久化在流本身中。
性能特征
由于 Streams 在执行任何操作之前都会将所有数据持久化到磁盘,因此建议使用尽可能快的磁盘。
由于 Streams 的磁盘 I/O 密集型特性,其吞吐量会随着消息大小的增加而降低。
与仲裁队列类似,Streams 也受集群大小的影响。流的副本越多,其吞吐量通常就越低,因为复制数据和达成共识需要做更多的工作。
控制初始复制因子
x-initial-cluster-size 队列参数控制初始流集群应跨越多少个 RabbitMQ 节点。
管理 Stream 副本
流的副本由操作员显式管理。当集群中添加新节点时,它将不托管任何流副本,除非操作员显式将其添加到流的副本集中。
当节点需要退役(永久从集群中移除)时,必须将其从其当前托管副本的所有流的副本列表中显式移除。
提供了两个CLI 命令来执行上述操作:rabbitmq-streams add_replica 和 rabbitmq-streams delete_replica。
rabbitmq-streams add_replica [-p <vhost>] <stream-name> <node>
rabbitmq-streams delete_replica [-p <vhost>] <stream-name> <node>
要成功添加和删除副本,流协调器必须在集群中可用。
在执行涉及成员更改的维护操作时,需要小心,以免因丢失仲裁而意外导致流不可用。
由于流成员资格未嵌入流本身,因此目前无法完全安全地添加副本。因此,如果任何时候存在不同步的副本,则无法添加另一个副本,并将返回错误。
替换集群节点时,更安全的方法是首先添加一个新节点,等待其同步,然后退役要替换的节点。
可以使用以下命令查询流的复制状态。
rabbitmq-streams stream_status [-p <vhost>] <stream-name>
此外,还可以使用以下命令重启流。
rabbitmq-streams restart_stream [-p <vhost>] <stream-name>
Stream 行为
每个流都有一个主写入者(leader)和零个或多个副本。
Leader 选举和故障处理
当声明一个新流时,将随机选择托管其副本的节点集,但始终包含客户端连接到的声明流的节点。
哪个副本成为初始 leader 由三种方式控制,即使用可选队列参数 x-queue-leader-locator,设置 queue-leader-locator 策略键,或在配置文件中定义 queue_leader_locator 键。以下是可能的值:
client-local:选择客户端连接到的声明流的节点。这是默认值。balanced:如果总共有少于 1000 个队列(经典队列、仲裁队列和流),则选择托管最少流 leader 的节点。如果总共有超过 1000 个队列,则选择随机节点。
流需要声明的节点组成仲裁才能运行。当托管流leader的 RabbitMQ 节点发生故障或停止时,托管该流副本的另一个节点将被选为 leader 并恢复操作。
失败并重新加入的副本将与 leader 同步(“赶上”)。与仲裁队列类似,临时副本故障不需要从当前选定的 leader 进行完全同步。如果重新加入的副本落后于 leader,则仅传输增量。这个“赶上”过程不会影响 leader 的可用性。
副本必须显式添加。当添加新副本时,它将从 leader 同步整个流状态,这与新添加的仲裁队列副本类似。
容错和在线副本的最小数量
共识系统可以在数据安全方面提供某些保证。这些保证意味着在它们变得相关之前需要满足某些条件,例如需要至少三个集群节点来提供容错,并且需要超过一半的成员可用才能正常工作。
可以根据表格描述各种大小集群的故障容忍特性。
| 集群节点数 | 可容忍的节点故障数 | 可容忍网络分区 |
|---|---|---|
| 1 | 0 | 不适用 |
| 2 | 0 | 否 |
| 3 | 1 | 是 |
| 4 | 1 | 是,如果一侧存在多数 |
| 5 | 2 | 是 |
| 6 | 2 | 是,如果一侧存在多数 |
| 7 | 3 | 是 |
| 8 | 3 | 是,如果一侧存在多数 |
| 9 | 4 | 是 |
使用 Streams 时的数据安全
Streams 将数据复制到多个节点,并且只有在数据已复制到流副本的仲裁后才会发出发布者确认。
Streams 始终将数据存储在磁盘上,但是,它们不会显式地将数据从操作系统页面缓存刷新(fsync)到底层存储介质,而是依赖操作系统在需要时执行此操作。这意味着服务器的非正常关闭可能会导致托管在该节点上的副本丢失数据。尽管理论上这打开了确认数据丢失的可能性,但在正常操作期间发生这种情况的几率非常小,并且单个节点上的数据丢失通常只会从系统中的其他节点重新复制。
如果需要更多数据安全,请考虑使用仲裁队列,因为只有在至少有仲裁节点已将数据写入磁盘并刷新到磁盘后,才会发出发布者确认。
对于未使用发布者确认机制确认的消息,不提供任何保证。这些消息可能会“中途”丢失,在操作系统缓冲区中,或未能到达流 leader。
Stream 可用性
流应该能够容忍少量流副本变得不可用,而对可用性的影响很小或没有影响。
请注意,根据使用的分区处理策略,RabbitMQ 在恢复期间可能会自行重新启动并重置节点,但只要不发生这种情况,此可用性保证就应该成立。
例如,一个具有三个副本的流可以容忍一个节点故障而不会丢失可用性。一个具有五个副本的流可以容忍两个,依此类推。
如果无法恢复仲裁节点(例如,如果 2 个 RabbitMQ 节点中的 3 个永久丢失),则队列将永久不可用,并且很可能需要操作员介入才能恢复。
配置 Streams
有关流协议端口、TLS 和其他配置,请参阅Stream 插件指南。有关所需的流复制端口,请参阅网络指南。
Streams 如何使用资源
与仲裁队列相比,Streams 通常具有更低的 CPU 和内存占用。
所有数据都存储在磁盘上,只有未写入的数据存储在内存中。
由于 Streams 是磁盘 I/O 密集型的,它们将大量使用内核页面缓存,这对容器化部署中的内存监控有重要影响。
使用 Streams 进行偏移量跟踪
当使用代理提供的偏移量跟踪功能时(目前仅在使用Stream 插件时可用),偏移量会作为非消息数据存储在流本身中。这意味着,当请求进行偏移量持久化时,流会在磁盘上因每次偏移量持久化请求而略微增长。
发布消息的去重
RabbitMQ Stream 可以根据两个客户端元素检测并过滤掉重复发布的的消息:生产者名称和消息发布 ID。
客户端应用程序选择为给定生产者激活去重。去重的用法取决于所使用的客户端库,本节涵盖了基础知识。有关 API 详细信息,请参阅您正在使用的客户端库的文档。在 RabbitMQ 团队维护的流客户端库中,默认情况下不激活去重。
生产者名称
对于给定的流,生产者名称必须是唯一的。每个给定的流在同一时间应该只有一个同名生产者,因为去重不支持并发发布(发布 ID 可能会与并发生产者实例交织)。生产者名称在应用程序重启之间应该是稳定的,并且对人类读者来说应该是清晰的。像 online-shop-order 或 online-shop-invoice 这样的名称比随机序列(如 3d235e79-047a-46a6-8c80-9d159d3e1b05)更好的名称。
发布 ID
发布 ID 是一个严格递增的序列。发布应用程序必须为每条出站消息递增它。以下是发布 ID 序列的规则:它必须是严格递增的,它可能在序列中有间隙(例如,0、1、2、3、6、7、9、10 等),它不一定从 0 开始。
去重的工作原理
代理会跟踪给定流上给定命名生产者的最高发布 ID(“限制”)。代理将过滤掉任何发布 ID 小于或等于此限制的出站消息。它会发送一个确认,告知生产者消息已被考虑,不应再次发送。
具有大于当前限制值的消息将被存储、确认,并将建立新的限制。
创建命名生产者时,客户端应用程序可以查询代理以获取其最后一个发布 ID。然后,应用程序可以从上次中断的地方恢复发布。
使用命名生产者并激活去重的工作方式的一个很好的心智模型是将其视为生产者逐行读取文件。每一行都是一条消息,发布 ID 就是行号。
示例
让我们看一些示例来理解去重的工作方式。在示例中,发布 e.g. “消息 2”意味着发布一条内容任意的消息,并将 2 用作发布 ID。
发布和重启
客户端应用程序执行以下操作:
- 声明一个具有名称的生产者以激活去重
- 发布消息 1、消息 2、消息 3
- 接收来自代理的每条消息的异步确认
- 发布消息 4
- 崩溃
我们假设代理收到了消息 4 并发送了确认,但应用程序在收到确认之前崩溃了。
应用程序重启
- 声明具有相同名称的发布者
- 查询代理以获取最后一个发布 ID
- 从代理接收发布 ID 4
- “滚动”到消息 4(参见上面的文件类比)
- 重新发布消息 4
- 收到消息 4 的确认(代理过滤掉了消息,但发送了确认)
- 发布消息 5,依此类推。
应用程序可以立即发布消息 5,因为代理返回了它收到的最后一个发布 ID 4。但应用程序选择重新发布消息 4,因为它从未收到其确认。代理无论如何都过滤掉了消息 4 并发送了确认,告知应用程序消息 4 已安全存储,并且可以继续处理下一条消息。
误用去重
客户端应用程序执行以下操作:
- 声明一个具有名称的生产者以激活去重
- 发布消息 1、消息 2、消息 3
- 接收来自代理的每条消息的异步确认
- 发布消息 10
- 接收消息 10 的异步确认
- 发布消息 4
- 接收消息 4 的异步确认
应用程序收到消息 4 的确认,但代理过滤掉了此消息,因为此时限制是 10。代理只跟踪最高的发布 ID,而不是每个单独的发布 ID。消息 4 的发布 ID 小于当前限制(由消息 10 设置为 10),因此消息 4 被过滤掉,即使它之前未被发布或存储。
人们可能会认为代理误导了应用程序,但应用程序误用了去重功能。它使用 10,然后是 4 作为发布 ID,违反了严格递增序列的要求。如果使用上面的文件类比,应用程序滚动到第 10 行,然后又回到了第 4 行。去重不包括这种情况。
深入了解
请参阅您喜欢的流客户端库文档,以获取有关去重的更多信息。
另请参阅有关去重的去重博客文章,其中包含分步示例。
限制
消息编码
Streams 在内部将消息存储为 AMQP 1.0 编码的数据。这意味着在使用 AMQP 0.9.1 发布时会进行转换。尽管 AMQP 1.0 数据模型在很大程度上能够包含 AMQP 0.9.1 的所有数据模型,但存在一些限制。如果 AMQP 0.9.1 消息包含具有复杂值(如数组或表)的头条目,这些头将不会被转换。这是因为头作为 AMQP 1.0 消息内的应用程序属性存储,而这些属性只能包含简单类型(如字符串和数字)的值。
UI 指标准确性
管理 UI 显示的消息计数可能略高于流中的实际计数。由于流存储的实现方式,偏移量跟踪信息也被计为消息,这使得消息计数人为地大于实际计数。在大多数系统中,这应该没有实际区别。