RabbitMQ 教程 - “Hello World!”
简介
先决条件
本教程假设 RabbitMQ 已安装并在 localhost
上运行,使用标准端口 (5672)。如果您使用不同的主机、端口或凭据,则需要调整连接设置。
在哪里获得帮助
如果您在学习本教程时遇到问题,可以通过 GitHub Discussions 或 RabbitMQ 社区 Discord 联系我们。
RabbitMQ 是一个消息代理:它接受并转发消息。您可以将其视为邮局:当您将要寄出的邮件放入邮筒时,您可以确信邮递员最终会将邮件送到您的收件人手中。在这个类比中,RabbitMQ 是一个邮筒、一个邮局和一个邮递员。
RabbitMQ 和邮局之间的主要区别在于它不处理纸张,而是接受、存储和转发二进制数据块——消息。
RabbitMQ 以及一般的消息传递,使用一些术语。
-
生产 仅仅意味着发送。发送消息的程序是生产者
-
队列 是 RabbitMQ 中邮筒的名称。尽管消息流经 RabbitMQ 和您的应用程序,但它们只能存储在队列 内部。队列 仅受主机内存和磁盘限制的约束,它本质上是一个大型消息缓冲区。
许多生产者 可以发送消息到一个队列,许多消费者 可以尝试从一个队列 接收数据。
这是我们表示队列的方式
-
消费 与接收具有相似的含义。消费者 是一个主要等待接收消息的程序
请注意,生产者、消费者和代理不必驻留在同一主机上;实际上,在大多数应用程序中,它们都不是。一个应用程序也可以同时是生产者和消费者。
“Hello World”
(使用 Spring AMQP)
在本教程的这一部分中,我们将使用 spring-amqp 库编写两个程序;一个生产者,它发送一条消息,以及一个消费者,它接收消息并打印出来。我们将忽略 Spring AMQP API 中的一些细节,专注于这个非常简单的事情,以便开始入门。这是消息传递的“Hello World”。
在下面的图中,“P”是我们的生产者,“C”是我们的消费者。中间的方框是一个队列 - RabbitMQ 代表消费者维护的消息缓冲区。
Spring AMQP 框架
RabbitMQ 支持多种协议。本教程使用 AMQP 0-9-1,这是一种开放的、通用的消息传递协议。在 许多不同的语言中,有许多 RabbitMQ 的客户端。
我们将使用 Spring Boot 来引导和配置我们的 Spring AMQP 项目。我们选择 Maven 来构建项目,但我们也可以使用 Gradle。
该项目的源代码 在线提供,但您也可以从头开始学习教程。
如果您选择后者,请打开 Spring Initializr 并提供:group id(例如 org.springframework.amqp.tutorials
),artifact id(例如 rabbitmq-amqp-tutorials
)。搜索 RabbitMQ 依赖项并选择 RabbitMQ 依赖项。
生成项目并将生成的项目解压缩到您选择的位置。现在可以将其导入您喜欢的 IDE。或者,您可以从您喜欢的编辑器中处理它。
配置项目
Spring Boot 提供了许多功能,但我们在这里只重点介绍一些。首先,Spring Boot 应用程序可以选择通过 application.properties
或 application.yml
文件提供其属性(还有更多选项,但这将使我们入门)。您会在生成的项目中找到一个空的 application.properties
文件。将 application.properties 重命名为 application.yml
文件,并包含以下属性
spring:
profiles:
active: usage_message
logging:
level:
org: ERROR
tutorial:
client:
duration: 10000
创建一个新的包 tut1
,我们可以在其中放置教程代码。我们现在创建一个 Java 配置文件 Tut1Config.java
,以以下方式描述我们的 Spring bean
package org.springframework.amqp.tutorials.tut1;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
@Profile({"tut1","hello-world"})
@Configuration
public class Tut1Config {
@Bean
public Queue hello() {
return new Queue("hello");
}
@Profile("receiver")
@Bean
public Tut1Receiver receiver() {
return new Tut1Receiver();
}
@Profile("sender")
@Bean
public Tut1Sender sender() {
return new Tut1Sender();
}
}
请注意,我们将第一个教程配置文件定义为 tut1
(包名称)或 hello-world
。我们使用 @Configuration
注解来让 Spring 知道这是一个 Java 配置,在其中我们创建了队列(“hello”)的定义,并定义了我们的 Sender
和 Receiver
bean。
我们现在将通过 Boot Application 运行我们所有的教程,只需传入我们正在使用的配置文件即可。为了启用此功能,我们将使用以下内容修改生成的 RabbitAmqpTutorialsApplication
类
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Profile;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
public class RabbitAmqpTutorialsApplication {
@Profile("usage_message")
@Bean
public CommandLineRunner usage() {
return args -> {
System.out.println("This app uses Spring Profiles to
control its behavior.\n");
System.out.println("Sample usage: java -jar
rabbit-tutorials.jar
--spring.profiles.active=hello-world,sender");
};
}
@Profile("!usage_message")
@Bean
public CommandLineRunner tutorial() {
return new RabbitAmqpTutorialsRunner();
}
public static void main(String[] args) throws Exception {
SpringApplication.run(RabbitAmqpTutorialsApplication.class, args);
}
}
并添加 RabbitAmqpTutorialsRunner
类,如下所示
package org.springframework.amqp.tutorials.tut1;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.ConfigurableApplicationContext;
public class RabbitAmqpTutorialsRunner implements CommandLineRunner {
@Value("${tutorial.client.duration:0}")
private int duration;
@Autowired
private ConfigurableApplicationContext ctx;
@Override
public void run(String... arg0) throws Exception {
System.out.println("Ready ... running for " + duration + "ms");
Thread.sleep(duration);
ctx.close();
}
}
发送
现在,很少有代码需要放入 sender 和 receiver 类中。我们将其称为 Tut1Receiver
和 Tut1Sender
。sender 利用我们的配置和 RabbitTemplate
发送消息。
// Sender
package org.springframework.amqp.tutorials.tut1;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
public class Tut1Sender {
@Autowired
private RabbitTemplate template;
@Autowired
private Queue queue;
@Scheduled(fixedDelay = 1000, initialDelay = 500)
public void send() {
String message = "Hello World!";
this.template.convertAndSend(queue.getName(), message);
System.out.println(" [x] Sent '" + message + "'");
}
}
您会注意到 Spring AMQP 删除了样板代码,只留下您需要关心的消息传递逻辑。我们自动装配在 Tut1Config
类中的 bean 定义中配置的队列,并且像许多 spring 连接抽象一样,我们使用 RabbitTemplate
包装了样板 RabbitMQ 客户端类,该模板可以自动装配到 sender 中。剩下的就是创建一个消息并调用模板的 convertAndSend
方法,传入我们定义的 bean 中的队列名称和我们刚刚创建的消息。
发送不起作用!
如果这是您第一次使用 RabbitMQ 并且没有看到“Sent”消息,那么您可能会挠头想知道哪里可能出错了。可能是 broker 启动时没有足够的可用磁盘空间(默认情况下至少需要 50 MB 可用空间),因此拒绝接受消息。检查 broker 日志文件 以查看是否记录了 资源警报,并在必要时降低可用磁盘空间阈值。配置指南 将向您展示如何设置
disk_free_limit
。
接收
receiver 同样简单。我们使用 @RabbitListener
注解注解我们的 receiver 类,并传入队列的名称。然后我们使用 @RabbitHandler
注解注解我们的 receive
方法,传入已推送到队列的 payload。
package org.springframework.amqp.tutorials.tut1;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
@RabbitListener(queues = "hello")
public class Tut1Receiver {
@RabbitHandler
public void receive(String in) {
System.out.println(" [x] Received '" + in + "'");
}
}
将它们放在一起
我们现在必须构建 JAR 文件
./mvnw clean package
应用程序使用 Spring Profiles 来控制它正在运行哪个教程,以及它是 sender 还是 receiver。要运行 receiver,请执行以下命令
# consumer
java -jar target/rabbitmq-tutorials.jar --spring.profiles.active=hello-world,receiver
打开另一个 shell 来运行 sender
# sender
java -jar target/rabbitmq-tutorials.jar --spring.profiles.active=hello-world,sender
列出队列
您可能希望查看 RabbitMQ 有哪些队列以及其中有多少消息。您可以使用
rabbitmqctl
工具(以特权用户身份)执行此操作sudo rabbitmqctl list_queues
在 Windows 上,省略 sudo
rabbitmqctl.bat list_queues
是时候继续学习 第 2 部分 并构建一个简单的工作队列了。