白话RabbitMQ(四): 建立路由 有更新!

  |   0 评论   |   290 浏览

前言

订阅/发布中我们建立了一个简单的日志系统,从而将log消息广播给一些消费者。这章我们会在此基础上加入一些新的特性-我们将有针对性的进行消息分发,比如,只把错误(error)消息保存到磁盘,与此同时,打印出所有的消息。

绑定

我们在前面的例子中,绑定是这么来做的

channel.queueBind(queueName, EXCHANGE_NAME, "");

绑定是建立交换机和队列之间的一种联系:队列会接受交换机中的消息。绑定可以用一个路由键来指明,为了与basic_publish区分开,我们称之为绑定键(binding key):

channel.queueBind(queueName, EXCHANGE_NAME, "black");

绑定键跟路由器类型也有关系,我们之前用的广播路由器,会忽略掉这个值

直达交换机(Direct Exchange)

之前我们用的是广播交换机,会将消息发送给所有的消费者。这里我们希望通过log的严重程度进行过滤,例如只有严重的错误才会写入到磁盘,而warn和info消息就不用了,以此来节省磁盘空间

而广播交换机没法满足这个需求-它只是无脑的发送消息。所以我们会使用直达交换机(Direct Exchange)- 消息会通过所绑定的键来发送给对应的队列,可以看如下这幅图

如上图所示,直达交换机X绑定了两个队列,C1是通过orange来绑定,而C2是通过black和green绑定。因此,发送到路由键orange的消息会发送给队列Q1,发送到路由键black或者green的消息会发送给Q2,其它的消息将被丢弃。

多项绑定


当然,多个队列绑定到一个键上也是合法的,在这种情况下,直达交换机将会将消息发送给所有的队列,就像广播交换机一样,如上图所示,一个键为black的消息将会同时被发送给C1和C2.

我们首先需要创建一个直达路由器

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

并发送消息到这个路由器

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

上面我们是发送给’severity’,简单起见,假设有下列几种日志类型’severity’ ,‘info’, ‘warning’, ‘error’.

订阅消息(Subscribing)

接受消息跟之前一样,但有一点不同,我们提供了一个binding key,

String queueName = channel.queueDeclare().getQueue();

for(String severity : argv){
  channel.queueBind(queueName, EXCHANGE_NAME, severity);
}

整合

将上面的所有代码整合到一起

EmitLogDirect.java

import com.rabbitmq.client.*;

import java.io.IOException;

public class EmitLogDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv)
                  throws java.io.IOException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        String severity = getSeverity(argv);
        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
        System.out.println(" [x] Sent '" + severity + "':'" + message + "'");

        channel.close();
        connection.close();
    }
    //..
}

ReceiveLogsDirect.java

import com.rabbitmq.client.*;

import java.io.IOException;

public class ReceiveLogsDirect {

  private static final String EXCHANGE_NAME = "direct_logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    String queueName = channel.queueDeclare().getQueue();

    if (argv.length < 1){
      System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
      System.exit(1);
    }

    for(String severity : argv){
      channel.queueBind(queueName, EXCHANGE_NAME, severity);
    }
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope,
                                 AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
      }
    };
    channel.basicConsume(queueName, true, consumer);
  }
}

编译

javac -cp $CP ReceiveLogsDirect.java EmitLogDirect.java

只保存warning和error的消息到磁盘上

java -cp $CP ReceiveLogsDirect warning error > logs_from_rabbit.log

将所有的消息打印到频幕上

java -cp $CP ReceiveLogsDirect info warning error
# => [*] Waiting for logs. To exit press CTRL+C

最后,发送error消息

java -cp $CP EmitLogDirect error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'

好了,这一章就到这儿,下一章我们将讲述如何基于特定模式进行监听

评论

发表评论

validate