跳至主内容

Java 客户端 API 指南

概述

本指南涵盖了 RabbitMQ Java 客户端 及其公共 API。它假定您正在使用 最新主版本号的客户端,并且读者已熟悉 基本概念

本指南的关键章节包括:

另外,还可以单独找到 API 参考(JavaDoc)。

支持时间表

有关支持时间表,请参阅 RabbitMQ Java 库支持页面

JDK 和 Android 版本支持

此库的 5.x 版本系列 要求 JDK 8(编译和运行时)。在 Android 上,这意味着仅支持 Android 7.0 或更高版本

4.x 版本系列 支持 JDK 6 和 7.0 之前的 Android 版本。

许可证

该库是开源的,在 GitHub 上开发,并根据以下三个许可证之一提供:

这意味着用户可以将该库视为根据 以上列表中的任何一个许可证 进行许可。例如,用户可以选择 Apache Public License 2.0 并将此客户端包含在商业产品中。根据 GPLv2 许可的代码库可以选择 GPLv2,依此类推。

概述

客户端 API 公开了 AMQP 0-9-1 协议模型 的关键实体,并提供了额外的抽象以方便使用。

RabbitMQ Java 客户端使用 com.rabbitmq.client 作为其顶级包。关键类和接口包括:

  • Channel:表示一个 AMQP 0-9-1 通道,并提供大多数操作(协议方法)。
  • Connection:表示一个 AMQP 0-9-1 连接。
  • ConnectionFactory:构造 Connection 实例。
  • Consumer:表示一个消息消费者。
  • DefaultConsumer:常用的消费者基类。
  • BasicProperties:消息属性(元数据)。
  • BasicProperties.Builder:用于构建 BasicProperties 的构建器。

协议操作可通过 Channel 接口获得。Connection 用于打开通道、注册连接生命周期事件处理器以及关闭不再需要的连接。Connection 通过 ConnectionFactory 实例化,ConnectionFactory 用于配置各种连接设置,例如 vhost 或用户名。

连接和通道

核心 API 类是 ConnectionChannel,分别代表 AMQP 0-9-1 连接和通道。它们通常在使用前被导入。

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

连接到 RabbitMQ

以下代码使用给定参数(主机名、端口号等)连接到 RabbitMQ 节点。

ConnectionFactory factory = new ConnectionFactory();
// "guest"/"guest" by default, limited to localhost connections
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);

Connection conn = factory.newConnection();

对于在本地运行的 RabbitMQ 节点,所有这些参数都有合理的默认值。

如果在创建连接之前属性仍未分配,则将使用其默认值。

属性默认值
用户名"guest"
密码"guest"
虚拟主机"/"
主机名"localhost"
端口

5672 用于常规连接,5671 用于 使用 TLS 的连接

使用 URI 连接

或者,也可以使用 URI

ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost");
Connection conn = factory.newConnection();

对于在本地运行的标准 RabbitMQ 服务器,所有这些参数都有合理的默认值。

可以在 服务器节点日志 中观察到成功的和不成功的客户端连接事件。

请注意,默认情况下,用户 guest 只能从 localhost 连接。这是为了在生产系统中限制使用广为人知的凭据。

应用程序开发人员可以 为连接分配自定义名称。如果设置了该名称,它将在 RabbitMQ 节点日志以及 管理 UI 中提及。

然后,可以使用 Connection 接口打开通道。

Channel channel = conn.createChannel();

现在,可以使用该通道发送和接收消息,具体将在后续章节中介绍。

使用端点列表

连接时可以指定一个端点列表。将使用第一个可达的端点。如果发生 连接故障,使用端点列表可以使应用程序在原始节点宕机时连接到其他节点。

要使用多个端点,请将 Address 对象列表提供给 ConnectionFactory#newConnectionAddress 表示主机名和端口对。

Address[] addrArr = new Address[]{ new Address(hostname1, portnumber1)
, new Address(hostname2, portnumber2)};
Connection conn = factory.newConnection(addrArr);

将尝试连接到 hostname1:portnumber1,如果失败则连接到 hostname2:portnumber2。返回的连接是数组中第一个成功的连接(不会抛出 IOException)。这与在工厂上反复设置主机和端口,每次都调用 factory.newConnection() 直到其中一个成功为止完全等效。

如果还提供了 ExecutorService(通过 factory.newConnection(es, addrArr) 形式),则线程池与(第一个)成功连接相关联。

如果您希望更好地控制要连接的主机,请参阅 服务发现支持

从 RabbitMQ 断开连接

要断开连接,只需关闭通道和连接。

channel.close();
conn.close();

请注意,关闭通道可以被认为是良好的实践,但在此处并非绝对必要——当底层连接关闭时,它将自动完成。

可以在 服务器节点日志 中观察到客户端断开连接事件。

连接和通道的生命周期

客户端 连接 旨在持久化。底层协议的设计和优化是为了长期运行的连接。这意味着每个操作(例如,发布消息)都打开一个新连接是不必要的,并且强烈不推荐这样做,因为它会产生大量网络往返和开销。

通道 也旨在持久化,但由于许多可恢复的协议错误将导致通道关闭,通道的生命周期可能比其连接的生命周期短。为每个操作关闭和打开新通道通常是不必要的,但有时是合适的。如有疑问,请先考虑重用通道。

通道级异常(例如尝试从不存在的队列消费)将导致通道关闭。关闭的通道将不再可用,并且不会再从服务器接收任何事件(例如消息传递)。通道级异常将在 RabbitMQ 中记录,并将启动通道的关闭序列(见下文)。

客户端提供的连接名称

RabbitMQ 节点关于其客户端的信息有限:

  • 它们的 TCP 端点(源 IP 地址和端口)
  • 使用的凭据

仅凭这些信息就可能在识别应用程序和实例时遇到问题,尤其是在凭据可以共享,客户端通过负载均衡器连接,但 Proxy Protocol 无法启用时。

