UNPKG

4.62 kBJavaScriptView Raw
1'use strict';
2
3Object.defineProperty(exports, '__esModule', {
4 value: true
5});
6exports.default = void 0;
7
8var _FifoQueue = _interopRequireDefault(require('./FifoQueue'));
9
10var _types = require('./types');
11
12function _interopRequireDefault(obj) {
13 return obj && obj.__esModule ? obj : {default: obj};
14}
15
16/**
17 * Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
18 *
19 * This source code is licensed under the MIT license found in the
20 * LICENSE file in the root directory of this source tree.
21 */
22class Farm {
23 _computeWorkerKey;
24 _workerSchedulingPolicy;
25 _cacheKeys = Object.create(null);
26 _locks = [];
27 _offset = 0;
28 _taskQueue;
29
30 constructor(_numOfWorkers, _callback, options = {}) {
31 var _options$workerSchedu, _options$taskQueue;
32
33 this._numOfWorkers = _numOfWorkers;
34 this._callback = _callback;
35 this._computeWorkerKey = options.computeWorkerKey;
36 this._workerSchedulingPolicy =
37 (_options$workerSchedu = options.workerSchedulingPolicy) !== null &&
38 _options$workerSchedu !== void 0
39 ? _options$workerSchedu
40 : 'round-robin';
41 this._taskQueue =
42 (_options$taskQueue = options.taskQueue) !== null &&
43 _options$taskQueue !== void 0
44 ? _options$taskQueue
45 : new _FifoQueue.default();
46 }
47
48 doWork(method, ...args) {
49 const customMessageListeners = new Set();
50
51 const addCustomMessageListener = listener => {
52 customMessageListeners.add(listener);
53 return () => {
54 customMessageListeners.delete(listener);
55 };
56 };
57
58 const onCustomMessage = message => {
59 customMessageListeners.forEach(listener => listener(message));
60 };
61
62 const promise = new Promise( // Bind args to this function so it won't reference to the parent scope.
63 // This prevents a memory leak in v8, because otherwise the function will
64 // retain args for the closure.
65 ((args, resolve, reject) => {
66 const computeWorkerKey = this._computeWorkerKey;
67 const request = [_types.CHILD_MESSAGE_CALL, false, method, args];
68 let worker = null;
69 let hash = null;
70
71 if (computeWorkerKey) {
72 hash = computeWorkerKey.call(this, method, ...args);
73 worker = hash == null ? null : this._cacheKeys[hash];
74 }
75
76 const onStart = worker => {
77 if (hash != null) {
78 this._cacheKeys[hash] = worker;
79 }
80 };
81
82 const onEnd = (error, result) => {
83 customMessageListeners.clear();
84
85 if (error) {
86 reject(error);
87 } else {
88 resolve(result);
89 }
90 };
91
92 const task = {
93 onCustomMessage,
94 onEnd,
95 onStart,
96 request
97 };
98
99 if (worker) {
100 this._taskQueue.enqueue(task, worker.getWorkerId());
101
102 this._process(worker.getWorkerId());
103 } else {
104 this._push(task);
105 }
106 }).bind(null, args)
107 );
108 promise.UNSTABLE_onCustomMessage = addCustomMessageListener;
109 return promise;
110 }
111
112 _process(workerId) {
113 if (this._isLocked(workerId)) {
114 return this;
115 }
116
117 const task = this._taskQueue.dequeue(workerId);
118
119 if (!task) {
120 return this;
121 }
122
123 if (task.request[1]) {
124 throw new Error('Queue implementation returned processed task');
125 } // Reference the task object outside so it won't be retained by onEnd,
126 // and other properties of the task object, such as task.request can be
127 // garbage collected.
128
129 let taskOnEnd = task.onEnd;
130
131 const onEnd = (error, result) => {
132 if (taskOnEnd) {
133 taskOnEnd(error, result);
134 }
135
136 taskOnEnd = null;
137
138 this._unlock(workerId);
139
140 this._process(workerId);
141 };
142
143 task.request[1] = true;
144
145 this._lock(workerId);
146
147 this._callback(
148 workerId,
149 task.request,
150 task.onStart,
151 onEnd,
152 task.onCustomMessage
153 );
154
155 return this;
156 }
157
158 _push(task) {
159 this._taskQueue.enqueue(task);
160
161 const offset = this._getNextWorkerOffset();
162
163 for (let i = 0; i < this._numOfWorkers; i++) {
164 this._process((offset + i) % this._numOfWorkers);
165
166 if (task.request[1]) {
167 break;
168 }
169 }
170
171 return this;
172 }
173
174 _getNextWorkerOffset() {
175 switch (this._workerSchedulingPolicy) {
176 case 'in-order':
177 return 0;
178
179 case 'round-robin':
180 return this._offset++;
181 }
182 }
183
184 _lock(workerId) {
185 this._locks[workerId] = true;
186 }
187
188 _unlock(workerId) {
189 this._locks[workerId] = false;
190 }
191
192 _isLocked(workerId) {
193 return this._locks[workerId];
194 }
195}
196
197exports.default = Farm;