Puka - 重新思考 AMQP 客户端
我从根本上不同意我们当前 AMQP 客户端库公开的 API。
它们之所以不完善是有原因的:从一开始我们就刻意避免在 API 方面进行创新。 我们的客户端库的目的是公开通用的 AMQP,而不是任何一种消息传递的观点。 但是,在我看来,尝试将 AMQP 直接映射到客户端库 API 是错误的,会导致过度复杂化和难以使用的抽象。
没有共同点:盲目遵循 AMQP 模型的客户端库将是复杂的; 易于使用的客户端库必须有自己的主张。
1. 通道
客户端库遵循协议的主要问题是由 AMQP 通道的性质引起的。 通道通常被解释为匹配操作系统线程的抽象 - 您可以有很多这样的通道,并且每个通道都是同步的。
这一切都很好,但是 AMQP 通道不仅仅限于线程 - 它远不止于此:错误范围、事务范围、排序保证和确认范围。
程序员可以决定在单个线程中使用多个通道,或者相反:多个线程可能需要在一个通道上工作。
第一种情况的示例:在两个队列之间转发消息(一个通道用于发布,一个用于消费)。 第二种情况:在多个工作线程之间拆分来自一个通道的工作(以便在工作线程之间共享 basic.qos 配额)。
不可避免地,客户端库的作者必须决定通道和线程之间的关系。 如果您来自 .NET/Java 背景,这听起来可能很乏味 - 这些框架对线程处理有自己的看法。 但是在第三方库中假设任何关于线程模型的做法在某些语言中是非常糟糕的做法,例如 C 和 Python。
对于处理多个连接的问题,我们可以重复几乎相同的讨论。 例如,单个线程可能需要与两个连接通信。
每个客户端库作者都必须回答以下两个问题
- 是否可以同时在多个通道上运行多个同步方法?
- 是否可以从单个线程运行多个连接?
两个问题 - 四种可能的选择
阻塞多个通道 | 从单个线程处理多个连接 | |
---|---|---|
否 | 否 | 简单的阻塞客户端 (pyamqplib) |
否 | 是 | 半异步客户端 (pika 0.5.2) |
是 | 否 | 线程客户端 (rabbitmq-java, rabbitmq-dotnet) |
是 | 是 | 完全异步客户端 (puka) |
2. 错误处理
下一个问题是错误处理。 使用某些客户端库,实际上不可能捕获 AMQP 错误并在不重启整个程序的情况下从中恢复。 这通常是由于用户不理解通道作为错误范围的性质造成的。 但是这些库并没有使处理错误变得容易:您收到一个通道错误,然后呢? 例如,执行 basic.publish 可能会在理论上随时终止您的通道。
3. 同步发布
最后一个有缺陷的地方是缺乏对同步发布的支持。 在 RabbitMQ 扩展 AMQP 以支持“确认”之前,实际上不可能确保消息已传递到 Broker。 唯一的解决方案是使用事务,这会极大地减慢发布速度。 现在,有了“确认”,这是可能的,但相当困难 - 除了编写回调之外,用户还需要维护库线程和用户线程之间的锁,这需要理解库线程模型。
诞生
出于这种挫败感,一个新的实验性 Python 客户端诞生了:Puka。
Puka 尝试为底层 AMQP 协议提供简单的 API 和合理的错误处理。 Puka 的主要特点
- 单线程。 它不对底层线程模型做任何假设; 如果需要,用户可以在 Puka 之上编写一个薄的线程层。
- 可以混合使用同步和异步编程风格。
- AMQP 错误是可预测且可恢复的。
- Basic.publish 可以是同步的或异步的,随您所愿。
Puka 的反向特性
- AMQP 通道不对用户公开。
- 移除了对某些 AMQP 功能的支持,最值得注意的是心跳。
代码片段
作为预告,这里有一些代码片段。
逐个声明 1000 个队列
for i in range(1000):
promise = client.queue_declare(queue='a%04i' % i)
client.wait(promise)
并行声明 1000 个队列
promises = [client.queue_declare(queue='a%04i' % i) for i in range(1000)]
for promise in promises:
client.wait(promise)
异步发布
client.basic_publish(exchange='', routing_key='test',
body="Hello world!")
同步发布
promise = client.basic_publish(exchange='', routing_key='test',
body="Hello world!")
client.wait(promise)
AMQP 错误不会影响程序的其他部分(发布、消费等)。 例如,如果 'test' 队列已被声明为 'durable',并且您尝试在没有适当标志的情况下重新声明它,您将收到错误
> promise = client.queue_declare(queue='test')
> client.wait(promise)
Traceback (most recent call last):
[...]
puka.spec_exceptions.PreconditionFailed: {'class_id': 50, 'method_id': 10,
'reply_code': 406, 'reply_text': "PRECONDITION_FAILED - parameters for queue
'test' in vhost '/' not equivalent"}
在 Puka 中,您可以简单地捕获此异常并继续
try:
promise = client.queue_declare(queue='test')
client.wait(promise)
except puka.PreconditionFailed:
# Oh, sorry. Forgot it was durable.
promise = client.queue_declare(queue='test', durable=True)
client.wait(promise)
您可以查看 用于 RabbitMQ 教程的 Puka 代码 和 Puka 示例 以及 测试。
总结
总而言之,Puka 提供了更简单的 API、灵活的编程模型、适当的错误处理,并且不对线程处理做任何决定。 它使再次使用 AMQP 变得有趣。