跳至主要内容

RabbitMQ 教程 - 远程过程调用 (RPC)

远程过程调用 (RPC)

(使用 Pika Python 客户端)

信息

先决条件

本教程假设 RabbitMQ 已安装并运行在 localhost 上的标准端口 (5672) 上。如果您使用不同的主机、端口或凭据,则需要调整连接设置。

获取帮助

如果您在完成本教程时遇到问题,可以通过 GitHub 讨论RabbitMQ 社区 Discord 与我们联系。

先决条件

与其他 Python 教程一样,我们将使用 Pika RabbitMQ 客户端 版本 1.0.0

本教程的重点

第二篇教程中,我们学习了如何使用工作队列在多个工作进程之间分配耗时的任务。

但是,如果我们需要在远程计算机上运行一个函数并等待结果呢?这是一个完全不同的问题。这种模式通常称为远程过程调用RPC

在本教程中,我们将使用 RabbitMQ 来构建一个 RPC 系统:一个客户端和一个可扩展的 RPC 服务器。由于我们没有需要分配的耗时任务,我们将创建一个虚拟 RPC 服务来返回斐波那契数列。

客户端接口

为了说明如何使用 RPC 服务,我们将创建一个简单的客户端类。它将公开一个名为 call 的方法,该方法将发送 RPC 请求并阻塞,直到收到答案。

fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print(f"fib(4) is {result}")

关于 RPC 的说明

虽然 RPC 在计算中是一种非常常见的模式,但它经常受到批评。问题出现在程序员不知道一个函数调用是本地的还是缓慢的 RPC 时。这种混淆会导致系统不可预测,并增加调试的复杂性。错误使用 RPC 不会简化软件,反而会导致难以维护的意大利面条代码。

考虑到这一点,请参考以下建议

  • 确保清楚地区分本地函数调用和远程函数调用。
  • 记录您的系统。使组件之间的依赖关系清晰可见。
  • 处理错误情况。如果 RPC 服务器长时间处于关闭状态,客户端应该如何反应?

如有疑问,请避免使用 RPC。如果可以,您应该使用异步管道 - 与 RPC 类似的阻塞不同,结果会异步推送到下一个计算阶段。

回调队列

总的来说,通过 RabbitMQ 进行 RPC 很容易。客户端发送一个请求消息,服务器用一个响应消息进行回复。为了接收响应,客户端需要与请求一起发送一个“回调”队列地址。让我们尝试一下

result = channel.queue_declare(queue='', exclusive=True)
callback_queue = result.method.queue

channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to = callback_queue,
),
body=request)

# ... and some code to read a response message from the callback_queue ...

消息属性

AMQP 0-9-1 协议预定义了一组 14 个与消息相关的属性。大多数属性很少使用,以下属性除外

  • delivery_mode: 将消息标记为持久性(值为 2)或瞬时性(任何其他值)。您可能还记得 第二篇教程中的此属性。
  • content_type: 用于描述编码的 MIME 类型。例如,对于常用的 JSON 编码,最佳做法是将此属性设置为:application/json
  • reply_to: 通常用于命名回调队列。
  • correlation_id: 用于将 RPC 响应与请求关联。

关联 ID

在上面介绍的方法中,我们建议为每个 RPC 请求创建一个回调队列。这样做效率很低,但幸运的是,还有更好的方法 - 让我们为每个客户端创建一个回调队列。

这带来了一个新的问题,在该队列中收到响应后,不清楚该响应属于哪个请求。这时,correlation_id 属性就派上用场了。我们将它设置为每个请求的唯一值。稍后,当我们在回调队列中收到消息时,我们将查看此属性,并根据它将响应与请求匹配。如果我们看到一个未知的 correlation_id 值,我们可以安全地丢弃该消息 - 它不属于我们的请求。

您可能会问,为什么我们应该忽略回调队列中的未知消息,而不是报错?这是因为服务器端可能存在竞争条件。虽然不太可能,但 RPC 服务器可能在向我们发送答案后但尚未发送请求的确认消息之前就崩溃了。如果发生这种情况,重新启动的 RPC 服务器将再次处理该请求。这就是为什么我们在客户端必须优雅地处理重复响应,而且 RPC 理想情况下应该是幂等的。

