跳到主要内容
版本:4.1

通道

概述

本指南涵盖了与通道相关的各种主题,通道是 AMQP 0-9-1 特定的抽象概念。通道不能脱离连接而存在,因此强烈建议先熟悉连接指南

本指南涵盖:

以及其他与连接相关的主题。

基础知识

某些应用程序需要与代理建立多个逻辑连接。但是,同时保持许多 TCP 连接打开是不可取的,因为这样做会消耗系统资源,并使防火墙配置更加困难。AMQP 0-9-1 连接通过通道进行多路复用,通道可以被认为是“共享单个 TCP 连接的轻量级连接”。

客户端执行的每个协议操作都在通道上进行。特定通道上的通信与另一个通道上的通信完全分离,因此每个协议方法也携带一个通道 ID(也称为通道号),这是一个整数,代理和客户端都使用它来确定该方法用于哪个通道。

通道仅在连接的上下文中存在,绝不会单独存在。当连接关闭时,其上的所有通道也会关闭。

对于使用多个线程/进程进行处理的应用程序,通常为每个线程/进程打开一个新通道,而不是在它们之间共享通道。

通道生命周期

打开通道

应用程序在成功打开连接后立即打开通道。

这是一个 Java 客户端示例,它在打开新连接后打开一个具有自动分配通道 ID 的新通道

ConnectionFactory cf = new ConnectionFactory();
Connection conn = cf.createConnection();

Channel ch = conn.createChannel();

// ... use the channel to declare topology, publish, consume

在 .NET 客户端中,通道使用 IModel 接口表示,因此 API 中的名称有所不同

var cf = new ConnectionFactory();
var conn = cf.newConnection();

// the .NET client calls channels "models"
var ch = conn.CreateModel();

// ... use the channel to declare topology, publish, consume

与连接非常相似,通道应该是长期存在的。也就是说,不需要为每个操作都打开一个通道,这样做效率非常低,因为打开通道需要一次网络往返。

关闭通道

当不再需要通道时,应将其关闭。关闭通道将使其不可用,并安排回收其资源

Channel ch = conn.createChannel();

// do some work

// close the channel when it is no longer needed

ch.close();

使用 .NET 客户端的相同示例

var ch = await conn.CreateChannelAsync();

// do some work

// close the channel when it is no longer needed

ch.Close();

如上所述,已关闭的通道无法使用。尝试在已关闭的通道上执行操作将导致异常,表明通道已关闭。

当通道的连接关闭时,该通道也会关闭。

如果在消费者确认了通道上的一些交付后立即关闭通道,则确认可能无法在通道终止之前到达其目标队列。在这种情况下,通道上具有待处理确认的消息将在通道关闭后自动重新排队。

这种情况通常适用于生命周期短的通道的工作负载。使用长期存在的通道并以消费者可以处理重新交付的方式设计消费者将缓解上述行为。长期存在的通道通常也具有更好的性能。请注意,重新交付的消息将被明确标记

通道和错误处理

在上面的部分中,通道是由应用程序关闭的。通道还可以通过另一种方式关闭:由于协议异常。

某些场景被认为是协议中可恢复的(“软”)错误。它们使通道关闭,但应用程序可以打开另一个通道并尝试恢复或重试多次。最常见的例子是:

  • 使用不匹配的属性重新声明现有队列或交换机将失败,并出现 406 PRECONDITION_FAILED 错误
  • 访问用户不允许访问的资源将失败,并出现 403 ACCESS_REFUSED 错误
  • 绑定不存在的队列或不存在的交换机将失败,并出现 404 NOT_FOUND 错误
  • 从不存在的队列消费将失败,并出现 404 NOT_FOUND 错误
  • 发布到不存在的交换机将失败,并出现 404 NOT_FOUND 错误
  • 从声明连接以外的连接访问独占队列将失败,并出现 405 RESOURCE_LOCKED

客户端库提供了一种观察和响应通道异常的方法。例如,在 Java 客户端中,有一种注册错误处理程序并访问通道关闭(关闭)原因的方法。

任何尝试在已关闭通道上执行的操作都将失败并出现异常。请注意,当 RabbitMQ 关闭通道时,它会使用异步协议方法通知客户端。换句话说,导致通道异常的操作不会立即失败,但通道关闭事件处理程序会在之后不久触发。

某些客户端库可能使用等待响应的阻塞操作。在这种情况下,它们可能会以不同的方式传达通道异常,例如,使用运行时异常、错误类型或适用于该语言的其他方式。

有关更完整的错误代码列表,请参阅AMQP 0-9-1 参考

资源使用

每个通道在客户端上消耗相对少量的内存。根据客户端库的实现细节,它还可以使用专用的线程池(或类似物),在其中调度消费者操作,因此使用一个或多个线程(或类似物)。

