RabbitMQ 教程 - 主题
主题
(使用 Pika Python 客户端)
先决条件
本教程假设 RabbitMQ 已安装并在 localhost 上的标准端口 (5672) 上运行。如果您使用不同的主机、端口或凭据,则需要调整连接设置。
在哪里获得帮助
如果您在学习本教程时遇到问题,可以通过 GitHub Discussions 或 RabbitMQ 社区 Discord 联系我们。
先决条件
与其他 Python 教程一样,我们将使用 Pika RabbitMQ 客户端 1.0.0 版本。
本教程的重点
在之前的教程中,我们改进了日志系统。我们没有使用只能进行虚拟广播的 fanout 交换机,而是使用了 direct 交换机,并获得了选择性接收日志的可能性。
虽然使用 direct 交换机改进了我们的系统,但它仍然存在局限性 - 它无法根据多个标准进行路由。
在我们的日志系统中,我们可能不仅希望订阅基于严重性的日志,还希望订阅基于发出日志的源的日志。您可能从 syslog unix 工具中了解这个概念,该工具根据严重性(info/warn/crit...)和 facility(auth/cron/kern...)路由日志。
这将为我们提供很大的灵活性 - 我们可能只想监听来自 'cron' 的严重错误,但也想监听来自 'kern' 的所有日志。
为了在我们的日志系统中实现这一点,我们需要了解更复杂的 topic 交换机。
主题交换机
发送到 topic 交换机的消息不能具有任意的 routing_key - 它必须是由点分隔的单词列表。单词可以是任何内容,但通常它们指定与消息相关的某些特征。一些有效的路由键示例:stock.usd.nyse、nyse.vmw、quick.orange.rabbit。路由键中可以包含任意数量的单词,最多 255 个字节。
绑定键也必须采用相同的形式。topic 交换机背后的逻辑类似于 direct 交换机 - 使用特定路由键发送的消息将传递到所有绑定了匹配绑定键的队列。但是,绑定键有两个重要的特殊情况
- *(星号)可以代替一个单词。
- #(井号)可以代替零个或多个单词。
用一个例子来解释最容易
在这个例子中,我们将发送所有描述动物的消息。消息将使用由三个单词(两个点)组成的路由键发送。路由键中的第一个单词将描述速度,第二个单词描述颜色,第三个单词描述物种:<celerity>.<colour>.<species>。
我们创建了三个绑定:Q1 绑定了绑定键 *.orange.*,Q2 绑定了 *.*.rabbit 和 lazy.#。
这些绑定可以总结为
- Q1 对所有橙色动物感兴趣。
- Q2 想听到关于兔子的一切,以及关于懒惰动物的一切。
路由键设置为 quick.orange.rabbit 的消息将传递到两个队列。消息 lazy.orange.elephant 也将同时发送到这两个队列。另一方面,quick.orange.fox 将仅发送到第一个队列,而 lazy.brown.fox 仅发送到第二个队列。lazy.pink.rabbit 将仅传递到第二个队列一次,即使它匹配两个绑定。quick.brown.fox 不匹配任何绑定,因此将被丢弃。
如果我们违反约定并发送一个或四个单词的消息,例如 orange 或 quick.orange.new.rabbit 会发生什么?好吧,这些消息将不匹配任何绑定,并且会丢失。
另一方面,lazy.orange.new.rabbit,即使它有四个单词,也将匹配最后一个绑定,并将传递到第二个队列。
主题交换机
主题交换机功能强大,可以像其他交换机一样工作。
当队列绑定了
#(井号)绑定键时 - 它将接收所有消息,而不管路由键如何 - 就像fanout交换机一样。当绑定中未使用特殊字符
*(星号)和#(井号)时,主题交换机的行为就像direct交换机一样。
将它们放在一起
我们将在日志系统中使用 topic 交换机。我们将从一个工作假设开始,即日志的路由键将有两个单词:<facility>.<severity>。
代码几乎与之前的教程中的代码相同。
emit_log_topic.py (source)
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
    exchange='topic_logs', routing_key=routing_key, body=message)
print(f" [x] Sent {routing_key}:{message}")
connection.close()
receive_logs_topic.py (source)
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)
for binding_key in binding_keys:
    channel.queue_bind(
        exchange='topic_logs', queue=queue_name, routing_key=binding_key)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
    print(f" [x] {method.routing_key}:{body}")
channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
要接收所有日志,请运行
python receive_logs_topic.py "#"
要接收来自 facility kern 的所有日志
python receive_logs_topic.py "kern.*"
或者,如果您只想听到关于 critical 日志的信息
python receive_logs_topic.py "*.critical"
您可以创建多个绑定
python receive_logs_topic.py "kern.*" "*.critical"
要使用路由键 kern.critical 类型发送日志
python emit_log_topic.py "kern.critical" "A critical kernel error"
尽情玩转这些程序。请注意,代码不对路由键或绑定键做任何假设,您可能想要使用两个以上的路由键参数。
继续学习教程 6 以了解 RPC。