白话RabbitMQ(二): 任务队列 有更新!

  |   1 评论   |   731 浏览

前言

在第一篇中我们描述了如何最简单的RabbitMQ操作,如何发送、接受消息。在今天这篇文章中我们将描述如何创建一个任务队列,来将高耗时的任务分发到多个消费者,从而提高处理效率。

任务队列最主要的功能就是解耦高耗时的操作,否则程序会一直等在那里,浪费大量资源。反之我们会把这个操作交给队列,让它延后再做。我们将任务封装成一个消息发送给队列,后台的任务进程会得到这个任务并执行它,而且可以配置多个任务进程,进一步加大吞吐率。

特别是对于网络请求,一次短短的HTTP请求是要求迅速响应的,不可能让它一直停顿在高耗时操作上。

准备工作

在第一章中我们发送了“Hello World!”。现在来完成更复杂一点的,因为这里并没有真正的高耗时操作,比如缩放图像或输出一个pdf。因此我们只是用Thread.sleep()来假装我们很繁忙,而且会用"."来表示需要停顿的秒数,比如一个叫Hello…的任务将停顿3秒钟。

我们简单的更改下Send.java,称之为 NewTask.java.

String message = getMessage(argv);

channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

然后是工具类

private static String getMessage(String[] strings){
    if (strings.length < 1)
        return "Hello World!";
    return joinStrings(strings, " ");
}

private static String joinStrings(String[] strings, String delimiter) {
    int length = strings.length;
    if (length == 0) return "";
    StringBuilder words = new StringBuilder(strings[0]);
    for (int i = 1; i < length; i++) {
        words.append(delimiter).append(strings[i]);
    }
    return words.toString();
}

当然,我们的Recv.java也需要进行一些改造,它需要对每一个"."停顿1秒,Work.java如下

final Consumer consumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String message = new String(body, "UTF-8");

    System.out.println(" [x] Received '" + message + "'");
    try {
      doWork(message);
    } finally {
      System.out.println(" [x] Done");
    }
  }
};
boolean autoAck = true; // acknowledgment is covered below
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

private static void doWork(String task) throws InterruptedException {
    for (char ch: task.toCharArray()) {
        if (ch == '.') Thread.sleep(1000);
    }
}    

编译上面这些代码

javac -cp $CP NewTask.java Worker.java

轮询调度

任务队列的一个最大优点是可以并行工作,能够非常容易的水平扩张。

首先,让我们同时运行两个工作线程,他们能够同时从队列获取消息。我们也需要同时开启3个console:1个生产者,2个消费者

消费者C1

# shell 1
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C

消费者C2

# shell 2
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C

让我们运行生产者

# shell 3
java -cp $CP NewTask
# => First message.
java -cp $CP NewTask
# => Second message..
java -cp $CP NewTask
# => Third message...
java -cp $CP NewTask
# => Fourth message....
java -cp $CP NewTask
# => Fifth message.....    

让我们看看消费者们
消费者C1

java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'   

消费者C2

java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message..   

RabbitMQ默认有序的将会发送消息给下一个消费者,所以每一个消费者都会得到相同数量的消息,这种方式就叫做轮询调度(round-robin),你可以尝试下更多的消费者

消息确认

一个任务可能非常耗时,如果消费者在做一个高耗时任务时挂掉了,我们将会丢失所有发送到这个消费者上的消息。这是非常不可取的,所以我们希望能够明确的知道消息是否消费成功,如果一个消费挂了,我们能够知道,并且将消息发送给下一个消费者。

为了确保消息不丢失,RabbitMQ支持消息确认。收到消息后消费者会给RabbitMQ服务器发送一个ack(我已经收到消息了),RabbitMQ就会在服务上删除这个消息了。

如果一个消费者挂了(连接关闭,channel关闭,或者是TCP连接丢失)而没有发送ack,RabbitMQ就会知道消息并没有消费成功,于是乎消息会被放到消息队列重新消费。如果此时还有其它消费者的话,消息会发送给其它消费者来消费,确保消息不会丢失

消息并没有超时时间这个概念,消息只会在消费者挂掉了时候重发,即使是一个非常非常耗时的的消费者也不会发生重发

手动消息确认(Manual message acknowledgments)默认是打开的,虽然我们之前关闭了它:autoAck=true。让我们先将它设置为false

