跳到主要内容

RabbitMQ 教程 - 主题

主题

(使用 Pika Python 客户端)

信息

先决条件

本教程假设 RabbitMQ 已安装并在 localhost 上的标准端口 (5672) 上运行。如果您使用不同的主机、端口或凭据,则需要调整连接设置。

在哪里获得帮助

如果您在学习本教程时遇到问题,可以通过 GitHub DiscussionsRabbitMQ 社区 Discord 联系我们。

先决条件

与其他 Python 教程一样,我们将使用 Pika RabbitMQ 客户端 1.0.0 版本

本教程的重点

之前的教程中,我们改进了日志系统。我们没有使用只能进行虚拟广播的 fanout 交换机,而是使用了 direct 交换机,并获得了选择性接收日志的可能性。

虽然使用 direct 交换机改进了我们的系统,但它仍然存在局限性 - 它无法根据多个标准进行路由。

在我们的日志系统中,我们可能不仅希望订阅基于严重性的日志,还希望订阅基于发出日志的源的日志。您可能从 syslog unix 工具中了解这个概念,该工具根据严重性(info/warn/crit...)和 facility(auth/cron/kern...)路由日志。

这将为我们提供很大的灵活性 - 我们可能只想监听来自 'cron' 的严重错误,但也想监听来自 'kern' 的所有日志。

为了在我们的日志系统中实现这一点,我们需要了解更复杂的 topic 交换机。

主题交换机

发送到 topic 交换机的消息不能具有任意的 routing_key - 它必须是由点分隔的单词列表。单词可以是任何内容,但通常它们指定与消息相关的某些特征。一些有效的路由键示例:stock.usd.nysenyse.vmwquick.orange.rabbit。路由键中可以包含任意数量的单词,最多 255 个字节。

绑定键也必须采用相同的形式。topic 交换机背后的逻辑类似于 direct 交换机 - 使用特定路由键发送的消息将传递到所有绑定了匹配绑定键的队列。但是,绑定键有两个重要的特殊情况

  • *(星号)可以代替一个单词。
  • #(井号)可以代替零个或多个单词。

用一个例子来解释最容易

在这个例子中,我们将发送所有描述动物的消息。消息将使用由三个单词(两个点)组成的路由键发送。路由键中的第一个单词将描述速度,第二个单词描述颜色,第三个单词描述物种:<celerity>.<colour>.<species>

我们创建了三个绑定:Q1 绑定了绑定键 *.orange.*,Q2 绑定了 *.*.rabbitlazy.#

这些绑定可以总结为

  • Q1 对所有橙色动物感兴趣。
  • Q2 想听到关于兔子的一切,以及关于懒惰动物的一切。

路由键设置为 quick.orange.rabbit 的消息将传递到两个队列。消息 lazy.orange.elephant 也将同时发送到这两个队列。另一方面,quick.orange.fox 将仅发送到第一个队列,而 lazy.brown.fox 仅发送到第二个队列。lazy.pink.rabbit 将仅传递到第二个队列一次,即使它匹配两个绑定。quick.brown.fox 不匹配任何绑定,因此将被丢弃。

如果我们违反约定并发送一个或四个单词的消息,例如 orangequick.orange.new.rabbit 会发生什么?好吧,这些消息将不匹配任何绑定,并且会丢失。

另一方面,lazy.orange.new.rabbit,即使它有四个单词,也将匹配最后一个绑定,并将传递到第二个队列。

主题交换机

主题交换机功能强大,可以像其他交换机一样工作。

当队列绑定了 #(井号)绑定键时 - 它将接收所有消息,而不管路由键如何 - 就像 fanout 交换机一样。

当绑定中未使用特殊字符 *(星号)和 #(井号)时,主题交换机的行为就像 direct 交换机一样。

将它们放在一起

我们将在日志系统中使用 topic 交换机。我们将从一个工作假设开始,即日志的路由键将有两个单词:<facility>.<severity>

代码几乎与之前的教程中的代码相同。

emit_log_topic.py (source)

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
exchange='topic_logs', routing_key=routing_key, body=message)
print(f" [x] Sent {routing_key}:{message}")
connection.close()

receive_logs_topic.py (source)

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
sys.exit(1)

for binding_key in binding_keys:
channel.queue_bind(
exchange='topic_logs', queue=queue_name, routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
print(f" [x] {method.routing_key}:{body}")


channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

要接收所有日志,请运行

python receive_logs_topic.py "#"

要接收来自 facility kern 的所有日志

python receive_logs_topic.py "kern.*"

或者,如果您只想听到关于 critical 日志的信息

python receive_logs_topic.py "*.critical"

您可以创建多个绑定

python receive_logs_topic.py "kern.*" "*.critical"

要使用路由键 kern.critical 类型发送日志

python emit_log_topic.py "kern.critical" "A critical kernel error"

尽情玩转这些程序。请注意,代码不对路由键或绑定键做任何假设,您可能想要使用两个以上的路由键参数。

继续学习教程 6 以了解 RPC

© . All rights reserved.