跳至主内容

RabbitMQ Stream 教程 - 偏移量跟踪

简介

信息

先决条件

本教程假定 RabbitMQ 已安装,正在localhost上运行,并且stream 插件已启用。标准的 stream 端口是 5552。如果您使用不同的主机、端口或凭据,则需要调整连接设置。

使用 Docker

如果您没有安装 RabbitMQ,可以在 Docker 容器中运行它

docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672 -p 5672:5672  \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
rabbitmq:4-management

等待服务器启动,然后启用 stream 和 stream management 插件

docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream rabbitmq_stream_management 

哪里寻求帮助

如果您在学习本教程时遇到困难,可以通过邮件列表Discord 社区服务器与我们联系。

RabbitMQ Streams 在 RabbitMQ 3.9 中引入。更多信息请参见此处

偏移量跟踪

设置

本教程的这部分包括用 Rust 编写两个程序:一个生产者,它发送带有标记消息的波形消息;一个消费者,它接收消息并在收到标记消息时停止。它展示了消费者如何浏览流,甚至可以在前一次执行中从中断处重新开始。

本教程使用 stream Rust 客户端。请确保按照第一个教程中的 设置步骤进行操作。

本教程的可执行版本可以在 RabbitMQ tutorials 仓库 中找到。

请注意,可执行版本已实现了本教程末尾解释的 服务器端偏移量跟踪 部分,在测试此场景时需要考虑这一点。

发送程序名为 offset_tracking_send.rs,接收程序名为 receive_offset_tracking.rs。本教程侧重于客户端库的使用,因此应使用仓库中的最终代码来创建文件的脚手架(例如,导入、主函数等)。

发送

发送程序首先实例化 Environment 并创建流。

let create_response = environment
.stream_creator()
.max_length(ByteCapacity::GB(2))
.create(stream)
.await;

if let Err(e) = create_response {
if let StreamCreateError::Create { stream, status } = e {
match status {
// we can ignore this error because the stream already exists
ResponseCode::StreamAlreadyExists => {}
err => {
println!("Error creating stream: {:?} {:?}", stream, err);
std::process::exit(1);
}
}
}
}

然后,程序创建一个 Producer 实例并发布 100 条消息。最后一条消息的正文值设置为 marker;这是消费者停止消费的标记。

请注意 `tokio::sync::Notify` 的使用:主例程正在等待它,直到所有消息都通过确认回调得到确认。这确保了在程序关闭之前代理已收到所有消息。

let producer = environment.producer().build(stream).await?;

println!("Publishing {:?} messages", message_count);

for i in 0..message_count {
let msg;
if i < message_count - 1 {
msg = Message::builder().body(format!("hello{}", i)).build();
} else {
msg = Message::builder().body(format!("marker{}", i)).build();
};

let counter = confirmed_messages.clone();
let notifier = notify_on_send.clone();
producer
.send(msg, move |_| {
let inner_counter = counter.clone();
let inner_notifier = notifier.clone();
async move {
if inner_counter.fetch_add(1, Ordering::Relaxed) == message_count - 1 {
inner_notifier.notify_one();
}
}
})
.await?;
}

notify_on_send.notified().await;
println!("Messages confirmed: True");
producer.close().await?;

现在,让我们创建接收程序。

接收

接收程序创建一个 Environment 实例,并确保流也已创建。这部分代码与发送程序相同,因此在接下来的代码片段中为简洁起见省略了。

接收程序启动一个附加到流开头的消费者 OffsetSpecification::First。它使用两个变量:first_offsetlast_offset,用于在程序结束时输出第一个和最后一个已接收消息的偏移量。

当收到标记消息时,消费者停止:它将偏移量分配给 last_offset 变量并关闭消费者。

let mut first_offset: Option<u64> = None;
let mut last_offset: Option<u64> = None;
let mut consumer = environment
.consumer()
.offset(OffsetSpecification::First)
.build(stream)
.await
.unwrap();

while let Some(delivery) = consumer.next().await {
let d = delivery.unwrap();

if !first_offset.is_some() {
println!("First message received");
first_offset = Some(d.offset());
}

if String::from_utf8_lossy(d.message().data().unwrap()).contains("marker")
{
last_offset = Some(d.offset());
let handle = consumer.handle();
_ = handle.close().await;
break;
}
}

if first_offset.is_some() {
println!(
"Done consuming first_offset: {:?} last_offset: {:?} ", first_offset.unwrap(), last_offset.unwrap())
}

探索流

要运行这两个示例,请打开两个终端(shell)标签页。

在第一个选项卡中,运行发送程序以发布一系列消息。

 cargo run --bin send_offset_tracking

输出如下:

Publishing 100 messages
Messages confirmed: True

现在,让我们运行接收程序。打开一个新选项卡。请记住,由于 OffsetSpecification::First 偏移量规范,它应该从流的开头开始。

  cargo run --bin receive_offset_tracking

这是输出:

Started consuming
consuming first message
Done consuming first_offset: 0 last_offset: 99
什么是偏移量?

流可以看作是一个包含消息的数组。偏移量是数组中给定消息的索引。

流与队列不同:消费者可以读取和重读相同的消息,并且消息会保留在流中。

让我们尝试使用 OffsetSpecification::Offset 规范来附加到与 0 不同的给定偏移量,从而尝试此功能。创建 Consumer 的环境时,请将 OffsetSpecification 变量从

consumer = environment
.consumer()
.offset(OffsetSpecification::First)
.build(stream)
.await
.unwrap();

更改为

