跳到主要内容
版本:4.1

跨协议属性转换

概述

RabbitMQ 是一个多协议消息代理,支持

消息可以很容易地通过一种协议发布,并被其他协议消费。 这就需要在不同协议格式之间进行数据格式转换。 在 RabbitMQ 3.13 之前,所有消息在发布到 RabbitMQ 时都会被转换为基于 AMQP 0.9.1 协议的内部格式。 这种方法通常导致不必要的转换和/或数据保真度问题。

在当前的 RabbitMQ 版本中,AMQP、AMQP 0.9.1 和 MQTT 协议的方法已更改,消息将始终以其原始格式存储,并且仅在被与原始协议不同的协议消费才进行转换。 这样做的好处是,当原始协议和消费协议相同时,不会丢失任何协议特定的信息。

此规则的唯一例外是:无论使用哪种协议发布,流在内部都将其消息存储为 AMQP 编码的数据。 因此,例如,当使用 AMQP 0.9.1 发布时,会发生到 AMQP 的转换。

所有转换都已重写并正式化。 本文档旨在捕获已实施的转换规则,以便应用程序开发人员可以回顾本文档,以了解他们的消息在多协议消息传递应用程序中将如何转换。

约定

shortstr:(条件)一个小于 256 字节的字符串,并且仅包含有效的 UTF-8 编码数据,且不包含 NUL(二进制零八位字节)字符。

*:被理解为“任何”(字段/类型等)。

条件可以按书写顺序读取以进行评估。 例如,类型为 utf8 且超过 255 字节的 AMQP message_id 将无法通过第一个 shortstr 条件,然后会落到下一行。 如果没有匹配的行,则数据将不会被转换(因此将无法供消费应用程序使用)。

AMQP 1.0 -> AMQP 0.9.1

如果消息通过 AMQP 0.9.1 消费,并且消息符合以下任一条件,则会发生从 AMQP 到 AMQP 0.9.1 的转换

  • 是使用 AMQP 1.0 发布的,或者
  • 是从流中消费的。
AMQP 部分AMQP 字段AMQP 类型条件AMQP 0.9.1 部分AMQP 0.9.1 字段AMQP 0.9.1 类型注释
headerdurablebooleanpropertiesdelivery_modeubyte2 = 持久,1 = 瞬态
headerpriorityubytepropertiespriorityubyte
headerttlmilliseconds (uint)propertiesexpirationshortstr毫秒转换为字符串值
propertiesmessage_idutf8shortstrpropertiesmessage_idshortstr
propertiesmessage_idutf8> 256 字节propertiesheaders,键:x-message-idlongstr
propertiesmessage_iduuid (16 字节二进制)propertiesmessage_idshortstr转换为文本,例如“urn:uuid:550e8400-e29b-41d4-a716-446655440000
propertiesmessage_idulongpropertiesmessage_idshortstr转换为文本
propertiesmessage_idbinarypropertiesheaders,键:x-message-idbinary
propertiesuser_idbinaryshortstrpropertiesuser_idshortstr
propertiestoaddress
propertiessubjectutf8
propertiesreply_toaddressshortstrpropertiesreply_toshortstr
propertiescorrelation_idutf8shortstrpropertiescorrelation_idshortstr
propertiescorrelation_idutf8> 256 字节propertiesheaders,键:x-correlation-idlongstr
propertiescorrelation_iduuid (16 字节二进制)propertiescorrelation_idshortstr转换为文本表示形式,例如“urn:uuid:550e8400-e29b-41d4-a716-446655440000”
propertiescorrelation_idulongpropertiescorrelation_idshortstr转换为数字的文本表示形式
propertiescorrelation_idbinarypropertiesheaders,键:x-correlation-idbinary
propertiescontent_typesymbolpropertiescontent_typeshortstr
propertiescontent_encodingsymbolpropertiescontent_encodingshortstr
propertiesabsolute_expiry_timetimestamp
propertiescreation_timetimestamppropertiestimestamptimestamp转换为秒
propertiesgroup_idutf8shortstrpropertiesapp_idshortstr
propertiesgroup_sequencesequence-no
propertiesreply_to_group_idutf8
application properties**(参见类型转换)键:shortstrpropertiesheaders*
message annotations*(符号 - x-cc*)键:x-ccpropertiesheaders,键:“CC”longstr
message annotations*(符号 - x-*)*(参见类型转换)键:shortstr & x-*propertiesheaders*通常这意味着 x- headers
datadatasingle data sectionpayloadbinary从数据部分提取的纯二进制
data(多个)**multiple data sectionspayloadAMQP 1.0 编码的二进制properties.type 将设置为 "amqp-1.0"
amqp.value**payloadAMQP 1.0 编码的二进制properties.type 将设置为 "amqp-1.0"
amqp.sequence**payloadAMQP 1.0 编码的二进制properties.type 将设置为 "amqp-1.0"