为了更容易地在 服务器日志管理 UI 中识别客户端,AMQP 0-9-1 客户端连接(包括 RabbitMQ Java 客户端)可以提供一个自定义标识符。如果设置了该标识符,它将在日志条目和管理 UI 中提及。该标识符被称为 客户端提供的连接名称。该名称可用于识别应用程序或应用程序中的特定组件。该名称是可选的;然而,强烈鼓励开发人员提供一个,因为它将大大简化某些运维任务。

RabbitMQ Java 客户端的 ConnectionFactory#newConnection 方法重载 接受一个客户端提供的连接名称。下面是上面使用的修改后的连接示例,它提供了这样一个名称:

ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost");
// provides a custom connection name
Connection conn = factory.newConnection("app:audit component:event-consumer");

使用交换器和队列

客户端应用程序使用 [交换器] 和 队列 进行工作,它们是协议的高级 构建块。这些必须在使用前声明。声明任一类型的对象只是确保同名对象存在,如果不存在则创建它。

继续上一个示例,以下代码声明了一个交换器和一个 服务器命名的队列,然后将它们绑定在一起。

channel.exchangeDeclare(exchangeName, "direct", true);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, routingKey);

这将主动声明以下对象,这两者都可以通过其他参数进行自定义。此处两者都没有特殊参数。

  • 一个持久化的、非自动删除的、类型为“direct”的交换器。
  • 一个非持久化、独占、自动删除的队列,带有生成名称。

上面的函数调用然后将队列与指定路由键的交换器绑定。

请注意,当只有一个客户端希望处理该队列时,这是一种典型的声明队列的方式:它不需要一个众所周知名称,没有其他客户端可以使用它(独占),并且将自动清理(自动删除)。如果多个客户端希望共享一个具有众所周知名称的队列,以下代码将是合适的:

channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);

这将主动声明:

  • 一个持久化的、非自动删除的、类型为“direct”的交换器。
  • 一个持久化的、非独占的、非自动删除的队列,具有众所周知名称。

许多 Channel API 方法都重载了。这些 exchangeDeclarequeueDeclarequeueBind 的方便的短形式使用了合理的默认值。还有更长形式的参数更多,以便您可以根据需要覆盖这些默认值,在需要时提供完全控制。

这种“短形式,长形式”的模式在客户端 API 的使用中随处可见。

被动声明

队列和交换器可以“被动地”声明。被动声明只是检查具有提供名称的实体是否存在。如果存在,该操作将不执行任何操作。对于队列,成功的被动声明将返回与非被动声明相同的信息,即队列中处于 就绪状态 的消费者和消息数量。

如果实体不存在,该操作将因通道级异常而失败。之后无法再使用该通道。应打开一个新通道。通常使用一次性(临时)通道进行被动声明。

Channel#queueDeclarePassiveChannel#exchangeDeclarePassive 是用于被动声明的方法。以下示例演示了 Channel#queueDeclarePassive

Queue.DeclareOk response = channel.queueDeclarePassive("queue-name");
// returns the number of messages in Ready state in the queue
response.getMessageCount();
// returns the number of consumers the queue has
response.getConsumerCount();

Channel#exchangeDeclarePassive 的返回值不包含任何有用的信息。因此,如果该方法返回并且没有通道异常发生,则意味着该交换器确实存在。

具有可选响应的操作

一些常见操作也提供“不等待”版本,该版本不会等待服务器响应。例如,要声明一个队列并指示服务器不发送任何响应,请使用:

channel.queueDeclareNoWait(queueName, true, false, false, null);

“不等待”版本效率更高,但安全性较低。例如,它们更依赖于 心跳机制 来检测失败的操作。如有疑问,请从标准版本开始。仅在拓扑(队列、绑定)频繁更改的情况下才需要“不等待”版本。

删除实体和清空消息

队列或交换器可以显式删除。

channel.queueDelete("queue-name")

仅当队列为空时,才可能删除队列。

channel.queueDelete("queue-name", false, true)

或者,当它未被使用(没有任何消费者)时。

channel.queueDelete("queue-name", true, false)

队列可以被清空(其中的所有消息都被删除)。

channel.queuePurge("queue-name")

发布消息

要将消息发布到交换器,请使用 Channel.basicPublish,如下所示:

byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

为了精细控制,请使用重载变体来指定 mandatory 标志,或发送带有预设消息属性的消息(有关详细信息,请参阅 发布者指南)。

channel.basicPublish(exchangeName, routingKey, mandatory,
MessageProperties.PERSISTENT_TEXT_PLAIN,
messageBodyBytes);

这会发送一个具有传递模式 2(持久化)、优先级 1 和 content-type“text/plain”的消息。使用 Builder 类来构建具有所需数量属性的消息属性对象,例如:

channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.contentType("text/plain")
.deliveryMode(2)
.priority(1)
.userId("bob")
.build(),
messageBodyBytes);

此示例发布一条带有自定义标题的消息。

Map<String, Object> headers = new HashMap<String, Object>();
headers.put("latitude", 51.5252949);
headers.put("longitude", -0.0905493);

channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.headers(headers)
.build(),
messageBodyBytes);

此示例发布一条带有过期时间的消息。

channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.expiration("60000")
.build(),
messageBodyBytes);

这只是一个简短的示例集,并未演示所有支持的属性。

请注意,BasicProperties 是一个外部类 AMQP 的内部类。

如果存在 资源驱动的警报,则对 Channel#basicPublish 的调用最终会阻塞。

通道和并发注意事项(线程安全)

应避免在线程之间共享 Channel 实例。应用程序应每个线程使用一个 Channel,而不是在多个线程之间共享同一个 Channel

虽然通道上的某些操作可以安全地并发调用,但有些则不能,并且会导致线路上不正确的帧交织、重复确认等。

