UNPKG

2.31 kBJavaScriptView Raw
1'use strict';
2
3var EventEmitter = require('events').EventEmitter;
4var request = require('request');
5var inherits = require('inherits');
6var distributeProbabilities = require('./lib/distribute-flow-probabilities');
7var debug = require('debug')('flowbench:flow');
8
9var Flow = require('./flow');
10var Stats = require('./stats');
11
12module.exports = Experiment;
13
14function Experiment(options) {
15 if (! (this instanceof Experiment)) {
16 return new Experiment(options);
17 }
18
19 options.request = request.defaults(options.requestDefaults);
20 this.options = options;
21 this.stats = Stats(this);
22 this.flows = [];
23 this._running = 0;
24 this._done = 0;
25};
26
27inherits(Experiment, EventEmitter);
28
29
30var E = Experiment.prototype;
31
32E.push = function push(fn) {
33 this.flows.push(fn);
34};
35
36E.flow = function flow(options) {
37 var flow = Flow(this, options, this)
38 this.push(flow);
39 return flow;
40}
41
42E.one = function(cb) {
43 var random = Math.random();
44 var sum = 0;
45 var flow;
46 var idx = 0;
47 while(sum < random && idx < this.flows.length) {
48 flow = this.flows[idx];
49 if (flow) {
50 sum += flow.options.probability;
51 }
52 idx ++;
53 }
54 if (! flow) {
55 throw new Error('No flow to select');
56 }
57 flow(cb);
58};
59
60E.launchSome = function() {
61 var self = this;
62 var left = this.options.population - this._done - this._running;
63 left = Math.min(left, this.options.maxConcurrentFlows);
64
65 if (! left) {
66 this.emit('end');
67 } else {
68 for(var i = 0 ; i < left ; i ++) {
69 this._running ++;
70 this.one(callback);
71 }
72 }
73
74
75 function callback(err) {
76 self._running --;
77 self._done ++;
78 if (err) {
79 self.emit('error', err);
80 }
81 self.launchSome();
82 }
83};
84
85E.begin = function(cb) {
86 var self = this;
87
88 if (cb) {
89 var calledback = false;
90 this.once('error', function(err) {
91 if (! calledback) {
92 calledback = true;
93 cb(err);
94 }
95 })
96 this.once('end', function() {
97 if (! calledback) {
98 calledback = true;
99 var waitFor = Number(process.env.WAIT_BEFORE_STATS_MS) || 5e3;
100 setTimeout(function() {
101 cb(null, self.stats.toJSON());
102 }, waitFor);
103 }
104 });
105 }
106
107 debug('beginning experiment, have %d tasks in pipeline', this.flows.length);
108
109 distributeProbabilities(this.flows);
110 this.launchSome();
111};
\No newline at end of file