类型转换

AMQP 1.0 类型条件AMQP 0.9.1 类型注释
utf8(字符串)longstrRabbitMQ 不支持 shortstr 标头值,因此除非它是基本属性中的字段,否则所有 utf8 输入都将转换为 longstr
binarybinary
longlong
ulonglong溢出风险
ubyteunsignedbyte
shortshort
ushortusignedshort
uintunsignedint
intsignedint
doubledouble
floatfloat
booleanbool
timestamptimestamp(秒)值除以 1000
bytebyte
nullvoid
listarray
maptable
symbollongstr

AMQP 0.9.1 -> AMQP 1.0

AMQP 0.9.1 部分AMQP 0.9.1 字段AMQP 0.9.1 类型条件AMQP 1.0 部分AMQP 1.0 字段AMQP 1.0 类型注释
basic propertiesmessage_idshortstrvalid urn uuidpropertiesmessage_iduuid
basic propertiesmessage_idshortstrpropertiesmessage_idutf8
basic propertiescorrelation_idshortstrvalid urn uuidpropertiescorrelation_iduuid
basic propertiescorrelation_idshortstrpropertiescorrelation_idutf8
basic propertiesuser_idshortstrpropertiesuser_idbinary
basic propertiesexpirationshortstrif convertible to numeric typeheaderttluint
basic propertiestypeshortstrmessage annotationsx-basic-typeutf8
basic propertiesreply_toshortstrpropertiesreply_toutf8
basic propertiesapp_idshortstrpropertiesgroup-idutf8
basic propertiestimestamptimestamp(秒)propertiescreation_timetimestamp转换为毫秒
basic propertiescontent-typeshortstrpropertiescontent-typesymbol
basic propertiescontent-encodingshortstrpropertiescontent-encodingsymbol
basic propertiesdelivery-modeoctetheaderdurableboolean之前在 x-basic-delivery-mode 中
basic propertiespriorityoctetheaderpriorityubyte之前在 x-basic-priority 中
basic.propertiesheadersKey=x-amqp-1.0-propertiesproperties--旧版 1.0 插件标头
basic.propertiesheadersKey=x-amqp-1.0-application-propertiesapplication properties--旧版 1.0 插件标头
basic.propertiesheadersKey=x-amqp-1.0-message-annotationsmessage annotations--旧版 1.0 插件标头
basic.propertiesheadersKey=x-reply-to-topicpropertiesreply_toutf8"/topic/" RK
basic propertiesheaders*Key begins with "x-"message annotationsKey*参见类型转换规则
basic propertiesheaderstableValue not array or tableapplication propertiesKey*参见类型转换规则
basic propertiesheaderstableValue is array or table---不转换
payload-binarydata-data

类型转换

AMQP 0.9.1 类型条件AMQP 1.0 类型注释
longstrshortstrutf8性能/准确性权衡
longstrbinary
longlong
ubyteubyte
shortshort
ushortushort
uintuint
intint
doubledouble
floatfloat
boolboolean
binarybinary
timestamptimestamp从秒转换为毫秒
bytebyte
voidnull
arraylist
tablemap
table.keyx-* headersymbol
table.keyutf8
table.value*根据此表转换