channel.basicQos(1); // accept only one unack-ed message at a time (see below)

final Consumer consumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String message = new String(body, "UTF-8");

    System.out.println(" [x] Received '" + message + "'");
    try {
      doWork(message);
    } finally {
      System.out.println(" [x] Done");
      channel.basicAck(envelope.getDeliveryTag(), false);
    }
  }
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

这样一来,即使你使用CTRL+C强制杀死了一个消费者,消费者所丢失的消息也将会被重发,会被另一个消费者所接受并消费。

忘记应答

很容易犯忘记应答的错误,但会导致非常严重的后果。Messages会被重发,RabbitMQ会消耗越来越多的内存因为unacked的消息无法释放(甚至更严重,RabbitMQ内部维护了一个最大打开线程数,如果太多的消息没有应答,RabbitMQ甚至会整个崩溃掉)

你可以用Rabbitmqctl查看未被应答的消息数

sudo rabbitmqctl list_queues name messages_ready      
 messages_unacknowledged

windows下:

rabbitmqctl.bat list_queues name messages_ready     
messages_unacknowledged

消息持久化

我们现在知道了可以通过应答来保证消息不丢失,但万一RabbitMQ挂了呢?还是可能会导致消息丢失。因此我们可以通过持久化的机制,包括将队列以及队列中的消息持久化的方式,来保证即便RabbitMQ挂了,当它重启的时候,队列以及消息也能够恢复

首先做队列的持久化,声明队列为durable

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

但很可惜的是,这种声明方式并不适用与上面的方法,因为我们已经将“Hello”定义为一个非持久化的队列了,是不能再将他改为持久化的,如果这样做,将会直接返回一个error信息。所以,我们需要重新再定义一个队列

boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

在保证队列的持久化后需要保证消息的持久化-将消息设置为PERSISTENT_TEXT_PLAIN

import com.rabbitmq.client.MessageProperties;

channel.basicPublish("", "task_queue",
        MessageProperties.PERSISTENT_TEXT_PLAIN,
        message.getBytes());  

公平分发

但这样还是存在问题:假设有如下的情形,一个消费者非常耗时,而一个消费者非常快,由于消息都是公平的发送,所以它们都是接收到相同数量的消息,会导致一个消费者非常忙碌,而另外一个消费者非常空闲,而RabbitMQ无法得知这一点。

为了解决这个缺陷我们引入了basicQos方法以及prefetchCount =1的设置。这会告诉RabbitMQ一次只给消费者一个消息:如果这个消息未确认,将不会发送新的消息,从而它会将消息发送给其它并不那么忙的消费者

int prefetchCount = 1;
channel.basicQos(prefetchCount);

留意queue size

如果所有的消费者都非常忙,队列可能会很快被填满,所以你需要留意这一点,要么增加更多的消费者,或者采取其它的策略。

整合

NewTask.java

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

public class NewTask {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv)
                      throws java.io.IOException {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

    String message = getMessage(argv);

    channel.basicPublish( "", TASK_QUEUE_NAME,
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");

    channel.close();
    connection.close();
  }      
  //...
}

Worker.java

import com.rabbitmq.client.*;

import java.io.IOException;

public class Worker {
  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    final Connection connection = factory.newConnection();
    final Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    channel.basicQos(1);

    final Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");

        System.out.println(" [x] Received '" + message + "'");
        try {
          doWork(message);
        } finally {
          System.out.println(" [x] Done");
          channel.basicAck(envelope.getDeliveryTag(), false);
        }
      }
    };
    boolean autoAck = false;
    channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
  }

  private static void doWork(String task) {
    for (char ch : task.toCharArray()) {
      if (ch == '.') {
        try {
          Thread.sleep(1000);
        } catch (InterruptedException _ignored) {
          Thread.currentThread().interrupt();
        }
      }
    }
  }
}

使用消息确认和prefetchCount你就能设置一个持久化队列了,同时,使用durable和persist,,即使RabbitMQ挂掉了,重启后也能够重发消息

评论

  • 吴宏伟 回复»

    您好
    我是电子工业出版社的吴宏伟,你这个网站专为Rabbitmq而开。
    目前,我们有意向找可以写一本Rabbitmq图书的作者。目前市面只有两本书,都是我社出版的,卖得都挺火。
    所以,想和你接触一下。

    谢谢 !
    我的微信号:wuwei83744810

发表评论

validate