跳到主要内容

在 Cloud Foundry 上使用 RabbitMQ 服务与 Node.JS

·5 分钟阅读
Michael Bridgen

最近我们推出了一个 RabbitMQ service for Cloud Foundry,使其可以轻松地启动消息代理,以供您在 Cloud Foundry 上的应用程序使用。 网上有关于将其与 Ruby on Rails 和使用 Spring 的 Java 应用程序一起使用的教程。 在这里,我们将着眼于将 RabbitMQ 服务与 Node.JS 应用程序一起使用。

我没有假设您事先对 Node.JS、npm 或 RabbitMQ 有太多了解;但是,如果您熟悉所有这些,您将从中获得更多收获。 npm 的手册页及其随附的 开发者信息 是理解 npm 的最佳指南,除非实际使用。 对于 RabbitMQ,我推荐 Michael Klishin 的 关于 AMQP 消息模型的入门读物

这个 GitHub 上的仓库 包含了我们的 Node.JS 示例的源代码。 让我们首先浏览一下 package.json

{
"name":"node-srs-demo",
"author": "Michael Bridgen",
"version":"0.0.2",
"dependencies":{
"amqp":">= 0.1.0",
"sanitizer": "*"
}
}

其中感兴趣的项目是对 “amqp” 的依赖;0.1.0 版本支持 Cloud Foundry 将提供的 URL 语法,并且没有理由不使用最新和最棒的版本。

这是第一段应用程序代码。 请注意,我们将文件命名为 app.js,以便 Cloud Foundry 将其识别为主模块并运行它。

require.paths.unshift('./node_modules');

var http = require('http');
var amqp = require('amqp');
var URL = require('url');
var htmlEscape = require('sanitizer/sanitizer').escape;

function rabbitUrl() {
if (process.env.VCAP_SERVICES) {
conf = JSON.parse(process.env.VCAP_SERVICES);
return conf['rabbitmq-2.4'][0].credentials.url;
}
else {
return "amqp://#";
}
}

var port = process.env.VCAP_APP_PORT || 3000;

首先,我们要确保 Node.JS 知道我们的库在哪里。 在工作目录中使用 npm install 会将它们安装在名为 node_modules 的子目录中,当我们推送到 Cloud Foundry 时,它们将与 app.js 一起复制过去。

我们的服务连接详细信息以 JSON 对象的形式出现在环境中;rabbitUrl 过程会解析该对象并提取 RabbitMQ 服务的 URL。 原则上,我们可以将多个 RabbitMQ 服务实例绑定到应用程序——我们在这里假设我们只想要第一个(并且可能是唯一一个)这样的实例。 那就是 [0] 位。

var messages = [];

function setup() {

var exchange = conn.exchange('cf-demo', {'type': 'fanout', durable: false}, function() {

var queue = conn.queue('', {durable: false, exclusive: true},
function() {
queue.subscribe(function(msg) {
messages.push(htmlEscape(msg.body));
if (messages.length > 10) {
messages.shift();
}
});
queue.bind(exchange.name, '');
});
queue.on('queueBindOk', function() { httpServer(exchange); });
});
}

这是一个我们稍后将调用的过程,用于创建我们在 RabbitMQ 实例中需要的所有内容。

由于客户端中的一切都是异步操作,因此有很多回调。 嵌套取决于我们何时需要结果;具体来说,我们需要队列才能订阅,并且我们需要队列交换机才能绑定队列。 请注意为队列给定的空名称——这表示该队列是匿名的,换句话说,由 RabbitMQ 随机生成名称。

我们可以跳过最后一个回调(启动 HTTP 服务器的回调)的作用域,因为我们知道到那时一切都已完成。

代码的下一部分是我们响应 HTTP 请求的地方

function httpServer(exchange) {
var serv = http.createServer(function(req, res) {
var url = URL.parse(req.url);
if (req.method == 'GET' && url.pathname == '/env') {
printEnv(res);
}
else if (req.method == 'GET' && url.pathname == '/') {
res.statusCode = 200;
openHtml(res);
writeForm(res);
writeMessages(res);
closeHtml(res);
}
else if (req.method == 'POST' && url.pathname == '/') {
chunks = '';
req.on('data', function(chunk) { chunks += chunk; });
req.on('end', function() {
msg = unescapeFormData(chunks.split('=')[1]);
exchange.publish('', {body: msg});
res.statusCode = 303;
res.setHeader('Location', '/');
res.end();
});
}
else {
res.statusCode = 404;
res.end("This is not the page you were looking for.");
}
});
serv.listen(port);
}

RabbitMQ 的部分在中间,我们在那里从前面发布消息到我们的交换机。 Node.JS AMQP 库会愉快地发布一个对象,将其序列化为 JSON 值;我们之前使用的 subscribe 方法假定 JSON 有效负载并将其解析为对象。

app.js 的其余部分只是助手函数,除了启动整个过程的那一行(到目前为止我们只编写了回调!)

var conn = amqp.createConnection({url: rabbitUrl()});
conn.on('ready', setup);

您可以看到整体控制流程很简单

  1. 打开与 RabbitMQ 的连接;完成之后,
  2. 构建一个交换机和一个队列,将队列绑定到交换机并订阅队列;然后
  3. 启动 HTTP 监听器。

使用这段代码,我在详细说明方面犯了错误。 当然,还有很大的抽象空间,例如在交换机-队列-绑定-订阅模式中,我希望这种模式在应用程序中经常出现。

如需有关在 Cloud Foundry 上使用 RabbitMQ 的帮助,请加入 support.cloudfoundry.com 论坛。

© . All rights reserved.