MQTT 5.0 -> AMQP 1.0

MQTT 5.0 部分MQTT 字段MQTT 5.0 类型条件AMQP 1.0 部分AMQP 1.0 字段AMQP 1.0 类型注释
Fixed HeaderDupBits设置为 header first-acquirer 没有意义,因为 DUP 标志仅适用于从客户端到服务器(从服务器到客户端的消费由队列发送的 Redelivered 标志确定)
Fixed HeaderQoSBitsheaderdurableboolean如果 QoS > 0,则 durable 为 true
Fixed HeaderRetainBits
Variable HeaderPayload Format IndicatorBits参见“Payload”行,条件为“Payload Format Indicator set”
Variable HeaderMessage Expiry Intervaluintheaderttlmilliseconds秒转换为毫秒
Variable HeaderTopic Aliasushort
Variable HeaderResponse Topicutf8propertiesreply_toutf8将 MQTT 主题转换为 AMQP 路由键 (RK)。 将 reply-to 地址设置为 "/exchange/" X "/" RK。
Variable HeaderCorrelation Databinaryurn:uuidpropertiescorrelation_iduuid
Variable HeaderCorrelation Databinarypropertiescorrelation_idbinary
Variable HeaderUser Propertyutf8键以 "x-" 开头且键为 ASCIImessage annotation值:utf8键的类型为 symbol
键不以 "x-" 开头application properties值:utf8键的类型为 utf8
Variable HeaderSubscription Identifieruint
Variable HeaderContent Typeutf8valid ASCIIpropertiescontent_typesymbolMQTT 内容类型是 UTF-8,而 AMQP 1.0 内容类型仅为 ASCII
PayloadPayload Format Indicator unsetdatadata
PayloadPayload Format Indicator setamqp.valueutf8如果设置了 Payload Format Indicator,则将 MQTT payload 转换为字符串(即单个 AMQP 值部分),因为 AMQP 字符串是 UTF-8 编码的

类型转换

MQTT 5.0 类型条件AMQP 1.0 类型注释
Bitsboolean仅转换选定的标志
ushortushort
uintuint
utf8utf8
binary datautf8utf8
binary databinary
utf8 string pairsmap删除重复键,但保持顺序

MQTT 5.0 -> AMQP 0.9.1

MQTT 5.0 部分MQTT 5.0 字段MQTT 5.0 类型条件AMQP 0.9.1 部分AMQP 0.9.1 字段AMQP 0.9.1 类型注释
Fixed HeaderDupBits设置为 basic.deliver 字段 redelivered 没有意义,因为 DUP 标志仅适用于从客户端到服务器(从服务器到客户端的消费由队列发送的 Redelivered 标志确定)
Fixed HeaderQoSBitspropertiesdelivery_modeQoS 0 映射到 delivery_mode 1,QoS 1 映射到 delivery_mode 2
Fixed HeaderRetainBits
Variable HeaderPayload Format IndicatorBitsAMQP 0.9.1 content_encoding 指的是 MIME 内容编码,其有效值在 https://www.iana.org/assignments/http-parameters/http-parameters.xml 中列出。 因此,AMQP 0.9.1 content_encoding 是不同的东西。 它最好映射到 content_type 的 text/<subtype>; charset="utf-8,但未定义 <subtype>。 此外,MQTT 5.0 属性 Content Type 已经定义。
Variable HeaderMessage Expiry Intervaluintpropertiesexpirationshortstr秒转换为毫秒
Variable HeaderTopic Aliasushort
Variable HeaderResponse Topicutf8propertiesheaders,键:x-reply-to-topiclongstrAMQP 0.9.1 属性 reply_to 指的是队列名称,而不是主题名称
Variable HeaderCorrelation Databinaryshortstrpropertiescorrelation_idshortstr
propertiesheaders,键:x-correlation-idlongstr
Variable HeaderUser Propertyutf8 string pairname is shortstrpropertiesheaderslongstrRabbitMQ 不能将 shortstr 作为例如标头值。
Variable HeaderSubscription Identifiervariable byte integer
Variable HeaderContent Typeutf8shortstrpropertiescontent_typeshortstr
Payloadpayload

