跳到主要内容

RabbitMQ 教程 - 发布/订阅

发布/订阅

(使用 Spring AMQP)

信息

前提条件

本教程假设 RabbitMQ 已安装并在 localhost 上的标准端口 (5672) 上运行。如果您使用不同的主机、端口或凭据,则连接设置可能需要调整。

在哪里获得帮助

如果您在学习本教程时遇到问题,可以通过 GitHub DiscussionsRabbitMQ 社区 Discord 联系我们。

第一个教程中,我们展示了如何使用 start.spring.io 利用 Spring Initializr 创建一个带有 RabbitMQ starter 依赖的项目,以创建 Spring AMQP 应用程序。

之前的教程中,我们创建了一个新的包 tut2 来放置我们的配置、发送者和接收者,并创建了一个带有两个消费者的工作队列。工作队列背后的假设是每个任务都只交付给一个工作者。

在这一部分,我们将实现 fanout 模式,将消息传递给多个消费者。这种模式也称为“发布/订阅”,通过在我们的 Tut3Config 文件中配置一些 bean 来实现。

本质上,发布的消息将被广播到所有接收者。

交换机

在本教程的前几部分,我们向队列发送和接收消息。现在是时候介绍 RabbitMQ 中的完整消息模型了。

让我们快速回顾一下我们在之前的教程中涵盖的内容

  • 生产者是发送消息的用户应用程序。
  • 队列是存储消息的缓冲区。
  • 消费者是接收消息的用户应用程序。

RabbitMQ 消息模型的核心思想是生产者从不直接向队列发送任何消息。实际上,通常生产者甚至不知道消息是否会被传递到任何队列。

相反,生产者只能将消息发送到交换机。交换机是一个非常简单的东西。一方面,它从生产者接收消息,另一方面,它将消息推送到队列。交换机必须确切地知道如何处理它接收到的消息。它应该附加到特定的队列吗?它应该附加到多个队列吗?还是应该丢弃它。这些规则由交换机类型定义。

有几种可用的交换机类型:directtopicheadersfanout。我们将重点关注最后一个——fanout。让我们配置一个 bean 来描述这种类型的交换机,并将其命名为 tut.fanout

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;


@Profile({"tut3", "pub-sub", "publish-subscribe"})
@Configuration
public class Tut3Config {

@Bean
public FanoutExchange fanout() {
return new FanoutExchange("tut.fanout");
}

@Profile("receiver")
private static class ReceiverConfig {

@Bean
public Queue autoDeleteQueue1() {
return new AnonymousQueue();
}

@Bean
public Queue autoDeleteQueue2() {
return new AnonymousQueue();
}

@Bean
public Binding binding1(FanoutExchange fanout,
Queue autoDeleteQueue1) {
return BindingBuilder.bind(autoDeleteQueue1).to(fanout);
}

@Bean
public Binding binding2(FanoutExchange fanout,
Queue autoDeleteQueue2) {
return BindingBuilder.bind(autoDeleteQueue2).to(fanout);
}

@Bean
public Tut3Receiver receiver() {
return new Tut3Receiver();
}
}

@Profile("sender")
@Bean
public Tut3Sender sender() {
return new Tut3Sender();
}
}

我们遵循与前两个教程相同的方法。我们为本教程创建了三个配置文件(tut3pub-subpublish-subscribe)。它们都是运行 fanout 配置文件教程的同义词。接下来,我们将 FanoutExchange 配置为 Spring bean。在 Tut3Receiver 类中,我们定义了四个 bean:2 个 AnonymousQueue(AMQP 术语中的非持久、独占、自动删除队列)和 2 个将这些队列绑定到交换机的绑定。

fanout 交换机非常简单。正如您可能从名称中猜到的那样,它只是将它接收到的所有消息广播到它知道的所有队列。这正是我们扇出消息所需要的。

列出交换机

要列出服务器上的交换机,您可以运行非常有用的 rabbitmqctl

sudo rabbitmqctl list_exchanges

在此列表中,将有一些 amq.* 交换机和默认(未命名)交换机。这些是默认创建的,但您目前不太可能需要使用它们。

未命名交换机

在本教程的前几部分中,我们对交换机一无所知,但仍然能够向队列发送消息。这之所以可能,是因为我们使用了默认交换机,我们用空字符串 ("") 来标识它。

回忆一下我们之前是如何发布消息的

   template.convertAndSend(queue.getName(), message)

第一个参数是路由键,RabbitTemplate 默认将消息发送到默认交换机。每个队列都自动绑定到默认交换机,队列名称作为绑定键。这就是为什么我们可以使用队列名称作为路由键,以确保消息最终到达队列中。

现在,我们可以改为发布到我们命名的交换机

@Autowired
private RabbitTemplate template;

@Autowired
private FanoutExchange fanout; // configured in Tut3Config above

template.convertAndSend(fanout.getName(), "", message);

从现在开始,fanout 交换机将消息附加到我们的队列。

临时队列

如您所记,之前我们使用的是具有特定名称的队列(记住 hello)。能够命名队列对我们至关重要——我们需要将工作者指向同一个队列。当您想在生产者和消费者之间共享队列时,为队列命名非常重要。

