RabbitMQ 教程 - 工作队列
工作队列
(使用 Objective-C 客户端)
先决条件
本教程假设 RabbitMQ 已 安装 并在 localhost 上的 标准端口 (5672) 上运行。如果您使用不同的主机、端口或凭据,则需要调整连接设置。
哪里寻求帮助
如果您在学习本教程时遇到困难,可以通过 GitHub Discussions 或 RabbitMQ 社区 Discord 联系我们。
在 第一个教程 中,我们编写了向命名队列发送和接收消息的方法。在本教程中,我们将创建一个“工作队列”,用于在多个工作进程之间分发耗时任务。
工作队列(又称:任务队列)背后的主要思想是避免立即执行一个资源密集型任务并等待其完成。相反,我们安排任务稍后执行。我们将一个任务封装成一条消息并将其发送到队列。一个在后台运行的工作进程会接收任务并最终执行工作。当您运行多个工作进程时,任务将在它们之间共享。
这个概念对于 Web 应用程序尤其有用,在这些应用程序中,不可能在短暂的 HTTP 请求窗口内处理一个复杂的任务。
准备工作
在本教程的 上一部分,我们发送了一条包含“Hello World!”的消息。现在,我们将发送代表复杂任务的字符串。我们没有实际的真实世界任务,比如需要调整大小的图片或需要渲染的 pdf 文件,所以我们通过假装自己很忙来模拟它——通过使用 sleep。我们将字符串中的点数视为其复杂性;每个点代表一秒钟的“工作”。例如,由 Hello... 描述的模拟任务将需要三秒钟。
我们将稍微修改 我们之前示例 中的 send 方法,以便允许将任意字符串作为方法参数发送。此方法将把任务调度到我们的工作队列,所以让我们将其重命名为 newTask。除了新参数外,实现保持不变。
- (void)newTask:(NSString *)msg {
NSLog(@"Attempting to connect to local RabbitMQ broker");
RMQConnection *conn = [[RMQConnection alloc] initWithDelegate:[RMQConnectionDelegateLogger new]];
[conn start];
id<RMQChannel> ch = [conn createChannel];
RMQQueue *q = [ch queue:@"hello"];
NSData *msgData = [msg dataUsingEncoding:NSUTF8StringEncoding];
[ch.defaultExchange publish:msgData routingKey:q.name];
NSLog(@"Sent %@", msg);
[conn close];
}
我们旧的 receive 方法需要一些较大的改动:它需要为消息体中的每个点模拟一秒钟的工作。为了帮助我们理解发生了什么,让每个工作进程都有一个名称,并且每个工作进程都需要从队列中取出消息并执行任务,所以让我们称之为 workerNamed:
[q subscribe:^(RMQMessage * _Nonnull message) {
NSString *messageText = [[NSString alloc] initWithData:message.body encoding:NSUTF8StringEncoding];
NSLog(@"%@: Received %@", name, messageText);
// imitate some work
unsigned int sleepTime = (unsigned int)[messageText componentsSeparatedByString:@"."].count - 1;
NSLog(@"%@: Sleeping for %u seconds", name, sleepTime);
sleep(sleepTime);
}];
请注意,我们模拟的任务是执行时间的。
像在教程一中一样,从 viewDidLoad 运行它们
- (void)viewDidLoad {
[super viewDidLoad];
[self newTask:@"Hello World..."];
[self workerNamed:@"Flopsy"];
}
日志输出应该表明 Flopsy 正在睡眠三秒钟。
轮询分发
使用任务队列的一个优点是能够轻松地并行化工作。如果我们正在积累工作积压,我们可以添加更多的工作进程,从而轻松扩展。
让我们尝试同时运行两个 workerNamed: 方法。它们都会从队列中获取消息,但具体是如何获取的呢?让我们来看看。
更改 viewDidLoad 以发送更多消息并启动两个工作进程
- (void)viewDidLoad {
[super viewDidLoad];
[self workerNamed:@"Jack"];
[self workerNamed:@"Jill"];
[self newTask:@"Hello World..."];
[self newTask:@"Just one this time."];
[self newTask:@"Five....."];
[self newTask:@"None"];
[self newTask:@"Two..dots"];
}
让我们看看分发给我们的工作进程的内容
# => Jack: Waiting for messages
# => Jill: Waiting for messages
# => Sent Hello World...
# => Jack: Received Hello World...
# => Jack: Sleeping for 3 seconds
# => Sent Just one this time.
# => Jill: Received Just one this time.
# => Jill: Sleeping for 1 seconds
# => Sent Five.....
# => Sent None
# => Sent Two..dots
# => Jill: Received Five.....
# => Jill: Sleeping for 5 seconds
# => Jack: Received None
# => Jack: Sleeping for 0 seconds
# => Jack: Received Two..dots
# => Jack: Sleeping for 2 seconds
默认情况下,RabbitMQ 会按顺序将每条消息发送给下一个消费者。平均而言,每个消费者将获得相同数量的消息。这种分发消息的方式称为轮询。尝试使用三个或更多工作进程进行此操作。
消息确认
完成一项任务可能需要几秒钟,您可能会想,如果一个消费者开始一项长时间任务然后终止了怎么办?使用我们当前的代码,一旦 RabbitMQ 将消息传递给消费者,它会立即将其标记为删除。在这种情况下,如果您终止一个工作者,它正在处理的消息就会丢失。分派给该特定工作者但尚未处理的消息也会丢失。
但是我们不想丢失任何任务。如果一个工作进程死亡,我们希望将任务分发给另一个工作进程。
为了确保消息永远不会丢失,RabbitMQ 支持消息确认。ack(确认)由消费者发送回来,告知 RabbitMQ 特定消息已收到、已处理,并且 RabbitMQ 可以删除它。
如果消费者死亡(其通道关闭、连接关闭或 TCP 连接丢失)而未发送 ack,RabbitMQ 将理解消息未完全处理,并会将其重新排队。如果当时有其他消费者在线,它将迅速将其重新分发给另一个消费者。这样,您可以确信没有消息会丢失,即使工作进程偶尔会死亡。
强制执行消费者交付确认的超时(默认为 30 分钟)。这有助于检测从未确认交付的错误(卡住)的消费者。您可以按照交付确认超时中的说明增加此超时。
消息确认在客户端中默认是关闭的,但在 AMQ 协议中不是(AMQBasicConsumeNoAck 选项由 subscribe: 自动发送)。现在是时候通过显式设置 AMQBasicConsumeNoOptions 并在完成任务后从工作进程发送适当的确认来打开确认了。
RMQBasicConsumeOptions manualAck = RMQBasicConsumeNoOptions;
[q subscribe:manualAck handler:^(RMQMessage * _Nonnull message) {
NSString *messageText = [[NSString alloc] initWithData:message.body encoding:NSUTF8StringEncoding];
NSLog(@"%@: Received %@", name, messageText);
// imitate some work
unsigned int sleepTime = (unsigned int)[messageText componentsSeparatedByString:@"."].count - 1;
NSLog(@"%@: Sleeping for %u seconds", name, sleepTime);
sleep(sleepTime);
[ch ack:message.deliveryTag];
}];
使用此代码,我们可以确信即使工作进程在处理消息时崩溃,也不会丢失任何内容。工作进程崩溃后不久,所有未确认的消息将被重新传递。
确认必须在接收到交付的同一通道上发送。尝试使用不同通道进行确认将导致通道级协议异常。请参阅确认文档指南以了解更多信息。
遗忘的确认
忘记
ack是一个常见的错误。这是一个简单的错误,但后果严重。当您的客户端退出时,消息将被重新传递(这看起来可能像是随机的重新传递),但 RabbitMQ 将消耗越来越多的内存,因为它无法释放任何未确认的消息。为了调试此类错误,您可以使用
rabbitmqctl打印messages_unacknowledged字段sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged在 Windows 上,删除 sudo
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
消息持久性
我们已经学习了如何确保即使工作进程死亡,任务也不会丢失。但是,如果 RabbitMQ 服务器停止,我们的任务仍然会丢失。
当 RabbitMQ 退出或崩溃时,它会忘记队列和消息,除非您告诉它不要这样做。需要两件事来确保消息不会丢失:我们需要将队列和消息都标记为持久。
首先,我们需要确保队列能够承受 RabbitMQ 节点重启。为此,我们需要将其声明为持久
RMQQueue *q = [ch queue:@"hello" options:AMQQueueDeclareDurable];
虽然此命令本身是正确的,但它在我们当前的设置中不起作用。这是因为我们已经定义了一个名为 hello 的队列,该队列不是持久的。RabbitMQ 不允许您使用不同的参数重新定义现有队列,并且会向尝试这样做的任何程序返回错误。但是有一个快速的解决方法——让我们声明一个不同的队列名称,例如 task_queue。
RMQQueue *q = [ch queue:@"task_queue" options:AMQQueueDeclareDurable];
此 options:AMQQueueDeclareDurable 更改需要应用于生产者和消费者代码。
此时,我们确信即使 RabbitMQ 重启,task_queue 队列也不会丢失。现在我们需要将我们的消息标记为持久化。
- 通过使用
persistent选项。
[ch.defaultExchange publish:msgData routingKey:q.name persistent:YES];
关于消息持久化的注意事项
将消息标记为持久并不能完全保证消息不会丢失。尽管它告诉 RabbitMQ 将消息保存到磁盘,但在 RabbitMQ 接受消息但尚未保存的短时间窗口内仍然存在风险。此外,RabbitMQ 不会为每条消息执行
fsync(2)——它可能只保存在缓存中而未真正写入磁盘。持久性保证不强,但对于我们简单的任务队列来说已绰绰有余。如果您需要更强的保证,可以使用发布者确认。
公平分派
您可能已经注意到调度仍然不能完全按我们想要的方式工作。例如,在有两个工作进程的情况下,当所有奇数消息都很重而偶数消息都很轻时,一个工作进程将持续忙碌,而另一个工作进程几乎不做任何工作。好吧,RabbitMQ 对此一无所知,它仍然会均匀地分发消息。
这是因为 RabbitMQ 在消息进入队列时就会分发消息。它不会查看消费者未确认消息的数量。它只是盲目地将第 n 条消息分发给第 n 个消费者。
为了解决这个问题,我们可以使用 basicQos:global: 方法,并将预取值设置为 @1。这告诉 RabbitMQ 一次不要向一个工作进程发送超过一条消息。换句话说,在工作进程处理并确认上一条消息之前,不要向其分发新消息。相反,它会将消息分发给下一个未忙碌的工作进程。
[ch basicQos:@1 global:NO];
关于队列大小的说明
如果所有工作进程都忙碌,您的队列可能会填满。您需要密切关注这一点,并可能添加更多工作进程,或采取其他策略。
总而言之
我们的 newTask: 方法的最终代码
- (void)newTask:(NSString *)msg {
RMQConnection *conn = [[RMQConnection alloc] initWithDelegate:[RMQConnectionDelegateLogger new]];
[conn start];
id<RMQChannel> ch = [conn createChannel];
RMQQueue *q = [ch queue:@"task_queue" options:RMQQueueDeclareDurable];
NSData *msgData = [msg dataUsingEncoding:NSUTF8StringEncoding];
[ch.defaultExchange publish:msgData routingKey:q.name persistent:YES];
NSLog(@"Sent %@", msg);
[conn close];
}
以及我们的 workerNamed:
- (void)workerNamed:(NSString *)name {
RMQConnection *conn = [[RMQConnection alloc] initWithDelegate:[RMQConnectionDelegateLogger new]];
[conn start];
id<RMQChannel> ch = [conn createChannel];
RMQQueue *q = [ch queue:@"task_queue" options:RMQQueueDeclareDurable];
[ch basicQos:@1 global:NO];
NSLog(@"%@: Waiting for messages", name);
RMQBasicConsumeOptions manualAck = RMQBasicConsumeNoOptions;
[q subscribe:manualAck handler:^(RMQMessage * _Nonnull message) {
NSString *messageText = [[NSString alloc] initWithData:message.body encoding:NSUTF8StringEncoding];
NSLog(@"%@: Received %@", name, messageText);
// imitate some work
unsigned int sleepTime = (unsigned int)[messageText componentsSeparatedByString:@"."].count - 1;
NSLog(@"%@: Sleeping for %u seconds", name, sleepTime);
sleep(sleepTime);
[ch ack:message.deliveryTag];
}];
}
通过使用消息确认和预取,您可以设置一个工作队列。持久化选项让任务即使在 RabbitMQ 重启后也能存活。
现在我们可以继续学习 教程 3,了解如何将同一条消息传递给多个消费者。