UNPKG

4.34 kBJavaScriptView Raw
1'use strict';
2
3Object.defineProperty(exports, '__esModule', {
4 value: true
5});
6exports.default = void 0;
7
8var _FifoQueue = _interopRequireDefault(require('./FifoQueue'));
9
10var _types = require('./types');
11
12function _interopRequireDefault(obj) {
13 return obj && obj.__esModule ? obj : {default: obj};
14}
15
16/**
17 * Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
18 *
19 * This source code is licensed under the MIT license found in the
20 * LICENSE file in the root directory of this source tree.
21 */
22class Farm {
23 _computeWorkerKey;
24 _workerSchedulingPolicy;
25 _cacheKeys = Object.create(null);
26 _locks = [];
27 _offset = 0;
28 _taskQueue;
29
30 constructor(_numOfWorkers, _callback, options = {}) {
31 this._numOfWorkers = _numOfWorkers;
32 this._callback = _callback;
33 this._computeWorkerKey = options.computeWorkerKey;
34 this._workerSchedulingPolicy =
35 options.workerSchedulingPolicy ?? 'round-robin';
36 this._taskQueue = options.taskQueue ?? new _FifoQueue.default();
37 }
38
39 doWork(method, ...args) {
40 const customMessageListeners = new Set();
41
42 const addCustomMessageListener = listener => {
43 customMessageListeners.add(listener);
44 return () => {
45 customMessageListeners.delete(listener);
46 };
47 };
48
49 const onCustomMessage = message => {
50 customMessageListeners.forEach(listener => listener(message));
51 };
52
53 const promise = new Promise( // Bind args to this function so it won't reference to the parent scope.
54 // This prevents a memory leak in v8, because otherwise the function will
55 // retain args for the closure.
56 ((args, resolve, reject) => {
57 const computeWorkerKey = this._computeWorkerKey;
58 const request = [_types.CHILD_MESSAGE_CALL, false, method, args];
59 let worker = null;
60 let hash = null;
61
62 if (computeWorkerKey) {
63 hash = computeWorkerKey.call(this, method, ...args);
64 worker = hash == null ? null : this._cacheKeys[hash];
65 }
66
67 const onStart = worker => {
68 if (hash != null) {
69 this._cacheKeys[hash] = worker;
70 }
71 };
72
73 const onEnd = (error, result) => {
74 customMessageListeners.clear();
75
76 if (error) {
77 reject(error);
78 } else {
79 resolve(result);
80 }
81 };
82
83 const task = {
84 onCustomMessage,
85 onEnd,
86 onStart,
87 request
88 };
89
90 if (worker) {
91 this._taskQueue.enqueue(task, worker.getWorkerId());
92
93 this._process(worker.getWorkerId());
94 } else {
95 this._push(task);
96 }
97 }).bind(null, args)
98 );
99 promise.UNSTABLE_onCustomMessage = addCustomMessageListener;
100 return promise;
101 }
102
103 _process(workerId) {
104 if (this._isLocked(workerId)) {
105 return this;
106 }
107
108 const task = this._taskQueue.dequeue(workerId);
109
110 if (!task) {
111 return this;
112 }
113
114 if (task.request[1]) {
115 throw new Error('Queue implementation returned processed task');
116 } // Reference the task object outside so it won't be retained by onEnd,
117 // and other properties of the task object, such as task.request can be
118 // garbage collected.
119
120 let taskOnEnd = task.onEnd;
121
122 const onEnd = (error, result) => {
123 if (taskOnEnd) {
124 taskOnEnd(error, result);
125 }
126
127 taskOnEnd = null;
128
129 this._unlock(workerId);
130
131 this._process(workerId);
132 };
133
134 task.request[1] = true;
135
136 this._lock(workerId);
137
138 this._callback(
139 workerId,
140 task.request,
141 task.onStart,
142 onEnd,
143 task.onCustomMessage
144 );
145
146 return this;
147 }
148
149 _push(task) {
150 this._taskQueue.enqueue(task);
151
152 const offset = this._getNextWorkerOffset();
153
154 for (let i = 0; i < this._numOfWorkers; i++) {
155 this._process((offset + i) % this._numOfWorkers);
156
157 if (task.request[1]) {
158 break;
159 }
160 }
161
162 return this;
163 }
164
165 _getNextWorkerOffset() {
166 switch (this._workerSchedulingPolicy) {
167 case 'in-order':
168 return 0;
169
170 case 'round-robin':
171 return this._offset++;
172 }
173 }
174
175 _lock(workerId) {
176 this._locks[workerId] = true;
177 }
178
179 _unlock(workerId) {
180 this._locks[workerId] = false;
181 }
182
183 _isLocked(workerId) {
184 return this._locks[workerId];
185 }
186}
187
188exports.default = Farm;