#!/usr/bin/env node "use strict"; var VERSION = '0.0.3' , Lazy = require('lazy') , arg = require('optimist') , argv = arg.usage('Usage: $0 [options] [-import | -export]') .describe('host', 'host') .default('host', 'localhost') .describe('user', 'username') .alias('user', 'u') .default('user', 'guest') .describe('password', 'password') .alias('password', 'p') .default('password', 'guest') .describe('port', 'port') .default('port', 5672) .describe('vhost', 'vhost') .default('vhost', '/') .describe('exchange', 'exchange to work with') .alias('exchange', 'e') .describe('queue', 'queue\'s name to work with') .alias('queue', 'q') .describe('passive', 'set it to true if the queue already exist') .default('passive', true) .boolean('passive') .describe('durable', 'if specified the queue will survive a broker restart') .boolean('durable') .describe('autoDelete', 'if specified the queue will be deleted when there are no more subscriptions') .boolean('autoDelete') .describe('onlyBody', 'if specified export will contain only body of messages') .default('onlyBody', false) .boolean('onlyBody') .describe('export', 'export [filename], export queue\'s content to filename') .describe('import', 'import [filename], export file content into the queue') .describe('count', 'limit the number of message to export/import') .alias('count', 'count') .describe('v', 'verbose mode') .alias('v', 'verbose') .default('v', false) .describe('h', 'produce this help message') .alias('h','help') .argv , AmqpDSL = require('amqp-dsl') , fs = require('fs') , queueOptions = {} , tmpExchange = 'amqp_tool_import_'+(+new Date()); if(argv.h){ return arg.showHelp(console.error); } if(!argv.import && !argv.export){ console.error("Missing command [import | export]"); console.error(" --import [./filename.json] (default: stdin)"); console.error(" --export [./filename.json] (default: stdout)"); return; } if(!argv.queue && !argv.exchange){ console.error("Missing command [--queue | --exchange]"); console.error(" --queue myQueue or --exchange myExchange"); return; } // Toolbox Function.prototype.curry = Function.prototype.curry || function(/* args */){ var args = Array.prototype.slice.call(arguments) , fn = this; return function(){ return fn.apply(this, args.concat(Array.prototype.slice.call(arguments))); } } // Log functions function logError(event, prefix){ return argv.verbose ? function(err){ console.error(prefix || '','::', event, err); } : function(){}; } function logInfo(){ var args = Array.prototype.slice.call(arguments); return argv.verbose ? function(){console.info.apply(null, args);} : function(){}; } // Import/Export function function importQueue(conn, exchange, streamController){ var count = 0 , stream = null; streamController.open(startImport); function startImport(_stream){ stream = _stream; process.on('SIGINT', streamController.close.curry(stream, function(){ process.exit(0); })); // When no more data come from stdin or from the file stream.once('end', stopImport); Lazy(stream) .lines .map(String) .map(importMsg); } function importMsg(msg){ // N33D ES6 : {message, header, deliveryInfo} = JSON.parse(msg); var message; try{ message = JSON.parse(msg); } catch(err){ console.error("JSON PARSE ERROR", msg, err); return; } // only define existing attributes var messageOptions = {}; for (var attr in message[2]) { if (message[2].hasOwnProperty(attr)) { messageOptions[attr] = message[2][attr]; } } exchange.publish(message[2].routingKey, message[0], messageOptions); if(argv.count && ++count == argv.count){ stopImport(); } } function stopImport(){ streamController.close(stream, conn.destroy.bind(conn)); } } function exportQueue(conn, queue, streamController){ var count = 0 , stream = null; streamController.open(startExport); function startExport(_stream){ stream = _stream; process.on('SIGINT', streamController.close.curry(stream, function(){ process.exit(0); })); queue.subscribe({ack:true}, exportMsg); } function exportMsg(message, header, deliveryInfo){ if(argv.onlyBody){ stream.write(JSON.stringify(message)+"\n"); } else { stream.write(JSON.stringify([message, header, deliveryInfo])+"\n"); } queue.shift(); if(argv.count && ++count == argv.count){ stopExport(deliveryInfo.consumerTag); } } function stopExport(consumerTag){ queue.unsubscribe(consumerTag); streamController.close(stream, conn.destroy.bind(conn)); } } // Build queue options ['passive','durable', 'autoDelete'].forEach(function(o){ if(argv[o]){ queueOptions[o]= !!argv[o]; } }); // Connect AmqpDSL.login({ login:argv.user , password:argv.password , host:argv.host , vhost:argv.vhost , port:argv.port }) .on('close', logError('close','RabbitMQ connection closed')) .on('error', logError('error','RabbitMQ')) .queue(argv.queue, queueOptions, logInfo('Connected to queue "'+argv.queue+'"')) .exchange(tmpExchange, {autoDelete:true, durable:false, passive:false}) .connect(function(err, amqp){ var conn = amqp.connection , queue = amqp.queues[argv.queue] , exchange = amqp.exchanges[tmpExchange]; if(err){ throw err; return; } argv.export && logInfo(['Exporting' , ' queue "' , argv.queue , '" to "' , (argv.export === true ? 'stdout' : argv.export) , '"'].join(''))(); argv.import && logInfo(['Importing' , ' from "' , (argv.import === true ? 'stdin' : argv.import) , '" to "' , argv.queue,'"'].join(''))(); // --export argv.export && exportQueue(conn, queue, { open:function(doneFn){ if(argv.export === true){// default value: stdout doneFn(process.stdout); } else { var stream = fs.createWriteStream(argv.export); stream.once('open', doneFn.curry(stream)); } }, close:function(stream, doneFn){ if(argv.export === true){ stream.once('drain', doneFn); } else { stream.once('close', doneFn); stream.destroySoon(); } } }); // --import argv.import && importQueue(conn, exchange, { open:function(doneFn){ // Bind the queue to the temporary exchange queue.bind(exchange.name, '#'); if(argv.import === true){// default value: stdin process.stdin.setEncoding('utf8'); process.stdin.resume(); doneFn(process.stdin); } else {//from file var stream = fs.createReadStream(argv.import); stream.setEncoding('utf8'); doneFn(stream); } }, close:function(stream, doneFn){ // Unbind the queue from the temporary exchange queue.unbind(exchange.name, '#'); // Destroy the temporary exchange exchange.destroy(); if(argv.import != true){// from file stream.destroy(); stream.once('close', doneFn); } else { doneFn(); } } }); });