UNPKG

13.8 kBJavaScriptView Raw
1/**
2 * @licstart The following is the entire license notice for the
3 * JavaScript code in this page
4 *
5 * Copyright 2022 Mozilla Foundation
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 *
19 * @licend The above is the entire license notice for the
20 * JavaScript code in this page
21 */
22"use strict";
23
24Object.defineProperty(exports, "__esModule", {
25 value: true
26});
27exports.MessageHandler = void 0;
28
29var _util = require("./util.js");
30
31const CallbackKind = {
32 UNKNOWN: 0,
33 DATA: 1,
34 ERROR: 2
35};
36const StreamKind = {
37 UNKNOWN: 0,
38 CANCEL: 1,
39 CANCEL_COMPLETE: 2,
40 CLOSE: 3,
41 ENQUEUE: 4,
42 ERROR: 5,
43 PULL: 6,
44 PULL_COMPLETE: 7,
45 START_COMPLETE: 8
46};
47
48function wrapReason(reason) {
49 if (!(reason instanceof Error || typeof reason === "object" && reason !== null)) {
50 (0, _util.unreachable)('wrapReason: Expected "reason" to be a (possibly cloned) Error.');
51 }
52
53 switch (reason.name) {
54 case "AbortException":
55 return new _util.AbortException(reason.message);
56
57 case "MissingPDFException":
58 return new _util.MissingPDFException(reason.message);
59
60 case "PasswordException":
61 return new _util.PasswordException(reason.message, reason.code);
62
63 case "UnexpectedResponseException":
64 return new _util.UnexpectedResponseException(reason.message, reason.status);
65
66 case "UnknownErrorException":
67 return new _util.UnknownErrorException(reason.message, reason.details);
68
69 default:
70 return new _util.UnknownErrorException(reason.message, reason.toString());
71 }
72}
73
74class MessageHandler {
75 constructor(sourceName, targetName, comObj) {
76 this.sourceName = sourceName;
77 this.targetName = targetName;
78 this.comObj = comObj;
79 this.callbackId = 1;
80 this.streamId = 1;
81 this.streamSinks = Object.create(null);
82 this.streamControllers = Object.create(null);
83 this.callbackCapabilities = Object.create(null);
84 this.actionHandler = Object.create(null);
85
86 this._onComObjOnMessage = event => {
87 const data = event.data;
88
89 if (data.targetName !== this.sourceName) {
90 return;
91 }
92
93 if (data.stream) {
94 this._processStreamMessage(data);
95
96 return;
97 }
98
99 if (data.callback) {
100 const callbackId = data.callbackId;
101 const capability = this.callbackCapabilities[callbackId];
102
103 if (!capability) {
104 throw new Error(`Cannot resolve callback ${callbackId}`);
105 }
106
107 delete this.callbackCapabilities[callbackId];
108
109 if (data.callback === CallbackKind.DATA) {
110 capability.resolve(data.data);
111 } else if (data.callback === CallbackKind.ERROR) {
112 capability.reject(wrapReason(data.reason));
113 } else {
114 throw new Error("Unexpected callback case");
115 }
116
117 return;
118 }
119
120 const action = this.actionHandler[data.action];
121
122 if (!action) {
123 throw new Error(`Unknown action from worker: ${data.action}`);
124 }
125
126 if (data.callbackId) {
127 const cbSourceName = this.sourceName;
128 const cbTargetName = data.sourceName;
129 new Promise(function (resolve) {
130 resolve(action(data.data));
131 }).then(function (result) {
132 comObj.postMessage({
133 sourceName: cbSourceName,
134 targetName: cbTargetName,
135 callback: CallbackKind.DATA,
136 callbackId: data.callbackId,
137 data: result
138 });
139 }, function (reason) {
140 comObj.postMessage({
141 sourceName: cbSourceName,
142 targetName: cbTargetName,
143 callback: CallbackKind.ERROR,
144 callbackId: data.callbackId,
145 reason: wrapReason(reason)
146 });
147 });
148 return;
149 }
150
151 if (data.streamId) {
152 this._createStreamSink(data);
153
154 return;
155 }
156
157 action(data.data);
158 };
159
160 comObj.addEventListener("message", this._onComObjOnMessage);
161 }
162
163 on(actionName, handler) {
164 const ah = this.actionHandler;
165
166 if (ah[actionName]) {
167 throw new Error(`There is already an actionName called "${actionName}"`);
168 }
169
170 ah[actionName] = handler;
171 }
172
173 send(actionName, data, transfers) {
174 this.comObj.postMessage({
175 sourceName: this.sourceName,
176 targetName: this.targetName,
177 action: actionName,
178 data
179 }, transfers);
180 }
181
182 sendWithPromise(actionName, data, transfers) {
183 const callbackId = this.callbackId++;
184 const capability = (0, _util.createPromiseCapability)();
185 this.callbackCapabilities[callbackId] = capability;
186
187 try {
188 this.comObj.postMessage({
189 sourceName: this.sourceName,
190 targetName: this.targetName,
191 action: actionName,
192 callbackId,
193 data
194 }, transfers);
195 } catch (ex) {
196 capability.reject(ex);
197 }
198
199 return capability.promise;
200 }
201
202 sendWithStream(actionName, data, queueingStrategy, transfers) {
203 const streamId = this.streamId++,
204 sourceName = this.sourceName,
205 targetName = this.targetName,
206 comObj = this.comObj;
207 return new ReadableStream({
208 start: controller => {
209 const startCapability = (0, _util.createPromiseCapability)();
210 this.streamControllers[streamId] = {
211 controller,
212 startCall: startCapability,
213 pullCall: null,
214 cancelCall: null,
215 isClosed: false
216 };
217 comObj.postMessage({
218 sourceName,
219 targetName,
220 action: actionName,
221 streamId,
222 data,
223 desiredSize: controller.desiredSize
224 }, transfers);
225 return startCapability.promise;
226 },
227 pull: controller => {
228 const pullCapability = (0, _util.createPromiseCapability)();
229 this.streamControllers[streamId].pullCall = pullCapability;
230 comObj.postMessage({
231 sourceName,
232 targetName,
233 stream: StreamKind.PULL,
234 streamId,
235 desiredSize: controller.desiredSize
236 });
237 return pullCapability.promise;
238 },
239 cancel: reason => {
240 (0, _util.assert)(reason instanceof Error, "cancel must have a valid reason");
241 const cancelCapability = (0, _util.createPromiseCapability)();
242 this.streamControllers[streamId].cancelCall = cancelCapability;
243 this.streamControllers[streamId].isClosed = true;
244 comObj.postMessage({
245 sourceName,
246 targetName,
247 stream: StreamKind.CANCEL,
248 streamId,
249 reason: wrapReason(reason)
250 });
251 return cancelCapability.promise;
252 }
253 }, queueingStrategy);
254 }
255
256 _createStreamSink(data) {
257 const streamId = data.streamId,
258 sourceName = this.sourceName,
259 targetName = data.sourceName,
260 comObj = this.comObj;
261 const self = this,
262 action = this.actionHandler[data.action];
263 const streamSink = {
264 enqueue(chunk, size = 1, transfers) {
265 if (this.isCancelled) {
266 return;
267 }
268
269 const lastDesiredSize = this.desiredSize;
270 this.desiredSize -= size;
271
272 if (lastDesiredSize > 0 && this.desiredSize <= 0) {
273 this.sinkCapability = (0, _util.createPromiseCapability)();
274 this.ready = this.sinkCapability.promise;
275 }
276
277 comObj.postMessage({
278 sourceName,
279 targetName,
280 stream: StreamKind.ENQUEUE,
281 streamId,
282 chunk
283 }, transfers);
284 },
285
286 close() {
287 if (this.isCancelled) {
288 return;
289 }
290
291 this.isCancelled = true;
292 comObj.postMessage({
293 sourceName,
294 targetName,
295 stream: StreamKind.CLOSE,
296 streamId
297 });
298 delete self.streamSinks[streamId];
299 },
300
301 error(reason) {
302 (0, _util.assert)(reason instanceof Error, "error must have a valid reason");
303
304 if (this.isCancelled) {
305 return;
306 }
307
308 this.isCancelled = true;
309 comObj.postMessage({
310 sourceName,
311 targetName,
312 stream: StreamKind.ERROR,
313 streamId,
314 reason: wrapReason(reason)
315 });
316 },
317
318 sinkCapability: (0, _util.createPromiseCapability)(),
319 onPull: null,
320 onCancel: null,
321 isCancelled: false,
322 desiredSize: data.desiredSize,
323 ready: null
324 };
325 streamSink.sinkCapability.resolve();
326 streamSink.ready = streamSink.sinkCapability.promise;
327 this.streamSinks[streamId] = streamSink;
328 new Promise(function (resolve) {
329 resolve(action(data.data, streamSink));
330 }).then(function () {
331 comObj.postMessage({
332 sourceName,
333 targetName,
334 stream: StreamKind.START_COMPLETE,
335 streamId,
336 success: true
337 });
338 }, function (reason) {
339 comObj.postMessage({
340 sourceName,
341 targetName,
342 stream: StreamKind.START_COMPLETE,
343 streamId,
344 reason: wrapReason(reason)
345 });
346 });
347 }
348
349 _processStreamMessage(data) {
350 const streamId = data.streamId,
351 sourceName = this.sourceName,
352 targetName = data.sourceName,
353 comObj = this.comObj;
354 const streamController = this.streamControllers[streamId],
355 streamSink = this.streamSinks[streamId];
356
357 switch (data.stream) {
358 case StreamKind.START_COMPLETE:
359 if (data.success) {
360 streamController.startCall.resolve();
361 } else {
362 streamController.startCall.reject(wrapReason(data.reason));
363 }
364
365 break;
366
367 case StreamKind.PULL_COMPLETE:
368 if (data.success) {
369 streamController.pullCall.resolve();
370 } else {
371 streamController.pullCall.reject(wrapReason(data.reason));
372 }
373
374 break;
375
376 case StreamKind.PULL:
377 if (!streamSink) {
378 comObj.postMessage({
379 sourceName,
380 targetName,
381 stream: StreamKind.PULL_COMPLETE,
382 streamId,
383 success: true
384 });
385 break;
386 }
387
388 if (streamSink.desiredSize <= 0 && data.desiredSize > 0) {
389 streamSink.sinkCapability.resolve();
390 }
391
392 streamSink.desiredSize = data.desiredSize;
393 new Promise(function (resolve) {
394 resolve(streamSink.onPull && streamSink.onPull());
395 }).then(function () {
396 comObj.postMessage({
397 sourceName,
398 targetName,
399 stream: StreamKind.PULL_COMPLETE,
400 streamId,
401 success: true
402 });
403 }, function (reason) {
404 comObj.postMessage({
405 sourceName,
406 targetName,
407 stream: StreamKind.PULL_COMPLETE,
408 streamId,
409 reason: wrapReason(reason)
410 });
411 });
412 break;
413
414 case StreamKind.ENQUEUE:
415 (0, _util.assert)(streamController, "enqueue should have stream controller");
416
417 if (streamController.isClosed) {
418 break;
419 }
420
421 streamController.controller.enqueue(data.chunk);
422 break;
423
424 case StreamKind.CLOSE:
425 (0, _util.assert)(streamController, "close should have stream controller");
426
427 if (streamController.isClosed) {
428 break;
429 }
430
431 streamController.isClosed = true;
432 streamController.controller.close();
433
434 this._deleteStreamController(streamController, streamId);
435
436 break;
437
438 case StreamKind.ERROR:
439 (0, _util.assert)(streamController, "error should have stream controller");
440 streamController.controller.error(wrapReason(data.reason));
441
442 this._deleteStreamController(streamController, streamId);
443
444 break;
445
446 case StreamKind.CANCEL_COMPLETE:
447 if (data.success) {
448 streamController.cancelCall.resolve();
449 } else {
450 streamController.cancelCall.reject(wrapReason(data.reason));
451 }
452
453 this._deleteStreamController(streamController, streamId);
454
455 break;
456
457 case StreamKind.CANCEL:
458 if (!streamSink) {
459 break;
460 }
461
462 new Promise(function (resolve) {
463 resolve(streamSink.onCancel && streamSink.onCancel(wrapReason(data.reason)));
464 }).then(function () {
465 comObj.postMessage({
466 sourceName,
467 targetName,
468 stream: StreamKind.CANCEL_COMPLETE,
469 streamId,
470 success: true
471 });
472 }, function (reason) {
473 comObj.postMessage({
474 sourceName,
475 targetName,
476 stream: StreamKind.CANCEL_COMPLETE,
477 streamId,
478 reason: wrapReason(reason)
479 });
480 });
481 streamSink.sinkCapability.reject(wrapReason(data.reason));
482 streamSink.isCancelled = true;
483 delete this.streamSinks[streamId];
484 break;
485
486 default:
487 throw new Error("Unexpected stream case");
488 }
489 }
490
491 async _deleteStreamController(streamController, streamId) {
492 await Promise.allSettled([streamController.startCall && streamController.startCall.promise, streamController.pullCall && streamController.pullCall.promise, streamController.cancelCall && streamController.cancelCall.promise]);
493 delete this.streamControllers[streamId];
494 }
495
496 destroy() {
497 this.comObj.removeEventListener("message", this._onComObjOnMessage);
498 }
499
500}
501
502exports.MessageHandler = MessageHandler;
\No newline at end of file