流和超级流(分区流)
什么是流
RabbitMQ 流是一种持久化的、复制的数据结构,可以完成与队列相同的任务:它们缓冲来自生产者的消息,这些消息由消费者读取。但是,流在两个重要方面与队列不同:消息的存储和消费方式。
流建模了一个仅追加的消息日志,可以重复读取直到过期。流始终是持久化和复制的。对这种流行为更技术性的描述是“非破坏性消费者语义”。
为了从 RabbitMQ 中的流读取消息,一个或多个消费者订阅它,并根据需要多次读取相同的消息。
流中的数据可以通过 RabbitMQ 客户端库或通过专用的二进制协议插件和相关的客户端来使用。后一种选择是强烈推荐的,因为它提供了对所有流特定功能的访问,并提供了最佳的吞吐量(性能)。
除了流之外,RabbitMQ 还支持称为超级流的分区流。在本指南的后面部分将更详细地介绍它们。
现在,您可能会问以下问题
- 那么,流会取代队列吗?
- 我应该不再使用队列吗?
为了回答这些问题,引入流不是为了取代队列,而是为了补充它们。流为新的 RabbitMQ 用例开辟了许多机会,这些用例在流的用例中进行了描述。
以下信息详细介绍了流的使用,以及流的管理和维护操作。
您还应该查看流插件信息,以了解更多关于使用二进制 RabbitMQ Stream 协议的流的信息,以及流核心和流插件比较页面,以了解功能矩阵。
流的用例
开发流最初是为了涵盖 4 个消息传递用例,现有队列类型要么无法提供,要么提供但有缺点
-
大型扇出
当想要将同一消息传递给多个订阅者时,用户目前必须为每个消费者绑定一个专用队列。如果消费者数量很大,这可能会变得效率低下,尤其是在需要持久性和/或复制时。流将允许任意数量的消费者以非破坏性的方式从同一队列消费相同的消息,从而无需绑定多个队列。流消费者还可以从副本读取,从而允许读取负载分散在整个集群中。
-
重放(时间旅行)
由于所有当前的 RabbitMQ 队列类型都具有破坏性的消费行为,即当消费者完成消息处理后,消息会从队列中删除,因此无法重新读取已消费的消息。流将允许消费者连接到日志中的任何点并从那里读取。
-
吞吐量性能
没有持久队列类型能够提供可以与任何现有基于日志的消息传递系统竞争的吞吐量。流的设计以性能为主要目标。
-
大型积压
大多数 RabbitMQ 队列都设计为收敛到空状态,并为此进行了优化,当给定队列上有数百万条消息时,性能可能会更差。流的设计目的是以高效的方式存储大量数据,并最大限度地减少内存开销。
如何使用 RabbitMQ 流
一个可以指定可选队列和消费者参数的 AMQP 0.9.1 客户端库将能够像使用常规 AMQP 0.9.1 队列一样使用流。
与队列一样,流必须首先声明。
声明 RabbitMQ 流
要声明流,请将 x-queue-type
队列参数设置为 stream
(默认值为 classic
)。此参数必须由客户端在声明时提供;不能使用策略设置或更改它。这是因为策略定义或适用的策略可以动态更改,但队列类型不能。它必须在声明时指定。
以下代码片段展示了如何使用AMQP 0.9.1 Java 客户端创建流
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(
"my-stream",
true, // durable
false, false, // not exclusive, not auto-delete
Collections.singletonMap("x-queue-type", "stream")
);
使用 x-queue-type
参数设置为 stream
声明队列将在每个配置的 RabbitMQ 节点上创建一个副本的流。流是仲裁系统,因此强烈建议使用不均匀的集群大小。
流仍然是一个 AMQP 0.9.1 队列,因此它可以在创建后绑定到任何交换机,就像任何其他 RabbitMQ 队列一样。
如果使用管理 UI声明,则必须使用队列类型下拉菜单指定 stream
类型。
x-max-length-bytes
设置流的最大大小(以字节为单位)。请参阅保留。默认值:未设置。
x-max-age
设置流的最大年龄。请参阅保留。默认值:未设置。
x-stream-max-segment-size-bytes
流在磁盘上被划分为固定大小的段文件。此设置控制这些段文件的大小(以字节为单位)。默认值:500000000 字节。
x-stream-filter-size-bytes
用于过滤的 Bloom 过滤器的大小(以字节为单位)。该值必须介于 16 和 255 之间。默认值:16 字节。
虽然可以通过策略配置 x-stream-max-segment-size-bytes
和 x-stream-filter-size-bytes
参数,但只有在策略在流声明时设置(存在)时,它们才会应用于流。如果为匹配但预先存在的流更改了这些参数,则即使队列记录的有效策略可能表明应该更改,它们也不会被更改。
因此,最好仅通过队列参数配置这些参数。
以下 Java 示例演示了如何在应用程序代码中的流声明时设置参数
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-queue-type", "stream");
// maximum stream size: 20 GB
arguments.put("x-max-length-bytes", 20_000_000_000);
// size of segment files: 100 MB
arguments.put("x-stream-max-segment-size-bytes", 100_000_000);
// size of stream bloom filter: 32
arguments.put("x-stream-filter-size-bytes", 32);
channel.queueDeclare(
"my-stream",
true, // durable
false, false, // not exclusive, not auto-delete
arguments
);
客户端操作
消费
由于流永远不会删除任何消息,因此任何消费者都可以从日志中的任何点开始读取/消费。这由 x-stream-offset
消费者参数控制。如果未指定,消费者将从消费者启动后写入日志的下一个偏移量开始读取。支持以下值
first
- 从日志中的第一条可用消息开始last
- 这从最后写入的“消息块”开始读取(块是流中使用的存储和传输单元,简单来说,它是由几个到几千个消息组成的一批消息,具体取决于入口)next
- 与不指定任何偏移量相同- 偏移量 - 一个数值,指定要附加到日志的确切偏移量。如果此偏移量不存在,它将分别钳制到日志的开始或结束位置。
- 时间戳 - 一个时间戳值,指定要附加到日志的时间点。它将钳制到最接近的偏移量,如果时间戳超出流的范围,它将钳制到日志的开始或结束位置。对于 AMQP 0.9.1,使用的时间戳是 POSIX 时间,精度为一秒,即自 1970-01-01 00:00:00 UTC 以来的秒数。请注意,消费者可能会收到在指定时间戳之前发布的消息。
- 间隔 - 一个字符串值,指定相对于当前时间附加日志的时间间隔。使用与
x-max-age
相同的规范(请参阅保留)
以下代码片段展示了如何使用 first
偏移量规范
channel.basicQos(100); // QoS must be specified
channel.basicConsume(
"my-stream",
false,
Collections.singletonMap("x-stream-offset", "first"), // "first" offset specification
(consumerTag, message) -> {
// message processing
// ...
channel.basicAck(message.getEnvelope().getDeliveryTag(), false); // ack is required
},
consumerTag -> { });
以下代码片段展示了如何指定要从中消费的特定偏移量
channel.basicQos(100); // QoS must be specified
channel.basicConsume(
"my-stream",
false,
Collections.singletonMap("x-stream-offset", 5000), // offset value
(consumerTag, message) -> {
// message processing
// ...
channel.basicAck(message.getEnvelope().getDeliveryTag(), false); // ack is required
},
consumerTag -> { });
以下代码片段展示了如何指定要从中消费的特定时间戳
// an hour ago
Date timestamp = new Date(System.currentTimeMillis() - 60 * 60 * 1_000)
channel.basicQos(100); // QoS must be specified
channel.basicConsume(
"my-stream",
false,
Collections.singletonMap("x-stream-offset", timestamp), // timestamp offset
(consumerTag, message) -> {
// message processing
// ...
channel.basicAck(message.getEnvelope().getDeliveryTag(), false); // ack is required
},
consumerTag -> { });
其他流操作
以下操作可以以类似于经典队列和仲裁队列的方式使用,但有些操作具有一些队列特定的行为。
流的单活动消费者功能
流的单活动消费者是 RabbitMQ 3.11 及更高版本中提供的功能。它在流上提供独占消费和消费连续性。当共享同一流和名称的多个消费者实例启用单活动消费者时,这些实例中只有一个实例会处于活动状态,因此会接收消息。其他实例将处于空闲状态。
单活动消费者功能提供 2 个好处
- 消息按顺序处理:一次只有一个消费者。
- 消费连续性得到维护:如果活动消费者停止或崩溃,组中的消费者将接管。
博客文章提供了关于流的单活动消费者的更多详细信息。
超级流
超级流是通过将大型流分区为较小流来扩展的方式。它们与单活动消费者集成,以在分区内保持消息顺序。超级流从 RabbitMQ 3.11 开始可用。
超级流是由单个常规流组成的逻辑流。它是使用 RabbitMQ 流扩展发布和消费的一种方式:一个大型逻辑流被划分为分区流,将存储和流量分散到多个集群节点上。
超级流仍然是一个逻辑实体:由于客户端库的智能性,应用程序将其视为一个“大型”流。超级流的拓扑结构基于AMQP 0.9.1 模型,即交换机、队列以及它们之间的绑定。
可以使用任何 AMQP 0.9.1 库或管理插件创建超级流的拓扑结构,它需要创建一个直连交换机、“分区”流,并将它们绑定在一起。不过,使用 rabbitmq-streams add_super_stream
命令可能会更容易。以下是如何使用它创建具有 3 个分区的 invoices
超级流
rabbitmq-streams add_super_stream invoices --partitions 3
使用 rabbitmq-streams add_super_stream --help
了解有关该命令的更多信息。
与单个流相比,超级流增加了复杂性,因此不应将其视为涉及流的所有用例的默认解决方案。只有当您确定已达到单个流的限制时,才考虑使用超级流。
博客文章提供了超级流的概述。
过滤
RabbitMQ 流提供服务器端过滤功能,避免读取流的所有消息,仅在客户端进行过滤。当消费应用程序只需要消息的子集时,例如来自给定地理区域的消息,这有助于节省网络带宽。
流过滤受到流协议、AMQP 0.9.1 和STOMP的支持。示例将使用 AMQP 0.9.1。
必须发布带有相关过滤器值的消息,过滤功能才能工作。此值使用 x-stream-filter-value
标头指定
channel.basicPublish(
"", // default exchange
"my-stream",
new AMQP.BasicProperties.Builder()
.headers(Collections.singletonMap(
"x-stream-filter-value", "california" // set filter value
))
.build(),
body
);
如果消费者只想接收给定过滤器值的消息,则必须使用 x-stream-filter
参数
channel.basicQos(100); // QoS must be specified
channel.basicConsume(
"my-stream",
false,
Collections.singletonMap("x-stream-filter", "california"), // set filter
(consumerTag, message) -> {
Map<String, Object> headers = message.getProperties().getHeaders();
// there must be some client-side filter logic
if ("california".equals(headers.get("x-stream-filter-value"))) {
// message processing
// ...
}
channel.basicAck(message.getEnvelope().getDeliveryTag(), false); // ack is required
},
consumerTag -> { });
如上面的代码片段所示,也必须有一些客户端过滤逻辑,因为服务器端过滤是概率性的:不匹配过滤器值的消息仍然可以发送给消费者。服务器使用Bloom 过滤器,这是一种节省空间的概率数据结构,其中可能存在误报。尽管如此,过滤仍然节省了一些带宽,这是其主要目标。
关于过滤的其他注意事项
- 可以在同一流中发布带有和不带有过滤器值的消息。
- 当消费者设置过滤器时,不发送没有过滤器值的消息。将
x-stream-match-unfiltered
参数设置为true
以更改此行为,并同时接收未过滤的消息。 x-stream-filter
消费者参数接受字符串,但也接受字符串数组,以接收不同过滤器值的消息。
第一篇博客文章提供了流过滤的概述,第二篇博客文章涵盖了内部原理。
功能比较:常规队列与流
流在传统意义上并不是真正的队列,因此与 AMQP 0.9.1 队列语义并不十分一致。其他队列类型支持的许多功能不受支持,并且由于队列类型的性质,永远不会受支持。
可以使用常规队列的 AMQP 0.9.1 客户端库将能够使用流,只要它使用消费者确认即可。
由于流的非破坏性读取语义,许多功能将永远不会被流支持。
功能矩阵
功能 | 经典 | 流 |
---|---|---|
非持久队列 | 是 | 否 |
独占性 | 是 | 否 |
每消息持久性 | 每消息 | 始终 |
成员资格更改 | 否 | 手动 |
TTL | 是 | 否(但请参阅保留) |
队列长度限制 | 是 | 否(但请参阅保留) |
将消息保留在内存中 | 请参阅经典队列 | 从不 |
消息优先级 | 是 | 否 |
消费者优先级 | 是 | 否 |
死信交换机 | 是 | 否 |
遵守策略 | 是 | 是(请参阅保留) |
对内存警报做出反应 | 是 | 否(使用最少的 RAM) |
毒消息处理 | 否 | 否 |
非持久队列
根据其假设的用例,流始终是持久的,它们不能像常规队列一样是非持久的。
独占性
根据其假设的用例,流始终是持久的,它们不能像常规队列一样是独占的。它们不适合用作临时队列。
全局 QoS
流不支持全局QoS 预取,其中通道为使用该通道的所有消费者设置单个预取限制。如果尝试从启用了全局 QoS 的通道消费流,将返回通道错误。
使用每消费者 QoS 预取,这是几个流行的客户端中的默认设置。
数据保留
流被实现为不可变的仅追加磁盘日志。这意味着日志将无限增长,直到磁盘空间耗尽。为了避免这种不良情况,可以为每个流设置保留配置,这将根据总日志数据大小和/或年龄丢弃日志中最旧的数据。
有两个参数控制流的保留。这些参数可以组合使用。这些参数可以在声明时使用队列参数设置,也可以作为可以动态更新的策略设置。策略优先于队列参数。
-
max-age
:有效单位:Y、M、D、h、m、s
例如,
7D
表示一周 -
max-length-bytes
:最大总大小(以字节为单位)
注意:保留是按段评估的,因此还有一个参数会生效,那就是流的段大小。只要段包含至少一条消息,流将始终至少保留一个段。当使用 broker 提供的偏移量跟踪时,每个消费者的偏移量都作为非消息数据持久化在流本身中。
性能特征
由于流在执行任何操作之前会将所有数据持久化到磁盘,因此建议使用尽可能快的磁盘。
由于流的磁盘 I/O 密集型特性,其吞吐量会随着消息大小的增加而降低。
与仲裁队列一样,流也受到集群大小的影响。流拥有的副本越多,其吞吐量通常会越低,因为复制数据和达成共识需要完成更多工作。
控制初始复制因子
x-initial-cluster-size
队列参数控制初始流集群应跨越多少个 rabbit 节点。
管理流副本
流的副本由操作员显式管理。当向集群添加新节点时,除非操作员显式将其添加到流的副本集中,否则它将不托管任何流副本。
当必须解除节点(从集群中永久删除)时,必须从该节点当前托管副本的所有流的副本列表中显式删除它。
提供了两个CLI 命令来执行上述操作:rabbitmq-streams add_replica
和 rabbitmq-streams delete_replica
rabbitmq-streams add_replica [-p <vhost>] <stream-name> <node>
rabbitmq-streams delete_replica [-p <vhost>] <stream-name> <node>
为了成功添加和删除副本,流协调器必须在集群中可用。
需要注意,在执行涉及成员资格更改的维护操作时,不要因丢失仲裁而意外地使流不可用。
由于流成员资格未嵌入在流本身中,因此目前无法完全安全地添加副本。因此,如果任何时候存在不同步的副本,则无法添加另一个副本,并且将返回错误。
在更换集群节点时,更安全的方法是首先添加一个新节点,等待它变为同步,然后解除它替换的节点的委托。
可以使用以下命令查询流的复制状态
rabbitmq-streams stream_status [-p <vhost>] <stream-name>
此外,可以使用以下命令重启流
rabbitmq-streams restart_stream [-p <vhost>] <stream-name>
流行为
每个流都有一个主写入器(领导者)和零个或多个副本。
领导者选举和故障处理
当声明新流时,将随机选择将托管其副本的节点集,但始终包括声明流的客户端连接到的节点。
哪个副本成为初始领导者以三种方式控制,即使用 x-queue-leader-locator
可选队列参数,设置 queue-leader-locator
策略键或通过在配置文件中定义 queue_leader_locator
键。以下是可能的值
client-local
:选择声明流的客户端连接到的节点。这是默认值。balanced
:如果总体队列(经典队列、仲裁队列和流)少于 1000 个,则选择托管最少流领导者的节点。如果总体队列超过 1000 个,则随机选择一个节点。
流需要声明节点的仲裁才能运行。当托管流领导者的 RabbitMQ 节点发生故障或停止时,将选举托管该流副本之一的另一个节点作为领导者并恢复操作。
失败和重新加入的副本将与领导者重新同步(“赶上”)。与仲裁队列队列类似,临时副本故障不需要从当前选出的领导者进行完全重新同步。如果重新加入的副本落后于领导者,则只会传输增量。这种“赶上”过程不会影响领导者的可用性。
必须显式添加副本。当添加新副本时,它将从领导者同步整个流状态,类似于新添加的仲裁队列副本。
容错和在线副本的最小数量
共识系统可以提供关于数据安全性的某些保证。这些保证确实意味着在它们变得相关之前需要满足某些条件,例如需要至少三个集群节点来提供容错,并且需要超过一半的成员可用才能工作。
各种规模的集群的容错特性可以用表格描述
集群节点计数 | 容忍的节点故障数 | 对网络分区具有容错能力 |
---|---|---|
1 | 0 | 不适用 |
2 | 0 | 否 |
3 | 1 | 是 |
4 | 1 | 是,如果大多数存在于一侧 |
5 | 2 | 是 |
6 | 2 | 是,如果大多数存在于一侧 |
7 | 3 | 是 |
8 | 3 | 是,如果大多数存在于一侧 |
9 | 4 | 是 |
使用流时的数据安全
流跨多个节点复制数据,并且仅在数据已复制到流副本的仲裁后才发布发布者确认。
流始终将数据存储在磁盘上,但是,它们不会显式地将数据从操作系统页面缓存刷新 (fsync) 到底层存储介质,而是依赖操作系统在需要时执行此操作。这意味着服务器的非受控关闭可能会导致托管在该节点上的副本的数据丢失。虽然从理论上讲,这开启了已确认数据丢失的可能性,但在正常操作期间发生这种情况的可能性非常小,并且单个节点上的数据丢失通常只会从系统中的其他节点重新复制。
如果需要更高的数据安全性,请考虑改用仲裁队列,因为在至少仲裁数量的节点都已写入和刷新数据到磁盘之前,不会发布发布者确认。
对于尚未使用发布者确认机制确认的消息,不提供任何保证。此类消息可能会在“中间”丢失,在操作系统缓冲区中或无法到达流领导者。
流可用性
流应该能够容忍少数流副本变得不可用,而对可用性几乎没有或没有影响。
请注意,根据使用的分区处理策略,RabbitMQ 可能会在恢复期间重启自身并重置节点,但只要这种情况没有发生,此可用性保证就应该成立。
例如,具有三个副本的流可以容忍一个节点故障而不会丢失可用性。具有五个副本的流可以容忍两个,依此类推。
如果无法恢复仲裁节点(例如,如果 3 个 RabbitMQ 节点中的 2 个永久丢失),则队列将永久不可用,并且很可能需要操作员的介入才能恢复。
配置流
有关流协议端口、TLS 和其他配置,请参阅流插件指南。有关所需的流复制端口,请参阅网络指南。
流如何使用资源
流通常比仲裁队列具有更低的 CPU 和内存占用。
所有数据都存储在磁盘上,只有未写入的数据存储在内存中。
使用流时的偏移量跟踪
当使用 broker 提供的偏移量跟踪功能(目前仅在使用流插件时可用)时,偏移量作为非消息数据持久化在流本身中。这意味着,当请求偏移量持久性时,流将在磁盘上增长少量,每次偏移量持久性请求都会增长。
限制
消息编码
流在内部将其消息存储为 AMQP 1.0 编码的数据。这意味着当使用 AMQP 0.9.1 发布时,会进行转换。虽然 AMQP 1.0 数据模型在很大程度上能够包含 AMQP 0.9.1 的所有数据模型,但仍存在一些限制。如果 AMQP 0.9.1 消息包含具有复杂值(例如数组或表)的标头条目,则这些标头将不会被转换。这是因为标头作为应用程序属性存储在 AMQP 1.0 消息中,而这些属性只能包含简单类型的值,例如字符串和数字。
UI 指标准确性
管理界面可能显示的消息计数略高于流中的实际计数。由于流存储的实现方式,偏移跟踪信息也被计为消息,这使得消息计数人为地偏大。这在大多数系统中应该不会造成实际差异。