类型转换

MQTT 5.0 类型条件AMQP 0.9.1 类型注释
BitsBits
ushortushort
uintuint
utf8< 256 字节 且 目标不是标头shortstrRabbitMQ 不能将 shortstr 作为例如标头值。
utf8>= 256 字节longstr
binary< 256 字节的 UTF-8 编码数据shortstr
binary无效的 UTF-8 或 >= 256 字节longstr
utf8 pairstable“重复字段是非法的”在 AMQP 0.9.1 字段表中。 RabbitMQ 按键对字段进行排序。

AMQP 1.0 -> MQTT 5.0

AMQP 1.0 部分AMQP 1.0 字段AMQP 1.0 类型条件MQTT 5.0 部分MQTT 5.0 字段MQTT 5.0 类型注释
headerdurablebooleanFixed HeaderQoSBitsdurable=true 映射到 QoS 1,durable=false 映射到 QoS 0
propertiesmessage_id*
propertiesuser_idbinary
propertiestoaddress
propertiessubjectutf8
propertiesreply_toaddress"/exchange/" X "/" RK,其中 X 匹配 mqtt.exchangeVariable HeaderResponse Topicutf8将 AMQP 主题转换为 MQTT 主题
propertiescorrelation_id*Variable HeaderCorrelation Databinary转换为二进制
propertiescontent_typesymbolVariable HeaderContent Typeutf8
propertiescontent_encodingsymbol
propertiesabsolute_expiry_timetimestamp
propertiescreation_timetimestamp
propertiesgroup_idutf8
propertiesgroup_sequencesequence-no
propertiesreply_to_group_idutf8
application properties**不是数组、列表、map 或二进制User Propertiesutf8值转换为字符串表示形式
message annotations**仅限 x- 标头,排除具有数组、列表、map 或二进制值的标头User Propertiesutf8
datadata一个或多个数据部分Payloadbinary
data**单个 AMQP 二进制值部分Payloadbinary
dataamqp.value(utf8)可转换为 UTF-8 的单个 AMQP 值部分PayloadbinaryPayload-Format-Indicator 设置为 1,以便 MQTT 5.0 客户端可以解析消息的文本表示形式,从而理解消息。
data*其他单个 AMQP 值部分或 amqp-sequence 部分Payloadbinary使用 AMQP 类型系统进行编码,并包含 Content Type message/vnd.rabbitmq.amqp。 此 Content Type 未注册。

AMQP 0.9.1 -> MQTT 5.0

AMQP 0.9.1 部分AMQP 0.9.1 字段AMQP 0.9.1 类型条件MQTT 5.0 部分MQTT 5.0 字段MQTT 5.0 类型注释
basic propertiesmessage_idshortstr
basic propertiescorrelation_idshortstrVariable HeaderCorrelation Databinary
basic propertiesuser_idshortstr
basic propertiesexpirationshortstrVariable HeaderMessage Expiry Intervaluint从毫秒转换为秒
basic propertiestypeshortstr
basic propertiesreply_toshortstr
basic propertiesapp_idshortstr
basic propertiestimestamptimestamp(秒)
basic propertiescontent_typeshortstrVariable HeaderContent Typeutf8
basic propertiescontent_encodingshortstr
basic propertiesdelivery_modeoctetFixed HeaderQoSBitsdelivery-mode 2 映射到 QoS 1,delivery-mode 1 映射到 QoS 0
basic propertiespriorityoctet
basic.propertiesheaderslongstrKey=x-reply-to-topicVariable HeaderResponse Topicutf8将 AMQP 主题转换为 MQTT 主题
basic.propertiesheaderslongstrKey=x-correlation-idVariable HeaderCorrelation Databinary
basic.propertiesheaders*不是数组、表User propertiesutf8简单值转换为其字符串表示形式
basic propertiesheaderstable
payloadbinaryPayload
© . All rights reserved.