每个通道还在客户端连接到的节点上消耗相对少量的内存,以及一些 Erlang 进程。由于一个节点通常服务于多个通道连接,因此过度通道使用或通道泄漏的影响将主要反映在 RabbitMQ 节点的指标中,而不是客户端的指标中。

考虑到这两个因素,强烈建议限制每个连接使用的通道数量。作为指导,大多数应用程序可以为每个连接使用个位数的通道。那些具有特别高的并发率的应用程序(通常此类应用程序是消费者)可以从每个线程/进程/协程一个通道开始,并在指标表明原始模型不再可持续时切换到通道池,例如,因为它消耗了太多内存。

请参阅监控、指标和诊断部分,了解如何检查通道、连接上的通道数量、通道 churn 率等等。

每个连接的最大通道数

可以在连接时由客户端和服务器协商同时在一个连接上打开的最大通道数。该值对于 RabbitMQ 和客户端库都是可配置的。

在服务器端,该限制由 channel_max 控制

# no more 100 channels can be opened on a connection at the same time
channel_max = 100

如果超出配置的限制,连接将被关闭并出现致命错误

2019-02-11 16:04:06.296 [error] <0.887.0> Error on AMQP connection <0.887.0> (127.0.0.1:49956 -> 127.0.0.1:5672, vhost: '/', user: 'guest', state: running), channel 23:
operation none caused a connection exception not_allowed: "number of channels opened (22) has reached the negotiated channel_max (22)"

可以配置客户端以允许每个连接的通道数少于服务器。对于 RabbitMQ Java 客户端ConnectionFactory#setRequestedChannelMax 是控制限制的方法

ConnectionFactory cf = new ConnectionFactory();
// Ask for up to 32 channels per connection. Will have an effect as long as the server is configured
// to use a higher limit, otherwise the server's limit will be used.
cf.setRequestedChannelMax(32);

对于 RabbitMQ .NET 客户端,请使用 ConnectionFactory#RequestedChannelMax 属性

var cf = new ConnectionFactory();
// Ask for up to 32 channels per connection. Will have an effect as long as the server is configured
// to use a higher limit, otherwise the server's limit will be used.
cf.RequestedChannelMax = 32;

将使用两者中的较低值:客户端无法配置为允许的通道数多于服务器配置的最大值。尝试这样做的客户端将在日志中遇到如下错误

2019-02-11 16:03:16.543 [error] <0.882.0> closing AMQP connection <0.882.0> (127.0.0.1:49911 -> 127.0.0.1:5672):
failed to negotiate connection parameters: negotiated channel_max = 2047 is higher than the maximum allowed value (32)

每个节点的最大通道数

可以使用配置参数 channel_max_per_node 配置允许在集群中每个节点上打开的最大通道数

# no more than 500 channels can be opened on each node at the same time
channel_max_per_node = 500

监控、指标和诊断

由于通道数量会影响节点资源的使用,因此当前打开的通道数量和通道打开/关闭速率是系统的重要指标,应进行监控。监控它们将有助于检测许多常见问题

  • 通道泄漏
  • 高通道 churn

这两个问题最终都会导致节点资源耗尽

单个通道指标,例如未确认消息的数量或 basic.get 操作速率,可以帮助识别应用程序行为中的异常和低效率。

内存使用

监控系统和操作员可能都需要检查通道在一个节点上消耗了多少内存,一个节点上的通道总数,然后确定每个连接上有多少通道。

通道数量显示在管理 UI 的概览选项卡上,连接数量也是如此。通过将通道数量除以连接数量,操作员可以确定每个连接的平均通道数。

要了解节点上通道使用了多少内存,请使用rabbitmq-diagnostics memory_breakdown

rabbitmq-diagnostics memory_breakdown -q --unit mb
# => [elided for brevity]
# ...
# => connection_channels: 3.596 mb (2.27%)
# ...
# => [elided for brevity]

有关详细信息,请参阅RabbitMQ 内存使用分析指南

通道泄漏

通道泄漏是指应用程序重复打开通道而不关闭它们,或者至少仅关闭其中一部分的情况。

通道泄漏最终会耗尽节点(或多个目标节点)的 RAM 和 CPU 资源。

相关指标

管理 UI 的概览选项卡列出了当前用户有权访问的所有虚拟主机中的通道总数

mgmt-ui-global-channel-count.png

要检查连接上的当前通道数以及每个连接的通道限制,请导航到“连接”选项卡,并启用相关的列(如果未显示)。

Per connection channel count in management UI

概览和单个节点页面提供了自RabbitMQ 3.7.9 以来的通道 churn 率图表。如果通道打开操作的速率始终高于通道关闭操作的速率,则表明其中一个应用程序中存在通道泄漏

Channel count growth in management UI

要找出哪个连接泄漏通道,请按照本指南中演示的方式检查每个连接的通道计数。

高通道 Churn

