AMQP 1.0 客户端库
本页记录了 RabbitMQ 4.0 及以上版本 的 AMQP 1.0 客户端库的用法。
RabbitMQ 团队支持以下库
- RabbitMQ AMQP 1.0 **Java** 客户端
- RabbitMQ AMQP 1.0 **.NET** 客户端
- RabbitMQ AMQP 1.0 Go 客户端
- RabbitMQ AMQP 1.0 Python 客户端
- RabbitMQ AMQP 1.0 JavaScript 客户端
应用程序开发人员将在此处找到最常见用例的库使用方法。有关许可、下载、依赖管理、高级和特定用法及配置等其他信息,请参阅相应库存储库中的 README 页面。
概述
RabbitMQ 团队维护了一组 专为 RabbitMQ 设计和优化 的 AMQP 1.0 客户端库。它们在 AMQP 1.0 之上提供了简单、安全且功能强大的 API。应用程序可以使用这些库发布和消费消息,并以跨编程语言一致的方式管理服务器拓扑。这些库还提供高级功能,如自动连接和拓扑恢复,以及与队列的连接亲和性。
RabbitMQ 与任何符合 AMQP-1.0 的客户端库兼容。使用 RabbitMQ AMQP 1.0 客户端库与 RabbitMQ 并非强制性,但强烈建议应用程序这样做以获得最佳体验。
安全性
RabbitMQ AMQP 1.0 客户端库默认是安全的,它们始终创建 持久实体 并始终发布持久化消息。
保证
RabbitMQ AMQP 1.0 客户端库提供至少一次的保证。
Broker 始终 确认 已正确处理发布的 P消息。发布者通过使用 `unsettled` 发送者结算模式 和 `first` 接收者结算模式 来实现此目的。
消费者必须始终 向 broker 发送 消息处理结果。消费者在创建时使用与发布者相同的设置(`first` 接收者结算模式 和 `unsettled` 发送者结算模式)。
客户端 API
本节介绍如何使用 RabbitMQ AMQP 1.0 客户端库连接到集群,以及发布和消费消息。
连接
库提供了一个节点或节点集群的入口点。它的名称是“环境”。环境允许创建连接。它可以包含在连接之间共享的基础设施相关配置设置(例如,Java 的线程池)。以下是如何创建环境:
- Java
- C#
- Python
- Go
- JavaScript
import com.rabbitmq.client.amqp.*;
import com.rabbitmq.client.amqp.impl.AmqpEnvironmentBuilder;
// ...
// create the environment instance
Environment environment = new AmqpEnvironmentBuilder()
.build();
// ...
// close the environment when the application stops
environment.close();
using RabbitMQ.AMQP.Client;
using RabbitMQ.AMQP.Client.Impl;
// ...
// create the environment instance
IEnvironment environment = await AmqpEnvironment.CreateAsync(
ConnectionSettingBuilder.Create().Build());
// ...
// close the environment when the application stops
await environment.CloseAsync();
from rabbitmq_amqp_python_client import Environment
# ...
# create the environment instance
environment = Environment("amqp://guest:guest@localhost:5672/")
# ...
# close the environment when the application stops
environment.close()
import (
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
)
// ...
// create the environment instance for a single node
env := rmq.NewEnvironment("amqp://guest:guest@localhost:5672/", nil)
// in case you have multiple endpoints you can use the following:
// clusterEnv := rmq.NewClusterEnvironment([]rmq.Endpoint{
// {Address: "amqp://server1", Options: &rmq.AmqpConnOptions{}},
// {Address: "amqp://server2", Options: &rmq.AmqpConnOptions{}},
// })
// ...
// close the connections
env.CloseConnections(context.Background())
const rabbit = require("rabbitmq-amqp-js-client");
// create the environment instance for a single node
const environment = rabbit.createEnvironment({
host: "localhost",
port: 5672,
username: rabbitUser,
password: rabbitPassword,
});
// close the environment when the application stops
await environment.close();
通常一个应用程序进程只有一个环境实例。应用程序退出时必须关闭环境以释放其资源。
应用程序从环境中打开连接。它们必须指定适当的设置来连接到集群节点(URI、凭据)。
- Java
- C#
- Python
- Go
- JavaScript
// open a connection from the environment
Connection connection = environment.connectionBuilder()
.uri("amqp://admin:admin@localhost:5672/%2f")
.build();
// ...
// close the connection when it is no longer necessary
connection.close();
// open a connection from the environment setting
IConnection connection = await environment.CreateConnectionAsync();
//open a connection from the environment with different settings
ConnectionSettingBuilder otherSettingBuilder = ConnectionSettingBuilder.Create()
.ContainerId("my_containerId")
.Host("localhost");
IConnection connection = await environment.CreateConnectionAsync(otherSettingBuilder.Build());
// ...
// close the connection when it is no longer necessary
await connection.CloseAsync();
# open a connection from the environment setting
connection = environment.connection()
# close the connection when it is no longer necessary
connection.close()
// open a connection from the environment setting
connection, err := env.NewConnection(context.Background())
// close the connection when it is no longer necessary
connection.close()
// open a connection from the environment setting
const connection = await environment.createConnection();
// close the connection when it is no longer necessary
await connection.close();
库默认使用 ANONYMOUS SASL 身份验证机制。连接应为长期存在的对象,应用程序应避免频繁断开和重连。不再需要时必须关闭它们。
发布
必须创建一个发布者来发布消息。发布者将发布消息的目标通常在创建时设置,但也可以按每条消息的基础设置。
以下是如何声明一个在创建时设置目标的发布者:
- Java
- C#
- Python
- Go
- JavaScript
Publisher publisher = connection.publisherBuilder()
.exchange("foo").key("bar")
.build();
// ...
// close the publisher when it is no longer necessary
publisher.close();
// The publisher can use exchange (optionally with a key) or queue to publish messages.
IPublisher publisher = await connection.PublisherBuilder().Exchange("foo").Key("bar")
.BuildAsync();
// ...
// close the publisher when it is no longer necessary
await publisher.CloseAsync();
publisher.Dispose();
# The publisher can use exchange (optionally with a key) or queue to publish messages.
# You can use the AddressHelper utility class to get the addr from the exchange name and the key
exchange_address = AddressHelper.exchange_address("foo", "bar")
publisher = connection.publisher(addr)
# close the publisher when it is no longer necessary
publisher.close()
// you can use ExchangeAddress and QueueAddress to publish to the queue
publisher, err := amqpConnection.NewPublisher(context.Background(), &rmq.ExchangeAddress{
Exchange: "foo",
Key: "bar",
}, nil)
// close the publisher when it is no longer necessary
publisher.close()
// The publisher can use exchange (optionally with a key) or queue to publish messages.
const publisher = await connection.createPublisher({
exchange: { name: "exchange", routingKey: "key" },
});
// close the publisher when it is no longer necessary
publisher.close();
在前面的示例中,使用该发布者发布的每条消息都将发送到具有 `bar` 路由键的 `foo` 交换机。
RabbitMQ 使用包含交换机、队列和绑定 的 AMQ 0.9.1 模型。
消息从发布者实例创建。它们遵循 AMQP 1.0 消息格式。可以定义消息体(作为字节数组)、标准属性和应用程序属性。
当消息发布时,Broker 会在异步回调中指示它如何处理该消息。客户端应用程序会根据 Broker 返回的消息状态(AMQP 中的“结果”)采取适当措施(例如,如果消息未被 accepted,则将其存储在其他地方)。
以下代码片段展示了如何创建消息、发布它,并处理来自 Broker 的响应:
- Java
- C#
- Python
- Go
- JavaScript
// create the message
Message message = publisher
.message("hello".getBytes(StandardCharsets.UTF_8))
.messageId(1L);
// publish the message and deal with broker feedback
publisher.publish(message, context -> {
// asynchronous feedback from the broker
if (context.status() == Publisher.Status.ACCEPTED) {
// the broker accepted (confirmed) the message
} else {
// deal with possible failure
}
});
// create the message
var message = new AmqpMessage("Hello");
// publish the message and deal with broker feedback
// The result is synchronous, use a `List<Task<PublishResult>>` to increase the performances
PublishResult pr = await publisher.PublishAsync(message);
switch (pr.Outcome.State)
{
case OutcomeState.Accepted:
// the broker accepted (confirmed) the message
break;
case OutcomeState.Released:
// the broker could not route the message anywhere
break;
case OutcomeState.Rejected:
// at least one queue rejected the message
break;
}
# create the message
# body is a byte array. You can use the Converter utility class
message = Message(body=Converter.string_to_bytes("Hello"))
# publish the message and deal with broker feedback
# The result is synchronous
status = publisher.publish(Message(message)
match status.remote_state:
case OutcomeState.ACCEPTED:
# the broker accepted (confirmed) the message
case OutcomeState.RELEASED:
# the broker could not route the message anywhere
case OutcomeState.REJECTED:
# at least one queue rejected the message
// create the message
message = rmq.NewMessage([]byte("Hello")
// publish the message and deal with broker feedback
publishResult, err := publisher.Publish(context.Background(),message)
if err != nil {
// there is an error
}
switch publishResult.Outcome.(type) {
case *rmq.StateAccepted:
// the broker accepted (confirmed) the message
break
case *rmq.StateReleased:
// the broker could not route the message anywhere
break
case *rmq.StateRejected:
// at least one queue rejected the message
stateType := publishResult.Outcome.(*rmq.StateRejected)
if stateType.Error != nil {
// in case there is error
}
break
default:
// these status are not supported. Leave it for AMQP 1.0 compatibility
// see: https://rabbitmq.org.cn/docs/amqp#outcomes
}
// create the message
const message = rabbit.createAmqpMessage({ body: "Hello" });
// publish the message and deal with broker feedback
const publishResult = await publisher.publish(message);
switch (publishResult.outcome) {
case rabbit.OutcomeState.ACCEPTED:
// the broker accepted (confirmed) the message
break;
case rabbit.OutcomeState.RELEASED:
// the broker could not route the message anywhere
break;
case rabbit.OutcomeState.REJECTED:
// at least one queue rejected the message
break;
default:
break;
}
上面的发布者示例将消息发送到指定的交换机和路由键,但这并非发布者支持的唯一目标。以下是支持的非空目标:
- Java
- C#
- Python
- Go
- JavaScript
// publish to an exchange with a routing key
Publisher publisher1 = connection.publisherBuilder()
.exchange("foo").key("bar") // /exchanges/foo/bar
.build();
// publish to an exchange without a routing key
Publisher publisher2 = connection.publisherBuilder()
.exchange("foo") // /exchanges/foo
.build();
// publish to a queue
Publisher publisher3 = connection.publisherBuilder()
.queue("some-queue") // /queues/some-queue
.build();
// publish to an exchange with a routing key
Publisher publisher = await connection.PublisherBuilder()
.Exchange("foo")
.Key("bar")
.BuildAsync();
// publish to an exchange without a routing key
Publisher publisher = await connection.PublisherBuilder()
.Exchange("foo") // /exchanges/foo
.BuildAsync();
// publish to a queue
IPublisher publisher = await _connection.PublisherBuilder()
.Queue("some-queue")// /queues/some-queue
.BuildAsync();
# publish to an exchange with a routing key
# You can use the AddressHelper utility class to get the addr from the exchange name and the key
exchange_address = AddressHelper.exchange_address("foo", "bar")
publisher = connection.publisher(addr)
# publish to an exchange without a routing key
exchange_address = AddressHelper.exchange_address("foo")
publisher = connection.publisher(addr)
# publish to a queue
queue_address = AddressHelper.queue_address("some-queue")
publisher = connection.publisher(queue_address)
// publish to an exchange with a routing key
publisher, err := connection.NewPublisher(context.Background(), &ExchangeAddress{
Exchange: "foo",
Key: "bar",
}, nil)
// publish to an exchange without a routing key
publisher, err := connection.NewPublisher(context.Background(), &ExchangeAddress{
Exchange: "foo",
}, nil)
// publish to a queue
publisher, err := connection.NewPublisher(context.Background(), &QueueAddress{
Queue: "some-queue"},
nil)
// publish to an exchange with a routing key
const publisher = await connection.createPublisher({
exchange: { name: "foo", routingKey: "bar" },
});
// publish to an exchange without a routing key
const publisher = await connection.createPublisher({
exchange: { name: "foo" },
});
// publish to a queue
const publisher = await connection.createPublisher({
queue: { name: "some-queue" },
});
库将 API 调用转换为 地址格式 v2。
也可以按每条消息的基础定义目标。发布者必须定义为不带目标,并且每条消息在其属性部分的 `to` 字段中定义其目标。库在消息创建 API 中提供了辅助方法来定义消息目标,从而避免处理 地址格式。
以下代码片段展示了如何创建没有目标的发布者,并定义具有不同目标类型的消息:
- Java
- C#
- Python
- Go
- JavaScript
// no target defined on publisher creation
Publisher publisher = connection.publisherBuilder()
.build();
// publish to an exchange with a routing key
Message message1 = publisher.message()
.toAddress().exchange("foo").key("bar")
.message();
// publish to an exchange without a routing key
Message message2 = publisher.message()
.toAddress().exchange("foo")
.message();
// publish to a queue
Message message3 = publisher.message()
.toAddress().queue("my-queue")
.message();
// no target defined on publisher creation
IPublisher publisher = await connection.PublisherBuilder()
.BuildAsync();
// publish to an exchange with a routing key
IMessage message = new AmqpMessage("Hello!").ToAddress()
.Exchange("foo")
.Key("bar")
.Build()
await aPublisher.PublishAsync(message);
// publish to a queue
IMessage message = new AmqpMessage("Hello!").ToAddress()
.Queue("foo")
.Build()
await aPublisher.PublishAsync(message);
# no target defined on publisher creation
publisher = connection.publisher()
# publish to an exchange with a routing key
# You can use the AddressHelper.message_to_address_helper
# utility class to set the destination in the message
message = Message(body="Hello!")
exchange_address = AddressHelper.exchange_address("foo", "bar")
message = AddressHelper.message_to_address_helper(message, exchange_address)
publisher.publish(message)
# publish to a queue
message = Message(body="Hello!")
queue_address = AddressHelper.queue_address("some-queue")
message = AddressHelper.message_to_address_helper(message, queue_address)
publisher = publisher.publish(message)
// // no target defined on publisher creation (nil)
publisher, err := connection.NewPublisher(context.Background(), nil,
nil)
// create message with an exchange with a routing key
msg, err = NewMessageWithAddress([]byte("hello"), &ExchangeAddress{
Exchange: "foo",
Key: "bar",
})
// create a message with a queue
msg, err = NewMessageWithAddress([]byte("hello"), &QueueAddress{
Queue: "some-queue"})
// use the publish
publishResult, err = publisher.Publish(context.Background(), msg)
// ...
// no target defined on publisher creation
const publisher = await connection.createPublisher();
// create message with an exchange with a routing key
const message = rabbit.createAmqpMessage({
body: "Hello",
destination: { exchange: { name: "exchange", routingKey: "key" } },
});
// create a message with a queue
const message = rabbit.createAmqpMessage({
body: "Hello",
destination: { queue: { name: "queue" } },
});
// use the publish
const publishResult = await publisher.publish(message);
//...
对 Streams 的支持
如果消息要发送到 流,则可以设置其 过滤器值,使用 x-stream-filter-value 消息注解。
- Java
- C#
- Python
- Go
- JavaScript
Message message = publisher.message(body)
.annotation("x-stream-filter-value", "invoices"); // set filter value
publisher.publish(message, context -> {
// confirm callback
});
var message = new AmqpMessage(body);
message.Annotation("x-stream-filter-value", "invoices");// set filter value
PublishResult pr = await publisher.PublishAsync(message);
publisher.publish(
Message(
Converter.string_to_bytes(body="myBody"),
annotations={"x-stream-filter-value": "invoices"},# set filter value
)
message := amqp.NewMessage(body)
message.Annotations = amqp.Annotations{
"x-stream-filter-value": "invoices",// set filter value
}
publishResult, err := publisher.Publish(context.Background(), message)
const message = rabbit.createAmqpMessage({
body: "Hello",
annotations: { "x-stream-filter-value": "invoices" },
})
const publishResult = await publisher.publish(message);
//...
消费
消费者创建
创建消费者包括指定要从中消费的队列以及处理消息的回调。
- Java
- C#
- Python
- Go
- JavaScript
Consumer consumer = connection.consumerBuilder()
.queue("some-queue")
.messageHandler((context, message) -> {
byte[] body = message.body();
// ...
context.accept(); // settle the message
})
.build(); // do not forget to build the instance!
IConsumer consumer = await connection.ConsumerBuilder()
.Queue("some-queue")
.MessageHandler(async (context, message) =>
{
// deal with the message
await context.AcceptAsync();// settle the message
}
).BuildAndStartAsync();
class MyMessageHandler(AMQPMessagingHandler):
def __init__(self):
super().__init__()
def on_message(self, event: Event):
# ...
# event.message.body is a byte array. If you have string
# you can use Converter utility class, like:
# my_body_string=Converter.bytes_to_string(event.message.body)
self.delivery_context.accept(event)# settle the message
queue_address = AddressHelper.queue_address("some-queue")
consumer = connection.consumer(queue_address, message_handler=MyMessageHandler())
consumer.run()
// create the consumer
consumer, err := connection.NewConsumer(context.Background(), "some-queue", nil)
// receive the message
deliveryContext, err := consumer.Receive(context.Background())
// ....
deliveryContext.Accept(context.Background()) // settle the message
// create the consumer
const consumer = await connection.createConsumer({
queue: { name: "some-queue" },
messageHandler: (context, msg) => {
console.log(`MessageId: ${msg.message_id}; Payload: ${msg.body}`);
context.accept(); // settle the message
},
});
consumer.start();
一旦应用程序完成处理消息,就必须对其进行结算。这会向 Broker 指示处理结果以及它应该如何处理该消息(例如,删除该消息)。应用程序必须结算消息,否则它们将用完 信用额度,Broker 将停止向它们分派消息。
下一节将介绍消息结算的语义。
消息处理结果(Outcome)
库允许应用程序以不同方式结算消息。它们在消息应用程序的上下文中尽可能使用明确的术语。每个术语都映射到 AMQP 规范中的 特定结果。
accept:应用程序成功处理了消息,可以将其从队列中删除(accepted结果)。discard:应用程序无法处理消息,因为它无效,Broker 可以将其丢弃或(如果配置了) 死信。requeue:应用程序未处理消息,Broker 可以将其重新入队并将其分发给同一或不同的消费者(released结果)。
discard 和 requeue 有一个可选的消息注解参数,用于与消息头部分中现有的注解结合。此类消息注解可用于提供有关 discard 或 requeue 原因的详细信息。特定于应用程序的注解键必须以 x-opt- 前缀开头,而 Broker 理解的注解键仅以 x- 开头。discard 和 requeue 都使用 `modified` 结果和消息注解参数。
只有仲裁队列支持使用 `modified` 结果 修改消息注解。
消费者优雅关闭
消费者通过接受、丢弃或重新排队来结算消息。
未结算的消息在消费者关闭时会被重新排队。这可能导致消息重复处理。
以下是一个例子:
- 消费者为给定消息执行数据库操作。
- 消费者在接受(结算)消息之前关闭。
- 消息被重新排队。
- 另一个消费者获取消息并再次执行数据库操作。
完全避免重复消息是困难的,这就是为什么处理应该是幂等的。消费者 API 提供了一种在消费者关闭时避免重复消息的方法。它包括暂停消息传递,获取未结算消息的数量以确保其最终达到 0,然后关闭消费者。这确保了消费者最终已静止并且所有接收到的消息都已处理。
以下是一个消费者优雅关闭的示例:
- Java
- C#
- Python
- Go
- JavaScript
// pause the delivery of messages
consumer.pause();
// ensure the number of unsettled messages reaches 0
long unsettledMessageCount = consumer.unsettledMessageCount();
// close the consumer
consumer.close();
// pause the delivery of messages
consumer.pause();
// ensure the number of unsettled messages reaches 0
long unsettledMessageCount = consumer.UnsettledMessageCount();
// close the consumer
consumer.close();
# CURRENTLY NOT IMPLEMENTED
// CURRENTLY NOT IMPLEMENTED
// CURRENTLY NOT IMPLEMENTED
应用程序仍然可以不暂停而关闭消费者,但有重复处理同一消息的风险。
对 Streams 的支持
库在消费者配置中对 流 提供开箱即用的支持。
在 消费 流时,可以设置附加到何处
- Java
- C#
- Python
- Go
- JavaScript
Consumer consumer = connection.consumerBuilder()
.queue("some-stream")
.stream()
.offset(ConsumerBuilder.StreamOffsetSpecification.FIRST)
.builder()
.messageHandler((context, message) -> {
// message processing
})
.build();
IConsumer consumer = await connection.ConsumerBuilder()
.Queue("some-stream")
.Stream()
.Offset(StreamOffsetSpecification.First)
.Builder()
.MessageHandler( async (context, message) => {
// message processing
})
.BuildAndStartAsync();
class MyMessageHandler(AMQPMessagingHandler):
def __init__(self):
super().__init__()
def on_message(self, event: Event):
# accepting
self.delivery_context.accept(event)
# deal with the message
stream_address = AddressHelper.queue_address("some-stream")
consumer = consumer_connection.consumer(
stream_address,
message_handler=MyMessageHandler(),
# can be first, last, next or an offset long
# you can also specify stream filters with methods: apply_filters and filter_match_unfiltered
stream_filter_options=StreamOptions(offset_specification=OffsetSpecification.first),
)
consumer, err := connection.NewConsumer(context.Background(), qName,
&StreamConsumerOptions{
Offset: &OffsetFirst{},
})
const consumer = await connection.createConsumer({
stream: {
name: "some-stream",
offset: Offset.first(),
},
messageHandler: (context, message) => {
// message processing
},
});
还支持 流过滤 配置。
- Java
- C#
- Python
- Go
- JavaScript
Consumer consumer = connection.consumerBuilder()
.queue("some-stream")
.stream()
.filterValues("invoices", "orders")
.filterMatchUnfiltered(true)
.builder()
.messageHandler((ctx, msg) -> {
String filterValue = (String) msg.annotation("x-stream-filter-value");
// there must be some client-side filter logic
if ("invoices".equals(filterValue) || "orders".equals(filterValue)) {
// message processing
}
ctx.accept();
})
.build();
IConsumer consumer = await connection.ConsumerBuilder()
.Queue("some-stream")
.Stream()
.FilterValues(["invoices", "orders"])
.FilterMatchUnfiltered(true)
.Builder()
.MessageHandler(async (context, message) => {
string filterValue = (string)message.Annotation("x-stream-filter-value");
if (filterValue.Equals("invoices")|| filterValue.Equals("orders"))
{
// message processing
}
context.Accept();
}
).BuildAndStartAsync();
class MyMessageHandler(AMQPMessagingHandler):
def __init__(self):
super().__init__()
def on_message(self, event: Event):
filterValue = event.message.annotations["x-stream-filter-value"]
if filterValue == "invoices" or filterValue == "orders":
### message processing
self.delivery_context.accept(event)
stream_address = AddressHelper.queue_address("some-stream")
consumer = consumer_connection.consumer(
stream_address,
message_handler=MyMessageHandler(),
stream_filter_options=StreamOptions(stream_filters=["invoices", "orders"], match_unfiltered=True),
)
consumer, err := connection.NewConsumer(context.Background(), qName, &
StreamConsumerOptions{
Offset: &OffsetFirst{},
Filters: []string{"invoices", "orders"},
})
deliveryContext, err := consumer.Receive(context.Background())
// ..
var filterValue string
filterValue = deliveryContext.Message().Annotations["x-stream-filter-value"].(string)
if filterValue == "orders" || filterValue == "invoices" {
//
}
err = deliveryContext.Accept(context.Background())
const consumer = await connection.createConsumer({
stream: {
name: "some-stream",
offset: Offset.first(),
matchUnfiltered: true,
filterValues: ["invoices", "orders"],
},
messageHandler: (context, message) => {
const annotations = message.message_annotations;
if (
annotations &&
["invoices", "orders"].includes(annotations["x-stream-filter-value"])
) {
// message processing
}
context.accept();
},
});
consumer.start();
处理流时,还可以考虑使用您首选编程语言的流客户端库的 原生流协议。
拓扑管理
应用程序可以管理 RabbitMQ 的 AMQ 0.9.1 模型:声明和删除交换机、队列和绑定。
为此,它们需要从连接中获取管理 API。
- Java
- C#
- Python
- Go
- JavaScript
Management management = connection.management();
// ...
// close the management instance when it is no longer needed
management.close();
IManagement management = connection.Management();
// ...
// close the management instance when it is no longer needed
await management.CloseAsync()
management = connection.management()
# ...
# close the management instance when it is no longer needed
management.close()
management = connection.management()
// ...
// close the management instance when it is no longer needed
management.close()
const management = connection.management();
// ...
// close the management instance when it is no longer needed
management.close();
管理 API 在不再需要时应立即关闭。应用程序通常在启动时创建其所需的拓扑,因此可以在此步骤之后关闭管理对象。
交换器
以下是如何创建内置类型的 交换机:
- Java
- C#
- Python
- Go
- JavaScript
management.exchange()
.name("my-exchange")
.type(Management.ExchangeType.FANOUT) // enum for built-in type
.declare();
IExchangeSpecification exchangeSpec = management
.Exchange(exchangeName)
.Type(ExchangeType.TOPIC);
await exchangeSpec.DeclareAsync();
exchange_name = "my-exchange"
management.declare_exchange(ExchangeSpecification(name=exchange_name, exchange_type=ExchangeType.topic))
// there are structs for Fanout, Direct, etc..
exchange_name = "my-exchange"
exchangeInfo, err := management.DeclareExchange(context.TODO(), &TopicExchangeSpecification{
Name: exchangeName,
})
const exchange = await management.declareExchange("my-exchange", {
type: "topic",
});
也可以指定交换机类型为字符串(用于非内置类型交换机)。
- Java
- C#
- Python
- Go
- JavaScript
management.exchange()
.name("my-exchange")
.type("x-delayed-message") // non-built-in type
.autoDelete(false)
.argument("x-delayed-type", "direct")
.declare();
await _management.Exchange("myExchange")
.Type("x-delayed-message")
.Argument("x-delayed-type", "direct")
.DeclareAsync();
exchange_arguments = {}
exchange_arguments["x-delayed-type"] = "direct"
exchange_info = management.declare_exchange(
ExchangeCustomSpecification(
name="myExchange",
exchange_type="x-delayed-message",
arguments=exchange_arguments,
)
)
_, err := management.DeclareExchange(context.TODO(), &CustomExchangeSpecification{
Name: "myExchange",
ExchangeTypeName: "x-delayed-message",
Arguments: map[string]any{
"x-delayed-type": "direct",
},
})
const exchange = await management.declareExchange("my-exchange", {
type: "x-delayed-message",
auto_delete: true,
durable: false,
arguments: {
"x-delayed-type": "direct",
},
});
以下是如何删除交换机:
- Java
- C#
- Python
- Go
- JavaScript
management.exchangeDelete("my-exchange");
await management.Exchange("my-exchange").DeleteAsync();
exchange_name = "my-exchange"
management.delete_exchange(exchange_name)
exchange_name = "my-exchange"
management.DeleteExchange(context.TODO(),exchange_name)
const result = await management.deleteExchange("my-exchange");
队列
- Java
- C#
- Python
- Go
- JavaScript
management.queue()
.name("my-queue")
.exclusive(true)
.autoDelete(false)
.declare();
IQueueSpecification queueSpec = management
.Queue("myqueue")
.Exclusive(true)
.AutoDelete(false)
await queueSpec.DeclareAsync();
queue_name = "myqueue"
management.declare_queue(ClassicQueueSpecification(name=queue_name))
queue_name = "myqueue"
queueInfo, err := management.DeclareQueue(context.TODO(), &ClassicQueueSpecification{
Name: queueName})
const queue = await management.declareQueue("myqueue", {
exclusive: true,
autoDelete: false,
});
管理 API 显式支持 队列参数。
- Java
- C#
- Python
- Go
- JavaScript
management.queue()
.name("my-queue")
.type(Management.QueueType.CLASSIC)
.messageTtl(Duration.ofMinutes(10))
.maxLengthBytes(ByteCapacity.MB(100))
.declare();
IQueueSpecification queueSpec = management
.Queue("my-queue")
.Type(QueueType.CLASSIC)
.MessageTtl(TimeSpan.FromMinutes(10))
.MaxLengthBytes(ByteCapacity.Mb(100));
await queueSpec.DeclareAsync();
management.declare_queue(ClassicQueueSpecification(name="my-queue", message_ttl=timedelta(minutes=10), max_len_bytes=100000000))
queueInfo, err := management.DeclareQueue(context.TODO(), &ClassicQueueSpecification{
Name: "my-queue",
MaxPriority: 32,
MaxLengthBytes: CapacityGB(1),
IsAutoDelete: false,
})
const queue = await management.declareQueue("myqueue", {
type: "classic",
arguments: {
// for a list, see: https://rabbitmq.org.cn/docs/queues#optional-arguments
"x-max-priority": 10,
"message-ttl": 60 * 1000,
},
});
管理 API 还区分了所有队列类型共享的参数和仅对给定类型有效的参数。以下是创建 仲裁队列 的示例:
- Java
- C#
- Python
- Go
- JavaScript
management
.queue()
.name("my-quorum-queue")
.quorum() // set queue type to 'quorum'
.quorumInitialGroupSize(3) // specific to quorum queues
.deliveryLimit(3) // specific to quorum queues
.queue()
.declare();
IQueueSpecification queueSpec = management
.Queue("my-quorum-queue")
.Quorum() // set queue type to 'quorum'
.QuorumInitialGroupSize(3) // specific to quorum queues
.DeliveryLimit(3) // specific to quorum queues
.Queue();
await queueSpec.DeclareAsync();
queue_name = "my-quorum-queue"
management.declare_queue(QuorumQueueSpecification(name=queue_name, quorum_initial_group_size=3, deliver_limit=3))
queueInfo, err := management.DeclareQueue(context.TODO(), &QuorumQueueSpecification{
Name: "my-quorum-queue",
QuorumInitialGroupSize = 3,
DeliveryLimit: 3,
})
const queue = await management.declareQueue("myqueue", {
type: "quorum",
deliveryLimit: 3,
initialGroupSize: 3,
});
可以查询有关队列的信息。
- Java
- C#
- Python
- Go
- JavaScript
Management.QueueInfo info = management.queueInfo("my-queue");
long messageCount = info.messageCount();
int consumerCount = info.consumerCount();
String leaderNode = info.leader();
IQueueInfo queueInfo = await management.GetQueueInfoAsync("my-queue");
ulong messageCount = queueInfo.MessageCount();
uint consumerCount = queueInfo.ConsumerCount();
string leader = queueInfo.Leader();
queue_name = "my-queue"
queue_info = management.queue_info(name=queue_name)
message_count = queue_info.message_count
consumer_count = queue_info.consumer_count
leader = queue_info.leader
queueInfo, err := management.QueueInfo(context.TODO(), "my-queue")
messageCount := queueInfo.MessageCount();
consumerCount := queueInfo.ConsumerCount();
leader := queueInfo.Leader();
const result = await management.getQueueInfo("my-queue");
const messages = result.getInfo.messageCount;
const consumers = result.getInfo.consumerCount;
const leader = result.getInfo.leader;
此 API 也可用于检查队列是否存在。
以下是如何删除队列:
- Java
- C#
- Python
- Go
- JavaScript
management.queueDelete("my-queue");
await management.Queue("myqueue").DeleteAsync();
management.delete_queue(name="myqueue")
management.DeleteExchange(context.TODO(),"myqueue")
const result = await management.deleteQueue(queueName);
绑定
管理 API 支持将 队列绑定 到交换机。
- Java
- C#
- Python
- Go
- JavaScript
management.binding()
.sourceExchange("my-exchange")
.destinationQueue("my-queue")
.key("foo")
.bind();
IBindingSpecification bindingSpec = management.Binding()
.SourceExchange("my-exchange")
.DestinationQueue("my-queue")
.Key("foo");
await bindingSpec.BindAsync();
bind_name = management.bind(
BindingSpecification(
source_exchange="my-exchange",
destination_queue="my-queue",
binding_key="foo",
)
)
// ExchangeToQueueBindingSpecification implements BindingSpecification interface
bindingPath, err := management.Bind(context.TODO(), &rmq.ExchangeToQueueBindingSpecification{
SourceExchange: "my-exchange",
DestinationQueue: "my-queue",
BindingKey: "foo",
})
const queue = await management.declareQueue("my-queue");
const exchange = await management.declareExchange("my-exchange");
await management.bind(routingKey, { source: exchange, destination: queue });
还支持 交换机到交换机的绑定。
- Java
- C#
- Python
- Go
- JavaScript
management.binding()
.sourceExchange("my-exchange")
.destinationExchange("my-other-exchange")
.key("foo")
.bind();
IBindingSpecification bindingSpec = management.Binding()
.SourceExchange("my-exchange")
.DestinationExchange("my-other-exchange")
.Key("foo");
await bindingSpec.BindAsync();
binding_exchange_queue_path = management.bind(
ExchangeToExchangeBindingSpecification(
source_exchange="my-exchange",
destination_exchange="my-other-exchange",
)
)
// ExchangeToExchangeBindingSpecification implements BindingSpecification interface
bindingPath, err := management.Bind(context.TODO(), &rmq.ExchangeToExchangeBindingSpecification{
SourceExchange: "my-exchange",
DestinationExchange: "my-other-exchange",
})
const exchange1 = await management.declareExchange("my-exchange");
const exchange2 = await management.declareExchange("my-other-exchange");
await management.bind(bindingKey, {
source: exchange1,
destination: exchange2,
});
也可以解绑实体。
- Java
- C#
- Python
- Go
- JavaScript
management.unbind()
.sourceExchange("my-exchange")
.destinationQueue("my-queue")
.key("foo")
.unbind();
IBindingSpecification bindingSpec = management.Binding()
.SourceExchange("my-exchange")
.DestinationQueue("my-queue")
.Key("foo");
await bindingSpec.UnbindAsync();
management.unbind(BindingSpecification(
source_exchange="my-exchange",
destination_queue="my-queue",
))
// bindingPath is the bind result
err = management.Unbind(context.TODO(), bindingPath)
await management.unbind(bindingKey, { source: exchange, destination: queue });
高级用法
生命周期监听器
应用程序可以通过添加监听器来响应某些 API 组件的状态更改。应用程序可以向连接添加监听器,以便在连接恢复后停止发布消息。然后,当连接恢复并再次打开时,应用程序可以恢复发布。
以下是如何在连接上设置监听器:
- Java
- C#
- Python
- Go
- JavaScript
Connection connection = environment.connectionBuilder()
.listeners(context -> { // set one or several listeners
context.previousState(); // the previous state
context.currentState(); // the current (new) state
context.failureCause(); // the cause of the failure (in case of failure)
context.resource(); // the connection
}).build();
connection.ChangeState += (
sender, // the sender instance ( in this case the connection)
fromState, // the previous state
toState, // the current (new) state
e // the cause of the failure (in case of failure)
) =>
{
};
# CURRENTLY NOT IMPLEMENTED
stateChanged := make(chan *rmq.StateChanged, 1)
go func(ch chan *rmq.StateChanged) {
for statusChanged := range ch {
// statusChanged.From from status
// statusChanged.To to status
// StateClosed has the func GetError() in case of error
}
}(stateChanged)
connection.NotifyStatusChange(stateChanged)
// CURRENTLY NOT IMPLEMENTED
也可以在发布者实例上设置监听器。
- Java
- C#
- Python
- Go
- JavaScript
Publisher publisher = connection.publisherBuilder()
.listeners(context -> {
// ...
})
.exchange("foo").key("bar")
.build();
publisher.ChangeState += (sender, fromState, toState, e) =>
{
}
# CURRENTLY NOT IMPLEMENTED FOR PYTHON
# CURRENTLY NOT IMPLEMENTED Due Of https://github.com/Azure/go-amqp/issues/99
// CURRENTLY NOT IMPLEMENTED
在消费者实例上也可以。
- Java
- C#
- Python
- Go
- JavaScript
Consumer consumer = connection.consumerBuilder()
.listeners(context -> {
// ...
})
.queue("my-queue")
.build();
consumer.ChangeState += (sender, fromState, toState, e) =>
{
}
# CURRENTLY NOT IMPLEMENTED FOR PYTHON
# CURRENTLY NOT IMPLEMENTED Due Of https://github.com/Azure/go-amqp/issues/99
// CURRENTLY NOT IMPLEMENTED
自动连接恢复
自动连接恢复默认激活:客户端库将在连接意外关闭后(例如,网络故障、节点重启等)自动恢复连接。自动拓扑恢复也在连接恢复后立即激活:客户端库将重新创建 AMQP 实体,以及为恢复的连接创建发布者和消费者。开发人员需要担心网络稳定性和节点重启的问题会少很多,因为客户端库会负责处理。
客户端每 5 秒尝试重新连接一次,直到成功。可以通过自定义退避延迟策略来更改此行为。
- Java
- C#
- Python
- Go
- JavaScript
Connection connection = environment.connectionBuilder()
.recovery()
.backOffDelayPolicy(BackOffDelayPolicy.fixed(Duration.ofSeconds(2)))
.connectionBuilder().build();
class MyBackOffDelay : IBackOffDelayPolicy {
...
}
await AmqpConnection.CreateAsync(
ConnectionSettingBuilder.Create().RecoveryConfiguration(
RecoveryConfiguration.Create()
.BackOffDelayPolicy(new MyBackOffDelay())).Build());
environment = Environment(
"amqp://guest:guest@localhost:5672/",
recovery_configuration=RecoveryConfiguration(back_off_reconnect_interval=timedelta(seconds=2)),
)
// to the BackOffReconnectInterval the client adds a random 500 ms
env := NewEnvironment("amqp://", &AmqpConnOptions{
RecoveryConfiguration: &RecoveryConfiguration{
ActiveRecovery: true,
BackOffReconnectInterval: 5 * time.Second,
MaxReconnectAttempts: 5,
},
})
// arguments are passed directly to RHEA library
// https://github.com/amqp/rhea?tab=readme-ov-file#container
//
// TOPOLOGY RECOVERY IS NOT TESTED
await environment.createConnection({
reconnect: 2000,
initialReconnectDelay: 2000,
reconnectLimit: 10,
});
如果拓扑恢复不适用于特定应用程序,也可以将其禁用。应用程序通常会注册一个连接 生命周期监听器 来了解连接何时恢复,并相应地恢复其自身状态。
- Java
- C#
- Python
- Go
- JavaScript
Connection connection = environment.connectionBuilder()
.recovery()
.topology(false) // deactivate topology recovery
.connectionBuilder()
.listeners(context -> {
// set listener that restores application state when connection is recovered
})
.build();
await AmqpConnection.CreateAsync(
ConnectionSettingBuilder.Create().RecoveryConfiguration(
RecoveryConfiguration.Create()
.Topology(false)) // deactivate topology recovery
.Build());
# CURRENTLY NOT IMPLEMENTED
// CURRENTLY NOT IMPLEMENTED
// CURRENTLY NOT IMPLEMENTED
也可以完全禁用恢复。
- Java
- C#
- Python
- Go
- JavaScript
Connection connection = environment.connectionBuilder()
.recovery()
.activated(false) // deactivate recovery
.connectionBuilder().build();
await AmqpConnection.CreateAsync(
ConnectionSettingBuilder.Create().RecoveryConfiguration(
RecoveryConfiguration.Create().
Activated(false)).// deactivate recovery
Build());
environment = Environment(
"amqp://guest:guest@localhost:5672/",
recovery_configuration=RecoveryConfiguration(active_recovery=False),
)
env := NewEnvironment("amqp://", &AmqpConnOptions{
RecoveryConfiguration: &RecoveryConfiguration{
ActiveRecovery: false,
},
})
await environment.createConnection({ reconnect: false });