跳至主内容
版本:4.2

Stream 插件

概述

Stream 是一种持久化、可复制的数据结构,它模拟了一个只有追加操作的日志,并提供非破坏性的消费者语义。

此功能在所有当前维护的发布系列中均可用。

Stream 可以用作常规的 AMQP 0.9.1 队列,或者通过专用的二进制协议插件及相关客户端进行访问。有关功能矩阵,请参阅RabbitMQ 核心和 Stream 插件比较页面

本页介绍 Stream 插件,它允许使用新的二进制协议与 Stream 进行交互。有关概念的概述以及操作 Stream 的方法,请参阅RabbitMQ Stream 指南

另一篇配套指南《Stream 客户端连接》解释了 Stream 协议客户端应如何连接到集群节点以获得最佳的数据局部性和效率(吞吐量、延迟)。

信息

RabbitMQ Stream 协议的客户端库已支持多种流行平台。其中一些列在下面。标有复选标记 (✓) 的库由 RabbitMQ 核心团队正式支持。

使用Stream PerfTest来模拟工作负载并测量 RabbitMQ Stream 系统的性能。

启用插件

Stream 插件包含在 RabbitMQ 分发版中。在客户端成功连接之前,必须使用rabbitmq-plugins启用它。

rabbitmq-plugins enable rabbitmq_stream

插件配置

TCP 监听器

如果未指定配置,Stream 适配器将在所有接口上监听端口 5552,并具有默认用户登录/密码 guest/guest

Stream 监听器可以监听的端口可以通过rabbitmq.conf进行更改。

以下是一个最小化的配置文件,它将监听端口更改为 12345

stream.listeners.tcp.1 = 12345

而一个仅监听 localhost(IPv4 和 IPv6)的配置将如下所示

stream.listeners.tcp.1 = 127.0.0.1:5552
stream.listeners.tcp.2 = ::1:5552

TCP 监听器选项

该插件支持 TCP 监听器选项配置。

这些设置使用通用前缀 stream.tcp_listen_options,并控制 TCP 缓冲区大小、入站 TCP 连接队列长度、是否启用TCP keepalive等。有关详细信息,请参阅网络指南

stream.listeners.tcp.1 = 127.0.0.1:5552
stream.listeners.tcp.2 = ::1:5552

stream.tcp_listen_options.backlog = 4096
stream.tcp_listen_options.recbuf = 131072
stream.tcp_listen_options.sndbuf = 131072

stream.tcp_listen_options.keepalive = true
stream.tcp_listen_options.nodelay = true

stream.tcp_listen_options.exit_on_close = true
stream.tcp_listen_options.send_timeout = 120

心跳超时

heartbeat timeout 值定义了 RabbitMQ 和客户端库认为对端 TCP 连接在何时变得不可达(断开)。

RabbitMQ 支持的消息传递协议也使用类似的机制

Stream 协议连接的默认值为 60 秒。

# use a lower heartbeat timeout value
stream.heartbeat = 20

将心跳超时值设置得太低,可能由于瞬时网络拥塞、短暂的服务器流控制等原因,导致误报(对端被认为不可用,但实际上并非如此)。

在选择超时值时应考虑这一点。

根据用户和客户端库维护者多年的反馈,低于 5 秒的值很可能导致误报,而 1 秒或更低的值则极有可能导致误报。在 5 到 20 秒之间的值最适合大多数环境。

流控制

如果消息无法及时写入和复制,快速的发布者可能会压垮代理。因此,每个连接都有一个允许的最大未确认消息数,超过该数量后连接将被阻塞(initial_credits,默认为 50,000)。当确认了指定数量的消息后(credits_required_for_unblocking,默认为 12,500),连接将解除阻塞。您可以根据您的工作负载更改这些值。

stream.initial_credits = 100000
stream.credits_required_for_unblocking = 25000

这些设置的值较高可以提高发布吞吐量,但会增加内存消耗(可能导致代理崩溃)。值较低有助于处理大量中等速度发布连接。

此设置仅适用于发布者,不适用于消费者。

消费者信用流

