const amqp = require('amqplib');
const uuid = require('uuid');
const co = require('co');
function delay(ms) {
return new Promise(resolve => {
setTimeout(resolve, ms);
});
}
const blankQueueError = new Error('Queue name is blank');
const blankTaskError = new Error('Task body is blank');
const invalidJsonError = new Error('Invalid JSON in task body');
/**
* Return an instance of Kewpie
* @constructor
* @module Kewpie
* @param {kewpieOpts} [passedOpts] - A set of options to override the defaults
* @returns {Kewpie}
*/
function Kewpie(passedOpts = {}) {
const defaultOpts = {
deadLetterExchange: 'deadletters',
deadLetterQueue: 'deadletters',
exchange: 'kewpie',
maxPriority: 10,
defaultExpiration: 1000 * 60 * 60, // 1 hour
maxConnectionAttempts: 10,
delayMS: 500,
enableDelayed: false
};
const opts = Object.assign({}, defaultOpts, passedOpts);
const {
delayMS,
maxConnectionAttempts,
defaultExpiration,
maxPriority,
deadLetterExchange,
deadLetterQueue,
exchange,
enableDelayed
} = opts;
if (enableDelayed) console.warn('Delayed sending requires the delayed message exchange plugin to RabbitMQ which is currently EXPERIMENTAL. See https://github.com/rabbitmq/rabbitmq-delayed-message-exchange#plugin-status for more details');
const queueOpts = {
maxPriority,
durable: true,
deadLetterExchange
};
let channel;
let connection;
let connectionAttempts = 0;
/**
* Connect to the RabbitMQ server and set up the queues and exchanges
* @module Kewpie/connect
* @param {string} rabbitUrl - The URL to connect to a rabbitMQ server, eg: amqp://localhost:15672
* @param {string[]} queues - The queues to instantiate for later publishing or subscription
* @returns {Promise}
*/
function connect(rabbitUrl, queues) {
return co(function *() {
connection = yield amqp.connect(rabbitUrl);
const ch = yield connection.createConfirmChannel();
yield setup(ch, queues);
channel = ch;
})
.catch(reconnect(rabbitUrl, queues));
}
/**
* Set up the queues and exchanges
* @param {Object} ch - A channel returned from amqplib
* @param {string[]} queues - The queues to instantiate for later publishing or subscription
* @returns {Promise}
*/
function setup(ch, queues) {
return co.wrap(function *() {
const exchangeType = enableDelayed ? 'x-delayed-message' : 'topic';
const exchangeArgs = {};
if (enableDelayed) exchangeArgs['x-delayed-type'] = 'topic';
yield ch.assertExchange(
exchange,
exchangeType,
{
arguments: {
'x-delayed-type': 'topic'
},
durable: true
}
);
yield Promise.all(queues.map(co.wrap(function *(queue) {
yield ch.assertQueue(queue, queueOpts);
yield ch.bindQueue(queue, exchange, queue);
})));
yield ch.assertExchange(deadLetterExchange, 'topic', { durable: true });
yield ch.assertQueue(deadLetterQueue, { durable: true });
yield ch.bindQueue(deadLetterQueue, deadLetterExchange, '#');
})();
}
/**
* If the connection to RabbitMQ fails, wait a little bit then try again
* @param {string} rabbitUrl - The URL to connect to a rabbitMQ server, eg: amqp://localhost:15672
* @param {string[]} queues - The queues to instantiate for later publishing or subscription
* @returns {Promise}
*/
function reconnect(rabbitUrl, queues) {
return e => {
connectionAttempts++;
if (connectionAttempts > maxConnectionAttempts) {
throw e;
} else {
return co(function *() {
yield delay(delayMS);
return connect(rabbitUrl, queues);
});
}
};
}
/**
* Publish a message/task to a queue
* @module Kewpie/publish
* @param {string} queue - The name of the queue you intend the message to reach. This will be used as the message's routing key
* @param {Object} task - Any `JSON.stringify`able Object. This will be serialised and sent as the message body
* @param {Object} [opts] - A set of opts to override defaults
* @param {number} opts.priority - The priority of the message (defaults to 0)
* @param {number} opts.expiration - The expiration time of the message in MS
* @param {number} opts.delay - The number of MS the exchange should wait before publishing the message to a queue for consumption
* @returns {Promise}
*/
function publish(queue, task, opts = {}) {
return co(function *() {
if (!queue) throw blankQueueError;
if (!task) throw blankTaskError;
if (!channel) {
yield delay(delayMS);
return publish(queue, task, opts);
}
const innerOpts = {
priority: opts.priority || 0,
persistent: true,
expiration: opts.expiration || defaultExpiration,
headers: {}
};
if (opts.expiration === null) delete innerOpts.expiration;
if (opts.delay) innerOpts.headers['x-delay'] = opts.delay;
let buf;
try {
buf = new Buffer(JSON.stringify(task));
} catch (e) {
return Promise.reject(invalidJsonError);
}
return new Promise((resolve, reject) => {
channel.publish(exchange, queue, buf, innerOpts, (err) => {
if (err) return reject(err);
return resolve(task);
});
});
});
}
/**
* Unsubscribe a subscriber/handler from a queue
* @module Kewpie/unsubscribe
* @param {string} tag - The consumerTag of the subscriber
* @returns {Promise}
*/
function unsubscribe(tag) {
return channel.cancel(tag);
}
/**
* Subscribe a handler to a queue
* @module Kewpie/subscribe
* @param {string} queue - The queue you wish to subscribe to
* @param {function} handler - The queue you wish to subscribe to
* @returns {Consumer}
*/
function subscribe(queue, handler, maxConcurrent = 1) {
return co(function *() {
if (!channel) {
yield delay(delayMS);
return subscribe(queue, handler, maxConcurrent);
}
const consumerTag = uuid.v4();
yield channel.assertQueue(queue, queueOpts);
yield channel.prefetch(maxConcurrent);
channel.consume(queue, msg => {
try {
handler(JSON.parse(msg.content.toString()))
.then(() => {
channel.ack(msg);
})
.catch((opts = {}) => {
opts.requeue = opts.requeue || false;
channel.nack(msg, false, opts.requeue);
});
} catch (e) {
// The only time this should be reached is when JSON.parse fails, so never requeue this kind of failure
channel.nack(msg, false, false);
}
}, { consumerTag });
return { consumerTag };
});
}
/**
* Close the connection to RabbitMQ
* @module Kewpie/close
* @returns {Promise}
*/
function close() {
return connection.close();
}
return {
publish,
subscribe,
unsubscribe,
connect,
close,
errors: {
blankQueueError,
blankTaskError,
invalidJsonError
},
opts
};
}
module.exports = Kewpie;