Stream 插件
概述
Stream 是一种持久化、可复制的数据结构,它模拟了一个只有追加操作的日志,并提供非破坏性的消费者语义。
此功能在所有当前维护的发布系列中均可用。
Stream 可以用作常规的 AMQP 0.9.1 队列,或者通过专用的二进制协议插件及相关客户端进行访问。有关功能矩阵,请参阅RabbitMQ 核心和 Stream 插件比较页面。
本页介绍 Stream 插件,它允许使用新的二进制协议与 Stream 进行交互。有关概念的概述以及操作 Stream 的方法,请参阅RabbitMQ Stream 指南。
另一篇配套指南《Stream 客户端连接》解释了 Stream 协议客户端应如何连接到集群节点以获得最佳的数据局部性和效率(吞吐量、延迟)。
RabbitMQ Stream 协议的客户端库已支持多种流行平台。其中一些列在下面。标有复选标记 (✓) 的库由 RabbitMQ 核心团队正式支持。
- ✓ RabbitMQ Java Stream 客户端
- ✓ RabbitMQ Golang Stream 客户端
- ✓ RabbitMQ .NET Stream 客户端
- ✓ RabbitMQ Rust Stream 客户端
- ✓ RabbitMQ Python Stream 客户端 (rstream)
- RabbitMQ Python Stream 客户端 (rbfly)
- RabbitMQ NodeJS Stream 客户端
- RabbitMQ C++ Stream 客户端
- RabbitMQ C Stream 客户端
- RabbitMQ Elixir Stream 客户端
- RabbitMQ Erlang Stream 客户端 (lake)
使用Stream PerfTest来模拟工作负载并测量 RabbitMQ Stream 系统的性能。
启用插件
Stream 插件包含在 RabbitMQ 分发版中。在客户端成功连接之前,必须使用rabbitmq-plugins启用它。
rabbitmq-plugins enable rabbitmq_stream
插件配置
TCP 监听器
如果未指定配置,Stream 适配器将在所有接口上监听端口 5552,并具有默认用户登录/密码 guest/guest。
Stream 监听器可以监听的端口可以通过rabbitmq.conf进行更改。
以下是一个最小化的配置文件,它将监听端口更改为 12345
stream.listeners.tcp.1 = 12345
而一个仅监听 localhost(IPv4 和 IPv6)的配置将如下所示
stream.listeners.tcp.1 = 127.0.0.1:5552
stream.listeners.tcp.2 = ::1:5552
TCP 监听器选项
该插件支持 TCP 监听器选项配置。
这些设置使用通用前缀 stream.tcp_listen_options,并控制 TCP 缓冲区大小、入站 TCP 连接队列长度、是否启用TCP keepalive等。有关详细信息,请参阅网络指南。
stream.listeners.tcp.1 = 127.0.0.1:5552
stream.listeners.tcp.2 = ::1:5552
stream.tcp_listen_options.backlog = 4096
stream.tcp_listen_options.recbuf = 131072
stream.tcp_listen_options.sndbuf = 131072
stream.tcp_listen_options.keepalive = true
stream.tcp_listen_options.nodelay = true
stream.tcp_listen_options.exit_on_close = true
stream.tcp_listen_options.send_timeout = 120
心跳超时
heartbeat timeout 值定义了 RabbitMQ 和客户端库认为对端 TCP 连接在何时变得不可达(断开)。
RabbitMQ 支持的消息传递协议也使用类似的机制。
Stream 协议连接的默认值为 60 秒。
# use a lower heartbeat timeout value
stream.heartbeat = 20
将心跳超时值设置得太低,可能由于瞬时网络拥塞、短暂的服务器流控制等原因,导致误报(对端被认为不可用,但实际上并非如此)。
在选择超时值时应考虑这一点。
根据用户和客户端库维护者多年的反馈,低于 5 秒的值很可能导致误报,而 1 秒或更低的值则极有可能导致误报。在 5 到 20 秒之间的值最适合大多数环境。
流控制
如果消息无法及时写入和复制,快速的发布者可能会压垮代理。因此,每个连接都有一个允许的最大未确认消息数,超过该数量后连接将被阻塞(initial_credits,默认为 50,000)。当确认了指定数量的消息后(credits_required_for_unblocking,默认为 12,500),连接将解除阻塞。您可以根据您的工作负载更改这些值。
stream.initial_credits = 100000
stream.credits_required_for_unblocking = 25000
这些设置的值较高可以提高发布吞吐量,但会增加内存消耗(可能导致代理崩溃)。值较低有助于处理大量中等速度发布连接。
此设置仅适用于发布者,不适用于消费者。
消费者信用流
本节介绍 Stream 协议的信用流机制,该机制允许消费者控制代理如何分派消息。
消费者在创建其订阅时提供初始信用数。一个信用代表代理可以发送给消费者的消息块。
块是一批消息。这是 RabbitMQ Stream 中使用的存储和传输单元,即消息连续存储在块中,并且它们是作为块的一部分分派的。一个块可以包含一到数千条消息,具体取决于进入的数据量。
因此,如果一个消费者创建了一个包含 5 个初始信用的订阅,代理将发送 5 个消息块。每次分派一个块时,代理会扣除一个信用。当订阅没有剩余信用时,代理会停止发送消息。因此,在我们示例中,代理将在分派 5 个块后停止为该订阅发送消息。这通常不是我们想要的,所以消费者可以向其订阅提供信用以获取更多消息。
由消费者(即客户端库和/或应用程序)根据处理消息的速度来提供信用。我们希望消息能够连续流动,所以一个经验法则是,用至少 2 个信用创建订阅,并在每个新消息块上提供信用。这样做应该总会有一些消息在网络上传输,并且消费者应该一直忙碌,而不是空闲。
消费者可以通过此信用流机制选择代理如何向它们分派消息。这有助于避免消费者过载或空闲。消费者信用流如何暴露给应用程序取决于客户端库,服务器端没有设置可以更改其行为。
广告主机和端口
Stream 协议允许发现 Stream 的拓扑结构,即特定 Stream 集群中领导者和副本的位置。这样,客户端可以选择连接到适当的节点来与 Stream 交互:发布时连接到领导者节点,消费时连接到副本节点。默认情况下,节点返回其主机名和监听端口,这在大多数情况下都可以,但并非总是如此(例如,代理位于集群节点和客户端之间,集群节点和/或客户端运行在容器中等)。
advertised_host 和 advertised_port 键允许指定当被询问 Stream 拓扑结构时,代理节点返回哪些信息。可以根据其基础架构设置这些设置,以便客户端能够连接到集群节点。
stream.advertised_host = rabbitmq-1
stream.advertised_port = 12345
Stream 客户端连接指南 涵盖了为什么 advertised_host 和 advertised_port 设置在某些部署中是必需的。
配置条目具有TLS 对应项(advertised_tls_host 和 advertised_tls_port)。
最大帧大小
RabbitMQ Stream 协议使用最大帧大小限制。默认值为 1 MiB,如有必要可以增加该值。
# in bytes
stream.frame_max = 2097152
TLS 支持
要为 Stream 连接使用 TLS,必须在代理中配置 TLS。要启用支持 TLS 的 Stream 连接,请使用 stream.listeners.ssl.* 配置键添加一个 TLS 监听器。
该插件将使用核心 RabbitMQ 服务器证书和密钥(就像 AMQP 0.9.1 和 AMQP 1.0 监听器一样)
ssl_options.cacertfile = /path/to/tls/ca_certificate.pem
ssl_options.certfile = /path/to/tls/server_certificate.pem
ssl_options.keyfile = /path/to/tls/server_key.pem
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = true
stream.listeners.tcp.1 = 5552
# default TLS-enabled port for stream connections
stream.listeners.ssl.1 = 5551
此配置将在端口 5552 上创建一个标准 TCP 监听器,并在端口 5551 上创建一个 TLS 监听器。
当设置了 TLS 监听器时,您可能希望禁用所有非 TLS 监听器。这可以这样配置:
stream.listeners.tcp = none
stream.listeners.ssl.1 = 5551
与普通连接一样,可以配置广告 TLS 主机和端口。使用 TLS 时,插件将返回以下元数据:
- hostname:如果设置了
advertised_host,则使用它;如果未设置advertised_host,则使用主机名。 - port:当前的 TLS 端口。
可以通过同时设置或单独设置 advertised_tls_host 和 advertised_tls_port 配置条目来覆盖此行为。
stream.advertised_host = private-rabbitmq-1
stream.advertised_port = 12345
stream.advertised_tls_host = public-rabbitmq-1
stream.advertised_tls_port = 12344