在共享通道上并发发布可能会导致线路上不正确的帧交织,从而触发连接级协议异常,并导致代理立即关闭连接。因此,这需要应用程序代码中的显式同步(Channel#basicPublish 必须在关键区域内调用)。在线程之间共享通道也会干扰 发布者确认。最好完全避免在共享通道上进行并发发布,例如通过每个线程使用一个通道。

可以使用通道池来避免在共享通道上进行并发发布:一旦一个线程完成使用一个通道,它就会将其返回给池,使该通道可供另一个线程使用。通道池可以被认为是特定的同步解决方案。建议使用现有的池化库而不是自制解决方案。例如,Spring AMQP 提供了现成的通道池功能。

通道消耗资源,在大多数情况下,应用程序在同一 JVM 进程中很少需要超过数百个开放通道。如果我们假设应用程序为每个通道都有一个线程(因为通道不应并发使用),那么单个 JVM 的数千个线程已经是一个相当大的开销,可能可以避免。此外,少数快速的发布者可以轻松地使网络接口和代理节点饱和:发布涉及的工作量比路由、存储和传递消息要少。

一个需要避免的经典反模式是为每条已发布的的消息打开一个通道。通道应该相当持久,打开一个新通道是一个网络往返,这使得这种模式极其低效。

在一个线程中消费并在另一个线程中共享通道上发布是安全的。

服务器推送的传递(参见下面的部分)与通道保持顺序的保证并发分发。分发机制使用一个 java.util.concurrent.ExecutorService,每个连接一个。可以通过 ConnectionFactory#setSharedExecutor 设置器提供一个自定义执行程序,该执行程序将由单个 ConnectionFactory 创建的所有连接共享。

当使用 手动确认 时,考虑谁执行确认很重要。如果与接收传递的线程不同(例如,Consumer#handleDelivery 将传递处理委托给了另一个线程),则使用 multiple 参数设置为 true 进行确认是不安全的,并且会导致重复确认,从而导致通道级协议异常,关闭该通道。一次确认一条消息是安全的。

通过订阅接收消息(“推送 API”)

import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;

接收消息的最有效方法是使用 Consumer 接口设置订阅。然后,消息将在到达时自动传递,而不是必须显式请求。

在调用与 Consumer 相关的 API 方法时,单个订阅始终由其消费者标签引用。消费者标签是一个消费者标识符,可以是客户端生成或服务器生成的。为了让 RabbitMQ 生成一个节点范围唯一的标签,请使用一个不带消费者标签参数的 Channel#basicConsume 重载,或将空字符串传递给消费者标签并使用 Channel#basicConsume 返回的值。消费者标签用于取消消费者。

不同的 Consumer 实例必须具有不同的消费者标签。强烈建议避免在连接上使用重复的消费者标签,因为这可能导致 自动连接恢复 问题,并在监控消费者时产生混淆的监控数据。

实现 Consumer 的最简单方法是继承便利类 DefaultConsumer。此子类的对象可以传递给 basicConsume 调用以设置订阅。

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "myConsumerTag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
<i>// (process the message components here ...)</i>
channel.basicAck(deliveryTag, false);
}
});

在此处,由于我们指定了 autoAck = false,因此有必要确认传递给 Consumer 的消息,最方便的做法是在 handleDelivery 方法中完成,如下例所示。

更复杂的 Consumer 需要覆盖更多方法。特别是,当通道和连接关闭时会调用 handleShutdownSignal,并且在对该 Consumer 的任何其他回调调用之前,会将消费者标签传递给 handleConsumeOk

Consumer 还可以实现 handleCancelOkhandleCancel 方法,以分别接收显式和隐式取消的通知。

您可以使用 Channel.basicCancel 显式取消特定的 Consumer

channel.basicCancel(consumerTag);

传递消费者标签。

与发布者一样,为消费者考虑并发风险安全很重要。

回调到 Consumer 的过程是在一个独立于实例化其 Channel 的线程的线程池中进行的。这意味着 Consumer 可以安全地调用 ConnectionChannel 上的阻塞方法,例如 Channel#queueDeclareChannel#basicCancel

每个 Channel 都将按 RabbitMQ 发送的顺序将其所有传递分发到其 Consumer 回调方法。通道之间的传递顺序不保证:这些传递可以并行分发。

对于最常见的用例(一个 Consumer 对应一个 Channel),这意味着 Consumer 不会阻碍其他 Consumer。当一个通道上有多个 Consumer 时,请注意,一个耗时长的 Consumer 可能会阻碍该通道上其他 Consumer 的回调分发。

有关并发和并发风险安全的其他主题,请参阅 并发注意事项(线程安全)部分。

检索单个消息(“拉取 API”)

也可以按需检索单个消息(“拉取 API”,也称为轮询)。这种消费方法效率极低,因为它实际上是在轮询,并且应用程序需要反复请求结果,即使绝大多数请求都没有产生任何结果。因此,强烈不推荐 使用此方法。

要“拉取”消息,请使用 Channel.basicGet 方法。返回值是 GetResponse 的实例,从中可以提取头信息(属性)和消息体。

boolean autoAck = false;
GetResponse response = channel.basicGet(queueName, autoAck);
if (response == null) {
// No message retrieved.
} else {
AMQP.BasicProperties props = response.getProps();
byte[] body = response.getBody();
long deliveryTag = response.getEnvelope().getDeliveryTag();
// ...

由于此示例使用了 手动确认(上面的 autoAck = false),您还必须调用 Channel.basicAck 来确认您已成功接收到消息。

// ...
channel.basicAck(method.deliveryTag, false); // acknowledge receipt of the message
}

处理无法路由的消息

如果消息是以“mandatory”标志集发布的,但无法路由,代理将通过 AMQP.Basic.Return 命令将其返回给发送客户端。