consumer = environment
.consumer()
.offset(OffsetSpecification::Offset(42))
.build(stream)
.await
.unwrap();

偏移量 42 是任意的,它可以是 0 到 99 之间的任何数字。再次运行接收程序。

 cargo run --bin receive_offset_tracking

这是输出:

Started consuming:
First message received.
Done consuming first_offset: 42 last_offset: 99

还有一个方法可以附加到流的末尾,以便在创建消费者时只看到新消息。这就是 OffsetSpecification::Next 偏移量规范。让我们尝试一下。

consumer = environment
.consumer()
.offset(OffsetSpecification::Next)
.build(stream)
.await
.unwrap();

运行接收程序。

 cargo run --bin receive_offset_tracking

这次消费者没有收到任何消息。

Started consuming

它正在等待流中的新消息。通过再次运行发送程序来发布一些消息。回到第一个选项卡。

 cargo run --bin send_offset_tracking

等待程序退出,然后切换回接收程序选项卡。消费者收到了新消息。

Started consuming
First message received.
Done consuming first_offset: 100 last_offset: 199

接收程序因为发送程序将其放在流末尾的新标记消息而停止。

本节展示了如何“浏览”流:从开头、从任何偏移量,甚至对于新消息。下一节将介绍如何利用服务器端偏移量跟踪,以便从消费者前一次执行的中断处恢复。

服务器端偏移量跟踪

RabbitMQ Streams 提供服务器端偏移量跟踪,用于存储流中给定消费者的进度。如果消费者因任何原因停止(崩溃、升级等),它将能够从先前停止的位置重新连接,以避免处理相同的消息。

RabbitMQ Streams 提供了偏移量跟踪的 API,但也可以使用其他解决方案来存储正在消耗的应用程序的进度。这可能取决于用例,但关系型数据库也是一个不错的解决方案。

让我们修改接收程序以存储已处理消息的偏移量。已更新的行用注释标出。

let mut first_offset: Option<u64> = None;
let mut last_offset: Option<u64> = None;
let mut consumer = environment
.consumer()
// The consumer needs a name to use Server-Side Offset Tracking
.name("consumer-1")
.offset(OffsetSpecification::First)
.build(stream)
.await
.unwrap();

println!("Started consuming");

// We can query if a stored offset exists
let mut stored_offset: u64 = consumer.query_offset().await.unwrap_or_else(|_| 0);

if stored_offset > 0 {
stored_offset += 1;
}
consumer = environment
.consumer()
// The consumer needs a name to use Server-Side Offset Tracking
.name("consumer-1")
.offset(OffsetSpecification::Offset(stored_offset))
.build(stream)
.await
.unwrap();

let mut received_messages: i64 = -1;
while let Some(delivery) = consumer.next().await {
let d = delivery.unwrap();

if !first_offset.is_some() {
first_offset = Some(d.offset());
}
consumer = environment
.consumer()
// The consumer needs a name to use Server-Side Offset Tracking
.name("consumer-1")
.offset(OffsetSpecification::Offset(stored_offset))
.build(stream)
.await
.unwrap();

let mut received_messages: i64 = -1;
while let Some(delivery) = consumer.next().await {
let d = delivery.unwrap();

if !first_offset.is_some() {
println!("First message received");
first_offset = Some(d.offset());
}
received_messages = received_messages + 1;
if received_messages % 10 == 0
|| String::from_utf8_lossy(d.message().data().unwrap()).contains("marker")
{
// We store the offset in the server
let _ = consumer
.store_offset(d.offset())
.await
.unwrap_or_else(|e| println!("Err: {}", e));
if String::from_utf8_lossy(d.message().data().unwrap()).contains("marker") {
last_offset = Some(d.offset());
let handle = consumer.handle();
_ = handle.close().await;
break;
}
}
}
}

if first_offset.is_some() {
println!(
"Done consuming first_offset: {:?} last_offset: {:?} ", first_offset.unwrap(), last_offset.unwrap())
}

让我们运行接收程序。

 cargo run --bin receive_offset_tracking

这是输出:

Started consuming: Press control +C to close
First message received.
Done consuming, first offset 0, last offset 99.

这没什么令人惊讶的:消费者从流的开头获取了消息,并在到达标记消息时停止。

让我们再次启动它。

cargo run --bin receive_offset_tracking

这是输出:

Started consuming
First message received.
Done consuming first_offset: 100 last_offset: 199

最相关的实现是:

  • 消费者必须有一个名称。它是存储和检索最后一个存储的偏移量值的关键。
  • 偏移量每 10 条消息存储一次。对于偏移量存储频率而言,这是一个异常低的值,但对于本教程来说没问题。实际世界中的值通常是几百或几千。
  • 偏移量在关闭消费者之前存储,就在收到标记消息之后。

消费者正好从上次中断的地方恢复:第一次运行时最后一个偏移量是 99,第二次运行时第一个偏移量是 100。请注意,OffsetSpecification::First 偏移量规范被忽略了:存储的偏移量优先于偏移量规范参数。消费者在第一次运行时存储了偏移量跟踪信息,因此客户端库使用它在第二次运行时从正确的位置恢复消耗。

本教程关于 RabbitMQ Streams 中的消费语义的内容到此结束。它涵盖了消费者如何附加到流中的任何位置。消费应用程序很可能需要跟踪它们在流中达到的点。它们可以使用本教程中演示的内置服务器端偏移量跟踪功能。它们也可以自由使用任何其他数据存储解决方案来完成此任务。

有关偏移量跟踪的更多信息,请参阅 RabbitMQ 博客

© . This site is unofficial and not affiliated with VMware.