UNPKG

4.8 kBJavaScriptView Raw
1'use strict';
2
3Object.defineProperty(exports, '__esModule', {
4 value: true
5});
6exports.default = void 0;
7
8var _FifoQueue = _interopRequireDefault(require('./FifoQueue'));
9
10var _types = require('./types');
11
12function _interopRequireDefault(obj) {
13 return obj && obj.__esModule ? obj : {default: obj};
14}
15
16function _defineProperty(obj, key, value) {
17 if (key in obj) {
18 Object.defineProperty(obj, key, {
19 value: value,
20 enumerable: true,
21 configurable: true,
22 writable: true
23 });
24 } else {
25 obj[key] = value;
26 }
27 return obj;
28}
29
30class Farm {
31 constructor(_numOfWorkers, _callback, options = {}) {
32 var _options$workerSchedu, _options$taskQueue;
33
34 this._numOfWorkers = _numOfWorkers;
35 this._callback = _callback;
36
37 _defineProperty(this, '_computeWorkerKey', void 0);
38
39 _defineProperty(this, '_workerSchedulingPolicy', void 0);
40
41 _defineProperty(this, '_cacheKeys', Object.create(null));
42
43 _defineProperty(this, '_locks', []);
44
45 _defineProperty(this, '_offset', 0);
46
47 _defineProperty(this, '_taskQueue', void 0);
48
49 this._computeWorkerKey = options.computeWorkerKey;
50 this._workerSchedulingPolicy =
51 (_options$workerSchedu = options.workerSchedulingPolicy) !== null &&
52 _options$workerSchedu !== void 0
53 ? _options$workerSchedu
54 : 'round-robin';
55 this._taskQueue =
56 (_options$taskQueue = options.taskQueue) !== null &&
57 _options$taskQueue !== void 0
58 ? _options$taskQueue
59 : new _FifoQueue.default();
60 }
61
62 doWork(method, ...args) {
63 const customMessageListeners = new Set();
64
65 const addCustomMessageListener = listener => {
66 customMessageListeners.add(listener);
67 return () => {
68 customMessageListeners.delete(listener);
69 };
70 };
71
72 const onCustomMessage = message => {
73 customMessageListeners.forEach(listener => listener(message));
74 };
75
76 const promise = new Promise( // Bind args to this function so it won't reference to the parent scope.
77 // This prevents a memory leak in v8, because otherwise the function will
78 // retaine args for the closure.
79 ((args, resolve, reject) => {
80 const computeWorkerKey = this._computeWorkerKey;
81 const request = [_types.CHILD_MESSAGE_CALL, false, method, args];
82 let worker = null;
83 let hash = null;
84
85 if (computeWorkerKey) {
86 hash = computeWorkerKey.call(this, method, ...args);
87 worker = hash == null ? null : this._cacheKeys[hash];
88 }
89
90 const onStart = worker => {
91 if (hash != null) {
92 this._cacheKeys[hash] = worker;
93 }
94 };
95
96 const onEnd = (error, result) => {
97 customMessageListeners.clear();
98
99 if (error) {
100 reject(error);
101 } else {
102 resolve(result);
103 }
104 };
105
106 const task = {
107 onCustomMessage,
108 onEnd,
109 onStart,
110 request
111 };
112
113 if (worker) {
114 this._taskQueue.enqueue(task, worker.getWorkerId());
115
116 this._process(worker.getWorkerId());
117 } else {
118 this._push(task);
119 }
120 }).bind(null, args)
121 );
122 promise.UNSTABLE_onCustomMessage = addCustomMessageListener;
123 return promise;
124 }
125
126 _process(workerId) {
127 if (this._isLocked(workerId)) {
128 return this;
129 }
130
131 const task = this._taskQueue.dequeue(workerId);
132
133 if (!task) {
134 return this;
135 }
136
137 if (task.request[1]) {
138 throw new Error('Queue implementation returned processed task');
139 } // Reference the task object outside so it won't be retained by onEnd,
140 // and other properties of the task object, such as task.request can be
141 // garbage collected.
142
143 const taskOnEnd = task.onEnd;
144
145 const onEnd = (error, result) => {
146 taskOnEnd(error, result);
147
148 this._unlock(workerId);
149
150 this._process(workerId);
151 };
152
153 task.request[1] = true;
154
155 this._lock(workerId);
156
157 this._callback(
158 workerId,
159 task.request,
160 task.onStart,
161 onEnd,
162 task.onCustomMessage
163 );
164
165 return this;
166 }
167
168 _push(task) {
169 this._taskQueue.enqueue(task);
170
171 const offset = this._getNextWorkerOffset();
172
173 for (let i = 0; i < this._numOfWorkers; i++) {
174 this._process((offset + i) % this._numOfWorkers);
175
176 if (task.request[1]) {
177 break;
178 }
179 }
180
181 return this;
182 }
183
184 _getNextWorkerOffset() {
185 switch (this._workerSchedulingPolicy) {
186 case 'in-order':
187 return 0;
188
189 case 'round-robin':
190 return this._offset++;
191 }
192 }
193
194 _lock(workerId) {
195 this._locks[workerId] = true;
196 }
197
198 _unlock(workerId) {
199 this._locks[workerId] = false;
200 }
201
202 _isLocked(workerId) {
203 return this._locks[workerId];
204 }
205}
206
207exports.default = Farm;