跳至主内容
版本:4.2

配置静态 Shovel

概述

本指南重点介绍静态配置的 Shovel。它假定您熟悉 Shovel 插件背后的关键概念。

提示

动态 Shovel 是现代 Shovel 类型。如有疑问,请优先选择动态 Shovel。

为了确保某些 Shovel 在集群形成后启动,请将动态 Shovel 与 启动时导入定义结合使用。

动态 Shovel 不同,静态 Shovel 使用 高级配置文件进行配置。它们在节点启动时启动,主要用于永久运行的工作负载。静态 Shovel 配置的任何更改都需要重新启动节点,这使其高度不灵活。

大多数用户应优先选择动态 Shovel 而非静态 Shovel,因为动态 Shovel 更灵活且易于自动化。生成动态 Shovel 定义(JSON 文档)通常比生成静态 Shovel 定义(使用 Erlang 术语)更容易。

配置

Shovel 插件的配置必须在 高级配置文件中定义。

它由一个单独的 shovels 子句组成,该子句列出了节点启动时应启动的 Shovel。

{rabbit, [
%% ...
]},

{rabbitmq_shovel, [
{shovels, [
{shovel_one, [
%% shovel_one properties ...
]},
%% ...
]}
]}

下面可以找到一个(特意冗长的)示例配置

shovels 子句的每个元素都是一个命名的静态 Shovel。列表中的名称必须是唯一的。

Shovel 定义在顶层看起来是这样的

{shovel_name, [
{source, [
%% protocol-specific source configuration goes here
]},
{destination, [
%% protocol-specific destination configuration goes here
]},
%% 'confirm' is the default acknowledgement mode
{ack_mode, confirm},
%% reconnect with a 5 second delay
{reconnect_delay, 5}
]}

