RabbitMQ 是一个多协议消息代理,支持
消息可以很容易地通过一种协议发布,并被其他协议消费。 这就需要在不同协议格式之间进行数据格式转换。 在 RabbitMQ 3.13 之前,所有消息在发布到 RabbitMQ 时都会被转换为基于 AMQP 0.9.1 协议的内部格式。 这种方法通常导致不必要的转换和/或数据保真度问题。
在当前的 RabbitMQ 版本中,AMQP、AMQP 0.9.1 和 MQTT 协议的方法已更改,消息将始终以其原始格式存储,并且仅在被与原始协议不同的协议消费时才进行转换。 这样做的好处是,当原始协议和消费协议相同时,不会丢失任何协议特定的信息。
此规则的唯一例外是流:无论使用哪种协议发布,流在内部都将其消息存储为 AMQP 编码的数据。 因此,例如,当使用 AMQP 0.9.1 发布时,会发生到 AMQP 的转换。
所有转换都已重写并正式化。 本文档旨在捕获已实施的转换规则,以便应用程序开发人员可以回顾本文档,以了解他们的消息在多协议消息传递应用程序中将如何转换。
shortstr
:(条件)一个小于 256 字节的字符串,并且仅包含有效的 UTF-8 编码数据,且不包含 NUL
(二进制零八位字节)字符。
*
:被理解为“任何”(字段/类型等)。
条件可以按书写顺序读取以进行评估。 例如,类型为 utf8
且超过 255 字节的 AMQP message_id
将无法通过第一个 shortstr
条件,然后会落到下一行。 如果没有匹配的行,则数据将不会被转换(因此将无法供消费应用程序使用)。
AMQP 1.0 -> AMQP 0.9.1
如果消息通过 AMQP 0.9.1 消费,并且消息符合以下任一条件,则会发生从 AMQP 到 AMQP 0.9.1 的转换
- 是使用 AMQP 1.0 发布的,或者
- 是从流中消费的。
AMQP 部分 | AMQP 字段 | AMQP 类型 | 条件 | AMQP 0.9.1 部分 | AMQP 0.9.1 字段 | AMQP 0.9.1 类型 | 注释 |
---|
header | durable | boolean | | properties | delivery_mode | ubyte | 2 = 持久,1 = 瞬态 |
header | priority | ubyte | | properties | priority | ubyte | |
header | ttl | milliseconds (uint) | | properties | expiration | shortstr | 毫秒转换为字符串值 |
properties | message_id | utf8 | shortstr | properties | message_id | shortstr | |
properties | message_id | utf8 | > 256 字节 | properties | headers,键:x-message-id | longstr | |
properties | message_id | uuid (16 字节二进制) | | properties | message_id | shortstr | 转换为文本,例如“urn:uuid:550e8400-e29b-41d4-a716-446655440000” |
properties | message_id | ulong | | properties | message_id | shortstr | 转换为文本 |
properties | message_id | binary | | properties | headers,键:x-message-id | binary | |
properties | user_id | binary | shortstr | properties | user_id | shortstr | |
properties | to | address | | | | | |
properties | subject | utf8 | | | | | |
properties | reply_to | address | shortstr | properties | reply_to | shortstr | |
properties | correlation_id | utf8 | shortstr | properties | correlation_id | shortstr | |
properties | correlation_id | utf8 | > 256 字节 | properties | headers,键:x-correlation-id | longstr | |
properties | correlation_id | uuid (16 字节二进制) | | properties | correlation_id | shortstr | 转换为文本表示形式,例如“urn:uuid:550e8400-e29b-41d4-a716-446655440000” |
properties | correlation_id | ulong | | properties | correlation_id | shortstr | 转换为数字的文本表示形式 |
properties | correlation_id | binary | | properties | headers,键:x-correlation-id | binary | |
properties | content_type | symbol | | properties | content_type | shortstr | |
properties | content_encoding | symbol | | properties | content_encoding | shortstr | |
properties | absolute_expiry_time | timestamp | | | | | |
properties | creation_time | timestamp | | properties | timestamp | timestamp | 转换为秒 |
properties | group_id | utf8 | shortstr | properties | app_id | shortstr | |
properties | group_sequence | sequence-no | | | | | |
properties | reply_to_group_id | utf8 | | | | | |
application properties | * | *(参见类型转换) | 键:shortstr | properties | headers | * | |
message annotations | *(符号 - x-cc*) | | 键:x-cc | properties | headers,键:“CC” | longstr | |
message annotations | *(符号 - x-*) | *(参见类型转换) | 键:shortstr & x-* | properties | headers | * | 通常这意味着 x- headers |
data | | data | single data section | payload | | binary | 从数据部分提取的纯二进制 |
data(多个) | * | * | multiple data sections | payload | | AMQP 1.0 编码的二进制 | properties.type 将设置为 "amqp-1.0" |
amqp.value | * | * | | payload | | AMQP 1.0 编码的二进制 | properties.type 将设置为 "amqp-1.0" |
amqp.sequence | * | * | | payload | | AMQP 1.0 编码的二进制 | properties.type 将设置为 "amqp-1.0" |
类型转换
AMQP 1.0 类型 | 条件 | AMQP 0.9.1 类型 | 注释 |
---|
utf8(字符串) | | longstr | RabbitMQ 不支持 shortstr 标头值,因此除非它是基本属性中的字段,否则所有 utf8 输入都将转换为 longstr |
binary | | binary | |
long | | long | |
ulong | | long | 溢出风险 |
ubyte | | unsignedbyte | |
short | | short | |
ushort | | usignedshort | |
uint | | unsignedint | |
int | | signedint | |
double | | double | |
float | | float | |
boolean | | bool | |
timestamp | | timestamp(秒) | 值除以 1000 |
byte | | byte | |
null | | void | |
list | | array | |
map | | table | |
symbol | | longstr | |
AMQP 0.9.1 -> AMQP 1.0
AMQP 0.9.1 部分 | AMQP 0.9.1 字段 | AMQP 0.9.1 类型 | 条件 | AMQP 1.0 部分 | AMQP 1.0 字段 | AMQP 1.0 类型 | 注释 |
---|
basic properties | message_id | shortstr | valid urn uuid | properties | message_id | uuid | |
basic properties | message_id | shortstr | | properties | message_id | utf8 | |
basic properties | correlation_id | shortstr | valid urn uuid | properties | correlation_id | uuid | |
basic properties | correlation_id | shortstr | | properties | correlation_id | utf8 | |
basic properties | user_id | shortstr | | properties | user_id | binary | |
basic properties | expiration | shortstr | if convertible to numeric type | header | ttl | uint | |
basic properties | type | shortstr | | message annotations | x-basic-type | utf8 | |
basic properties | reply_to | shortstr | | properties | reply_to | utf8 | |
basic properties | app_id | shortstr | | properties | group-id | utf8 | |
basic properties | timestamp | timestamp(秒) | | properties | creation_time | timestamp | 转换为毫秒 |
basic properties | content-type | shortstr | | properties | content-type | symbol | |
basic properties | content-encoding | shortstr | | properties | content-encoding | symbol | |
basic properties | delivery-mode | octet | | header | durable | boolean | 之前在 x-basic-delivery-mode 中 |
basic properties | priority | octet | | header | priority | ubyte | 之前在 x-basic-priority 中 |
basic.properties | headers | | Key=x-amqp-1.0-properties | properties | - | - | 旧版 1.0 插件标头 |
basic.properties | headers | | Key=x-amqp-1.0-application-properties | application properties | - | - | 旧版 1.0 插件标头 |
basic.properties | headers | | Key=x-amqp-1.0-message-annotations | message annotations | - | - | 旧版 1.0 插件标头 |
basic.properties | headers | | Key=x-reply-to-topic | properties | reply_to | utf8 | "/topic/" RK |
basic properties | headers | * | Key begins with "x-" | message annotations | Key | * | 参见类型转换规则 |
basic properties | headers | table | Value not array or table | application properties | Key | * | 参见类型转换规则 |
basic properties | headers | table | Value is array or table | - | - | - | 不转换 |
payload | - | binary | | data | - | data | |
类型转换
AMQP 0.9.1 类型 | 条件 | AMQP 1.0 类型 | 注释 |
---|
longstr | shortstr | utf8 | 性能/准确性权衡 |
longstr | | binary | |
long | | long | |
ubyte | | ubyte | |
short | | short | |
ushort | | ushort | |
uint | | uint | |
int | | int | |
double | | double | |
float | | float | |
bool | | boolean | |
binary | | binary | |
timestamp | | timestamp | 从秒转换为毫秒 |
byte | | byte | |
void | | null | |
array | | list | |
table | | map | |
table.key | x-* header | symbol | |
table.key | | utf8 | |
table.value | | * | 根据此表转换 |
MQTT 5.0 -> AMQP 1.0
MQTT 5.0 部分 | MQTT 字段 | MQTT 5.0 类型 | 条件 | AMQP 1.0 部分 | AMQP 1.0 字段 | AMQP 1.0 类型 | 注释 |
---|
Fixed Header | Dup | Bits | | | | | 设置为 header first-acquirer 没有意义,因为 DUP 标志仅适用于从客户端到服务器(从服务器到客户端的消费由队列发送的 Redelivered 标志确定) |
Fixed Header | QoS | Bits | | header | durable | boolean | 如果 QoS > 0,则 durable 为 true |
Fixed Header | Retain | Bits | | | | | |
Variable Header | Payload Format Indicator | Bits | | | | | 参见“Payload”行,条件为“Payload Format Indicator set” |
Variable Header | Message Expiry Interval | uint | | header | ttl | milliseconds | 秒转换为毫秒 |
Variable Header | Topic Alias | ushort | | | | | |
Variable Header | Response Topic | utf8 | | properties | reply_to | utf8 | 将 MQTT 主题转换为 AMQP 路由键 (RK)。 将 reply-to 地址设置为 "/exchange/" X "/" RK。 |
Variable Header | Correlation Data | binary | urn:uuid | properties | correlation_id | uuid | |
Variable Header | Correlation Data | binary | | properties | correlation_id | binary | |
Variable Header | User Property | utf8 | 键以 "x-" 开头且键为 ASCII | message annotation | | 值:utf8 | 键的类型为 symbol |
| | | 键不以 "x-" 开头 | application properties | | 值:utf8 | 键的类型为 utf8 |
Variable Header | Subscription Identifier | uint | | | | | |
Variable Header | Content Type | utf8 | valid ASCII | properties | content_type | symbol | MQTT 内容类型是 UTF-8,而 AMQP 1.0 内容类型仅为 ASCII |
Payload | | | Payload Format Indicator unset | data | | data | |
Payload | | | Payload Format Indicator set | amqp.value | | utf8 | 如果设置了 Payload Format Indicator,则将 MQTT payload 转换为字符串(即单个 AMQP 值部分),因为 AMQP 字符串是 UTF-8 编码的 |
类型转换
MQTT 5.0 类型 | 条件 | AMQP 1.0 类型 | 注释 |
---|
Bits | | boolean | 仅转换选定的标志 |
ushort | | ushort | |
uint | | uint | |
utf8 | | utf8 | |
binary data | utf8 | utf8 | |
binary data | | binary | |
utf8 string pairs | | map | 删除重复键,但保持顺序 |
MQTT 5.0 -> AMQP 0.9.1
MQTT 5.0 部分 | MQTT 5.0 字段 | MQTT 5.0 类型 | 条件 | AMQP 0.9.1 部分 | AMQP 0.9.1 字段 | AMQP 0.9.1 类型 | 注释 |
---|
Fixed Header | Dup | Bits | | | | | 设置为 basic.deliver 字段 redelivered 没有意义,因为 DUP 标志仅适用于从客户端到服务器(从服务器到客户端的消费由队列发送的 Redelivered 标志确定) |
Fixed Header | QoS | Bits | | properties | delivery_mode | | QoS 0 映射到 delivery_mode 1,QoS 1 映射到 delivery_mode 2 |
Fixed Header | Retain | Bits | | | | | |
Variable Header | Payload Format Indicator | Bits | | | | | AMQP 0.9.1 content_encoding 指的是 MIME 内容编码,其有效值在 https://www.iana.org/assignments/http-parameters/http-parameters.xml 中列出。 因此,AMQP 0.9.1 content_encoding 是不同的东西。 它最好映射到 content_type 的 text/<subtype>; charset="utf-8 ,但未定义 <subtype> 。 此外,MQTT 5.0 属性 Content Type 已经定义。 |
Variable Header | Message Expiry Interval | uint | | properties | expiration | shortstr | 秒转换为毫秒 |
Variable Header | Topic Alias | ushort | | | | | |
Variable Header | Response Topic | utf8 | | properties | headers,键:x-reply-to-topic | longstr | AMQP 0.9.1 属性 reply_to 指的是队列名称,而不是主题名称 |
Variable Header | Correlation Data | binary | shortstr | properties | correlation_id | shortstr | |
| | | | properties | headers,键:x-correlation-id | longstr | |
Variable Header | User Property | utf8 string pair | name is shortstr | properties | headers | longstr | RabbitMQ 不能将 shortstr 作为例如标头值。 |
Variable Header | Subscription Identifier | variable byte integer | | | | | |
Variable Header | Content Type | utf8 | shortstr | properties | content_type | shortstr | |
Payload | | | | payload | | | |
类型转换
MQTT 5.0 类型 | 条件 | AMQP 0.9.1 类型 | 注释 |
---|
Bits | | Bits | |
ushort | | ushort | |
uint | | uint | |
utf8 | < 256 字节 且 目标不是标头 | shortstr | RabbitMQ 不能将 shortstr 作为例如标头值。 |
utf8 | >= 256 字节 | longstr | |
binary | < 256 字节的 UTF-8 编码数据 | shortstr | |
binary | 无效的 UTF-8 或 >= 256 字节 | longstr | |
utf8 pairs | | table | “重复字段是非法的”在 AMQP 0.9.1 字段表中。 RabbitMQ 按键对字段进行排序。 |
AMQP 1.0 -> MQTT 5.0
AMQP 1.0 部分 | AMQP 1.0 字段 | AMQP 1.0 类型 | 条件 | MQTT 5.0 部分 | MQTT 5.0 字段 | MQTT 5.0 类型 | 注释 |
---|
header | durable | boolean | | Fixed Header | QoS | Bits | durable=true 映射到 QoS 1,durable=false 映射到 QoS 0 |
properties | message_id | * | | | | | |
properties | user_id | binary | | | | | |
properties | to | address | | | | | |
properties | subject | utf8 | | | | | |
properties | reply_to | address | "/exchange/" X "/" RK,其中 X 匹配 mqtt.exchange | Variable Header | Response Topic | utf8 | 将 AMQP 主题转换为 MQTT 主题 |
properties | correlation_id | * | | Variable Header | Correlation Data | binary | 转换为二进制 |
properties | content_type | symbol | | Variable Header | Content Type | utf8 | |
properties | content_encoding | symbol | | | | | |
properties | absolute_expiry_time | timestamp | | | | | |
properties | creation_time | timestamp | | | | | |
properties | group_id | utf8 | | | | | |
properties | group_sequence | sequence-no | | | | | |
properties | reply_to_group_id | utf8 | | | | | |
application properties | * | * | 不是数组、列表、map 或二进制 | User Properties | | utf8 | 值转换为字符串表示形式 |
message annotations | * | * | 仅限 x- 标头,排除具有数组、列表、map 或二进制值的标头 | User Properties | | utf8 | |
data | | data | 一个或多个数据部分 | Payload | | binary | |
data | * | * | 单个 AMQP 二进制值部分 | Payload | | binary | |
data | | amqp.value(utf8) | 可转换为 UTF-8 的单个 AMQP 值部分 | Payload | | binary | Payload-Format-Indicator 设置为 1,以便 MQTT 5.0 客户端可以解析消息的文本表示形式,从而理解消息。 |
data | | * | 其他单个 AMQP 值部分或 amqp-sequence 部分 | Payload | | binary | 使用 AMQP 类型系统进行编码,并包含 Content Type message/vnd.rabbitmq.amqp 。 此 Content Type 未注册。 |
AMQP 0.9.1 -> MQTT 5.0
AMQP 0.9.1 部分 | AMQP 0.9.1 字段 | AMQP 0.9.1 类型 | 条件 | MQTT 5.0 部分 | MQTT 5.0 字段 | MQTT 5.0 类型 | 注释 |
---|
basic properties | message_id | shortstr | | | | | |
basic properties | correlation_id | shortstr | | Variable Header | Correlation Data | binary | |
basic properties | user_id | shortstr | | | | | |
basic properties | expiration | shortstr | | Variable Header | Message Expiry Interval | uint | 从毫秒转换为秒 |
basic properties | type | shortstr | | | | | |
basic properties | reply_to | shortstr | | | | | |
basic properties | app_id | shortstr | | | | | |
basic properties | timestamp | timestamp(秒) | | | | | |
basic properties | content_type | shortstr | | Variable Header | Content Type | utf8 | |
basic properties | content_encoding | shortstr | | | | | |
basic properties | delivery_mode | octet | | Fixed Header | QoS | Bits | delivery-mode 2 映射到 QoS 1,delivery-mode 1 映射到 QoS 0 |
basic properties | priority | octet | | | | | |
basic.properties | headers | longstr | Key=x-reply-to-topic | Variable Header | Response Topic | utf8 | 将 AMQP 主题转换为 MQTT 主题 |
basic.properties | headers | longstr | Key=x-correlation-id | Variable Header | Correlation Data | binary | |
basic.properties | headers | * | 不是数组、表 | User properties | | utf8 | 简单值转换为其字符串表示形式 |
basic properties | headers | table | | | | | |
payload | | binary | | Payload | | | |