跳到主要内容
版本:4.1

Stream 插件

概述

流是一种持久化和复制的数据结构,它模拟了一个仅追加日志,具有非破坏性消费者语义。

此功能在所有当前维护的发布系列中均可用。

流可以用作常规 AMQP 0.9.1 队列,或通过专用二进制协议插件和关联的客户端来使用。有关功能矩阵,请参阅流核心插件和流插件比较页面

本页介绍 Stream 插件,该插件允许使用此新的二进制协议与流进行交互。有关概念和操作流的方式的概述,请参阅RabbitMQ 流指南

信息

RabbitMQ 流协议的客户端库适用于多个流行的平台。 下面列出了一些。 标有勾号 (✓) 的是 RabbitMQ 核心团队官方支持的

使用 Stream PerfTest 来模拟工作负载并测量 RabbitMQ 流系统的性能。

启用插件

Stream 插件包含在 RabbitMQ 发行版中。 在客户端成功连接之前,必须使用 rabbitmq-plugins 启用它

rabbitmq-plugins enable rabbitmq_stream

插件配置

TCP 监听器

当未指定任何配置时,Stream 适配器将监听端口 5552 上所有接口,并具有默认用户登录名/密码 guest/guest

端口流监听器将监听的端口可以通过 rabbitmq.conf 更改。

以下是一个最小化的配置文件,它将监听器端口更改为 12345

stream.listeners.tcp.1 = 12345

而将监听器更改为仅监听 localhost(IPv4 和 IPv6)的配置文件如下所示

stream.listeners.tcp.1 = 127.0.0.1:5552
stream.listeners.tcp.2 = ::1:5552

TCP 监听器选项

该插件支持 TCP 监听器选项配置。

这些设置使用通用前缀 stream.tcp_listen_options,并控制诸如 TCP 缓冲区大小、入站 TCP 连接队列长度、是否启用 TCP keepalives 等。 有关详细信息,请参阅网络指南

stream.listeners.tcp.1 = 127.0.0.1:5552
stream.listeners.tcp.2 = ::1:5552

stream.tcp_listen_options.backlog = 4096
stream.tcp_listen_options.recbuf = 131072
stream.tcp_listen_options.sndbuf = 131072

stream.tcp_listen_options.keepalive = true
stream.tcp_listen_options.nodelay = true

stream.tcp_listen_options.exit_on_close = true
stream.tcp_listen_options.send_timeout = 120

心跳超时

heartbeat timeout 值定义了 RabbitMQ 和客户端库应将对等 TCP 连接视为不可达(关闭)的时间段。

RabbitMQ 支持的消息传递协议也使用了类似的机制

流协议连接的默认值为 60 秒。

# use a lower heartbeat timeout value
stream.heartbeat = 20

将心跳超时值设置得太低可能会导致误报(对等方被认为不可用,而实际上并非如此),这是由于瞬时网络拥塞、短暂的服务器流控制等等。

在选择超时值时应考虑这一点。

来自用户和客户端库维护人员的多年反馈表明,低于 5 秒的值很可能导致误报,而 1 秒或更低的值非常有可能导致误报。 5 到 20 秒范围内的值对于大多数环境来说是最佳的。

流控制

如果快速发布者无法跟上写入和复制入站消息的速度,则可能会压垮 broker。 因此,每个连接都有允许的最大未确认消息数,然后才会被阻止(initial_credits,默认为 50,000)。 当确认给定数量的消息时(credits_required_for_unblocking,默认为 12,500),连接将被解除阻止。 您可以根据您的工作负载更改这些值

stream.initial_credits = 100000
stream.credits_required_for_unblocking = 25000

这些设置的较高值可以提高发布吞吐量,但会以更高的内存消耗为代价(这可能会导致 broker 崩溃)。 较低的值可以帮助处理大量中等速度的发布连接。

此设置仅适用于发布者,不适用于消费者。

消费者信用流

本节介绍流协议信用流机制,该机制允许消费者控制 broker 如何分发消息。