总结

我们的 RPC 将按如下方式工作

  • 当客户端启动时,它会创建一个匿名专用的回调队列。
  • 对于 RPC 请求,客户端会发送一条带有两个属性的消息:reply_to,设置为回调队列,以及 correlation_id,设置为每个请求的唯一值。
  • 请求被发送到 rpc_queue 队列。
  • RPC 工作进程(又名:服务器)正在等待该队列上的请求。当出现请求时,它会执行任务并将结果消息发送回客户端,使用 reply_to 字段中的队列。
  • 客户端等待回调队列上的数据。当出现消息时,它会检查 correlation_id 属性。如果它与请求中的值匹配,它会将响应返回到应用程序。

综合示例

rpc_server.py (源代码)

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n - 1) + fib(n - 2)

def on_request(ch, method, props, body):
n = int(body)

print(f" [.] fib({n})")
response = fib(n)

ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id = \
props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)

print(" [x] Awaiting RPC requests")
channel.start_consuming()

服务器代码相当简单

  • 像往常一样,我们首先建立连接并声明 rpc_queue 队列。
  • 我们声明了我们的斐波那契函数。它假设只有有效的正整数输入。(不要指望它对大数有效,它可能是最慢的递归实现。)
  • 我们为 basic_consume 声明了一个回调函数 on_request,这是 RPC 服务器的核心。它在收到请求时执行。它会执行任务并将响应发送回去。
  • 我们可能希望运行多个服务器进程。为了将负载均匀地分散到多个服务器上,我们需要设置 prefetch_count 设置。

rpc_client.py (源代码)

#!/usr/bin/env python
import pika
import uuid


class FibonacciRpcClient(object):

def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))

self.channel = self.connection.channel()

result = self.channel.queue_declare(queue='', exclusive=True)
self.callback_queue = result.method.queue

self.channel.basic_consume(
queue=self.callback_queue,
on_message_callback=self.on_response,
auto_ack=True)

self.response = None
self.corr_id = None

def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body

def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(
exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=str(n))
while self.response is None:
self.connection.process_data_events(time_limit=None)
return int(self.response)


fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(f" [.] Got {response}")

客户端代码稍微复杂一些

  • 我们建立连接、通道并声明一个专用的 callback_queue 用于回复。
  • 我们订阅了 callback_queue,以便我们可以接收 RPC 响应。
  • on_response 回调函数在每个响应上执行,它执行一个非常简单的任务,它会检查每个响应消息的 correlation_id 是否是我们正在寻找的。如果是,它会在 self.response 中保存响应并中断消费循环。
  • 接下来,我们定义了主要的 call 方法 - 它执行实际的 RPC 请求。
  • call 方法中,我们生成一个唯一的 correlation_id 编号并保存它 - on_response 回调函数将使用该值来捕获相应的响应。
  • 同样在 call 方法中,我们发布请求消息,并带有两个属性:reply_tocorrelation_id
  • 最后,我们等待适当的响应到达,并将响应返回给用户。

我们的 RPC 服务现在已经准备就绪。我们可以启动服务器

python rpc_server.py
# => [x] Awaiting RPC requests

要请求斐波那契数列,请运行客户端

python rpc_client.py
# => [x] Requesting fib(30)

所展示的设计并不是 RPC 服务的唯一实现方式,但它具有一些重要的优点

  • 如果 RPC 服务器太慢,您可以通过运行另一个服务器来进行扩展。尝试在新控制台中运行第二个 rpc_server.py
  • 在客户端,RPC 只需要发送和接收一条消息。不需要像 queue_declare 这样的同步调用。因此,RPC 客户端只需要一个网络往返就可以完成一个 RPC 请求。

我们的代码仍然很简单,没有试图解决更复杂(但重要)的问题,比如

  • 如果没有任何服务器运行,客户端应该如何反应?
  • 客户端是否应该为 RPC 设置某种超时机制?
  • 如果服务器出现故障并抛出异常,是否应该将其转发给客户端?
  • 在处理之前,保护免受无效的传入消息(例如,检查边界)。

如果您想进行实验,您可能会发现 管理 UI 有助于查看队列。