UNPKG

5.07 kBJavaScriptView Raw
1"use strict";
2/*
3 * This source file contains the code for proxying calls in the master thread to calls in the workers
4 * by `.postMessage()`-ing.
5 *
6 * Keep in mind that this code can make or break the program's performance! Need to optimize more…
7 */
8var __importDefault = (this && this.__importDefault) || function (mod) {
9 return (mod && mod.__esModule) ? mod : { "default": mod };
10};
11Object.defineProperty(exports, "__esModule", { value: true });
12exports.createProxyModule = exports.createProxyFunction = void 0;
13const debug_1 = __importDefault(require("debug"));
14const observable_fns_1 = require("observable-fns");
15const common_1 = require("../common");
16const observable_promise_1 = require("../observable-promise");
17const transferable_1 = require("../transferable");
18const messages_1 = require("../types/messages");
19const debugMessages = debug_1.default("threads:master:messages");
20let nextJobUID = 1;
21const dedupe = (array) => Array.from(new Set(array));
22const isJobErrorMessage = (data) => data && data.type === messages_1.WorkerMessageType.error;
23const isJobResultMessage = (data) => data && data.type === messages_1.WorkerMessageType.result;
24const isJobStartMessage = (data) => data && data.type === messages_1.WorkerMessageType.running;
25function createObservableForJob(worker, jobUID) {
26 return new observable_fns_1.Observable(observer => {
27 let asyncType;
28 const messageHandler = ((event) => {
29 debugMessages("Message from worker:", event.data);
30 if (!event.data || event.data.uid !== jobUID)
31 return;
32 if (isJobStartMessage(event.data)) {
33 asyncType = event.data.resultType;
34 }
35 else if (isJobResultMessage(event.data)) {
36 if (asyncType === "promise") {
37 if (typeof event.data.payload !== "undefined") {
38 observer.next(common_1.deserialize(event.data.payload));
39 }
40 observer.complete();
41 worker.removeEventListener("message", messageHandler);
42 }
43 else {
44 if (event.data.payload) {
45 observer.next(common_1.deserialize(event.data.payload));
46 }
47 if (event.data.complete) {
48 observer.complete();
49 worker.removeEventListener("message", messageHandler);
50 }
51 }
52 }
53 else if (isJobErrorMessage(event.data)) {
54 const error = common_1.deserialize(event.data.error);
55 if (asyncType === "promise" || !asyncType) {
56 observer.error(error);
57 }
58 else {
59 observer.error(error);
60 }
61 worker.removeEventListener("message", messageHandler);
62 }
63 });
64 worker.addEventListener("message", messageHandler);
65 return () => {
66 if (asyncType === "observable" || !asyncType) {
67 const cancelMessage = {
68 type: messages_1.MasterMessageType.cancel,
69 uid: jobUID
70 };
71 worker.postMessage(cancelMessage);
72 }
73 worker.removeEventListener("message", messageHandler);
74 };
75 });
76}
77function prepareArguments(rawArgs) {
78 if (rawArgs.length === 0) {
79 // Exit early if possible
80 return {
81 args: [],
82 transferables: []
83 };
84 }
85 const args = [];
86 const transferables = [];
87 for (const arg of rawArgs) {
88 if (transferable_1.isTransferDescriptor(arg)) {
89 args.push(common_1.serialize(arg.send));
90 transferables.push(...arg.transferables);
91 }
92 else {
93 args.push(common_1.serialize(arg));
94 }
95 }
96 return {
97 args,
98 transferables: transferables.length === 0 ? transferables : dedupe(transferables)
99 };
100}
101function createProxyFunction(worker, method) {
102 return ((...rawArgs) => {
103 const uid = nextJobUID++;
104 const { args, transferables } = prepareArguments(rawArgs);
105 const runMessage = {
106 type: messages_1.MasterMessageType.run,
107 uid,
108 method,
109 args
110 };
111 debugMessages("Sending command to run function to worker:", runMessage);
112 try {
113 worker.postMessage(runMessage, transferables);
114 }
115 catch (error) {
116 return observable_promise_1.ObservablePromise.from(Promise.reject(error));
117 }
118 return observable_promise_1.ObservablePromise.from(observable_fns_1.multicast(createObservableForJob(worker, uid)));
119 });
120}
121exports.createProxyFunction = createProxyFunction;
122function createProxyModule(worker, methodNames) {
123 const proxy = {};
124 for (const methodName of methodNames) {
125 proxy[methodName] = createProxyFunction(worker, methodName);
126 }
127 return proxy;
128}
129exports.createProxyModule = createProxyModule;