流过滤
流过滤是 RabbitMQ 3.13 中的一项新功能。当消费应用程序只需要流中的一部分消息时,它可以节省代理和消费应用程序之间的带宽。
继续阅读以了解流过滤的工作原理并查看其实际应用。
流过滤概念
想象一下,您有一个包含来自世界各地数据的流,以及一个只需要处理该数据子集的应用程序,比如特定区域的消息。应用程序可以读取整个流,然后过滤出它感兴趣的消息进行处理。这样做是可行的,但这意味着整个流的内容都将通过网络传输。
流过滤在代理端提供了一个初级的、高效的过滤机制,无需代理解析消息。在某些用例中,它可以显著减少网络传输的数据量。让我们来探索一下这个令人兴奋的新功能的语义。
发布端
流过滤基于过滤值:发布应用程序可以为每条消息关联一个字符串值。过滤值可以是任何内容,但为了使过滤正常工作,它们应满足某些标准。一组跨消息定义的共享值是一个不错的选择:地理位置(例如,国家、地区)、存储文档信息的流中的文档类型(例如,工资单、发票、订单)、产品类别(例如,书籍、行李、玩具)。
消息如何与过滤值关联取决于客户端库。下面是使用流 Java 客户端的示例
Producer producer = environment.producerBuilder()
.stream("invoices")
.filterValue(msg -> msg.getApplicationProperties().get("region").toString())
.build();
在这个示例中,应用程序开发人员提供了一些逻辑来从消息的应用程序属性中提取过滤值。使用过滤非常简单:无需更改实际的消息发布代码,只需在创建 `Producer` 时提供过滤值逻辑即可。
现在让我们看看它对消费者是如何工作的。
消费者端
以下是一个 Java 代码片段,用于声明一个仅对来自 `emea` 区域的消息感兴趣的消费者
Consumer consumer = environment.consumerBuilder()
.stream("invoices")
.filter()
.values("emea")
.postFilter(msg -> "emea".equals(msg.getApplicationProperties().get("region")))
.builder()
.messageHandler((ctx, msg) -> {
// message processing code
})
.build();
过滤在两个地方进行配置
filter().values(String... filterValues)告诉代理我们对与这些值关联的消息感兴趣(我们可以指定多个值,而不仅仅是一个)filter().postFilter(Predicate<Message> filter)提供了一些客户端逻辑,用于过滤掉不与预期过滤值关联的消息
为什么需要这种客户端过滤逻辑?代理端过滤逻辑使用的是布隆过滤器
布隆过滤器是一种节省空间的概率性数据结构,用于测试一个元素是否属于一个集合。
布隆过滤器在存储和速度方面非常高效,但它是概率性的:它可能会返回误报。因此,代理可能会发送它认为匹配预期过滤值但实际上不匹配的消息。这就是为什么需要一些客户端过滤逻辑的原因。
这是需要注意的一点,但与流过滤带来的好处相比,这是一个小小的权衡。
一篇后续博客文章涵盖了流过滤的内部原理,供有兴趣了解技术细节的人阅读。您还可以查看流 Java 客户端关于过滤的文档以获取更多信息。它涵盖了例如,消息不总是必须与过滤值关联,并且消费者可以选择接收具有给定过滤值和没有过滤值的消息(使用 `filter().matchUnfiltered()`)。
试用一下
让我们看看流过滤的实际效果。启动一个 RabbitMQ 3.13+ 节点
docker run -it --rm --name rabbitmq -p 5552:5552 \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
rabbitmq:3.13
启用流插件
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream
下载Stream PerfTest(它需要 Java 11 或更高版本才能运行)
cd /tmp
wget -O stream-perf-test.jar \
https://github.com/rabbitmq/rabbitmq-java-tools-binaries-dev/releases/download/v-stream-perf-test-latest/stream-perf-test-latest.jar
让我们发布消息 10 秒
java -jar stream-perf-test.jar --producers 1 --consumers 0 --rate 100 --filter-value-set 1..50 --size 10000 --time 10
消息长度为 10 KB,`--filter-value-set 1..50` 表示每条消息都关联了一个介于 `"1"` 和 `"50"` 之间的随机过滤值。
让我们消费所有消息(不带任何过滤)
java -jar stream-perf-test.jar --producers 0 --consumers 1 --offset first --prometheus
输出应该在几秒钟后停止,当消费者到达流的末尾时。不要停止应用程序,而是打开另一个终端标签页,并查询 Stream PerfTest 指标,看看它读取了多少数据
curl --silent localhost:8080/metrics | grep rabbitmq_stream_read_bytes_total
您应该会看到类似以下内容
# HELP rabbitmq_stream_read_bytes_total
# TYPE rabbitmq_stream_read_bytes_total counter
rabbitmq_stream_read_bytes_total 1.0046894E7
大约是 10 MB。客户端必须传输整个流。
现在停止 Stream PerfTest(`Ctrl-C`),然后重新启动它,这次启用过滤
java -jar stream-perf-test.jar --producers 0 --consumers 1 --offset first --prometheus --filter-values 5
这里我们要求只获取过滤值为 `"5"` 的消息(`--filter-values 5`)。再次等待输出停止,然后查看读取的字节数
curl --silent localhost:8080/metrics | grep rabbitmq_stream_read_bytes_total
您应该会看到类似这样的结果
# HELP rabbitmq_stream_read_bytes_total
# TYPE rabbitmq_stream_read_bytes_total counter
rabbitmq_stream_read_bytes_total 1957641.0
不到 2 MB。节省了 8 MB 的带宽,大约 80%,不错!
当然,这有点人为:Stream PerfTest 不是一个真正的应用程序,它不太可能像真实应用程序那样分发消息和过滤值。但即便如此,它也让我们对流过滤的带宽节省有了大致的了解。
总结
我们快速回顾了 RabbitMQ 3.13 中的流过滤。它可以在消息从代理分发到消费应用程序时节省带宽。并非所有用例都能从流过滤中受益,但对于那些能够受益的用例,其带宽方面的优势非常引人注目。
请继续关注一篇后续博客文章,它将涵盖流过滤的内部细节,以帮助您以最优化方式使用和配置它。
