UNPKG

4.48 kBJavaScriptView Raw
1"use strict";
2var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
3 function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); }
4 return new (P || (P = Promise))(function (resolve, reject) {
5 function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
6 function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
7 function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); }
8 step((generator = generator.apply(thisArg, _arguments || [])).next());
9 });
10};
11Object.defineProperty(exports, "__esModule", { value: true });
12const iterall_1 = require("iterall");
13/**
14 * A class for digesting PubSubEngine events via the new AsyncIterator interface.
15 * This implementation is a generic version of the one located at
16 * https://github.com/apollographql/graphql-subscriptions/blob/master/src/event-emitter-to-async-iterator.ts
17 * @class
18 *
19 * @constructor
20 *
21 * @property pullQueue @type {Function[]}
22 * A queue of resolve functions waiting for an incoming event which has not yet arrived.
23 * This queue expands as next() calls are made without PubSubEngine events occurring in between.
24 *
25 * @property pushQueue @type {any[]}
26 * A queue of PubSubEngine events waiting for next() calls to be made.
27 * This queue expands as PubSubEngine events arrice without next() calls occurring in between.
28 *
29 * @property eventsArray @type {string[]}
30 * An array of PubSubEngine event names which this PubSubAsyncIterator should watch.
31 *
32 * @property allSubscribed @type {Promise<number[]>}
33 * A promise of a list of all subscription ids to the passed PubSubEngine.
34 *
35 * @property listening @type {boolean}
36 * Whether or not the PubSubAsynIterator is in listening mode (responding to incoming PubSubEngine events and next() calls).
37 * Listening begins as true and turns to false once the return method is called.
38 *
39 * @property pubsub @type {PubSubEngine}
40 * The PubSubEngine whose events will be observed.
41 */
42class PubSubAsyncIterator {
43 constructor(pubsub, eventNames) {
44 this.pubsub = pubsub;
45 this.pullQueue = [];
46 this.pushQueue = [];
47 this.listening = true;
48 this.eventsArray =
49 typeof eventNames === 'string' ? [eventNames] : eventNames;
50 this.allSubscribed = this.subscribeAll();
51 }
52 next() {
53 return __awaiter(this, void 0, void 0, function* () {
54 yield this.allSubscribed;
55 return this.listening ? this.pullValue() : this.return();
56 });
57 }
58 return() {
59 return __awaiter(this, void 0, void 0, function* () {
60 this.emptyQueue(yield this.allSubscribed);
61 return { value: undefined, done: true };
62 });
63 }
64 throw(error) {
65 return __awaiter(this, void 0, void 0, function* () {
66 this.emptyQueue(yield this.allSubscribed);
67 return Promise.reject(error);
68 });
69 }
70 [iterall_1.$$asyncIterator]() {
71 return this;
72 }
73 pushValue(event) {
74 return __awaiter(this, void 0, void 0, function* () {
75 yield this.allSubscribed;
76 if (this.pullQueue.length !== 0) {
77 this.pullQueue.shift()({ value: event, done: false });
78 }
79 else {
80 this.pushQueue.push(event);
81 }
82 });
83 }
84 pullValue() {
85 return new Promise(resolve => {
86 if (this.pushQueue.length !== 0) {
87 resolve({ value: this.pushQueue.shift(), done: false });
88 }
89 else {
90 this.pullQueue.push(resolve);
91 }
92 });
93 }
94 emptyQueue(subscriptionIds) {
95 if (this.listening) {
96 this.listening = false;
97 this.unsubscribeAll(subscriptionIds);
98 this.pullQueue.forEach(resolve => resolve({ value: undefined, done: true }));
99 this.pullQueue.length = 0;
100 this.pushQueue.length = 0;
101 }
102 }
103 subscribeAll() {
104 return Promise.all(this.eventsArray.map(eventName => this.pubsub.subscribe(eventName, this.pushValue.bind(this), {})));
105 }
106 unsubscribeAll(subscriptionIds) {
107 for (const subscriptionId of subscriptionIds) {
108 this.pubsub.unsubscribe(subscriptionId);
109 }
110 }
111}
112exports.PubSubAsyncIterator = PubSubAsyncIterator;