all files / lib/offshore/utils/ stream.js

61.76% Statements 21/34
31.25% Branches 5/16
80% Functions 4/5
72.41% Lines 21/29
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80                                                                                                                     
/**
 * Streams
 *
 * A Streaming API with support for Transformations
 */
 
var util = require('util');
var Stream = require('stream');
var Transformations = require('./transformations');
var _ = require('lodash');
 
var ModelStream = module.exports = function(transformation) {
 
  // Use specified, or otherwise default, JSON transformation
  this.transformation = transformation || Transformations.json;
 
  // Reset write index
  this.index = 0;
 
  // Make stream writable
  this.writable = true;
};
 
util.inherits(ModelStream, Stream);
 
/**
 * Write to stream
 *
 * Extracts args to write and emits them as data events
 *
 * @param {Object} model
 * @param {Function} cb
 */
 
ModelStream.prototype.write = function(model, cb) {
  var self = this;
 
  // Run transformation on this item
  this.transformation.write(model, this.index, function writeToStream(err, transformedModel) {
 
    // Increment index for next time
    self.index++;
 
    // Write transformed model to stream
    self.emit('data', _.clone(transformedModel));
 
    // Inform that we're finished
    Iif (cb) return cb(err);
  });
 
};
 
/**
 * End Stream
 */
 
ModelStream.prototype.end = function(err, cb) {
  var self = this;
 
  Eif (err) {
    this.emit('error', err.message);
    Iif (cb) return cb(err);
    return;
  }
 
  this.transformation.end(function(err, suffix) {
 
    if (err) {
      self.emit('error', err);
      if (cb) return cb(err);
      return;
    }
 
    // Emit suffix if specified
    if (suffix) self.emit('data', suffix);
    self.emit('end');
    if (cb) return cb();
  });
};