跳至主要内容

RabbitMQ 教程 - 路由

路由

(使用 Pika Python 客户端)

信息

先决条件

本教程假定 RabbitMQ 已安装并在localhost上运行,使用标准端口 (5672)。如果您使用其他主机、端口或凭据,则需要调整连接设置。

获取帮助的地方

如果您在完成本教程时遇到困难,您可以通过GitHub 讨论RabbitMQ 社区 Discord联系我们。

先决条件

与其他 Python 教程一样,我们将使用Pika RabbitMQ 客户端版本 1.0.0

本教程的重点

之前的教程中,我们构建了一个简单的日志系统。我们能够将日志消息广播给多个接收者。

在本教程中,我们将添加一个功能 - 我们将使其能够仅订阅消息的子集。例如,我们能够将仅关键错误消息定向到日志文件(以节省磁盘空间),同时仍然能够在控制台中打印所有日志消息。

绑定

在之前的示例中,我们已经创建了绑定。您可能还记得类似的代码

channel.queue_bind(exchange=exchange_name,
queue=queue_name)

绑定是交换机和队列之间的一种关系。这可以简单地理解为:队列对来自该交换机的消息感兴趣。

绑定可以采用额外的routing_key参数。为了避免与basic_publish参数混淆,我们将称之为绑定键。这是我们如何使用键创建绑定的方法

channel.queue_bind(exchange=exchange_name,
queue=queue_name,
routing_key='black')

绑定键的含义取决于交换机类型。我们之前使用的fanout交换机只是忽略了它的值。

直接交换机

我们之前的教程中的日志系统将所有消息广播给所有消费者。我们希望将其扩展到允许根据消息的严重程度筛选消息。例如,我们可能希望将写入日志消息到磁盘的脚本仅接收严重错误,而不浪费磁盘空间来保存警告或信息日志消息。

我们一直使用fanout交换机,它没有给我们太多灵活性 - 它只能进行盲目的广播。

我们将改为使用direct交换机。direct交换机背后的路由算法很简单 - 消息将发送到绑定键与消息的路由键完全匹配的队列。

为了说明这一点,请考虑以下设置

在此设置中,我们可以看到direct交换机X与两个队列绑定。第一个队列使用绑定键orange绑定,第二个队列有两个绑定,一个使用绑定键black,另一个使用green

在这种设置中,发布到交换机的路由键为orange的消息将被路由到队列Q1。路由键为blackgreen的消息将发送到Q2。所有其他消息将被丢弃。

多个绑定

使用相同的绑定键将多个队列绑定在一起是完全合法的。在我们的示例中,我们可以添加XQ1之间的绑定,绑定键为black。在这种情况下,direct交换机将表现得像fanout,并将消息广播到所有匹配的队列。路由键为black的消息将被传递给Q1Q2

发出日志

我们将使用此模型作为我们的日志系统。我们将向direct交换机发送消息,而不是fanout。我们将提供日志严重程度作为路由键。这样,接收脚本将能够选择它想要接收的严重程度。首先,让我们专注于发出日志。

像往常一样,我们需要先创建一个交换机

channel.exchange_declare(exchange='direct_logs',
exchange_type='direct')

我们准备发送消息了

channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)

为了简化问题,我们将假设“严重程度”可以是“info”、“warning”、“error”之一。

订阅

接收消息的工作原理与之前的教程相同,只有一个例外 - 我们将为我们感兴趣的每个严重程度创建一个新的绑定。

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)

将它们整合在一起

emit_log_direct.py (来源)

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
exchange='direct_logs', routing_key=severity, body=message)
print(f" [x] Sent {severity}:{message}")
connection.close()

receive_logs_direct.py (来源)

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)

for severity in severities:
channel.queue_bind(
exchange='direct_logs', queue=queue_name, routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
print(f" [x] {method.routing_key}:{body}")


channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

如果您想将“warning”和“error”(而不是“info”)日志消息保存到文件,只需打开一个控制台并输入

python receive_logs_direct.py warning error > logs_from_rabbit.log

如果您想在屏幕上查看所有日志消息,请打开一个新的终端并执行

python receive_logs_direct.py info warning error
# => [*] Waiting for logs. To exit press CTRL+C

例如,要发出error日志消息,请键入

python emit_log_direct.py error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'

继续教程 5,了解如何根据模式收听消息。