跳至主内容

AMQP 1.0 客户端库

本页记录了 RabbitMQ 4.0 及以上版本AMQP 1.0 客户端库的用法。

RabbitMQ 团队支持以下库

应用程序开发人员将在此处找到最常见用例的库使用方法。有关许可、下载、依赖管理、高级和特定用法及配置等其他信息,请参阅相应库存储库中的 README 页面。

概述

RabbitMQ 团队维护了一组 专为 RabbitMQ 设计和优化 的 AMQP 1.0 客户端库。它们在 AMQP 1.0 之上提供了简单、安全且功能强大的 API。应用程序可以使用这些库发布和消费消息,并以跨编程语言一致的方式管理服务器拓扑。这些库还提供高级功能,如自动连接和拓扑恢复,以及与队列的连接亲和性。

注意

RabbitMQ 与任何符合 AMQP-1.0 的客户端库兼容。使用 RabbitMQ AMQP 1.0 客户端库与 RabbitMQ 并非强制性,但强烈建议应用程序这样做以获得最佳体验。

安全性

RabbitMQ AMQP 1.0 客户端库默认是安全的,它们始终创建 持久实体 并始终发布持久化消息。

保证

RabbitMQ AMQP 1.0 客户端库提供至少一次的保证。

Broker 始终 确认 已正确处理发布的 P消息。发布者通过使用 `unsettled` 发送者结算模式 和 `first` 接收者结算模式 来实现此目的。

消费者必须始终 向 broker 发送 消息处理结果。消费者在创建时使用与发布者相同的设置(`first` 接收者结算模式 和 `unsettled` 发送者结算模式)。

客户端 API

本节介绍如何使用 RabbitMQ AMQP 1.0 客户端库连接到集群,以及发布和消费消息。

连接

库提供了一个节点或节点集群的入口点。它的名称是“环境”。环境允许创建连接。它可以包含在连接之间共享的基础设施相关配置设置(例如,Java 的线程池)。以下是如何创建环境:

创建环境
import com.rabbitmq.client.amqp.*;
import com.rabbitmq.client.amqp.impl.AmqpEnvironmentBuilder;

// ...

// create the environment instance
Environment environment = new AmqpEnvironmentBuilder()
.build();
// ...
// close the environment when the application stops
environment.close();

通常一个应用程序进程只有一个环境实例。应用程序退出时必须关闭环境以释放其资源。

应用程序从环境中打开连接。它们必须指定适当的设置来连接到集群节点(URI、凭据)。

打开连接
// open a connection from the environment
Connection connection = environment.connectionBuilder()
.uri("amqp://admin:admin@localhost:5672/%2f")
.build();
// ...
// close the connection when it is no longer necessary
connection.close();

库默认使用 ANONYMOUS SASL 身份验证机制。连接应为长期存在的对象,应用程序应避免频繁断开和重连。不再需要时必须关闭它们。

发布

必须创建一个发布者来发布消息。发布者将发布消息的目标通常在创建时设置,但也可以按每条消息的基础设置。

以下是如何声明一个在创建时设置目标的发布者:

创建发布者
Publisher publisher = connection.publisherBuilder()
.exchange("foo").key("bar")
.build();
// ...
// close the publisher when it is no longer necessary
publisher.close();

在前面的示例中,使用该发布者发布的每条消息都将发送到具有 `bar` 路由键的 `foo` 交换机。

信息

RabbitMQ 使用包含交换机、队列和绑定 的 AMQ 0.9.1 模型

消息从发布者实例创建。它们遵循 AMQP 1.0 消息格式。可以定义消息体(作为字节数组)、标准属性和应用程序属性。

当消息发布时,Broker 会在异步回调中指示它如何处理该消息。客户端应用程序会根据 Broker 返回的消息状态(AMQP 中的“结果”)采取适当措施(例如,如果消息未被 accepted,则将其存储在其他地方)。

以下代码片段展示了如何创建消息、发布它,并处理来自 Broker 的响应:

发布消息
// create the message
Message message = publisher
.message("hello".getBytes(StandardCharsets.UTF_8))
.messageId(1L);

// publish the message and deal with broker feedback
publisher.publish(message, context -> {
// asynchronous feedback from the broker
if (context.status() == Publisher.Status.ACCEPTED) {
// the broker accepted (confirmed) the message
} else {
// deal with possible failure
}
});

上面的发布者示例将消息发送到指定的交换机和路由键,但这并非发布者支持的唯一目标。以下是支持的非空目标:

创建具有不同目标设置的发布者
// publish to an exchange with a routing key
Publisher publisher1 = connection.publisherBuilder()
.exchange("foo").key("bar") // /exchanges/foo/bar
.build();

// publish to an exchange without a routing key
Publisher publisher2 = connection.publisherBuilder()
.exchange("foo") // /exchanges/foo
.build();

// publish to a queue
Publisher publisher3 = connection.publisherBuilder()
.queue("some-queue") // /queues/some-queue
.build();
信息