当系统的新打开通道速率持续很高,并且其关闭通道速率也持续很高时,就称该系统具有高通道 churn。这通常意味着应用程序使用生命周期短的通道,或者通道经常由于通道级异常而关闭。

虽然对于某些工作负载来说,这是系统的自然状态,但在可能的情况下,应使用长期存在的通道。

管理 UI 提供了通道 churn 率的图表。下图是一个图表,显示了相当低的通道 churn,在给定的时间段内,通道打开和关闭的数量几乎相同

Node channel churn in management UI

虽然连接和断开连接速率是特定于系统的,但持续高于 100/秒的速率可能表明一个或多个应用程序的连接管理不佳,通常值得调查。

High channel churn in management UI

请注意,某些客户端和运行时(尤其是 PHP)不使用长期存在的连接,除非使用专用代理,否则预计它们会出现高连接 churn 率。

在管理 UI 中检查通道及其状态

要在管理 UI 中检查通道,请导航到“通道”选项卡,并根据需要添加或删除列

High channel churn in management UI

使用 CLI 工具检查通道及其状态

rabbitmqctl list_connectionsrabbitmqctl list_channels 是用于检查每个连接的通道计数和通道详细信息(例如消费者数量、未确认消息prefetch 等)的主要命令。

rabbitmqctl list_connections name channels -q
# => name channels
# => 127.0.0.1:52956 -> 127.0.0.1:5672 10
# => 127.0.0.1:52964 -> 127.0.0.1:5672 33

最右边的列包含连接上的通道计数。

可以禁止显示表头

rabbitmqctl list_connections name channels -q --no-table-headers
# => 127.0.0.1:52956 -> 127.0.0.1:5672 10
# => 127.0.0.1:52964 -> 127.0.0.1:5672 33

要检查单个通道,请使用 rabbitmqctl list_channels

rabbitmqctl list_channels -q
# => pid user consumer_count messages_unacknowledged
# => <rabbit@mercurio.3.815.0> guest 0 0
# => <rabbit@mercurio.3.820.0> guest 0 0
# => <rabbit@mercurio.3.824.0> guest 0 0
# => <rabbit@mercurio.3.828.0> guest 0 0
# => <rabbit@mercurio.3.832.0> guest 0 0
# => <rabbit@mercurio.3.839.0> guest 0 0
# => <rabbit@mercurio.3.840.0> guest 0 0

可以禁止显示表头

rabbitmqctl list_channels -q --no-table-headers
# => <rabbit@mercurio.3.815.0> guest 0 0
# => <rabbit@mercurio.3.820.0> guest 0 0
# => <rabbit@mercurio.3.824.0> guest 0 0
# => <rabbit@mercurio.3.828.0> guest 0 0
# => <rabbit@mercurio.3.832.0> guest 0 0
# => <rabbit@mercurio.3.839.0> guest 0 0
# => <rabbit@mercurio.3.840.0> guest 0 0

可以显示不同的列集

rabbitmqctl list_channels -q --no-table-headers vhost connection number  prefetch_count messages_unconfirmed
# => / <rabbit@mercurio.3.799.0> 1 0 0
# => / <rabbit@mercurio.3.802.0> 1 0 0
# => / <rabbit@mercurio.3.799.0> 2 0 0
# => / <rabbit@mercurio.3.799.0> 3 0 0
# => / <rabbit@mercurio.3.802.0> 2 0 0
# => / <rabbit@mercurio.3.802.0> 3 0 0
# => / <rabbit@mercurio.3.799.0> 4 0 0
# => / <rabbit@mercurio.3.802.0> 4 0 0
# => / <rabbit@mercurio.3.799.0> 5 0 0
# => / <rabbit@mercurio.3.799.0> 6 0 0
rabbitmqctl list_channels -s vhost connection number confirm
# => / <rabbit@mercurio.3.799.0> 1 false
# => / <rabbit@mercurio.3.802.0> 1 false
# => / <rabbit@mercurio.3.799.0> 2 false
# => / <rabbit@mercurio.3.799.0> 3 false
# => / <rabbit@mercurio.3.802.0> 2 false
# => / <rabbit@mercurio.3.802.0> 3 false
# => / <rabbit@mercurio.3.799.0> 4 false
# => / <rabbit@mercurio.3.802.0> 4 false
# => / <rabbit@mercurio.3.799.0> 5 false

发布者流控制

发布消息的通道可能会超过系统的其他部分,最可能是繁忙的队列和执行复制的队列。当发生这种情况时,流控制将应用于发布通道,进而应用于连接。仅消费消息的通道和连接不受影响。

对于使用自动确认模式的较慢的消费者,当写入 TCP 套接字时,连接和通道很可能遇到流控制。

监控系统可以收集处于流状态的连接数量的指标。经常遇到流控制的应用程序可能会考虑使用单独的连接来发布和消费,以避免流控制对非发布操作(例如队列管理)的影响。

© . All rights reserved.