all files / lib/offshore/query/ transaction.js

87.32% Statements 62/71
57.58% Branches 19/33
100% Functions 18/18
87.32% Lines 62/71
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116                                                                                                            
var _ = require('lodash');
var async = require('async');
 
module.exports = function(models, cb) {
  var self = this;
  var transactionCnx = {};
  models = _.isArray(models) ? models : [models];
  var errs = [];
  models.forEach(function(model) {
    Iif(!_.isObject(model) || !_.isObject(model.adapter) || !model.adapter.connection) {
      return errs.push('Invalid model : ' + model);
    }
 
    var cnx = model.adapter.connection;
    Iif(!model.offshore.connections[cnx]._adapter.registerTransaction) {
      return errs.push(new Error('Adapter ' + model.offshore.connections[cnx]._adapter.identity + ' has no transactable interface'));
    }
 
    Eif(!transactionCnx[cnx]) {
      transactionCnx[cnx] = {collections: [], adapter: model.offshore.connections[cnx]._adapter, context: model.offshore};
    }
 
    Eif(transactionCnx[cnx].collections.indexOf(model.identity) < 0) {
      transactionCnx[cnx].collections.push(model.identity);
    }
 
  });
 
  Iif(errs.length) {
    return cb(errs[0]);
  }
  var transactionDeferred = new TransactionDeferred(transactionCnx);
  var transactionDictionary = {};
  async.eachSeries(_.keys(transactionCnx), function(cnx, cb) {
    transactionCnx[cnx].adapter.registerTransaction(cnx, transactionCnx[cnx].collections, function(err, id) {
      Iif(err) {
        return cb(err);
      }
      transactionDictionary[cnx] = id;
      transactionCnx[cnx].id = id;
      cb();
    });
  }, function(err) {
    Iif(err) {
      return transactionDeferred.rollback(err);
    }
    var transaction = {};
    _.keys(transactionCnx).forEach(function(cnx) {
      var connection = transactionCnx[cnx];
      connection.collections.forEach(function(col) {
        transaction[col] = connection.context.collections[col]._loadQuery({transaction:  transactionDictionary});
      });
    });
    cb(transaction, function(err, result) {
      if(err) {
        return transactionDeferred.rollback(err);
      }
      transactionDeferred.commit(result);
 
    });
  });
  return transactionDeferred;
};
 
var TransactionDeferred = function(connections) {
  this._connections = connections;
  this._commit = null;
  this._rollback = null;
  this._execCallback = null;
};
 
TransactionDeferred.prototype.commit = function(result) {
  var self = this;
  Iif(self._rollback) {
    throw new Error('Cannot commit when transaction has been rollbacked');
  }
  Iif(self._commit) {
    throw new Error('Transaction already commited');
  }
  this._commit = result;
 
  async.eachSeries(_.keys(self._connections), function(cnx, next) {
    self._connections[cnx].adapter.commit(self._connections[cnx].id, _.keys(self._connections[cnx].collections), next);
  }, function() {
    Eif(self._execCallback) {
      setImmediate(function() {
        self._execCallback(null, self._commit);
      });
    }
  });
};
 
TransactionDeferred.prototype.rollback = function(err) {
  var self = this;
  Iif(self._commit) {
    throw new Error('Cannot rollback when transaction has been commited');
  }
  Iif(self._rollback) {
    throw new Error('Transaction already rollbacked');
  }
  this._rollback = err;
  async.eachSeries(_.keys(self._connections), function(cnx, next) {
    self._connections[cnx].adapter.rollback(self._connections[cnx].id, _.keys(self._connections[cnx].collections), next);
  }, function() {
    Eif(self._execCallback) {
      setImmediate(function() {
        self._execCallback(self._rollback);
      });
    }
  });
};
 
TransactionDeferred.prototype.exec = function(cb) {
  this._execCallback = cb;
};