流过滤内部机制
之前的帖子介绍了 RabbitMQ 3.13 中的一项令人兴奋的新功能——流过滤。在本帖中,我们将介绍流过滤的内部机制。了解其设计和实现将有助于您以最适合您的用例的方式来配置和使用流过滤。
概念
流过滤的目的是在代理端提供一个高效的第一层过滤,而无需代理解析消息。这样,只需要消息流子集的消费者就不需要获取所有数据并自行处理所有过滤。这可以大大减少传输到消费者的数据量。
通过过滤,可以为每条消息关联一个过滤值。它可以是地理信息,例如每条消息来自哪个地区,如下图所示
因此,我们的流有 1 条 AMER(绿色)消息,1 条 APAC(深蓝色)消息,2 条 EMEA(紫色)消息,然后是 2 条 AMER 消息。
消息发布
发布者可以将每条传出的消息与其过滤值相关联
在上图中,发布者发布了 1 条 AMER(绿色)消息和 2 条 EMEA(紫色)消息,这些消息将被添加到流中。
消息消费
当消费者订阅时,它可以指定一个或多个过滤值,代理将只分发具有此或这些过滤值(值)的消息。我们很快就会看到实际情况有所不同,但这足以理解概念。
在下图中,顶部的消费者指定它只想要 AMER(绿色)消息,代理只分发这些消息。中间的消费者也是如此,只想要 EMEA 消息,底部的消费者只想要 APAC 消息。
概念部分到此结束,现在让我们来了解实现细节。
流的结构
我们需要了解流的结构才能理解流过滤的内部原理。流是一个目录,其中包含段文件。每个段文件都有一个关联的索引文件(用于知道在段文件的给定偏移量处附加消费者,等等)。拥有多个“小”段文件比拥有一个用于整个流的大型单体文件要好:例如,删除“旧”段文件以截断流比删除大文件的开头更有效和安全。
段文件由包含消息的块组成。一个块中的消息数量取决于入口速率(高入口速率意味着一个块中有很多消息,低入口速率意味着一个块中消息较少)。一个块中的消息数量从几个(甚至 1 个)到几千个不等。
块有什么用?块是流中的工作单元:它们用于复制,并且对于我们的主题来说,更重要的是用于消费者交付。代理会逐个将块发送给消费者,使用 sendfile 系统调用(将整个块从文件系统发送到网络套接字,而无需将数据复制到用户空间)。
下图说明了流的结构
有了这些,我们来看看代理如何知道是否应该分发一个块。
代理端的过滤
想象一下,我们有一个只想要 AMER(绿色)消息的消费者。当代理即将分发一个块时,它需要知道该块是否包含 AMER 消息。如果包含,它可以将块发送给消费者;如果不包含,代理可以跳过该块,继续处理下一个块,并重新迭代。
每个块都有一个可以包含布隆过滤器的头部,它告诉代理该块是否包含具有给定过滤值的消息。布隆过滤器是一种空间效率高的概率数据结构,用于测试一个元素是否是某个集合的成员。在我们的示例中,集合包含 AMER、EMEA 和 APAC,而元素是 AMER。
下图说明了我们 3 个块的代理端过滤过程
如上图所示,过滤器可能会返回假阳性,即不包含具有预期过滤值(值)的消息的块。这是正常的,因为布隆过滤器是概率性的。但是,它们不会返回假阴性:如果过滤器说没有 AMER(绿色)消息,我们可以确定这是真的。我们必须接受这种不确定性:我们有时可能会分发一些不必要的块,但这总比分发所有块要好。
可以确定的是,消费者可能会收到它不想要的消息:看看我们左边的第一个块,它包含消费者想要的 AMER(绿色)消息,但也包含 EMEA(紫色)和 APAC(深蓝色)消息。这就是为什么客户端也必须进行过滤。
客户端过滤
代理在分发消息时处理第一层过滤,但由于分发单位是块,消费者仍可能收到它不想要的消息。因此,客户端也必须进行一些过滤,这显然必须与订阅时设置的过滤值(值)一致。
下图说明了一个只想要 AMER(绿色)消息的消费者,它必须进行最后一步过滤
让我们看看这如何转化为应用程序代码。
API 示例
过滤不是侵入性的,可以作为横切关注点来处理,从而最大限度地减少对应用程序代码的影响。以下是如何在使用 stream Java 客户端(filterValue(Function<Message,String>) 方法)声明生产者时设置从消息中提取过滤值的逻辑
Producer producer = environment.producerBuilder()
.stream("invoices")
.filterValue(msg -> msg.getApplicationProperties().get("region").toString())
.build();
在消费端,stream Java 客户端提供了 filter().values(String... filterValues) 方法来设置过滤值(值),以及 filter().postFilter(Predicate<Message> filter) 方法来设置客户端过滤逻辑。声明消费者时必须调用这两个方法
Consumer consumer = environment.consumerBuilder()
.stream("invoices")
.filter()
.values("AMER")
.postFilter(msg -> "AMER".equals(msg.getApplicationProperties().get("region")))
.builder()
.messageHandler((ctx, msg) -> {
// message processing code
})
.build();
如您所见,过滤不会改变发布和消费代码,只会改变生产者和消费者的声明。
现在让我们看看如何根据用例最适当地配置流过滤。
流过滤配置
关于流过滤的第一篇博文提供了一些数据(与无过滤相比,过滤可节省约 80% 的带宽)。流过滤的优势很大程度上取决于用例:入口速率、过滤值的基数和分布,以及过滤大小。过滤越大越好(错误率越小)。可以为块中使用的过滤的大小设置一个 16 到 255 字节之间的值,默认值为 16 字节。
stream Java 客户端在创建流时提供了 filterSize(int) 方法来设置过滤大小(它在内部设置 stream-filter-size-bytes 参数)。
environment.streamCreator()
.stream("invoices")
.filterSize(32)
.create()
如何估算过滤的大小?网上有很多布隆过滤器计算器。参数是哈希函数数量(RabbitMQ 流过滤为 2)、预期元素数量、错误率和大小。您通常对元素数量有所了解,因此需要在错误率和过滤大小之间找到一个权衡。
以下是一些示例
- 10 个值,16 字节 => 2% 的错误率
- 30 个值,16 字节 => 14% 的错误率
- 200 个值,128 字节 => 10% 的错误率
那么,过滤越大越好?不完全是:尽管布隆过滤器在存储方面非常高效,因为它不存储元素,只存储元素是否在集合中,但过滤大小是预先分配的。如果您将过滤大小设置为 255,并且每个块至少包含一条具有过滤值的消息,则每个块头部都会分配 255 字节。如果块包含许多大消息,这是可以接受的,因为与块大小相比,过滤大小可以忽略不计。但对于像单消息块、10 字节消息和 10 字节过滤值这样的退化情况,过滤大小会大于实际数据。
您必须根据自己的用例进行实验,以估算过滤大小对流大小的影响。关于流过滤的第一篇博文提供了一个使用 Stream PerfTest 估算流大小的技巧(读取整个流而不进行过滤,然后查看 rabbitmq_stream_read_bytes_total 指标)。
奖励:AMQP 上的流过滤
虽然访问流的首选方式是流协议,但其他协议也支持,例如 AMQP。流过滤也支持任何 AMQP 客户端库
- 声明:将
x-queue-type参数设置为stream,并使用x-stream-filter-size-bytes在声明流时设置过滤大小。 - 发布:使用
x-stream-filter-value头来设置传出消息的过滤值。 - 消费:使用
x-stream-filter消费者参数设置预期的过滤值(字符串或字符串数组),并可选择使用x-stream-match-unfiltered消费者参数来接收没有过滤值(值)的消息(默认为false)。客户端过滤仍然是必需的!
总结
这篇博文详细介绍了 RabbitMQ 3.13 中的流过滤。它补充了第一篇博文,该博文介绍了其用法并进行了演示。
流过滤易于使用和受益,但了解其内部原理有助于优化其使用,尤其是在棘手的用例中。请记住,客户端过滤是必需的,并且必须与配置的过滤值(值)一致。这通常很容易实现。还可以根据特定用例以最合适的方式设置过滤大小。