其中 shovel_name 是 Shovel 的名称(Erlang 原子)。如果名称不以小写字母开头,或者包含除字母数字字符、下划线 (_) 或 @ 之外的其他字符,则应将其用单引号 (') 括起来。

源和目标

Shovel 将消息从源传输到目标。

sourcedestination 键是强制的,并包含特定于协议的嵌套键。目前 AMQP 0.9.1 和 AMQP 1.0 是支持的两个协议。源和目标不一定使用相同的协议。所有其他属性都是可选的。

source 是一个强制键,对于不同的协议具有不同的键属性。所有协议共有的两个属性是:protocolurisprotocol 支持三个值:amqp091amqp10local,分别用于 AMQP 0-9-1、AMQP 1.0 和本地 Shovel。

%% for AMQP 0-9-1
{protocol, amqp091}

urisAMQP 连接 URI 的列表。

{uris, [
"amqp://fred:secret@host1.domain/my_vhost",
"amqp://john:secret@host2.domain/my_vhost"
]}

URI 语法已扩展为包含查询部分,以允许配置其他连接参数。请参阅 查询参数参考,其中包含静态 Shovel 可用的参数,例如 TLS 证书和私钥。

通用源键

AMQP 0-9-1、AMQP 1.0 和本地源支持一些键。它们在下表中进行了描述。

通用(协议无关)静态 Shovel 键(属性)
描述
reconnect-delay

在任一端断开连接后,重新连接到代理之前等待的时间(以秒为单位)。默认为 1。

{reconnect_delay, 5}

将在失败后延迟五秒钟然后重新连接。值为 0 表示不重新连接:Shovel 在第一次失败或连接尝试不成功后停止。

ack-mode

确定 Shovel 应如何确认已消费的消息。有效值为 on-confirmon-publishno-ack。默认为 on-confirm

如果设置为 on-confirm(默认),则在消息已被目标确认后,会向源代理确认。这可以处理网络错误和代理故障而不会丢失消息,并且是速度最慢的选项。

如果设置为 on-publish,则在消息已发布到目标(但尚未确认)后,会向源代理确认。在网络或代理故障的情况下,消息可能会丢失。

如果设置为 no-ack,将使用自动消息确认。此选项将提供最高的吞吐量,但安全性不高(在网络或代理故障的情况下会丢失消息)。

AMQP 0-9-1 源键

AMQP 0-9-1 特定的源键包含在单独的表中。

AMQP 0-9-1 源键(属性)
描述
声明

在 Shovel 开始传输消息之前要由 Shovel 执行的可选 AMQP 0-9-1 操作列表。它们通常用于设置拓扑。

  {declarations, [
%% declaration list
]}

声明遵循 RabbitMQ Erlang 客户端使用的方法和属性名称。

一个最小化的声明示例

  {declarations, [
'queue.declare',
{'queue.bind', [
{exchange, <<"my_exchange">>},
{queue, <<>>}
]}
]}

将首先声明一个匿名的队列,然后将其绑定到名为 "my_exchange" 的交换。在 queue.bind 方法中的 <<>> 队列名称表示“使用此通道上最后声明的队列”。

声明列表的每个元素要么是单个引号括起来的原子(如 'queue.declare')表示的 AMQP 0-9-1 方法,要么是第一个元素为方法原子,第二个元素为参数属性列表的元组。

如果仅使用方法名称,则所有参数都取其默认值(如上例中的 'queue.declare' 所示)。

如果提供了元组和属性列表,则列表中的属性显式指定部分或全部参数。

这是另一个示例

{'exchange.declare', [
{exchange, <<"my_exchange">>},
{type, <<"direct">>},
durable
]}

将声明一个持久化的、直连的(direct)交换,名为“my_exchange”。

queue

要从中传送消息的源队列的名称,作为 Erlang 二进制值。此属性是强制的。

{queue, <<"queue.1">>}

queue.1 是要从中传送消息的队列的名称,为二进制字符串。

此队列必须存在。使用上面介绍的资源 declarations 来声明队列或确保其存在。如果值为 <<>>(空二进制字符串),则使用 declarations最近声明的队列。这允许声明和使用匿名队列。

另请参阅下面的 预先声明的拓扑部分。

prefetch-count

在任何一个时间点,通过 Shovel 复制的未确认消息的最大数量。默认为 1000

{prefetch_count, 1000}

预先声明的拓扑

declarations 属性通常用于设置拓扑。至少,它必须设置源队列。

存在一些部署场景,其中拓扑在启动时从定义文件中自动导入。在这些场景中,我们可以通过将以下行添加到 rabbitmq.conf 文件中来配置插件,使其等待队列可用。

shovel.topology.predeclared = true

使用上述配置,如果静态 Shovel 没有 declarations 属性或为空,插件将等待直到源的 queue 被声明。

AMQP 1.0 源键

AMQP 1.0 源设置与 AMQP 0-9-1 源的不同。

AMQP 1.0 源键(属性)
描述
source_address

这代表 AMQP 1.0 链接的源地址。此键是强制的。

{source_address, <<"my-address">>}
prefetch-count

此可选键设置将授予接收链接的链接信贷数量。当信贷低于此值十分之一时,信贷将自动续订。默认值为 1000。它采用以下形式:

  {prefetch_count, 10}

本地 Shovel 源键

本地 Shovel 特定的源键包含在单独的表中。

本地 Shovel 源键(属性)
描述
声明

在 Shovel 开始传输消息之前要由 Shovel 执行的可选 AMQP 0-9-1 操作列表。它们通常用于设置拓扑。

  {declarations, [
%% declaration list
]}

声明遵循 RabbitMQ Erlang 客户端使用的方法和属性名称。

一个最小化的声明示例

  {declarations, [
'queue.declare',
{'queue.bind', [
{exchange, <<"my_exchange">>},
{queue, <<>>}
]}
]}

将首先声明一个匿名的队列,然后将其绑定到名为 "my_exchange" 的交换。在 queue.bind 方法中的 <<>> 队列名称表示“使用此通道上最后声明的队列”。

声明列表的每个元素要么是单个引号括起来的原子(如 'queue.declare')表示的 AMQP 0-9-1 方法,要么是第一个元素为方法原子,第二个元素为参数属性列表的元组。

如果仅使用方法名称,则所有参数都取其默认值(如上例中的 'queue.declare' 所示)。

如果提供了元组和属性列表,则列表中的属性显式指定部分或全部参数。

这是另一个示例

{'exchange.declare', [
{exchange, <<"my_exchange">>},
{type, <<"direct">>},
durable
]}

将声明一个持久化的、直连的(direct)交换,名为“my_exchange”。

queue

要从中传送消息的源队列的名称,作为 Erlang 二进制值。此属性是强制的。

{queue, <<"queue.1">>}

queue.1 是要从中传送消息的队列的名称,为二进制字符串。

此队列必须存在。使用上面介绍的资源 declarations 来声明队列或确保其存在。如果值为 <<>>(空二进制字符串),则使用 declarations最近声明的队列。这允许声明和使用匿名队列。

另请参阅下面的 预先声明的拓扑部分。

prefetch-count

在任何一个时间点,通过 Shovel 复制的未确认消息的最大数量。默认为 1000

{prefetch_count, 1000}

预先声明的拓扑

declarations 属性通常用于设置拓扑。至少,它必须设置源队列。

存在一些部署场景,其中拓扑在启动时从定义文件中自动导入。在这些场景中,我们可以通过将以下行添加到 rabbitmq.conf 文件中来配置插件,使其等待队列可用。

shovel.topology.predeclared = true

使用上述配置,如果静态 Shovel 没有 declarations 属性或为空,插件将等待直到源的 queue 被声明。

目标

destination 是一个强制键,对于不同的协议具有不同的键属性。所有协议共有的两个属性是:protocolurisprotocol 支持三个值:amqp091amqp10local,分别用于 AMQP 0-9-1、AMQP 1.0 和本地 Shovel。

%% for AMQP 0-9-1
{protocol, amqp091}

urisAMQP 连接 URI 的列表。

{uris, [
"amqp://fred:secret@host1.domain/my_vhost",
"amqp://john:secret@host2.domain/my_vhost"
]}

URI 语法已扩展为包含查询部分,以允许配置其他连接参数。请参阅 查询参数参考,其中包含静态 Shovel 可用的参数,例如 TLS 证书和私钥。

通用目标键

通用目标键(属性)
描述
reconnect-delay

在任一端断开连接后,重新连接到代理之前等待的时间(以秒为单位)。默认为 1。

{reconnect_delay, 5}

将在失败后延迟五秒钟然后重新连接。值为 0 表示不重新连接:Shovel 在第一次失败或连接尝试不成功后停止。

AMQP 0-9-1 目标键

AMQP 0-9-1 目标键(属性)
描述
publish_properties

此可选键控制 Shovel 设置或覆盖的消息属性。它采用以下形式:

{publish_properties, [
{delivery_mode, 2}
]}

其中列表中的属性在重新发布每条消息之前设置在其基本属性上。

此特定示例会将所有重新发布的设置为持久化。

{publish_properties, [
{delivery_mode, 2}
]}

默认情况下,原始消息属性会保留,但此子句可用于更改或设置任何已知属性。

  • content_type
  • content_encoding
  • headers
  • delivery_mode
  • priority
  • correlation_id
  • reply_to
  • expiration
  • message_id
  • timestamp
  • type
  • user_id
  • app_id
  • cluster_id
publish_fields

此可选键与 publish_properties 类似,但它控制发布设置而不是消费者可访问的消息属性。它采用以下形式:

{publish_fields, [
{exchange, <<"my_exchange">>},
{routing_key, <<"from_shovel">>}
]}

其中列表中的属性用于设置重新发布消息时使用的 basic.publish 方法的字段

默认情况下,消息会使用原始交换名称和路由键重新发布。通过指定

{publish_fields, [
{exchange, <<"my_exchange">>},
{routing_key, <<"from_shovel">>}
]}

消息将被重新发布到一个显式交换名称,并带有显式、固定的路由键。

add_timestamp_header

此布尔键控制在消息重新发布之前是否会添加自定义标头 x-shovelled-timestamp

{add_timestamp_header, true}

此标头的值是消息被 Shovel 化的时间戳(自纪元以来的秒数)。默认情况下,不添加此标头。

add_forward_headers

当设置为 true 时,Shovel 将添加一些自定义消息标头:shovelled-byshovel-typeshovel-name,以提供有关传输的附加元数据。

{add_forward_headers, true}

AMQP 1.0 目标键

AMQP 1.0 目标键(属性)
描述
target_address

这代表发送 AMQP 1.0 链接的目标地址。

{target_address, <<"some-address">>}
properties

此可选键控制重新发布消息时将添加哪些附加属性。它采用以下形式:

{properties, [
{content_typle, <<"application/json">>}
]}

可用键包括 message_iduser_idtosubjectreply_tocorrelation_idcontent_typecontent_encodingabsolute_expiry_timecreation_time。有关所有可用键和值的详细信息,请参阅 AMQP 1.0 规范(§3.2.4)。

application_properties

此可选键声明在重新发布消息时将添加的任何附加应用程序属性。它采用以下形式:

{application_properties, [
{<<"application-key-1">>, <<"value-1">>},
{<<"application-key-2">>, <<"value-2">>}
]}

键和值应为二进制字符串,如下例所示。

add_timestamp_header

此布尔键控制在消息重新发布之前是否会设置 creation_time 属性。

{add_timestamp_header, true}

此值是消息被 Shovel 化时的时间戳(自纪元以来的秒数)。默认情况下,不设置此属性。

add_forward_headers

当设置为 true 时,Shovel 将添加以下键的应用程序属性:shovelled-byshovel-typeshovel-name,以提供有关传输的附加元数据。

{add_forward_headers, true}

本地 Shovel 目标键

本地 Shovel 目标键(属性)
描述
add_timestamp_header

此布尔键控制在消息重新发布之前是否会添加自定义标头 x-opt-shovelled-timestamp

{add_timestamp_header, true}

此标头的值是消息被 Shovel 化的时间戳(自纪元以来的秒数)。默认情况下,不添加此标头。

add_forward_headers

当设置为 true 时,Shovel 将添加一些自定义消息标头:x-opt-shovelled-byx-opt-shovel-typex-opt-shovel-name,以提供有关传输的附加元数据。

{add_forward_headers, true}

示例配置

AMQP 0.9.1 端点之间一个相对完整的静态 Shovel 配置可能如下所示:

{rabbitmq_shovel,
[ {shovels, [ {my_first_shovel,
[ {source,
[ {protocol, amqp091},
{uris, [ "amqp://fred:secret@host1.domain/my_vhost",
"amqp://john:secret@host2.domain/my_vhost" ]},
{declarations, [ {'exchange.declare',
[ {exchange, <<"my_fanout">>},
{type, <<"fanout">>},
durable
]},
{'queue.declare',
[{arguments,
[{<<"x-message-ttl">>, long, 60000}]}]},
{'queue.bind',
[ {exchange, <<"my_fanout">>},
{queue, <<>>}
]}
]},
{queue, <<>>},
{prefetch_count, 10}
]},
{destination,
[ {protocol, amqp091},
{uris, ["amqp://"]},
{declarations, [ {'exchange.declare',
[ {exchange, <<"my_direct">>},
{type, <<"direct">>},
durable
]}
]},
{publish_properties, [ {delivery_mode, 2} ]},
{add_forward_headers, true},
{publish_fields, [ {exchange, <<"my_direct">>},
{routing_key, <<"from_shovel">>}
]}
]},
{ack_mode, on_confirm},
{reconnect_delay, 5}
]}
]}
]}

上面的配置定义了一个名为 'my_first_shovel' 的单个 Shovel。

'my_first_shovel' 将连接到其中一个代理(host1host2)作为源,并直接连接到本地代理作为目标。它将在故障后,延迟 5 秒后重新连接到另一个源代理。

当连接到源时,它将声明一个直连的、扇出(fanout)类型的交换,名为 "my_fanout",一个具有每个队列的消息 TTL 的匿名队列,并将该队列绑定到交换。

当连接到目标(本地代理)时,它将声明一个持久化的、直连的交换,名为 "my_direct"

此 Shovel 将把发送到源上的匿名队列的消息重新发布到本地交换,并使用固定的路由键 "from_shovel"。消息将是持久化的,并且仅在收到本地代理的发布确认后才会确认。

如果任何时候有至少十条未确认的消息,Shovel 消费者将不会获得更多传递。

示例配置(1.0 源 - 0.9.1 目标)

AMQP 1.0 源和 AMQP 0.9.1 目标之间的相对完整的 Shovel 配置可能如下所示:

{rabbitmq_shovel,
[ {shovels, [ {my_first_shovel,
[ {source,
[ {protocol, amqp10},
{uris, [ "amqp://fred:secret@host1.domain/my_vhost",
]},
{source_address, <<"my-source">>},
{prefetch_count, 10}
]},
{destination,
[ {protocol, amqp091},
{uris, ["amqp://"]},
{declarations, [ {'exchange.declare',
[ {exchange, <<"my_direct">>},
{type, <<"direct">>},
durable
]}
]},
{publish_properties, [ {delivery_mode, 2} ]},
{add_forward_headers, true},
{publish_fields, [ {exchange, <<"my_direct">>},
{routing_key, <<"from_shovel">>}
]}
]},
{ack_mode, on_confirm},
{reconnect_delay, 5}
]}
]}
]}

示例配置(0.9.1 源 — 1.0 目标)

AMQP 0.9.1 源和 AMQP 1.0 目标之间更广泛的 Shovel 配置可能如下所示:

{rabbitmq_shovel,
[{shovels, [{my_first_shovel,
{source,
[{protocol, amqp091},
{uris, ["amqp://fred:secret@host1.domain/my_vhost",
"amqp://john:secret@host2.domain/my_vhost"]},
{declarations, [{'exchange.declare',
[{exchange, <<"my_fanout">>},
{type, <<"fanout">>},
durable]},
{'queue.declare',
[{arguments,
[{<<"x-message-ttl">>, long, 60000}]}]},
{'queue.bind',
[{exchange, <<"my_fanout">>},
{queue, <<>>}
]}
]},
{queue, <<>>},
{prefetch_count, 10}
]},
{destination,
[{protocol, amqp10},
%% Note: for plain text SASL authentication, use
% {uris, ["amqp://user:pass@host:5672?sasl=plain"]},
%% Note: this relies on default user credentials
%% which has remote access restrictions, see
%% ./access-control to learn more
{uris, ["amqp://host:5672"]},
{properties, [{user_id, <<"my-user">>}]},
{application_properties, [{<<"my-prop">>, <<"my-prop-value">>}]},
{add_forward_headers, true},
{target_address, <<"destination-queue">>}
]},
{ack_mode, on_confirm},
{reconnect_delay, 5}
}]}
]}
}
© . This site is unofficial and not affiliated with VMware.