为了收到此类返回的通知,客户端可以实现 ReturnListener 接口并调用 Channel.addReturnListener。如果客户端未为特定通道配置返回侦听器,则关联的已返回消息将被静默丢弃。

channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replyCode,
String replyText,
String exchange,
String routingKey,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
...
}
});

例如,当客户端将带有“mandatory”标志的消息发布到类型为“direct”且未绑定到队列的交换器时,将调用返回侦听器。

关闭协议

客户端关闭流程概述

AMQP 0-9-1 连接和通道在管理网络故障、内部故障和显式本地关闭方面共享相同的通用方法。

AMQP 0-9-1 连接和通道具有以下生命周期状态:

  • open:对象已准备好使用。

  • closing:对象已收到本地关闭通知,已向任何支持的底层对象发出关闭请求,并正在等待其关闭程序完成。

  • closed:对象已从任何底层对象收到所有关闭完成通知,并因此已关闭自身。

无论导致关闭的原因如何(例如应用程序请求、内部客户端库故障、远程网络请求或网络故障),这些对象最终都会进入关闭状态。

连接和通道对象具有以下与关闭相关的​​方法:

  • addShutdownListener(ShutdownListener listener)

  • removeShutdownListener(ShutdownListener listener),用于管理任何侦听器,当对象转换为 closed 状态时将触发这些侦听器。请注意,向已关闭的对象添加 ShutdownListener 将立即触发该侦听器。

  • getCloseReason(),允许调查对象关闭的原因。

  • isOpen(),用于测试对象是否处于打开状态。

  • close(int closeCode, String closeMessage),用于显式通知对象关闭。

侦听器的简单用法如下:

import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.ShutdownListener;

connection.addShutdownListener(new ShutdownListener() {
public void shutdownCompleted(ShutdownSignalException cause)
{
...
}
});

有关关闭情况的信息

可以通过显式调用 getCloseReason() 方法,或者使用 ShutdownListener 类中的 service(ShutdownSignalException cause) 方法的 cause 参数来检索包含有关关闭原因所有信息的 ShutdownSignalException

ShutdownSignalException 类提供了分析关闭原因的方法。通过调用 isHardError() 方法,我们可以获取有关它是连接错误还是通道错误的信息,而 getReason() 以 AMQP 方法(AMQP.Channel.CloseAMQP.Connection.Close)的形式返回有关原因的信息(如果原因是库中的某些异常,例如网络通信失败,则 getCause() 可以检索该异常,此时返回 null)。

public void shutdownCompleted(ShutdownSignalException cause)
{
if (cause.isHardError())
{
Connection conn = (Connection)cause.getReference();
if (!cause.isInitiatedByApplication())
{
Method reason = cause.getReason();
...
}
...
} else {
Channel ch = (Channel)cause.getReference();
...
}
}

原子性和 isOpen() 方法的使用

不建议在生产代码中使用通道和连接对象的 isOpen() 方法,因为该方法返回的值取决于关闭原因的存在。以下代码说明了发生竞态条件的可能性:

public void brokenMethod(Channel channel)
{
if (channel.isOpen())
{
// The following code depends on the channel being in open state.
// However there is a possibility of the change in the channel state
// between isOpen() and basicQos(1) call
...
channel.basicQos(1);
}
}

相反,我们应该通常忽略此类检查,只需尝试执行所需的操作。如果在代码执行期间通道或连接已关闭,则会抛出 ShutdownSignalException,指示对象处于无效状态。我们还应该捕获 IOException,它可能由 SocketException(当代理意外关闭连接时)或 ShutdownSignalException(当代理启动干净关闭时)引起。

public void validMethod(Channel channel)
{
try {
...
channel.basicQos(1);
} catch (ShutdownSignalException sse) {
// possibly check if channel was closed
// by the time we started action and reasons for
// closing it
...
} catch (IOException ioe) {
// check why connection was closed
...
}
}

高级连接选项

消费者操作线程池

默认情况下,Consumer 线程(参见下文 接收)会在新的 ExecutorService 线程池中自动分配。如果需要更精细地控制,请在 newConnection() 方法上提供一个 ExecutorService,以便使用该线程池而不是默认线程池。以下是一个示例,其中提供了比通常分配的更大的线程池:

ExecutorService es = Executors.newFixedThreadPool(20);
Connection conn = factory.newConnection(es);

ExecutorsExecutorService 类都位于 java.util.concurrent 包中。

当连接关闭时,默认的 ExecutorService 将被 shutdown(),但用户提供的 ExecutorService(如上面的 es)将 不会shutdown()。提供自定义 ExecutorService 的客户端必须确保最终对其调用 shutdown() 方法,否则池的线程可能会阻止 JVM 终止。

相同的执行程序服务可以由多个连接共享,或在重新连接时进行串行重用,但不能在 shutdown() 后使用。

只有当有证据表明 Consumer 回调处理存在严重瓶颈时,才应考虑使用此功能。如果没有执行 Consumer 回调,或者执行很少,则默认分配已绰绰有余。开销最初很小,总分配的线程资源是有限的,即使偶尔会发生消费者活动爆发。

使用 AddressResolver 接口进行服务发现

可以使用 AddressResolver 的实现来更改连接时使用的端点解析算法。

Connection conn = factory.newConnection(addressResolver);

AddressResolver 接口如下:

public interface AddressResolver {

List<Address> getAddresses() throws IOException;

}

端点列表 类似,将首先尝试 Address 列表中返回的第一个地址,如果客户端未能连接到第一个地址,则尝试第二个,依此类推。

如果还提供了 ExecutorService(通过 factory.newConnection(es, addressResolver) 形式),则线程池与(第一个)成功连接相关联。

AddressResolver 是实现自定义服务发现逻辑的理想场所,这在动态基础架构中尤其有用。结合 自动恢复,客户端可以自动连接到在首次启动时甚至不存在的节点。亲和性(Affinity)和负载均衡是自定义 AddressResolver 可能有用的其他场景。

