跳转到主要内容

RabbitMQ 教程 - 远程过程调用 (RPC)

远程过程调用 (RPC)

(使用 Pika Python 客户端)

信息

先决条件

本教程假设 RabbitMQ 已安装并在 localhost 上的标准端口 (5672) 上运行。如果您使用不同的主机、端口或凭据,则需要调整连接设置。

在哪里获取帮助

如果您在学习本教程时遇到问题,可以通过GitHub DiscussionsRabbitMQ 社区 Discord 联系我们。

先决条件

与其他 Python 教程一样,我们将使用 Pika RabbitMQ 客户端 1.0.0 版本

本教程的重点

第二个教程中,我们学习了如何使用工作队列在多个工作者之间分配耗时的任务。

但是,如果我们需要在远程计算机上运行一个函数并等待结果呢?嗯,那又是另一回事了。这种模式通常被称为远程过程调用RPC

在本教程中,我们将使用 RabbitMQ 构建一个 RPC 系统:一个客户端和一个可扩展的 RPC 服务器。由于我们没有任何值得分配的耗时任务,我们将创建一个虚拟 RPC 服务,该服务返回斐波那契数。

客户端接口

为了说明 RPC 服务如何使用,我们将创建一个简单的客户端类。它将公开一个名为 call 的方法,该方法发送 RPC 请求并阻塞直到收到答案

fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print(f"fib(4) is {result}")

关于 RPC 的说明

尽管 RPC 在计算中是一种非常常见的模式,但它经常受到批评。当程序员不清楚函数调用是本地调用还是慢速 RPC 时,问题就出现了。这样的混淆会导致系统不可预测,并给调试增加不必要的复杂性。误用的 RPC 不但不能简化软件,反而会导致难以维护的意大利面条式代码。

考虑到这一点,请考虑以下建议

  • 确保清楚地知道哪个函数调用是本地的,哪个是远程的。
  • 记录您的系统。明确组件之间的依赖关系。
  • 处理错误情况。当 RPC 服务器长时间宕机时,客户端应该如何反应?

如有疑问,请避免使用 RPC。如果可以,您应该使用异步管道 - 而不是类似 RPC 的阻塞,结果会异步推送到下一个计算阶段。

回调队列

总的来说,通过 RabbitMQ 进行 RPC 很简单。客户端发送请求消息,服务器回复响应消息。为了接收响应,客户端需要随请求一起发送“回调”队列地址。让我们试试看

result = channel.queue_declare(queue='', exclusive=True)
callback_queue = result.method.queue

channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to = callback_queue,
),
body=request)

# ... and some code to read a response message from the callback_queue ...

消息属性

AMQP 0-9-1 协议预定义了一组与消息相关的 14 个属性。大多数属性很少使用,以下属性除外

  • delivery_mode:将消息标记为持久性(值为 2)或瞬态(任何其他值)。您可能还记得第二个教程中的这个属性。
  • content_type:用于描述编码的 mime 类型。例如,对于常用的 JSON 编码,最好将此属性设置为:application/json
  • reply_to:通常用于命名回调队列。
  • correlation_id:用于将 RPC 响应与请求关联起来。

关联 ID

为每个 RPC 请求创建一个回调队列效率低下。更好的方法是为每个客户端创建一个单独的回调队列。

这引发了一个新问题,在队列中收到响应后,不清楚响应属于哪个请求。这时就需要使用 correlation_id 属性。我们将为每个请求将其设置为唯一值。稍后,当我们在回调队列中收到消息时,我们将查看此属性,并根据该属性将响应与请求匹配。如果我们看到未知的 correlation_id 值,我们可以安全地丢弃该消息 - 它不属于我们的请求。

您可能会问,为什么我们应该忽略回调队列中的未知消息,而不是抛出错误?这是因为服务器端可能存在竞争条件。尽管不太可能,但 RPC 服务器可能在向我们发送答案后,但在为请求发送确认消息之前就崩溃了。如果发生这种情况,重新启动的 RPC 服务器将再次处理该请求。这就是为什么在客户端我们必须优雅地处理重复响应,并且 RPC 最好是幂等的。

