跳至主内容

AMQP 1.0 过滤表达式

·6 分钟阅读

RabbitMQ 4.1 引入了一项令人兴奋的新功能:用于Streams的 AMQP 过滤表达式。

此功能使 RabbitMQ 能够支持多个并发客户端,每个客户端仅消耗特定子集的消息,同时保留消息顺序。此外,它通过仅分派与客户端兴趣匹配的消息,最大限度地减少了 RabbitMQ 和其客户端之间的网络流量。

在这篇博文中,我们将探讨 AMQP 过滤表达式是什么,并通过一个简单的 Java 示例来演示如何使用它们。

规范

正如 原生 AMQP 1.0 博客文章中所述,AMQP 1.0 的优势之一是其可扩展性,由众多扩展规范支持。RabbitMQ 4.1 利用了扩展规范 AMQP Filter Expressions Version 1.0 Working Draft 09

本规范定义了 AMQP 消息过滤表达式的类型定义。过滤表达式是对消息进行评估的谓词,返回 truefalse。如果谓词评估为 true,则代理会将消息分派给消费者。

RabbitMQ 4.1 实现了本规范的一个子集,包括

  • § 4.2.4 properties 过滤器:应用于消息的不可变的 properties 部分。
  • § 4.2.5 application-properties 过滤器:应用于消息的不可变的 application-properties 部分。

示例

想象一下,每条消息都带有指定特定颜色的元数据。不同的消费者可以订阅同一个流,通过过滤消息来接收他们感兴趣的匹配颜色。

Consumers filtering messages from a stream
从流中过滤消息的消费者

第一个消费者接收所有绿色消息。第二个消费者接收所有紫色消息。第三个消费者接收所有蓝色消息。

尝试这个例子。

您可以使用 amqp-filter-expressions 示例应用程序以及 RabbitMQ AMQP 1.0 Java 客户端,按照以下步骤尝试此示例:

  1. 使用以下命令启动 RabbitMQ 服务器
docker run -it --rm --name rabbitmq \
-p 5672:5672 -p 15672:15672 \
rabbitmq:4.1-rc-management
  1. 导航到示例应用程序的根目录并启动客户端
mvn clean compile exec:java

运行示例应用程序后,您应该会在控制台看到以下输出

publisher sent message 0 with color green
publisher sent message 1 with color blue
publisher sent message 2 with color purple
publisher sent message 3 with color purple
publisher sent message 4 with color green
publisher sent message 5 with color green
consumer (filter green) received message 0
consumer (filter green) received message 4
consumer (filter green) received message 5
consumer (filter purple) received message 2
consumer (filter purple) received message 3
consumer (filter blue) received message 1
consumer (filter &s:e) received message 1
consumer (filter &s:e) received message 2
consumer (filter &s:e) received message 3

在此示例中,发布者发送六条消息,在 application-properties 部分为每条消息指定了特定的颜色。

  • 第一个消费者应用 color: green 的 application-properties 过滤器,按消息发布到流的顺序接收所有绿色消息。
  • 类似地,第二个消费者过滤 color: purple,接收所有紫色消息,第三个消费者过滤 color: blue,接收所有蓝色消息。

此外,此示例应用程序包含第四个消费者(上图未显示),其过滤器匹配颜色以字母 e 结尾的消息。(根据规范,过滤器表达式 &s:suffix 匹配以指定后缀结尾的值。)因此,第四个消费者接收蓝色和紫色消息。

AMQP 过滤表达式使多个客户端能够同时从同一个流中消费特定子集的消息,同时保持消息顺序。此功能还可以通过仅分派与每个客户端兴趣匹配的消息来最小化 RabbitMQ 和其客户端之间的网络流量。

流过滤比较

本博客文章中描述的 **AMQP 过滤表达式** 功能不应与 RabbitMQ 3.13 中引入的 **基于布隆过滤器的流过滤** 相混淆。

这两种功能都服务于同一个目的:从流中过滤消息。然而,它们的实现不同,导致特征也不同。

功能AMQP 过滤表达式基于布隆过滤器的流过滤
支持的协议AMQP 1.0主要适用于 RabbitMQ Streams 协议,但也支持 AMQP 1.0、AMQP 0.9.1 和 STOMP。
误报可能:需要在客户端进行额外的每条消息过滤。
支持多个过滤值(发布者)是:发布者可以在 properties 或 application-properties 部分定义多个值。否:发布者每条消息只能指定一个过滤值。
支持多个过滤表达式(消费者)是:消费者可以提供多个过滤表达式,如果**所有**过滤器都匹配,则会传递消息。是:消费者可以指定多个过滤值,如果**任何**过滤器匹配,则会传递消息。
前缀和后缀匹配是:对于字符串值,消费者可以定义这样的表达式:“过滤 subjectemea. 开头的消息”或“过滤 application-properties 部分包含键 color 且值以 e 结尾的消息。”
代理开销使用高效的 Erlang 模式匹配或术语相等操作实现。但是,为了每个消费者,每条消息都会被读入内存(除非与基于布隆过滤器的过滤结合使用)。最小:布隆过滤器成员资格检查使用常数时间。使用 RabbitMQ Streams 协议,sendfile 系统调用 优化了块传输,消息不会进入用户空间。
网络开销较低:只传输匹配消费者过滤器的消息。较高:即使只有一个消息匹配,也会传输整个

通过 AMQP 1.0 消费时,可以同时使用这两种功能。

总结

RabbitMQ 4.1 解决了在单个队列/流上启用多个消费者,同时确保某些消息(例如具有相同主题或 ID 的消息)始终由同一消费者处理,从而保持按顺序处理的 挑战

虽然此功能不适用于 经典队列仲裁队列,但 AMQP 过滤表达式允许消费者在从流中消费消息时过滤消息。由于流是不可变的日志,因此会保持总消息顺序。

© . This site is unofficial and not affiliated with VMware.