Java 客户端附带以下实现(有关详细信息,请参阅 javadoc):

  • DnsRecordIpAddressResolver:给定主机名,返回其 IP 地址(针对平台 DNS 服务器进行解析)。这对于简单的基于 DNS 的负载均衡或故障转移很有用。

  • DnsSrvRecordAddressResolver:给定服务名称,返回主机名/端口对。搜索实现为 DNS SRV 请求。这在使用服务注册表(如 HashiCorp Consul)时很有用。

心跳超时

有关心跳以及如何在 Java 客户端中配置心跳的更多信息,请参阅 心跳指南

自定义线程工厂

Google App Engine (GAE) 等环境 限制直接创建线程。要在此类环境中使用 RabbitMQ Java 客户端,必须配置一个自定义 ThreadFactory,该工厂使用适当的方法来实例化线程,例如 GAE 的 ThreadManager

以下是 Google App Engine 的一个示例。

import com.google.appengine.api.ThreadManager;

ConnectionFactory cf = new ConnectionFactory();
cf.setThreadFactory(ThreadManager.backgroundThreadFactory());

使用 Netty 进行网络 I/O

Java 客户端的 5.27.0 版本带来了对 Netty 的支持,用于网络 I/O。Netty 不一定比阻塞 I/O 快,但它提供了对资源(例如线程)的更多控制,并提供了高级网络选项,如 使用 OpenSSL 的 TLS原生传输(epoll、io_uring、kqueue)。

在默认的阻塞 I/O 模式下,每个连接都使用一个线程从网络套接字读取。使用 Netty,您可以控制从/到网络读取和写入的线程数。

如果您的 Java 进程使用许多连接(几十个或几百个),请使用 Netty。您应该使用比默认阻塞模式更少的线程。通过适当设置线程数,您不应遇到性能下降,尤其是在连接不太繁忙的情况下。

Netty 通过 ConnectionFactory#netty() 辅助方法激活和配置。Netty 的 EventLoopGroup 是对对线程数挑剔的应用程序最重要的设置。以下是如何使用 4 个线程进行设置的示例:

int nbThreads = 4;
IoHandlerFactory ioHandlerFactory = NioIoHandler.newFactory();
EventLoopGroup eventLoopGroup = new MultiThreadIoEventLoopGroup(
nbThreads, ioHandlerFactory
);
connectionFactory.netty().eventLoopGroup(eventLoopGroup);
// ...
// dispose the event loop group after closing all connections
eventLoopGroup.shutdownGracefully();

请注意,事件循环组必须在连接关闭其连接后进行处理。如果没有设置事件循环组,每个连接将使用自己的、单线程的事件循环组(并将负责关闭它)。这远非最佳选择,这就是为什么在使用 Netty 时强烈建议设置 EventLoopGroup

Netty 使用其自己的 SslContext API 进行 TLS 配置(而不是 JDK 的 SSLContext),因此当 Netty 激活时,ConnectionFactory#useSslProtocol() 方法无效。而是使用 ConnectionFactory.netty().sslContext(SslContext),以及 Netty 的 SslContextBuilder 类。以下是一个示例:

X509Certificate caCertificate = ...;
connectionFactory.netty()
.sslContext(SslContextBuilder
.forClient() // mandatory, do not forget to call
.trustManager(caCertificate) // pass in certificate directly
.build());

从网络故障中自动恢复

连接恢复

客户端和 RabbitMQ 节点之间的网络连接可能会失败。RabbitMQ Java 客户端支持连接和拓扑(队列、交换器、绑定和消费者)的自动恢复。

许多应用程序的自动恢复过程遵循以下步骤:

  • 重新连接
  • 恢复连接侦听器
  • 重新打开通道
  • 恢复通道侦听器
  • 恢复通道 basic.qos 设置、发布者确认和事务设置

拓扑恢复包括以下操作,对每个通道执行:

  • 重新声明交换器(预定义交换器除外)
  • 重新声明队列
  • 恢复所有绑定
  • 恢复所有消费者

从 Java 客户端 4.0.0 版本开始,自动恢复默认启用(拓扑恢复也一样)。

拓扑恢复依赖于每个连接的实体(队列、交换器、绑定、消费者)缓存。例如,当一个队列在连接上声明时,它将被添加到缓存中。当它被删除或计划删除时(例如因为它被 自动删除),它将被移除。这种模型存在一些局限性,将在下面介绍。

要禁用或启用自动连接恢复,请使用 factory.setAutomaticRecoveryEnabled(boolean) 方法。以下代码片段显示了如何显式启用自动恢复(例如,对于 4.0.0 之前的 Java 客户端):

ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);
factory.setAutomaticRecoveryEnabled(true);
// connection that will recover automatically
Connection conn = factory.newConnection();

如果恢复由于异常而失败(例如,RabbitMQ 节点仍不可达),它将在固定的时间间隔后重试(默认为 5 秒)。该间隔可以配置。

ConnectionFactory factory = new ConnectionFactory();
// attempt recovery every 10 seconds
factory.setNetworkRecoveryInterval(10000);

当提供地址列表时,列表会被随机排序,并逐一尝试所有地址。

ConnectionFactory factory = new ConnectionFactory();

Address[] addresses = {new Address("192.168.1.4"), new Address("192.168.1.5")};
factory.newConnection(addresses);

何时触发连接恢复?

如果启用了自动连接恢复,它将由以下事件触发:

  • 连接的 I/O 循环中抛出 I/O 异常。
  • 套接字读取操作超时。
  • 检测到丢失的服务器 心跳
  • 连接的 I/O 循环中抛出任何其他意外异常。

以先发生的为准。