库将 API 调用转换为 地址格式 v2

也可以按每条消息的基础定义目标。发布者必须定义为不带目标,并且每条消息在其属性部分的 `to` 字段中定义其目标。库在消息创建 API 中提供了辅助方法来定义消息目标,从而避免处理 地址格式

以下代码片段展示了如何创建没有目标的发布者,并定义具有不同目标类型的消息:

在消息中设置目标
// no target defined on publisher creation
Publisher publisher = connection.publisherBuilder()
.build();

// publish to an exchange with a routing key
Message message1 = publisher.message()
.toAddress().exchange("foo").key("bar")
.message();

// publish to an exchange without a routing key
Message message2 = publisher.message()
.toAddress().exchange("foo")
.message();

// publish to a queue
Message message3 = publisher.message()
.toAddress().queue("my-queue")
.message();

对 Streams 的支持

如果消息要发送到 ,则可以设置其 过滤器值,使用 x-stream-filter-value 消息注解。

在消息注解中设置流过滤器值
Message message = publisher.message(body)
.annotation("x-stream-filter-value", "invoices"); // set filter value
publisher.publish(message, context -> {
// confirm callback
});

消费

消费者创建

创建消费者包括指定要从中消费的队列以及处理消息的回调。

创建消费者
Consumer consumer = connection.consumerBuilder()
.queue("some-queue")
.messageHandler((context, message) -> {
byte[] body = message.body();
// ...
context.accept(); // settle the message
})
.build(); // do not forget to build the instance!

一旦应用程序完成处理消息,就必须对其进行结算。这会向 Broker 指示处理结果以及它应该如何处理该消息(例如,删除该消息)。应用程序必须结算消息,否则它们将用完 信用额度,Broker 将停止向它们分派消息。

下一节将介绍消息结算的语义。

消息处理结果(Outcome)

库允许应用程序以不同方式结算消息。它们在消息应用程序的上下文中尽可能使用明确的术语。每个术语都映射到 AMQP 规范中的 特定结果

  • accept:应用程序成功处理了消息,可以将其从队列中删除(accepted 结果)。
  • discard:应用程序无法处理消息,因为它无效,Broker 可以将其丢弃或(如果配置了) 死信
  • requeue:应用程序未处理消息,Broker 可以将其重新入队并将其分发给同一或不同的消费者(released 结果)。

discardrequeue 有一个可选的消息注解参数,用于与消息头部分中现有的注解结合。此类消息注解可用于提供有关 discardrequeue 原因的详细信息。特定于应用程序的注解键必须以 x-opt- 前缀开头,而 Broker 理解的注解键仅以 x- 开头。discardrequeue 都使用 `modified` 结果和消息注解参数。

只有仲裁队列支持使用 `modified` 结果 修改消息注解

消费者优雅关闭

消费者通过接受、丢弃或重新排队来结算消息。

未结算的消息在消费者关闭时会被重新排队。这可能导致消息重复处理。

以下是一个例子:

  • 消费者为给定消息执行数据库操作。
  • 消费者在接受(结算)消息之前关闭。
  • 消息被重新排队。
  • 另一个消费者获取消息并再次执行数据库操作。

完全避免重复消息是困难的,这就是为什么处理应该是幂等的。消费者 API 提供了一种在消费者关闭时避免重复消息的方法。它包括暂停消息传递,获取未结算消息的数量以确保其最终达到 0,然后关闭消费者。这确保了消费者最终已静止并且所有接收到的消息都已处理。

以下是一个消费者优雅关闭的示例:

优雅关闭消费者
// pause the delivery of messages
consumer.pause();
// ensure the number of unsettled messages reaches 0
long unsettledMessageCount = consumer.unsettledMessageCount();
// close the consumer
consumer.close();

应用程序仍然可以不暂停而关闭消费者,但有重复处理同一消息的风险。

对 Streams 的支持

库在消费者配置中对 提供开箱即用的支持。

消费 流时,可以设置附加到何处

附加到流的开头
Consumer consumer = connection.consumerBuilder()
.queue("some-stream")
.stream()
.offset(ConsumerBuilder.StreamOffsetSpecification.FIRST)
.builder()
.messageHandler((context, message) -> {
// message processing
})
.build();

还支持 流过滤 配置。

配置流过滤
Consumer consumer = connection.consumerBuilder()
.queue("some-stream")
.stream()
.filterValues("invoices", "orders")
.filterMatchUnfiltered(true)
.builder()
.messageHandler((ctx, msg) -> {
String filterValue = (String) msg.annotation("x-stream-filter-value");
// there must be some client-side filter logic
if ("invoices".equals(filterValue) || "orders".equals(filterValue)) {
// message processing
}
ctx.accept();
})
.build();

处理流时,还可以考虑使用您首选编程语言的流客户端库的 原生流协议

拓扑管理

应用程序可以管理 RabbitMQ 的 AMQ 0.9.1 模型:声明和删除交换机、队列和绑定。

为此,它们需要从连接中获取管理 API。

