配置静态 Shovel
概述
本指南重点介绍静态配置的 Shovel。它假定您熟悉 Shovel 插件背后的关键概念。
与 动态 Shovel 不同,静态 Shovel 使用 高级配置文件进行配置。它们在节点启动时启动,主要用于永久运行的工作负载。静态 Shovel 配置的任何更改都需要重新启动节点,这使其高度不灵活。
大多数用户应优先选择动态 Shovel 而非静态 Shovel,因为动态 Shovel 更灵活且易于自动化。生成动态 Shovel 定义(JSON 文档)通常比生成静态 Shovel 定义(使用 Erlang 术语)更容易。
配置
Shovel 插件的配置必须在 高级配置文件中定义。
它由一个单独的 shovels 子句组成,该子句列出了节点启动时应启动的 Shovel。
{rabbit, [
%% ...
]},
{rabbitmq_shovel, [
{shovels, [
{shovel_one, [
%% shovel_one properties ...
]},
%% ...
]}
]}
下面可以找到一个(特意冗长的)示例配置。
shovels 子句的每个元素都是一个命名的静态 Shovel。列表中的名称必须是唯一的。
Shovel 定义在顶层看起来是这样的
{shovel_name, [
{source, [
%% protocol-specific source configuration goes here
]},
{destination, [
%% protocol-specific destination configuration goes here
]},
%% 'confirm' is the default acknowledgement mode
{ack_mode, confirm},
%% reconnect with a 5 second delay
{reconnect_delay, 5}
]}
其中 shovel_name 是 Shovel 的名称(Erlang 原子)。如果名称不以小写字母开头,或者包含除字母数字字符、下划线 (_) 或 @ 之外的其他字符,则应将其用单引号 (') 括起来。
源和目标
Shovel 将消息从源传输到目标。
source 和 destination 键是强制的,并包含特定于协议的嵌套键。目前 AMQP 0.9.1 和 AMQP 1.0 是支持的两个协议。源和目标不一定使用相同的协议。所有其他属性都是可选的。
source 是一个强制键,对于不同的协议具有不同的键属性。所有协议共有的两个属性是:protocol 和 uris。protocol 支持三个值:amqp091、amqp10 和 local,分别用于 AMQP 0-9-1、AMQP 1.0 和本地 Shovel。
%% for AMQP 0-9-1
{protocol, amqp091}
uris 是 AMQP 连接 URI 的列表。
{uris, [
"amqp://fred:secret@host1.domain/my_vhost",
"amqp://john:secret@host2.domain/my_vhost"
]}
URI 语法已扩展为包含查询部分,以允许配置其他连接参数。请参阅 查询参数参考,其中包含静态 Shovel 可用的参数,例如 TLS 证书和私钥。
通用源键
AMQP 0-9-1、AMQP 1.0 和本地源支持一些键。它们在下表中进行了描述。
| 键 | 描述 |
| reconnect-delay | 在任一端断开连接后,重新连接到代理之前等待的时间(以秒为单位)。默认为 1。 将在失败后延迟五秒钟然后重新连接。值为 |
| ack-mode | 确定 Shovel 应如何确认已消费的消息。有效值为 如果设置为 如果设置为 如果设置为 |
AMQP 0-9-1 源键
AMQP 0-9-1 特定的源键包含在单独的表中。
| 键 | 描述 |
| 声明 | 在 Shovel 开始传输消息之前要由 Shovel 执行的可选 AMQP 0-9-1 操作列表。它们通常用于设置拓扑。 声明遵循 RabbitMQ Erlang 客户端使用的方法和属性名称。 一个最小化的声明示例 将首先声明一个匿名的队列,然后将其绑定到名为 声明列表的每个元素要么是单个引号括起来的原子(如 如果仅使用方法名称,则所有参数都取其默认值(如上例中的 如果提供了元组和属性列表,则列表中的属性显式指定部分或全部参数。 这是另一个示例 将声明一个持久化的、直连的(direct)交换,名为“ |
| queue | 要从中传送消息的源队列的名称,作为 Erlang 二进制值。此属性是强制的。
此队列必须存在。使用上面介绍的资源 另请参阅下面的 预先声明的拓扑部分。 |
| prefetch-count | 在任何一个时间点,通过 Shovel 复制的未确认消息的最大数量。默认为 |
预先声明的拓扑
declarations 属性通常用于设置拓扑。至少,它必须设置源队列。
存在一些部署场景,其中拓扑在启动时从定义文件中自动导入。在这些场景中,我们可以通过将以下行添加到 rabbitmq.conf 文件中来配置插件,使其等待队列可用。
shovel.topology.predeclared = true
使用上述配置,如果静态 Shovel 没有 declarations 属性或为空,插件将等待直到源的 queue 被声明。
AMQP 1.0 源键
AMQP 1.0 源设置与 AMQP 0-9-1 源的不同。
| 键 | 描述 |
| source_address | 这代表 AMQP 1.0 链接的源地址。此键是强制的。 |
| prefetch-count | 此可选键设置将授予接收链接的链接信贷数量。当信贷低于此值十分之一时,信贷将自动续订。默认值为 1000。它采用以下形式: |
本地 Shovel 源键
本地 Shovel 特定的源键包含在单独的表中。
| 键 | 描述 |
| 声明 | 在 Shovel 开始传输消息之前要由 Shovel 执行的可选 AMQP 0-9-1 操作列表。它们通常用于设置拓扑。 声明遵循 RabbitMQ Erlang 客户端使用的方法和属性名称。 一个最小化的声明示例 将首先声明一个匿名的队列,然后将其绑定到名为 声明列表的每个元素要么是单个引号括起来的原子(如 如果仅使用方法名称,则所有参数都取其默认值(如上例中的 如果提供了元组和属性列表,则列表中的属性显式指定部分或全部参数。 这是另一个示例 将声明一个持久化的、直连的(direct)交换,名为“ |
| queue | 要从中传送消息的源队列的名称,作为 Erlang 二进制值。此属性是强制的。
此队列必须存在。使用上面介绍的资源 另请参阅下面的 预先声明的拓扑部分。 |
| prefetch-count | 在任何一个时间点,通过 Shovel 复制的未确认消息的最大数量。默认为 |
预先声明的拓扑
declarations 属性通常用于设置拓扑。至少,它必须设置源队列。
存在一些部署场景,其中拓扑在启动时从定义文件中自动导入。在这些场景中,我们可以通过将以下行添加到 rabbitmq.conf 文件中来配置插件,使其等待队列可用。
shovel.topology.predeclared = true
使用上述配置,如果静态 Shovel 没有 declarations 属性或为空,插件将等待直到源的 queue 被声明。
目标
destination 是一个强制键,对于不同的协议具有不同的键属性。所有协议共有的两个属性是:protocol 和 uris。protocol 支持三个值:amqp091、amqp10 和 local,分别用于 AMQP 0-9-1、AMQP 1.0 和本地 Shovel。
%% for AMQP 0-9-1
{protocol, amqp091}
uris 是 AMQP 连接 URI 的列表。
{uris, [
"amqp://fred:secret@host1.domain/my_vhost",
"amqp://john:secret@host2.domain/my_vhost"
]}
URI 语法已扩展为包含查询部分,以允许配置其他连接参数。请参阅 查询参数参考,其中包含静态 Shovel 可用的参数,例如 TLS 证书和私钥。
通用目标键
| 键 | 描述 |
| reconnect-delay | 在任一端断开连接后,重新连接到代理之前等待的时间(以秒为单位)。默认为 1。 将在失败后延迟五秒钟然后重新连接。值为 |
AMQP 0-9-1 目标键
| 键 | 描述 |
| publish_properties | 此可选键控制 Shovel 设置或覆盖的消息属性。它采用以下形式: 其中列表中的属性在重新发布每条消息之前设置在其基本属性上。 此特定示例会将所有重新发布的设置为持久化。 默认情况下,原始消息属性会保留,但此子句可用于更改或设置任何已知属性。
|
| publish_fields | 此可选键与 其中列表中的属性用于设置重新发布消息时使用的 默认情况下,消息会使用原始交换名称和路由键重新发布。通过指定 消息将被重新发布到一个显式交换名称,并带有显式、固定的路由键。 |
| add_timestamp_header | 此布尔键控制在消息重新发布之前是否会添加自定义标头 此标头的值是消息被 Shovel 化的时间戳(自纪元以来的秒数)。默认情况下,不添加此标头。 |
| add_forward_headers | 当设置为 true 时,Shovel 将添加一些自定义消息标头: |
AMQP 1.0 目标键
| 键 | 描述 |
| target_address | 这代表发送 AMQP 1.0 链接的目标地址。 |
| properties | 此可选键控制重新发布消息时将添加哪些附加属性。它采用以下形式: 可用键包括 |
| application_properties | 此可选键声明在重新发布消息时将添加的任何附加应用程序属性。它采用以下形式: 键和值应为二进制字符串,如下例所示。 |
| add_timestamp_header | 此布尔键控制在消息重新发布之前是否会设置 此值是消息被 Shovel 化时的时间戳(自纪元以来的秒数)。默认情况下,不设置此属性。 |
| add_forward_headers | 当设置为 true 时,Shovel 将添加以下键的应用程序属性: |
本地 Shovel 目标键
| 键 | 描述 |
| add_timestamp_header | 此布尔键控制在消息重新发布之前是否会添加自定义标头 此标头的值是消息被 Shovel 化的时间戳(自纪元以来的秒数)。默认情况下,不添加此标头。 |
| add_forward_headers | 当设置为 true 时,Shovel 将添加一些自定义消息标头: |
示例配置
AMQP 0.9.1 端点之间一个相对完整的静态 Shovel 配置可能如下所示:
{rabbitmq_shovel,
[ {shovels, [ {my_first_shovel,
[ {source,
[ {protocol, amqp091},
{uris, [ "amqp://fred:secret@host1.domain/my_vhost",
"amqp://john:secret@host2.domain/my_vhost" ]},
{declarations, [ {'exchange.declare',
[ {exchange, <<"my_fanout">>},
{type, <<"fanout">>},
durable
]},
{'queue.declare',
[{arguments,
[{<<"x-message-ttl">>, long, 60000}]}]},
{'queue.bind',
[ {exchange, <<"my_fanout">>},
{queue, <<>>}
]}
]},
{queue, <<>>},
{prefetch_count, 10}
]},
{destination,
[ {protocol, amqp091},
{uris, ["amqp://"]},
{declarations, [ {'exchange.declare',
[ {exchange, <<"my_direct">>},
{type, <<"direct">>},
durable
]}
]},
{publish_properties, [ {delivery_mode, 2} ]},
{add_forward_headers, true},
{publish_fields, [ {exchange, <<"my_direct">>},
{routing_key, <<"from_shovel">>}
]}
]},
{ack_mode, on_confirm},
{reconnect_delay, 5}
]}
]}
]}
上面的配置定义了一个名为 'my_first_shovel' 的单个 Shovel。
'my_first_shovel' 将连接到其中一个代理(host1 或 host2)作为源,并直接连接到本地代理作为目标。它将在故障后,延迟 5 秒后重新连接到另一个源代理。
当连接到源时,它将声明一个直连的、扇出(fanout)类型的交换,名为 "my_fanout",一个具有每个队列的消息 TTL 的匿名队列,并将该队列绑定到交换。
当连接到目标(本地代理)时,它将声明一个持久化的、直连的交换,名为 "my_direct"。
此 Shovel 将把发送到源上的匿名队列的消息重新发布到本地交换,并使用固定的路由键 "from_shovel"。消息将是持久化的,并且仅在收到本地代理的发布确认后才会确认。
如果任何时候有至少十条未确认的消息,Shovel 消费者将不会获得更多传递。
示例配置(1.0 源 - 0.9.1 目标)
AMQP 1.0 源和 AMQP 0.9.1 目标之间的相对完整的 Shovel 配置可能如下所示:
{rabbitmq_shovel,
[ {shovels, [ {my_first_shovel,
[ {source,
[ {protocol, amqp10},
{uris, [ "amqp://fred:secret@host1.domain/my_vhost",
]},
{source_address, <<"my-source">>},
{prefetch_count, 10}
]},
{destination,
[ {protocol, amqp091},
{uris, ["amqp://"]},
{declarations, [ {'exchange.declare',
[ {exchange, <<"my_direct">>},
{type, <<"direct">>},
durable
]}
]},
{publish_properties, [ {delivery_mode, 2} ]},
{add_forward_headers, true},
{publish_fields, [ {exchange, <<"my_direct">>},
{routing_key, <<"from_shovel">>}
]}
]},
{ack_mode, on_confirm},
{reconnect_delay, 5}
]}
]}
]}
示例配置(0.9.1 源 — 1.0 目标)
AMQP 0.9.1 源和 AMQP 1.0 目标之间更广泛的 Shovel 配置可能如下所示:
{rabbitmq_shovel,
[{shovels, [{my_first_shovel,
{source,
[{protocol, amqp091},
{uris, ["amqp://fred:secret@host1.domain/my_vhost",
"amqp://john:secret@host2.domain/my_vhost"]},
{declarations, [{'exchange.declare',
[{exchange, <<"my_fanout">>},
{type, <<"fanout">>},
durable]},
{'queue.declare',
[{arguments,
[{<<"x-message-ttl">>, long, 60000}]}]},
{'queue.bind',
[{exchange, <<"my_fanout">>},
{queue, <<>>}
]}
]},
{queue, <<>>},
{prefetch_count, 10}
]},
{destination,
[{protocol, amqp10},
%% Note: for plain text SASL authentication, use
% {uris, ["amqp://user:pass@host:5672?sasl=plain"]},
%% Note: this relies on default user credentials
%% which has remote access restrictions, see
%% ./access-control to learn more
{uris, ["amqp://host:5672"]},
{properties, [{user_id, <<"my-user">>}]},
{application_properties, [{<<"my-prop">>, <<"my-prop-value">>}]},
{add_forward_headers, true},
{target_address, <<"destination-queue">>}
]},
{ack_mode, on_confirm},
{reconnect_delay, 5}
}]}
]}
}