如果到 RabbitMQ 节点的初始连接失败,自动连接恢复将不会启动。应用程序开发人员负责重试此类连接,记录失败的尝试,实施重试次数限制等。这是一个非常基本的示例:

ConnectionFactory factory = new ConnectionFactory();
// configure various connection settings

try {
Connection conn = factory.newConnection();
} catch (java.net.ConnectException e) {
Thread.sleep(5000);
// apply retry logic
}

当应用程序通过 Connection.Close 方法关闭连接时,不会启动连接恢复。

通道级异常不会触发任何类型的恢复,因为它们通常表示应用程序中的语义问题(例如,尝试从不存在的队列消费)。

恢复侦听器

可以向可恢复的连接和通道注册一个或多个恢复侦听器。当连接恢复启用时,ConnectionFactory#newConnectionConnection#createChannel 返回的连接会实现 com.rabbitmq.client.Recoverable,提供两个名称相当描述性的方法:

  • addRecoveryListener
  • removeRecoveryListener

请注意,目前需要将连接和通道转换为 Recoverable 才能使用这些方法。

对发布的影响

当连接断开时使用 Channel.basicPublish 发布的消息将丢失。客户端不会在连接恢复后将它们排队以供传递。为了确保已发布的消息能够到达 RabbitMQ,应用程序需要使用 发布者确认 并考虑连接故障。

拓扑恢复

拓扑恢复涉及交换器、队列、绑定和消费者的恢复。当启用自动恢复时,它默认启用。在客户端的现代版本中,拓扑恢复默认启用。

如果需要,可以显式禁用拓扑恢复。

ConnectionFactory factory = new ConnectionFactory();

Connection conn = factory.newConnection();
// enable automatic recovery (e.g. Java client prior 4.0.0)
factory.setAutomaticRecoveryEnabled(true);
// disable topology recovery
factory.setTopologyRecoveryEnabled(false);

故障检测和恢复限制

自动连接恢复有许多限制和有意设计的功能,应用程序开发人员需要了解。

拓扑恢复依赖于每个连接的实体(队列、交换器、绑定、消费者)缓存。例如,当一个队列在连接上声明时,它将被添加到缓存中。当它被删除或计划删除时(例如因为它被 自动删除),它将被移除。这使得在不同通道上声明和删除实体成为可能,而不会产生意外结果。这也意味着消费者标签(通道特定的标识符)在所有使用自动连接恢复的连接的通道之间必须是唯一的。

当连接断开或丢失时,需要时间来检测。因此,存在一个时间窗口,在此期间库和应用程序都不知道连接实际已失败。在此期间发布的任何消息都将被序列化并像往常一样写入 TCP 套接字。只能通过 发布者确认 来保证消息传递到代理:AMQP 0-9-1 中的发布在设计上是完全异步的。

当启用自动恢复的连接检测到套接字或 I/O 操作错误时,恢复将在配置的延迟后开始(默认为 5 秒)。这种设计假设即使许多网络故障是暂时的且通常是短暂的,它们也不会立即消失。具有延迟还可以避免服务器端资源清理(例如 独占或自动删除队列 的删除)与在同一资源上新打开的连接上执行的操作之间的固有竞态条件。

默认情况下,连接恢复尝试将以相同的间隔继续,直到成功打开新连接为止。可以通过将 RecoveryDelayHandler 实现实例提供给 ConnectionFactory#setRecoveryDelayHandler 来动态设置恢复延迟。使用动态计算延迟间隔的实现应避免过低的(例如低于 2 秒)值。

当连接处于恢复状态时,在同一通道上尝试的任何发布都将被拒绝并抛出异常。客户端目前不执行任何内部缓冲此类出站消息。应用程序开发人员有责任跟踪这些消息并在恢复成功后重新发布它们。 发布者确认 是一个协议扩展,应由无法承受消息丢失的发布者使用。

当通道因通道级异常而关闭时,连接恢复不会启动。此类异常通常表示应用程序级别的问​​题。库无法就这种情况做出明智的决定。

即使连接恢复启动后,已关闭的通道也不会被恢复。这包括显式关闭的通道以及上述通道级异常的情况。

手动确认和自动恢复

当使用手动确认时,可能会在消息传递和确认之间发生到 RabbitMQ 节点的网络连接失败。连接恢复后,RabbitMQ 将重置所有通道上的传递标签。

这意味着带有旧传递标签的 basic.ackbasic.nackbasic.reject 将导致通道异常。为避免这种情况,RabbitMQ Java 客户端会跟踪并更新传递标签,使其在恢复之间单调递增。

Channel.basicAckChannel.basicNackChannel.basicReject 然后会将调整后的传递标签转换为 RabbitMQ 使用的标签。

带有陈旧传递标签的确认将不会被发送。使用手动确认和自动恢复的应用程序必须能够处理重新传递。

通道生命周期和拓扑恢复

自动连接恢复旨在对应用程序开发人员尽可能透明,这就是为什么即使发生多次连接故障和恢复,Channel 实例也保持不变。从技术上讲,当自动恢复打开时,Channel 实例充当代理或装饰器:它们将 AMQP 业务委托给实际的 AMQP 通道实现,并在其周围实现一些恢复机制。这就是为什么您不应该在通道创建了某些资源(队列、交换器、绑定)后关闭它,否则这些资源的拓扑恢复将失败,因为通道已被关闭。相反,应将创建通道保持打开状态,直到应用程序生命周期结束。

未处理的异常

与连接、通道、恢复和消费者生命周期相关的未处理异常会委托给异常处理程序。异常处理程序是实现 ExceptionHandler 接口的任何对象。默认情况下,使用 DefaultExceptionHandler 的实例。它会将异常详细信息打印到标准输出。

可以通过 ConnectionFactory#setExceptionHandler 来覆盖处理程序。它将用于工厂创建的所有连接。

ConnectionFactory factory = new ConnectionFactory();
cf.setExceptionHandler(customHandler);

