介绍发布者确认
在许多消息传递场景中,您绝不能丢失消息。 由于 AMQP 对消息持久性/处理的保证很少,因此传统的做法是使用事务,但这可能慢得令人无法接受。 为了解决这个问题,我们以轻量级发布者确认的形式向 AMQP 引入了一个扩展。
使用 Tx 保证交付
在 RabbitMQ 中,持久消息是指应该在代理重启后幸存的消息。 这里的关键词是应该,因为如果代理在有机会将消息写入磁盘之前关闭,消息仍然可能丢失。 在某些情况下,这还不够,发布者需要知道消息是否被正确处理。 最直接的解决方案是使用事务,即提交每条消息。
发布者会使用类似这样的代码
ch.txSelect();
for (int i = 0; i < MSG_COUNT; ++i) {
ch.basicPublish("", QUEUE_NAME,
MessageProperties.PERSISTENT_BASIC,
"nop".getBytes());
ch.txCommit();
}
消费者会做类似这样的事情
QueueingConsumer qc = new QueueingConsumer(ch);
ch.basicConsume(QUEUE_NAME, true, qc);
for (int i = 0; i < MSG_COUNT; ++i) {
qc.nextDelivery();
System.out.printf("Consumed %d\n", i);
}
完整的程序(包括一些计时代码)可在此处获取:here。 发布 10000 条消息需要 4 分多钟。
流式轻量级发布者确认
在这种情况下使用事务存在两个问题。 第一个问题是它们是阻塞的:发布者必须等待代理处理每条消息。 知道除最后一条消息外所有消息都已成功处理,通常是过于强大的保证; 如果发布者知道在代理死机时哪些消息尚未处理,那就足够了。 第二个问题是事务不必要地繁重:每次提交都需要 fsync(),这需要很长时间才能完成。
输入确认:一旦通道进入确认模式,代理将在处理消息时确认消息。 由于这是异步完成的,因此生产者可以流式传输发布,而无需等待代理,并且代理可以有效地批量写入磁盘。
以下是使用确认的上述示例
private volatile SortedSet<Long> unconfirmedSet =
Collections.synchronizedSortedSet(new TreeSet());
...
ch.setConfirmListener(new ConfirmListener() {
public void handleAck(long seqNo, boolean multiple) {
if (multiple) {
unconfirmedSet.headSet(seqNo+1).clear();
} else {
unconfirmedSet.remove(seqNo);
}
}
public void handleNack(long seqNo, boolean multiple) {
// handle the lost messages somehow
}
});
ch.confirmSelect();
for (long i = 0; i < MSG_COUNT; ++i) {
unconfirmedSet.add(ch.getNextPublishSeqNo());
ch.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_BASIC,
"nop".getBytes());
}
while (unconfirmedSet.size() > 0)
Thread.sleep(10);
完整代码可在此处获取:here。 在继续之前,值得一提的是,运行此代码大约需要 2 秒。 它比事务代码快 100 多倍。
代码的作用是什么? 它首先声明一个集合,该集合将保存尚未确认的消息的 ID。 然后,它将通道设置为确认模式,并将 AckListener 附加到通道。 当它发布消息时,它会将消息添加到集合中; 同时,AckListener 会在收到确认时从集合中删除消息。 最后,生产者等待所有消息都被确认。 该集合始终保存需要在发生故障时重新传输的消息。
确认的工作原理
确认通过添加 confirm 类来扩展标准 AMQP。 此类仅包含两个方法:confirm.select 和 confirm.select-ok。 此外,basic.ack 方法可以发送给客户端。
confirm.select 方法在通道上启用发布者确认。 请注意,事务通道不能设置为确认模式,确认模式通道不能设置为事务模式。
当发送/接收 confirm.select 方法时,发布者/代理开始对发布进行编号(confirm.select 后的第一个发布为 1)。 一旦通道处于确认模式,发布者应期望收到 basic.ack 方法。 delivery-tag 字段指示已确认消息的编号。
当代理确认消息时,它会承担消息的责任,并通知发布者消息已成功处理; “成功处理”的含义取决于上下文。
基本规则如下
- 无法路由的 mandatory 或 immediate 消息会在 basic.return 之后立即确认;
- 否则,瞬态消息会在入队时立即确认; 并且,
- 持久消息会在持久化到磁盘或在每个队列上被消费时确认。
请注意,要确认持久消息,必须将其写入磁盘或在所有队列上确认。 关于确认,传递到非持久队列的持久消息的行为类似于瞬态消息。 队列删除、队列清除和 basic.reject{requeue=false} 模拟消费者确认。 关于 per-queue ttl,消息过期模拟消费者确认。
如果满足多个这些条件,则只有第一个条件会导致发送确认。 每条发布的消息迟早都会被确认,并且没有消息会被确认多次。 由于 basic.return 在 basic.ack 之前发送,因此一旦发布者收到 basic.ack,它就知道它永远不会再听到该消息的消息。
代理始终可以在 basic.ack 中设置multiple 位。 设置了 multiple 位的 basic.ack 表示所有消息(包括 delivery-tag)都已确认。
关于确认,有一些注意事项。 首先,代理不保证何时确认消息,只保证消息将被确认。 其次,随着未确认消息的堆积,消息处理速度会减慢:代理为每个确认模式发布执行多个 O(log(number-of-unconfirmed-messages)) 操作。 第三,如果发布者和代理之间的连接断开且存在未完成的确认,这并不一定意味着消息丢失,因此重新发布可能会导致重复消息。 最后,如果代理内部发生不好的事情并导致消息丢失,它将 basic.nack 这些消息(因此,ConfirmHandler 中的 handleNack())。
总而言之,确认为客户端提供了一种轻量级的方式来跟踪哪些消息已被代理处理,哪些消息需要在代理关闭或网络故障时重新发布。