RabbitMQ Stream 教程 - “Hello World!”
简介
前提条件
本教程假设您已安装 RabbitMQ,并在 localhost
上运行,且已启用 stream 插件。标准的 stream 端口 是 5552。如果您使用不同的主机、端口或凭据,则需要调整连接设置。
使用 Docker
如果您没有安装 RabbitMQ,可以使用 Docker 容器运行它
docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672 -p 5672:5672 \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
rabbitmq:4-management
等待服务器启动,然后启用 stream 和 stream management 插件
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream rabbitmq_stream_management
如何获取帮助
如果您在学习本教程时遇到问题,可以通过邮件列表或 Discord 社区服务器联系我们。
RabbitMQ Streams 在 RabbitMQ 3.9 中引入。更多信息请点击此处查看。
“Hello World”
(使用 Go Stream 客户端)
在本教程的这一部分,我们将用 Go 编写两个程序:一个生产者,用于发送单条消息;一个消费者,用于接收消息并打印出来。我们将略过 Go 客户端 API 的一些细节,专注于这个非常简单的入门示例。这是 RabbitMQ Streams 的 “Hello World”。
Go stream 客户端库
RabbitMQ 支持多种协议。本教程使用 RabbitMQ stream 协议,这是一种专用于 RabbitMQ streams 的协议。RabbitMQ 有许多不同语言的客户端,请参阅每种语言的 stream 客户端库。我们将使用 RabbitMQ 提供的 Go stream 客户端。
RabbitMQ Go 客户端 1.4 及更高版本通过 go get 分发。
本教程假设您在 Windows 上使用 PowerShell。在 MacOS 和 Linux 上,几乎任何 shell 都可以工作。
设置
首先,让我们验证您的 PATH
中是否包含 Go 工具链
go help
运行该命令应生成帮助消息。
本教程的可执行版本可以在 RabbitMQ tutorials 仓库中找到。
现在让我们创建项目
mkdir go-stream
cd go-stream
go mod init github.com/rabbitmq/rabbitmq-tutorials
go get -u github.com/rabbitmq/rabbitmq-stream-go-client
现在我们已经设置好了 Go 项目,可以编写一些代码了。
发送
我们将消息生产者(发送者)命名为 send.go
,消息消费者(接收者)命名为 receive.go
。生产者将连接到 RabbitMQ,发送一条消息,然后退出。
在 send.go
中,我们需要导入一些包
import (
"bufio"
"fmt"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
"log"
"os"
)
然后我们可以创建到服务器的连接
env, err := stream.NewEnvironment(
stream.NewEnvironmentOptions())
stream Go 客户端的入口点是 Environment
。它用于配置 RabbitMQ stream 发布者、stream 消费者和 stream 本身。
它抽象了套接字连接,并为我们处理协议版本协商和身份验证等。
本教程假设 stream 发布者和消费者连接到本地运行的 RabbitMQ 节点,即 localhost。要连接到不同机器上的节点,只需在 EnvironmentOptions
上指定目标主机名或 IP 地址。
接下来,让我们创建一个生产者。
生产者还将声明一个它将发布消息的 stream,然后发布一条消息
streamName := "hello-go-stream"
env.DeclareStream(streamName,
&stream.StreamOptions{
MaxLengthBytes: stream.ByteCapacity{}.GB(2),
},
)
producer, err := env.NewProducer(streamName, stream.NewProducerOptions())
if err != nil {
log.Fatalf("Failed to create producer: %v", err)
}
err = producer.Send(amqp.NewMessage([]byte("Hello world")))
if err != nil {
log.Fatalf("Failed to send message: %v", err)
}
stream 声明操作是幂等的:stream 仅在尚不存在时才会被创建。
stream 是一种仅追加日志抽象,允许重复消费消息,直到它们过期。始终定义保留策略是一个好的做法。在上面的示例中,stream 的大小限制为 5 GiB。
消息内容是一个字节数组。应用程序可以使用任何适当的格式(如 JSON、MessagePack 等)来编码他们需要传输的数据。
当以上代码运行完成时,生产者连接和 stream 系统连接将被关闭。这就是我们生产者的全部内容。
每次运行生产者时,它都会向服务器发送一条消息,并且该消息将被追加到 stream 中。
完整的 send.go 文件可以在 GitHub 上找到。
发送不工作!
如果这是您第一次使用 RabbitMQ,并且没有看到 “Sent” 消息,那么您可能会挠头想知道哪里出错了。可能是 Broker 启动时没有足够的可用磁盘空间(默认情况下至少需要 50 MB 可用空间),因此拒绝接受消息。检查 Broker 日志文件,看看是否有 资源警报 记录,如有必要,降低可用磁盘空间阈值。《配置指南》将向您展示如何设置
disk_free_limit
。另一个原因可能是程序在消息到达 Broker 之前就退出了。在某些客户端库中,发送是异步的:函数立即返回,但消息在通过网络传输之前先在 IO 层排队。发送程序要求用户按键以完成该过程:消息有足够的时间到达 Broker。stream 协议提供了一种确认机制,以确保 Broker 接收出站消息,但本教程为了简单起见,未使用此机制。
接收
本教程的另一部分,消费者,将连接到 RabbitMQ 节点并等待消息被推送到它。与本教程中发布单条消息并停止的生产者不同,消费者将持续运行,消费 RabbitMQ 将推送给它的消息,并打印接收到的有效负载。
与 send.go
类似,receive.go
也需要导入一些包
import (
"bufio"
"fmt"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
"log"
"os"
)
在初始设置方面,消费者部分与生产者部分非常相似;我们使用默认连接设置并声明消费者将从中消费的 stream。
env, err := stream.NewEnvironment(stream.NewEnvironmentOptions())
if err != nil {
log.Fatalf("Failed to create environment: %v", err)
}
streamName := "hello-go-stream"
env.DeclareStream(streamName,
&stream.StreamOptions{
MaxLengthBytes: stream.ByteCapacity{}.GB(2),
},
)
请注意,消费者部分也声明了 stream。这是为了允许生产者或消费者任何一部分先启动。
我们使用 Consumer
结构体来实例化消费者,并使用 ConsumerOptions
结构体来配置它。我们提供一个 messageHandler
回调来处理已传递的消息。
SetOffset
定义了消费者的起始点。在本例中,消费者从 stream 中可用的第一条消息开始。
messagesHandler := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
fmt.Printf("Stream: %s - Received message: %s\n", consumerContext.Consumer.GetStreamName(),
message.Data)}
consumer, err := env.NewConsumer(streamName, messagesHandler,
stream.NewConsumerOptions().SetOffset(stream.OffsetSpecification{}.First()))
if err != nil {
log.Fatalf("Failed to create consumer: %v", err)
}
完整的 receive.go 文件可以在 GitHub 上找到。
将它们放在一起
为了运行这两个示例,请打开两个终端(shell)标签页。
我们首先需要拉取依赖项
go get -u
本教程的两个部分可以以任何顺序运行,因为它们都声明了 stream。让我们首先运行消费者,这样当第一个发布者启动时,消费者将打印它
go run receive.go
然后运行生产者
go run send.go
消费者将打印它通过 RabbitMQ 从发布者那里获得的消息。消费者将保持运行,等待新的交付。尝试多次重新运行发布者以观察这一点。
Streams 与队列的不同之处在于,它们是可以重复消费的消息的仅追加日志。当多个消费者从一个 stream 消费时,他们将从第一条可用的消息开始。