应使用异常处理程序进行异常日志记录。

指标和监控

客户端收集活动连接的运行时指标(例如,已发布消息的数量)。指标收集是一项可选功能,应在 ConnectionFactory 级别使用 setMetricsCollector(metricsCollector) 方法进行设置。此方法需要一个 MetricsCollector 实例,该实例在客户端代码的多个地方被调用。

客户端开箱即用地支持 MicrometerDropwizard MetricsOpenTelemetry

以下是收集的指标:

  • 打开的连接数
  • 打开的通道数
  • 已发布的消息数
  • 已确认的消息数
  • 已否定确认(nack-ed)的出站消息数
  • 已退回的无法路由的出站消息数
  • 出站消息失败数
  • 已消耗的消息数
  • 已确认的消息数
  • 已拒绝的消息数

Micrometer 和 Dropwizard Metrics 都提供计数,以及与消息相关的指标的平均速率、过去五分钟速率等。它们还支持用于监控和报告的常用工具(JMX、Graphite、Ganglia、Datadog 等)。有关详细信息,请参阅下面的专用部分。

开发人员在启用指标收集时应牢记几点:

  • 使用 Micrometer 或 Dropwizard Metrics 时,不要忘记将适当的依赖项(在 Maven、Gradle 中,或作为 JAR 文件)添加到 JVM 类路径中。这些是可选依赖项,不会随 Java 客户端自动拉取。您可能还需要添加其他依赖项,具体取决于使用的报告后端。
  • 指标收集是可扩展的。鼓励实现自定义 MetricsCollector 以满足特定需求。
  • MetricsCollectorConnectionFactory 级别设置,但可以在不同实例之间共享。
  • 指标收集不支持事务。例如,如果事务中发送了确认,然后事务被回滚,则确认会在客户端指标中计数(但显然不会由代理计数)。请注意,确认实际上已发送到代理,然后被事务回滚取消,因此客户端指标在发送确认方面是正确的。总之,不要将客户端指标用于关键业务逻辑,它们不能保证完美准确。它们旨在用于简化对运行系统的理解并提高运维效率。

Micrometer 支持

必须首先启用指标收集。

使用 Micrometer 如下:

ConnectionFactory connectionFactory = new ConnectionFactory();
MicrometerMetricsCollector metrics = new MicrometerMetricsCollector();
connectionFactory.setMetricsCollector(metrics);
...
metrics.getPublishedMessages(); // get Micrometer's Counter object

Micrometer 支持 几种报告后端:Netflix Atlas、Prometheus、Datadog、Influx、JMX 等。

您通常会将 MeterRegistry 的实例传递给 MicrometerMetricsCollector。以下是 JMX 的示例:

JmxMeterRegistry registry = new JmxMeterRegistry();
MicrometerMetricsCollector metrics = new MicrometerMetricsCollector(registry);
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setMetricsCollector(metrics);

Dropwizard Metrics 支持

使用 Dropwizard 启用指标收集,如下所示:

ConnectionFactory connectionFactory = new ConnectionFactory();
StandardMetricsCollector metrics = new StandardMetricsCollector();
connectionFactory.setMetricsCollector(metrics);
...
metrics.getPublishedMessages(); // get Metrics' Meter object

Dropwizard Metrics 支持 几种报告后端:控制台、JMX、HTTP、Graphite、Ganglia 等。

您通常会将 MetricsRegistry 的实例传递给 StandardMetricsCollector。以下是 JMX 的示例:

MetricRegistry registry = new MetricRegistry();
StandardMetricsCollector metrics = new StandardMetricsCollector(registry);

ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setMetricsCollector(metrics);

JmxReporter reporter = JmxReporter
.forRegistry(registry)
.inDomain("com.rabbitmq.client.jmx")
.build();
reporter.start();

Google App Engine 上的 RabbitMQ Java 客户端

在 Google App Engine (GAE) 上使用 RabbitMQ Java 客户端需要使用自定义线程工厂,该工厂使用 GAE 的 ThreadManager 来实例化线程(参见上文)。此外,必须设置较低的心跳间隔(4-5 秒),以避免在 GAE 上遇到低 InputStream 读取超时。

ConnectionFactory factory = new ConnectionFactory();
cf.setRequestedHeartbeat(5);

注意事项和限制

为了实现拓扑恢复,RabbitMQ Java 客户端维护了一个已声明队列、交换器和绑定的缓存。该缓存是按连接的。某些 RabbitMQ 功能使得客户端无法观察到某些拓扑更改,例如当队列因 TTL 而被删除时。RabbitMQ Java 客户端尝试在最常见的情况下使缓存条目失效:

  • 当队列被删除时。
  • 当交换器被删除时。
  • 当绑定被删除时。
  • 当消费者在自动删除的队列上被取消时。
  • 当队列或交换器与自动删除的交换器解绑时。

然而,客户端无法跟踪单个连接之外的这些拓扑更改。依赖于自动删除队列或交换器以及队列 TTL(注意:不是消息 TTL!)并使用 自动连接恢复 的应用程序,应显式删除已知未使用或已删除的实体,以清空客户端的拓扑缓存。这得益于 RabbitMQ 3.3.x 中 Channel#queueDeleteChannel#exchangeDeleteChannel#queueUnbindChannel#exchangeUnbind 的幂等性(删除不存在的实体不会导致异常)。

RPC(请求/回复)模式:一个示例

作为一种编程便利,Java 客户端 API 提供了一个 RpcClient 类,它使用一个临时的回复队列,通过 AMQP 0-9-1 提供简单的 RPC 风格的通信 功能。

该类不对 RPC 参数和返回值施加任何特定格式。它只是提供了一种机制,用于将消息发送到给定的交换器,并带有特定的路由键,然后在回复队列上等待响应。

import com.rabbitmq.client.RpcClient;

