跳到主要内容
版本: 4.1

Khepri 的日常操作

即使元数据存储不存储消息,它的行为也会影响使用它的 RabbitMQ 的日常行为和技术操作,至少只要使用此集群的应用程序想要进行身份验证并声明或删除诸如队列或流之类的资源。

正如在集群注意事项中已经提到的,Khepri 是一个基于 Raft 的系统,就像任何基于 Raft 的系统一样,必须有法定数量的集群成员在线且可用,元数据存储才能接受更新(写入/删除或集群成员更改)。

一致性模型和可见性保证

在模式修改方面,例如队列或流声明,或绑定声明,Khepri 与 Mnesia 有一个重要的区别。这些更改在许多工作负载中不会被注意到,但可能会影响某些工作负载,特别是某些集成测试。

示例场景

考虑两种场景,A 和 B。

只有一个客户端。客户端执行以下步骤

  1. 它声明一个队列 Q。
  2. 它将 Q 绑定到一个交换机 X。
  3. 它向交换机 X 发布一条消息 M。
  4. 它期望消息被路由到队列 Q。
  5. 它消费该消息。
场景 A

在这种场景中,行为上应该没有可观察到的差异。客户端的期望将得到满足。

有两个客户端,客户端一和客户端二,连接到节点 R1 和 R3,并使用相同的虚拟主机。节点 R2 没有客户端连接。

  1. 客户端一声明一个队列 Q。
  2. 它将 Q 绑定到一个交换机 X。
  3. 它收到队列和绑定声明确认。
  4. 它通知客户端二,或者客户端二隐式地发现它已经完成了上述步骤(例如,在集成测试中)。
  5. 客户端二向 X 发布一条消息 M。
  6. 客户端一和客户端二都期望消息被路由到 Q。
场景 B

在这种场景中,在步骤三,当所有集群节点都提交了更新时,Mnesia 将返回。然而,Khepri 将在大多数节点(包括处理客户端一操作的节点)返回时返回。

这可能包括节点 R1 和 R2,但不包括节点 R3,这意味着客户端二连接到节点 R3 在上述示例中发布的消息不保证被路由

一旦所有模式更改传播到节点 R3,客户端二后续在节点 R3 上发布的消息将被保证被路由。

这是基于 Raft 的系统的权衡,它假设大多数节点接受的写入可以被认为是成功的。

变通策略

为了满足场景 B 中客户端二的期望,Khepri 可以在路由消息时执行一致性查询(涉及大多数副本)绑定,但这将对某些协议(例如 MQTT)和交换机/目的地类型(任何类似于 AMQP 0-9-1 中的主题交换机的东西)的吞吐量产生重大影响。

依赖于共享拓扑的多个连接的应用程序有几种应对策略。

如果应用程序使用两个或多个连接到不同的节点,它可以启动时声明其拓扑,然后在继续其他操作之前注入一个短暂的暂停(1-2 秒)。

依赖于动态拓扑的应用程序可以切换到使用“静态”的交换机和绑定集。

不需要使用共享拓扑的应用程序组件可以各自配置自己的队列/流/绑定。

使用多个连接到不同节点的测试套件可以选择仅使用一个连接或连接到同一节点,或者注入暂停,或者等待表明拓扑已就位的特定条件。

集群少数派侧的客户端资源声明

定义消息如何路由的拓扑存储在元数据存储中。声明资源的方式保持不变,而与活动的元数据存储后端无关。

但是,在 Mnesia 中即使集群的大多数节点宕机或无法访问时也可以声明队列,而使用 Khepri 进行相同的操作将会超时。这样,客户端可以适当地对问题做出反应,而不是寄希望于在 Mnesia 之上实现的各种网络分区恢复策略。

这是一个 PerfTest 工具尝试在一个 5 个节点中只有 1 个节点正在运行的集群中声明它需要的交换机和队列的示例

./scripts/PerfTest

# => id: test-161339-979, starting consumer #0
# => id: test-161339-979, starting consumer #0, channel #0
# => Main thread caught exception: java.io.IOException
# => 16:14:10.638 [com.rabbitmq.perf.PerfTest.main()] ERROR com.rabbitmq.perf.PerfTest - Main thread caught exception
# => (...)

同时,RabbitMQ 节点记录了以下消息

[error] <0.1373.0> Error on AMQP connection <0.1373.0> (127.0.0.1:55165 -> 127.0.0.1:5672 - perf-test-consumer-0, vhost: '/', user: 'guest', state: running), channel 1:
[error] <0.1373.0> operation exchange.declare caused a connection exception internal_error: "failed to declare exchange 'direct' in vhost '/' because the operation timed out"
© . All rights reserved.