本节介绍 Stream 协议的信用流机制,该机制允许消费者控制代理如何分派消息。

消费者在创建其订阅时提供初始信用数。一个信用代表代理可以发送给消费者的消息

是一批消息。这是 RabbitMQ Stream 中使用的存储和传输单元,即消息连续存储在块中,并且它们是作为块的一部分分派的。一个块可以包含一到数千条消息,具体取决于进入的数据量。

因此,如果一个消费者创建了一个包含 5 个初始信用的订阅,代理将发送 5 个消息块。每次分派一个块时,代理会扣除一个信用。当订阅没有剩余信用时,代理会停止发送消息。因此,在我们示例中,代理将在分派 5 个块后停止为该订阅发送消息。这通常不是我们想要的,所以消费者可以向其订阅提供信用以获取更多消息。

由消费者(即客户端库和/或应用程序)根据处理消息的速度来提供信用。我们希望消息能够连续流动,所以一个经验法则是,用至少 2 个信用创建订阅,并在每个新消息块上提供信用。这样做应该总会有一些消息在网络上传输,并且消费者应该一直忙碌,而不是空闲。

消费者可以通过此信用流机制选择代理如何向它们分派消息。这有助于避免消费者过载或空闲。消费者信用流如何暴露给应用程序取决于客户端库,服务器端没有设置可以更改其行为。

广告主机和端口

Stream 协议允许发现 Stream 的拓扑结构,即特定 Stream 集群中领导者和副本的位置。这样,客户端可以选择连接到适当的节点来与 Stream 交互:发布时连接到领导者节点,消费时连接到副本节点。默认情况下,节点返回其主机名和监听端口,这在大多数情况下都可以,但并非总是如此(例如,代理位于集群节点和客户端之间,集群节点和/或客户端运行在容器中等)。

advertised_hostadvertised_port 键允许指定当被询问 Stream 拓扑结构时,代理节点返回哪些信息。可以根据其基础架构设置这些设置,以便客户端能够连接到集群节点。

stream.advertised_host = rabbitmq-1
stream.advertised_port = 12345

Stream 客户端连接指南 涵盖了为什么 advertised_hostadvertised_port 设置在某些部署中是必需的。

配置条目具有TLS 对应项advertised_tls_hostadvertised_tls_port)。

最大帧大小

RabbitMQ Stream 协议使用最大帧大小限制。默认值为 1 MiB,如有必要可以增加该值。

# in bytes
stream.frame_max = 2097152

TLS 支持

要为 Stream 连接使用 TLS,必须在代理中配置 TLS。要启用支持 TLS 的 Stream 连接,请使用 stream.listeners.ssl.* 配置键添加一个 TLS 监听器。

该插件将使用核心 RabbitMQ 服务器证书和密钥(就像 AMQP 0.9.1 和 AMQP 1.0 监听器一样)

ssl_options.cacertfile = /path/to/tls/ca_certificate.pem
ssl_options.certfile = /path/to/tls/server_certificate.pem
ssl_options.keyfile = /path/to/tls/server_key.pem
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = true

stream.listeners.tcp.1 = 5552
# default TLS-enabled port for stream connections
stream.listeners.ssl.1 = 5551

此配置将在端口 5552 上创建一个标准 TCP 监听器,并在端口 5551 上创建一个 TLS 监听器。

当设置了 TLS 监听器时,您可能希望禁用所有非 TLS 监听器。这可以这样配置:

stream.listeners.tcp   = none
stream.listeners.ssl.1 = 5551

普通连接一样,可以配置广告 TLS 主机和端口。使用 TLS 时,插件将返回以下元数据:

  • hostname:如果设置了 advertised_host,则使用它;如果未设置 advertised_host,则使用主机名。
  • port:当前的 TLS 端口。

可以通过同时设置或单独设置 advertised_tls_hostadvertised_tls_port 配置条目来覆盖此行为。

stream.advertised_host = private-rabbitmq-1
stream.advertised_port = 12345
stream.advertised_tls_host = public-rabbitmq-1
stream.advertised_tls_port = 12344
© . This site is unofficial and not affiliated with VMware.