UNPKG

2.43 kBJavaScriptView Raw
1'use strict';
2
3var EventEmitter = require('events').EventEmitter;
4var request = require('request');
5var inherits = require('inherits');
6var distributeProbabilities = require('./lib/distribute-flow-probabilities');
7var debug = require('debug')('flowbench:flow');
8
9var Flow = require('./flow');
10var Stats = require('./stats');
11
12module.exports = Experiment;
13
14function Experiment(options) {
15 if (! (this instanceof Experiment)) {
16 return new Experiment(options);
17 }
18
19 options.request = request.defaults(options.requestDefaults);
20 this.options = options;
21 this.stats = Stats(this);
22 this.flows = [];
23 this._running = 0;
24 this._done = 0;
25};
26
27inherits(Experiment, EventEmitter);
28
29
30var E = Experiment.prototype;
31
32E.prepare = function prepare() {
33 distributeProbabilities(this.flows);
34 this.flows.forEach(function(flow) {
35 flow.prepare();
36 });
37};
38
39E.push = function push(fn) {
40 this.flows.push(fn);
41};
42
43E.flow = function flow(options) {
44 var flow = Flow(this, options, this)
45 this.push(flow);
46 return flow;
47}
48
49E.one = function(cb) {
50 var random = Math.random();
51 var sum = 0;
52 var flow;
53 var idx = 0;
54 while(sum < random && idx < this.flows.length) {
55 flow = this.flows[idx];
56 if (flow) {
57 sum += flow.options.probability;
58 }
59 idx ++;
60 }
61 if (! flow) {
62 throw new Error('No flow to select');
63 }
64 flow(cb);
65};
66
67E.launchSome = function() {
68 var self = this;
69 var left = this.options.population - this._done - this._running;
70 left = Math.min(left, this.options.maxConcurrentFlows);
71
72 if (! left) {
73 this.emit('end');
74 } else {
75 for(var i = 0 ; i < left ; i ++) {
76 this._running ++;
77 this.one(callback);
78 }
79 }
80
81
82 function callback(err) {
83 self._running --;
84 self._done ++;
85 if (err) {
86 self.emit('error', err);
87 }
88 self.launchSome();
89 }
90};
91
92E.begin = function(cb) {
93 var self = this;
94
95 if (cb) {
96 var calledback = false;
97 this.once('error', function(err) {
98 if (! calledback) {
99 calledback = true;
100 cb(err);
101 }
102 })
103 this.once('end', function() {
104 if (! calledback) {
105 calledback = true;
106 var waitFor = Number(process.env.WAIT_BEFORE_STATS_MS) || 5e3;
107 setTimeout(function() {
108 cb(null, self.stats.toJSON());
109 }, waitFor);
110 }
111 });
112 }
113
114 debug('beginning experiment, have %d tasks in pipeline', this.flows.length);
115
116 this.prepare();
117 this.launchSome();
118};
\No newline at end of file