总结

我们的 RPC 将像这样工作

  • 当客户端启动时,它会创建一个独占回调队列。
  • 对于 RPC 请求,客户端发送一条消息,其中包含两个属性:reply_to,设置为回调队列;correlation_id,设置为每个请求的唯一值。
  • 请求被发送到 rpc_queue 队列。
  • RPC 工作者(又名:服务器)正在等待该队列上的请求。当请求出现时,它会完成工作并将结果消息发送回客户端,使用来自 reply_to 字段的队列。
  • 客户端等待回调队列上的数据。当消息出现时,它会检查 correlation_id 属性。如果它与请求中的值匹配,则将响应返回给应用程序。

将它们放在一起

rpc_server.py (源代码)

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n - 1) + fib(n - 2)

def on_request(ch, method, props, body):
n = int(body)

print(f" [.] fib({n})")
response = fib(n)

ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id = \
props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)

print(" [x] Awaiting RPC requests")
channel.start_consuming()

服务器代码相当简单明了

  • 像往常一样,我们首先建立连接并声明队列 rpc_queue
  • 我们声明我们的斐波那契函数。它仅接受有效的正整数输入。(不要期望这个函数能处理大数字,它可能是最慢的递归实现)。
  • 我们为 basic_consume 声明一个回调 on_request,它是 RPC 服务器的核心。当收到请求时,它会被执行。它完成工作并将响应发送回。
  • 我们可能想要运行多个服务器进程。为了将负载平均分配到多个服务器上,我们需要设置 prefetch_count 设置。

rpc_client.py (源代码)

#!/usr/bin/env python
import pika
import uuid


class FibonacciRpcClient(object):

def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))

self.channel = self.connection.channel()

result = self.channel.queue_declare(queue='', exclusive=True)
self.callback_queue = result.method.queue

self.channel.basic_consume(
queue=self.callback_queue,
on_message_callback=self.on_response,
auto_ack=True)

self.response = None
self.corr_id = None

def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body

def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(
exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=str(n))
while self.response is None:
self.connection.process_data_events(time_limit=None)
return int(self.response)


fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(f" [.] Got {response}")

客户端代码稍微复杂一些

  • 我们建立连接、通道并为回复声明一个独占 callback_queue
  • 我们订阅 callback_queue,以便我们可以接收 RPC 响应。
  • 在每个响应上执行的 on_response 回调函数执行一个非常简单的工作,对于每个响应消息,它都会检查 correlation_id 是否是我们正在寻找的那个。如果是,它会将响应保存在 self.response 中并中断消费循环。
  • 接下来,我们定义我们的主 call 方法 - 它执行实际的 RPC 请求。
  • call 方法中,我们生成一个唯一的 correlation_id 号码并保存它 - on_response 回调函数将使用此值来捕获适当的响应。
  • 同样在 call 方法中,我们发布请求消息,其中包含两个属性:reply_tocorrelation_id
  • 最后,我们等待直到正确的响应到达,并将响应返回给用户。

我们的 RPC 服务现在已准备就绪。我们可以启动服务器

python rpc_server.py
# => [x] Awaiting RPC requests

要请求斐波那契数,请运行客户端

python rpc_client.py
# => [x] Requesting fib(30)

所提出的设计不是 RPC 服务的唯一可能实现,但它具有一些重要的优势

  • 如果 RPC 服务器太慢,您只需运行另一个服务器即可扩展。尝试在新控制台中运行第二个 rpc_server.py
  • 在客户端,RPC 只需要发送和接收一条消息。不需要像 queue_declare 这样的同步调用。因此,RPC 客户端对于单个 RPC 请求只需要一个网络往返。

我们的代码仍然非常简单,并没有尝试解决更复杂(但重要)的问题,例如

  • 如果没有服务器运行,客户端应该如何反应?
  • 客户端是否应该为 RPC 设置某种超时?
  • 如果服务器发生故障并引发异常,是否应该将其转发给客户端?
  • 在处理之前防止无效的传入消息(例如,检查边界)。

如果您想进行实验,您可能会发现 管理 UI 对于查看队列很有用。

© . All rights reserved.