消费者在创建其订阅时提供初始信用额度。 信用额度代表 broker 允许发送给消费者的消息

是一批消息。 这是 RabbitMQ Stream 中使用的存储和传输单元,即消息连续存储在一个块中,并且作为块的一部分传递。 一个块可以由一个到数千个消息组成,具体取决于入口。

因此,如果消费者创建了一个具有 5 个初始信用额度的订阅,则 broker 将发送 5 个消息块。 broker 每次传递一个块时都会减去一个信用额度。 当订阅没有剩余信用额度时,broker 将停止发送消息。 因此,在我们的示例中,broker 将在传递 5 个块后停止为此订阅发送消息。 这通常不是我们想要的,因此消费者可以为其订阅提供信用额度以获取更多消息。

这取决于消费者(即客户端库和/或应用程序)提供信用额度,具体取决于它处理消息的速度。 我们希望消息持续流动,因此一个好的经验法则是创建至少具有 2 个信用额度的订阅,并在每个新的消息块上提供一个信用额度。 这样做应该始终有一些消息在网络上流动,并且消费者应该始终处于忙碌状态,而不是空闲状态。

消费者可以通过此信用流机制选择 broker 向他们传递消息的方式。 这有助于避免消费者不堪重负或空闲。 消费者信用流如何暴露给应用程序取决于客户端库,没有服务器端设置可以更改其行为。

Advertised Host and Port

流协议允许发现流的拓扑,即给定流集的主节点和副本在集群中的位置。 这样,客户端可以选择连接到适当的节点以与流进行交互:主节点用于发布,副本用于消费。 默认情况下,节点返回其主机名和监听器端口,这在大多数情况下可能没问题,但并非总是如此(代理位于集群节点和客户端之间,集群节点和/或客户端在容器中运行等)。

advertised_hostadvertised_port 键允许指定当被询问流的拓扑时 broker 节点返回的信息。 可以根据其基础设施设置这些设置,以便客户端可以连接到集群节点

stream.advertised_host = rabbitmq-1
stream.advertised_port = 12345

Connecting to Streams 博客文章介绍了在某些部署中为什么 advertised_hostadvertised_port 设置是必要的。

最大帧大小

RabbitMQ Stream 协议使用最大帧大小限制。 默认值为 1 MiB,如果需要,可以增加该值

# in bytes
stream.frame_max = 2097152

TLS 支持

要将 TLS 用于流连接,必须在 broker 中配置 TLS。 要启用启用 TLS 的流连接,请使用 stream.listeners.ssl.* 配置键为流添加 TLS 监听器。

该插件将使用核心 RabbitMQ 服务器证书和密钥(就像 AMQP 0-9-1 和 AMQP 1.0 监听器一样)

ssl_options.cacertfile = /path/to/tls/ca_certificate.pem
ssl_options.certfile = /path/to/tls/server_certificate.pem
ssl_options.keyfile = /path/to/tls/server_key.pem
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = true

stream.listeners.tcp.1 = 5552
# default TLS-enabled port for stream connections
stream.listeners.ssl.1 = 5551

此配置在端口 5552 上创建一个标准 TCP 监听器,并在端口 5551 上创建一个 TLS 监听器。

设置 TLS 监听器后,您可能需要停用所有非 TLS 监听器。 可以像这样配置

stream.listeners.tcp   = none
stream.listeners.ssl.1 = 5551

就像普通连接一样,可以配置 advertised TLS 主机和端口。 使用 TLS 时,插件返回以下元数据

  • hostname:如果设置,则为 advertised_host,如果未设置 advertised_host,则为主机名
  • port:当前的 TLS 端口

可以通过一起或单独设置 advertised_tls_hostadvertised_tls_port 配置条目来覆盖此行为

stream.advertised_host = private-rabbitmq-1
stream.advertised_port = 12345
stream.advertised_tls_host = public-rabbitmq-1
stream.advertised_tls_port = 12344
© . All rights reserved.