但这不适用于我们的 fanout 示例。我们想听到所有消息,而不仅仅是其中的一部分。我们还只对当前流动的消息感兴趣,而不是旧消息。为了解决这个问题,我们需要两件事。

首先,每当我们连接到 Rabbit 时,我们需要一个全新的空队列。为此,我们可以创建一个具有随机名称的队列,或者——更好的是——让服务器为我们选择一个随机队列名称。

其次,一旦我们断开消费者连接,队列应自动删除。为了使用 Spring AMQP 客户端做到这一点,我们定义了一个 AnonymousQueue,它创建一个具有生成名称的非持久、独占、自动删除队列

@Bean
public Queue autoDeleteQueue1() {
return new AnonymousQueue();
}

@Bean
public Queue autoDeleteQueue2() {
return new AnonymousQueue();
}

此时,我们的队列具有随机队列名称。例如,它可能看起来像 spring.gen-1Rx9HOqvTAaHeeZrQWu8Pg

绑定

我们已经创建了一个 fanout 交换机和一个队列。现在我们需要告诉交换机将消息发送到我们的队列。交换机和队列之间的这种关系称为绑定。在上面的 Tut3Config 中,您可以看到我们有两个绑定,每个 AnonymousQueue 一个。

@Bean
public Binding binding1(FanoutExchange fanout,
Queue autoDeleteQueue1) {
return BindingBuilder.bind(autoDeleteQueue1).to(fanout);
}

列出绑定

您可以使用以下命令列出现有绑定,您猜对了,

rabbitmqctl list_bindings

将它们放在一起

发送消息的生产者程序与之前的教程看起来没有太大区别。最重要的变化是,我们现在想将消息发布到我们的 fanout 交换机,而不是未命名的交换机。发送时我们需要提供 routingKey,但对于 fanout 交换机,其值将被忽略。以下是 tut3.Sender.java 程序的代码

package org.springframework.amqp.tutorials.tut3;

import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import java.util.concurrent.atomic.AtomicInteger;

public class Tut3Sender {

@Autowired
private RabbitTemplate template;

@Autowired
private FanoutExchange fanout;

AtomicInteger dots = new AtomicInteger(0);

AtomicInteger count = new AtomicInteger(0);

@Scheduled(fixedDelay = 1000, initialDelay = 500)
public void send() {
StringBuilder builder = new StringBuilder("Hello");
if (dots.getAndIncrement() == 3) {
dots.set(1);
}
for (int i = 0; i < dots.get(); i++) {
builder.append('.');
}
builder.append(count.incrementAndGet());
String message = builder.toString();
template.convertAndSend(fanout.getName(), "", message);
System.out.println(" [x] Sent '" + message + "'");
}

}

Tut3Sender.java 源代码

如您所见,我们利用 Tut3Config 文件中的 bean,并将 RabbitTemplate 与我们配置的 FanoutExchange 一起自动装配。此步骤是必要的,因为禁止发布到不存在的交换机。

如果没有队列绑定到交换机,消息将会丢失,但这对于我们来说没关系;如果没有消费者正在侦听,我们可以安全地丢弃消息。

Tut3Receiver.java 的代码

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.util.StopWatch;

public class Tut3Receiver {

@RabbitListener(queues = "#{autoDeleteQueue1.name}")
public void receive1(String in) throws InterruptedException {
receive(in, 1);
}

@RabbitListener(queues = "#{autoDeleteQueue2.name}")
public void receive2(String in) throws InterruptedException {
receive(in, 2);
}

public void receive(String in, int receiver) throws InterruptedException {
StopWatch watch = new StopWatch();
watch.start();
System.out.println("instance " + receiver + " [x] Received '" + in + "'");
doWork(in);
watch.stop();
System.out.println("instance " + receiver + " [x] Done in "
+ watch.getTotalTimeSeconds() + "s");
}

private void doWork(String in) throws InterruptedException {
for (char ch : in.toCharArray()) {
if (ch == '.') {
Thread.sleep(1000);
}
}
}

}

Tut3Receiver.java 源代码

像以前一样编译,我们就可以执行 fanout 发送者和接收者了。

./mvnw clean package

当然,要执行本教程,请执行以下操作

# shell 1
java -jar target/rabbitmq-tutorials.jar --spring.profiles.active=pub-sub,receiver \
--tutorial.client.duration=60000
# shell 2
java -jar target/rabbitmq-tutorials.jar --spring.profiles.active=pub-sub,sender \
--tutorial.client.duration=60000

使用 rabbitmqctl list_bindings,您可以验证代码是否实际上按我们的意愿创建了绑定和队列。在运行两个 ReceiveLogs.java 程序的情况下,您应该看到类似以下内容

sudo rabbitmqctl list_bindings
tut.fanout exchange 8b289c9c-a1eb-4a3a-b6a9-163c4fdcb6c2 queue []
tut.fanout exchange d7e7d193-65b1-4128-a532-466a5256fd31 queue []

结果的解释很简单:来自交换机 logs 的数据会转到两个具有服务器分配名称的队列。这正是我们的意图。

要了解如何监听消息的子集,请继续学习教程 4

© . All rights reserved.