发送者选择的分发
RabbitMQ 2.4.0 引入了一个扩展,允许发布者在 CC 和 BCC 消息头中指定多个路由键。BCC 消息头在传递之前从消息中删除。直接和主题交换机是唯一使用路由键的标准交换机类型,因此此功能的路由逻辑仅适用于这些交换机类型。
我为什么要这样做?
1. 自定义路由逻辑
当路由规则过于复杂而无法用标准交换机表达时,您通常会求助于外部或自定义交换机。CC/BCC 消息头允许对等方通过使用匹配的路由填充这些消息头来实现潜在的复杂路由规则。
假设 RabbitMQ 代理接收 Java Log4J 消息,并且我们对在非工作时间到达的 SEVERE 级别的消息感兴趣。这假设一个 AMQP Log4J 处理程序将日志消息转发到 RabbitMQ 交换机,以及一个客户端(可能连接到寻呼机)从队列中检索它们。让我们假设队列名为“out-of-hours-emergencies”,并由寻呼机客户端声明。
问题是如何有选择地路由满足这些标准(严重性和时间)的消息。Java 日志记录 API 具有足够的复杂性,可以在消息到达代理之前在日志处理程序中执行一些选择性处理和过滤,因此在简单的情况下,问题有可能在上游从代理解决。对于此示例的目的,我们希望在代理中集中管理所有日志生产者的路由。
日志处理程序可以使用有关日志事件的信息来装饰 AMQP 消息,方法是将信息放置在消息头中。然后可以根据这些消息头以及内置的 amq.headers 交换机路由消息。因此,如果事件严重性出现在消息头中,则有可能在不求助于其他功能的情况下满足第一个约束。我们的第二个要求(仅在非工作时间收到的消息)无法以相同的方式通过内置交换机满足。内置交换机类型只能根据消息的内容而不是消息到达的时间执行路由。即使消息包含时间戳,内置交换机也无法通过不等式进行匹配。
我们可以通过依靠一个智能消费者来解决这个问题,该消费者在重新发布收到的消息之前填充 BCC 消息头。在我们的示例中,相关的标准将是“out-of-hours-emergencies”,因此智能消费者在重新发布非工作时间到达的严重日志消息之前,将其添加到 BCC 消息头。它可以利用其可支配的任何信息来做出该决定,包括日期、时间、消息内容或来自其他来源的信息。任何数量的标准都可以以相同的方式有选择地添加到 BCC 消息头。具有相同名称的队列将从我们的智能消费者那里接收所有重新发布的消息,这些消息在 BCC 消息头中包含此字符串。此时,寻呼机客户端从“out-of-hours-emergencies”队列中检索消息并呼叫操作员。
此技术可以路由以特定于域的格式编码的消息。具有格式知识的智能对等方可以解压缩消息,使用相关字段填充 BCC 消息头并重新发布。智能对等方的作用类似于 外部交换机。
2. 机密路由
这在路由键是生产者和消费者预先同意的安全令牌的情况下很有用。通配符使主题交换机在这种情况下毫无用处。使用设置为“topsecret.eyesonly”的路由键发布的消息可以被任何使用通配符“#”绑定的消费者获取。
生产者可以通过使用选定的接收者的路由键填充 BCC 消息头,将消息发送给任意消费者子集。接收者将无法了解其他接收者的身份,因为 BCC 消息头在传递之前会从消息中删除。
路由信息仍然可能以其他方式泄露,例如管理和监控插件或 rabbitmqctl 管理实用程序。这些将需要适当的保护。
AMQP 不是已经可以做到这一点了吗?
虽然无法删除消息头,但仅使用标准 AMQP 功能也可以获得一些类似的效果。
- 生产者可以发送多条消息,每条消息都具有不同的路由键。这浪费了网络带宽和代理资源,因为代理无法优化重复消息的存储。
- 生产者可以声明一个临时交换机,为每个预期的接收者创建一个临时绑定。这是一个很大的工作量,每次接收者集合更改时都需要重复。
我该如何使用这个?
请确保使用 RabbitMQ 2.4.0 或更高版本。可以使用任何 AMQP 客户端。将 CC 或 BCC 消息头设置为路由键列表。消息头值必须是 AMQP 数组类型,即使它只包含单个值。消息将根据 CC 和 BCC 消息头中的组合路由键以及 basic.publish 方法(在本示例中为“routingkey1”、“routingkey2”和“routingkey3”)路由到所有目标。
Java 示例代码
BasicProperties props = new BasicProperties();
Map<String, Object> headers = new HashMap<String, Object>();
List<String> ccList = new ArrayList<String>();
ccList.add("routingkey2");
ccList.add("routingkey3");
headers.put("CC", ccList);
props.setHeaders(headers);
channel.basicPublish(exchange, "routingkey1", props, payload);
互操作性影响是什么?
任何 AMQP 客户端都可以使用此功能。生产者只需要能够设置消息中的消息头。
使用任何 RabbitMQ 特定的扩展使得将 RabbitMQ 换成不同的 AMQP 代理变得更加困难 - 发送者选择的分发也不例外。
如果您的应用程序已经使用了名为 CC 或 BCC 的消息头,那么您应该使用不同的键或联系 RabbitMQ 团队以获得帮助。