RabbitMQ 教程 - 主题 (Topics)
主题 (Topics)
(使用 Go RabbitMQ 客户端)
先决条件
本教程假设 RabbitMQ 已安装并在 localhost 上的标准端口 (5672) 上运行。如果您使用不同的主机、端口或凭据,则连接设置可能需要调整。
在哪里获得帮助
如果您在学习本教程时遇到问题,您可以通过 GitHub Discussions 或 RabbitMQ 社区 Discord 联系我们。
在上一个教程中,我们改进了日志系统。我们没有使用只能进行虚拟广播的 fanout 交换机,而是使用了 direct 交换机,并获得了选择性接收日志的可能性。
虽然使用 direct 交换机改进了我们的系统,但它仍然存在局限性 - 它无法根据多个条件进行路由。
在我们的日志系统中,我们可能不仅希望订阅基于严重性的日志,还希望订阅基于发出日志的源的日志。您可能从 syslog unix 工具中了解这个概念,该工具根据严重性(info/warn/crit...)和设施(auth/cron/kern...)路由日志。
这将为我们提供很大的灵活性 - 我们可能只想监听来自 'cron' 的关键错误,但也想监听来自 'kern' 的所有日志。
为了在我们的日志系统中实现这一点,我们需要了解更复杂的 topic 交换机。
主题 (Topic) 交换机
发送到 topic 交换机的消息不能具有任意的 routing_key - 它必须是单词列表,用点分隔。单词可以是任何内容,但通常它们指定与消息相关的某些特征。一些有效的路由键示例:stock.usd.nyse, nyse.vmw, quick.orange.rabbit。路由键中可以包含任意数量的单词,最多 255 字节的限制。
绑定键也必须采用相同的形式。topic 交换机背后的逻辑类似于 direct 交换机 - 使用特定路由键发送的消息将被传递到所有使用匹配绑定键绑定的队列。但是,绑定键有两个重要的特殊情况
- *(星号) 可以替代正好一个单词。
- #(井号) 可以替代零个或多个单词。
用一个例子来解释这最容易
在这个例子中,我们将发送所有描述动物的消息。消息将使用由三个单词(两个点)组成的路由键发送。路由键中的第一个单词将描述速度,第二个单词描述颜色,第三个单词描述物种:<speed>.<colour>.<species>。
我们创建了三个绑定:Q1 绑定了绑定键 *.orange.*,Q2 绑定了 *.*.rabbit 和 lazy.#。
这些绑定可以总结为
- Q1 对所有橙色动物感兴趣。
- Q2 想听到关于兔子的一切,以及关于懒惰动物的一切。
路由键设置为 quick.orange.rabbit 的消息将被传递到两个队列。消息 lazy.orange.elephant 也将发送到这两个队列。另一方面,quick.orange.fox 将仅发送到第一个队列,而 lazy.brown.fox 仅发送到第二个队列。lazy.pink.rabbit 将仅传递到第二个队列一次,即使它匹配两个绑定。quick.brown.fox 不匹配任何绑定,因此将被丢弃。
如果我们违反约定并发送一个或四个单词的消息,例如 orange 或 quick.orange.new.rabbit 会发生什么?好吧,这些消息将不匹配任何绑定,并且会丢失。
另一方面,lazy.orange.new.rabbit,即使它有四个单词,也将匹配最后一个绑定,并将被传递到第二个队列。
主题 (Topic) 交换机
主题 (Topic) 交换机功能强大,可以像其他交换机一样工作。
当队列绑定了
#(井号) 绑定键时 - 它将接收所有消息,无论路由键如何 - 就像fanout交换机一样。当特殊字符
*(星号) 和#(井号) 未在绑定中使用时,主题 (topic) 交换机将像direct交换机一样工作。
整合在一起
我们将在日志系统中使用 topic 交换机。我们将从一个工作假设开始,即日志的路由键将有两个单词:<facility>.<severity>。
代码与上一个教程中的代码几乎相同。
emit_log_topic.go 的代码
package main
import (
        "context"
        "log"
        "os"
        "strings"
        "time"
        amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
        if err != nil {
                log.Panicf("%s: %s", msg, err)
        }
}
func main() {
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()
        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()
        err = ch.ExchangeDeclare(
                "logs_topic", // name
                "topic",      // type
                true,         // durable
                false,        // auto-deleted
                false,        // internal
                false,        // no-wait
                nil,          // arguments
        )
        failOnError(err, "Failed to declare an exchange")
        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
        defer cancel()
        body := bodyFrom(os.Args)
        err = ch.PublishWithContext(ctx,
                "logs_topic",          // exchange
                severityFrom(os.Args), // routing key
                false, // mandatory
                false, // immediate
                amqp.Publishing{
                        ContentType: "text/plain",
                        Body:        []byte(body),
                })
        failOnError(err, "Failed to publish a message")
        log.Printf(" [x] Sent %s", body)
}
func bodyFrom(args []string) string {
        var s string
        if (len(args) < 3) || os.Args[2] == "" {
                s = "hello"
        } else {
                s = strings.Join(args[2:], " ")
        }
        return s
}
func severityFrom(args []string) string {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
                s = "anonymous.info"
        } else {
                s = os.Args[1]
        }
        return s
}
receive_logs_topic.go 的代码
package main
import (
        "log"
        "os"
        amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
        if err != nil {
                log.Panicf("%s: %s", msg, err)
        }
}
func main() {
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()
        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()
        err = ch.ExchangeDeclare(
                "logs_topic", // name
                "topic",      // type
                true,         // durable
                false,        // auto-deleted
                false,        // internal
                false,        // no-wait
                nil,          // arguments
        )
        failOnError(err, "Failed to declare an exchange")
        q, err := ch.QueueDeclare(
                "",    // name
                false, // durable
                false, // delete when unused
                true,  // exclusive
                false, // no-wait
                nil,   // arguments
        )
        failOnError(err, "Failed to declare a queue")
        if len(os.Args) < 2 {
                log.Printf("Usage: %s [binding_key]...", os.Args[0])
                os.Exit(0)
        }
        for _, s := range os.Args[1:] {
                log.Printf("Binding queue %s to exchange %s with routing key %s",
                        q.Name, "logs_topic", s)
                err = ch.QueueBind(
                        q.Name,       // queue name
                        s,            // routing key
                        "logs_topic", // exchange
                        false,
                        nil)
                failOnError(err, "Failed to bind a queue")
        }
        msgs, err := ch.Consume(
                q.Name, // queue
                "",     // consumer
                true,   // auto ack
                false,  // exclusive
                false,  // no local
                false,  // no wait
                nil,    // args
        )
        failOnError(err, "Failed to register a consumer")
        var forever chan struct{}
        go func() {
                for d := range msgs {
                        log.Printf(" [x] %s", d.Body)
                }
        }()
        log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
        <-forever
}
接收所有日志
go run receive_logs_topic.go "#"
接收来自设施 kern 的所有日志
go run receive_logs_topic.go "kern.*"
或者,如果您只想听到关于 critical 日志的信息
go run receive_logs_topic.go "*.critical"
您可以创建多个绑定
go run receive_logs_topic.go "kern.*" "*.critical"
并发出路由键为 kern.critical 类型的日志
go run emit_log_topic.go "kern.critical" "A critical kernel error"
尽情玩这些程序。请注意,代码没有对路由或绑定键做任何假设,您可能想使用两个以上的路由键参数。
(emit_log_topic.go 和 receive_logs_topic.go 的完整源代码)
接下来,了解如何在教程 6中将往返消息作为远程过程调用执行