RpcClient rpc = new RpcClient(channel, exchangeName, routingKey);

(此类如何使用 AMQP 0-9-1 的实现细节如下:请求消息以 basic.correlation_id 字段设置为此 RpcClient 实例唯一的值发送,并以 basic.reply_to 设置为回复队列的名称。)

创建该类的实例后,您可以使用以下任何方法通过它发送 RPC 请求:

byte[] primitiveCall(byte[] message);
String stringCall(String message)
Map mapCall(Map message)
Map mapCall(Object[] keyValuePairs)

primitiveCall 方法将原始字节数组作为请求和响应体进行传输。stringCall 方法是 primitiveCall 的一个薄层便利包装器,它将消息体视为默认字符编码中的 String 实例。

mapCall 变体更复杂一些:它们将包含普通 Java 值的 java.util.Map 编码为 AMQP 0-9-1 二进制表表示,并以相同方式解码响应。(请注意,这里的值类型有一些限制——有关详细信息,请参阅 javadoc。)

所有编组/解组的便利方法都使用 primitiveCall 作为传输机制,只是在其之上提供了一个包装层。

TLS 支持

可以通过 TLS 加密客户端和代理之间的通信。客户端和服务器身份验证(又称对等验证)也得到支持。以下是使用 Java 客户端加密的最简单、最不安全的用法:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5671);

// Only suitable for development.
// This code will not perform peer certificate chain verification and prone
// to man-in-the-middle attacks.
// See the main TLS guide to learn about peer verification and how to enable it.
factory.useSslProtocol();

请注意,在上述示例中,客户端不强制执行任何服务器身份验证(对等证书链验证),因为使用了默认的“信任所有证书”TrustManager。这对于本地开发很方便,但容易受到中间人攻击,因此不推荐在生产中使用

要了解有关 RabbitMQ 中 TLS 支持的更多信息,请参阅 TLS 指南。如果您只想配置 Java 客户端(特别是对等验证和信任管理器部分),请阅读 TLS 指南的相应部分

请注意,当 Netty 用于网络 I/O 时,它需要使用自己的 SslContext API。有关更多详细信息,请参阅 Netty 部分。

OAuth 2 支持

该客户端可以针对 OAuth 2 服务器(如 UAA)进行身份验证。必须在服务器端启用 OAuth 2 插件,并将其配置为使用与客户端相同的 OAuth 2 服务器。

获取 OAuth 2 令牌

Java 客户端提供了 OAuth2ClientCredentialsGrantCredentialsProvider 类,用于通过 OAuth 2 Client Credentials 流程 获取 JWT 令牌。客户端将在打开连接时在密码字段中发送 JWT 令牌。然后,代理将验证 JWT 令牌的签名、有效性和权限,然后授权连接并授予对所请求虚拟主机的访问权限。

优先使用 OAuth2ClientCredentialsGrantCredentialsProviderBuilder 来创建 OAuth2ClientCredentialsGrantCredentialsProvider 实例,然后将其设置在 ConnectionFactory 上。以下代码片段展示了如何为 OAuth 2 插件的示例设置 配置和创建 OAuth 2 凭据提供程序实例。

import com.rabbitmq.client.impl.OAuth2ClientCredentialsGrantCredentialsProvider.
OAuth2ClientCredentialsGrantCredentialsProviderBuilder;
...
CredentialsProvider credentialsProvider =
new OAuth2ClientCredentialsGrantCredentialsProviderBuilder()
.tokenEndpointUri("https://:8080/uaa/oauth/token/")
.clientId("rabbit_client").clientSecret("rabbit_secret")
.grantType("password")
.parameter("username", "rabbit_super")
.parameter("password", "rabbit_super")
.build();

connectionFactory.setCredentialsProvider(credentialsProvider);

在生产环境中,请确保为令牌端点 URI 使用 HTTPS,并在必要时为 HTTPS 请求配置 SSLContext(以验证和信任 OAuth 2 服务器的身份)。以下代码片段通过使用 OAuth2ClientCredentialsGrantCredentialsProviderBuilder 中的 tls().sslContext() 方法来实现这一点。

SSLContext sslContext = ... // create and initialise SSLContext

CredentialsProvider credentialsProvider =
new OAuth2ClientCredentialsGrantCredentialsProviderBuilder()
.tokenEndpointUri("https://:8080/uaa/oauth/token/")
.clientId("rabbit_client").clientSecret("rabbit_secret")
.grantType("password")
.parameter("username", "rabbit_super")
.parameter("password", "rabbit_super")
.tls() // configure TLS
.sslContext(sslContext) // set SSLContext
.builder() // back to main configuration
.build();

请查阅 Javadoc 以了解所有可用选项。

刷新令牌

令牌会过期,代理将拒绝带有过期令牌的连接上的操作。为了避免这种情况,可以在令牌过期前调用 CredentialsProvider#refresh() 并将新令牌发送到服务器。从应用程序的角度来看,这很麻烦,因此 Java 客户端提供了 DefaultCredentialsRefreshService 来提供帮助。此实用程序跟踪使用的令牌,在它们过期之前刷新它们,并将新令牌发送到它负责的连接。

以下代码片段展示了如何创建 DefaultCredentialsRefreshService 实例并将其设置在 ConnectionFactory 上。

import com.rabbitmq.client.impl.DefaultCredentialsRefreshService.
DefaultCredentialsRefreshServiceBuilder;
...
CredentialsRefreshService refreshService =
new DefaultCredentialsRefreshServiceBuilder().build();
cf.setCredentialsRefreshService(refreshService);

DefaultCredentialsRefreshService 会在令牌有效时间达到 80% 时安排刷新,例如,如果令牌在 60 分钟后过期,它将在 48 分钟后刷新。这是默认行为,有关更多信息,请参阅 Javadoc

© . This site is unofficial and not affiliated with VMware.