从环境中获取管理对象
Management management = connection.management();
// ...
// close the management instance when it is no longer needed
management.close();

管理 API 在不再需要时应立即关闭。应用程序通常在启动时创建其所需的拓扑,因此可以在此步骤之后关闭管理对象。

交换器

以下是如何创建内置类型的 交换机

创建内置类型的交换机
management.exchange()
.name("my-exchange")
.type(Management.ExchangeType.FANOUT) // enum for built-in type
.declare();

也可以指定交换机类型为字符串(用于非内置类型交换机)。

创建非内置类型的交换机
management.exchange()
.name("my-exchange")
.type("x-delayed-message") // non-built-in type
.autoDelete(false)
.argument("x-delayed-type", "direct")
.declare();

以下是如何删除交换机:

删除交换机
management.exchangeDelete("my-exchange");

队列

以下是如何使用 默认队列类型 创建 队列

创建经典队列
management.queue()
.name("my-queue")
.exclusive(true)
.autoDelete(false)
.declare();

管理 API 显式支持 队列参数

创建带参数的队列
management.queue()
.name("my-queue")
.type(Management.QueueType.CLASSIC)
.messageTtl(Duration.ofMinutes(10))
.maxLengthBytes(ByteCapacity.MB(100))
.declare();

管理 API 还区分了所有队列类型共享的参数和仅对给定类型有效的参数。以下是创建 仲裁队列 的示例:

创建仲裁队列
management
.queue()
.name("my-quorum-queue")
.quorum() // set queue type to 'quorum'
.quorumInitialGroupSize(3) // specific to quorum queues
.deliveryLimit(3) // specific to quorum queues
.queue()
.declare();

可以查询有关队列的信息。

获取队列信息
Management.QueueInfo info = management.queueInfo("my-queue");
long messageCount = info.messageCount();
int consumerCount = info.consumerCount();
String leaderNode = info.leader();

此 API 也可用于检查队列是否存在。

以下是如何删除队列:

删除队列
management.queueDelete("my-queue");

绑定

管理 API 支持将 队列绑定 到交换机。

将队列绑定到交换机
management.binding()
.sourceExchange("my-exchange")
.destinationQueue("my-queue")
.key("foo")
.bind();

还支持 交换机到交换机的绑定

将交换机绑定到另一个交换机
management.binding()
.sourceExchange("my-exchange")
.destinationExchange("my-other-exchange")
.key("foo")
.bind();

也可以解绑实体。

删除交换机和队列之间的绑定
management.unbind()
.sourceExchange("my-exchange")
.destinationQueue("my-queue")
.key("foo")
.unbind();

高级用法

生命周期监听器

应用程序可以通过添加监听器来响应某些 API 组件的状态更改。应用程序可以向连接添加监听器,以便在连接恢复后停止发布消息。然后,当连接恢复并再次打开时,应用程序可以恢复发布。

以下是如何在连接上设置监听器:

在连接上设置监听器
Connection connection = environment.connectionBuilder()
.listeners(context -> { // set one or several listeners
context.previousState(); // the previous state
context.currentState(); // the current (new) state
context.failureCause(); // the cause of the failure (in case of failure)
context.resource(); // the connection
}).build();

也可以在发布者实例上设置监听器。

在发布者上设置监听器
Publisher publisher = connection.publisherBuilder()
.listeners(context -> {
// ...
})
.exchange("foo").key("bar")
.build();

在消费者实例上也可以。

在消费者上设置监听器
Consumer consumer = connection.consumerBuilder()
.listeners(context -> {
// ...
})
.queue("my-queue")
.build();

自动连接恢复

自动连接恢复默认激活:客户端库将在连接意外关闭后(例如,网络故障、节点重启等)自动恢复连接。自动拓扑恢复也在连接恢复后立即激活:客户端库将重新创建 AMQP 实体,以及为恢复的连接创建发布者和消费者。开发人员需要担心网络稳定性和节点重启的问题会少很多,因为客户端库会负责处理。

客户端每 5 秒尝试重新连接一次,直到成功。可以通过自定义退避延迟策略来更改此行为。

为连接恢复设置退避策略
Connection connection = environment.connectionBuilder()
.recovery()
.backOffDelayPolicy(BackOffDelayPolicy.fixed(Duration.ofSeconds(2)))
.connectionBuilder().build();

如果拓扑恢复不适用于特定应用程序,也可以将其禁用。应用程序通常会注册一个连接 生命周期监听器 来了解连接何时恢复,并相应地恢复其自身状态。

禁用拓扑恢复
Connection connection = environment.connectionBuilder()
.recovery()
.topology(false) // deactivate topology recovery
.connectionBuilder()
.listeners(context -> {
// set listener that restores application state when connection is recovered
})
.build();

也可以完全禁用恢复。

禁用恢复
Connection connection = environment.connectionBuilder()
.recovery()
.activated(false) // deactivate recovery
.connectionBuilder().build();
© . This site is unofficial and not affiliated with VMware.