1 | ;
|
2 |
|
3 | Object.defineProperty(exports, "__esModule", {
|
4 | value: true
|
5 | });
|
6 | exports.default = exports.QueueClosedError = void 0;
|
7 |
|
8 | var _autoBind = _interopRequireDefault(require("auto-bind"));
|
9 |
|
10 | var _Module = _interopRequireDefault(require("./Module"));
|
11 |
|
12 | function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
|
13 |
|
14 | function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; }
|
15 |
|
16 | const {
|
17 | MODULE_NAME,
|
18 | log,
|
19 | warn,
|
20 | error,
|
21 | noteGauge,
|
22 | noteCount,
|
23 | trackOp
|
24 | } = new _Module.default(__filename); // eslint-disable-line no-unused-vars
|
25 |
|
26 | const DEFAULT_MAX_QUEUE_SIZE = -1;
|
27 |
|
28 | class QueueClosedError extends Error {
|
29 | constructor(queueName) {
|
30 | super(queueName ? `Queue '${queueName}' is closed` : 'Queue closed');
|
31 | }
|
32 |
|
33 | }
|
34 |
|
35 | exports.QueueClosedError = QueueClosedError;
|
36 |
|
37 | class AsyncQueue {
|
38 | constructor(config) {
|
39 | _defineProperty(this, "name", void 0);
|
40 |
|
41 | _defineProperty(this, "_items", void 0);
|
42 |
|
43 | _defineProperty(this, "_closed", void 0);
|
44 |
|
45 | _defineProperty(this, "_maxSize", void 0);
|
46 |
|
47 | _defineProperty(this, "_enqueueWaitingList", void 0);
|
48 |
|
49 | _defineProperty(this, "_dequeueWaitingList", void 0);
|
50 |
|
51 | _defineProperty(this, "_closeWaitingList", void 0);
|
52 |
|
53 | _defineProperty(this, "getLength", () => {
|
54 | return this._items.length;
|
55 | });
|
56 |
|
57 | // config
|
58 | config = config || {
|
59 | name: undefined,
|
60 | maxSize: undefined
|
61 | };
|
62 | this.name = config.name;
|
63 | this._maxSize = config.maxSize || DEFAULT_MAX_QUEUE_SIZE; // state
|
64 |
|
65 | this._items = [];
|
66 | this._closed = false;
|
67 | this._enqueueWaitingList = [];
|
68 | this._dequeueWaitingList = [];
|
69 | this._closeWaitingList = [];
|
70 | (0, _autoBind.default)(this);
|
71 | }
|
72 |
|
73 | async enqueue(newItem) {
|
74 | if (this._closed) {
|
75 | throw new Error(`Queue is closed`);
|
76 | } // wait for space to enqueue
|
77 |
|
78 |
|
79 | if (this._maxSize > 0 && this._items.length >= this._maxSize) {
|
80 | await new Promise((res, rej) => this._enqueueWaitingList.push({
|
81 | res,
|
82 | rej
|
83 | }));
|
84 | } //log(`Enqueued '${newItem}' to queue '${this.name}'`)
|
85 | // release oldest waiting dequeue
|
86 |
|
87 |
|
88 | if (this._dequeueWaitingList.length) {
|
89 | this._dequeueWaitingList.shift().res(newItem);
|
90 | } else {
|
91 | this._items.push(newItem);
|
92 | }
|
93 |
|
94 | this.name && noteCount(`${this.name}.enqueue`, 1);
|
95 | this.name && noteGauge(`${this.name}.items`, this._items.length);
|
96 | }
|
97 |
|
98 | async dequeue() {
|
99 | let item;
|
100 |
|
101 | if (this._items.length === 0) {
|
102 | // if there are NO queued items
|
103 | if (this._closed) {
|
104 | throw new QueueClosedError(this.name);
|
105 | }
|
106 |
|
107 | item = await new Promise((res, rej) => this._dequeueWaitingList.push({
|
108 | res,
|
109 | rej
|
110 | }));
|
111 | } else {
|
112 | // if there are queued items
|
113 | item = this._items.shift(); // release oldest waiting enqueue
|
114 |
|
115 | if (this._enqueueWaitingList.length) {
|
116 | this._enqueueWaitingList.shift().res();
|
117 | }
|
118 | }
|
119 |
|
120 | this.name && noteCount(`${this.name}.dequeue`, 1);
|
121 | this.name && noteGauge(`${this.name}.items`, this._items.length); //log(`Dequeued '${item}' from queue '${this.name}'`)
|
122 | // release all waiting closes
|
123 |
|
124 | if (this._items.length === 0) {
|
125 | this._closeWaitingList.forEach(cbs => cbs.res());
|
126 | }
|
127 |
|
128 | return item;
|
129 | }
|
130 |
|
131 | dequeueAllAvailable() {
|
132 | const items = this._items.splice(0);
|
133 |
|
134 | this.name && noteCount(`${this.name}.dequeue`, items.length);
|
135 | this.name && noteGauge(`${this.name}.items`, this._items.length);
|
136 |
|
137 | if (this._enqueueWaitingList.length > 0) {
|
138 | const enqueuesToRelease = Math.min(this._enqueueWaitingList.length, this._maxSize);
|
139 |
|
140 | for (let index = 0; index < enqueuesToRelease; index++) {
|
141 | this._enqueueWaitingList.shift().res();
|
142 | }
|
143 | } // release all waiting closes
|
144 |
|
145 |
|
146 | this._closeWaitingList.forEach(cbs => cbs.res());
|
147 |
|
148 | return items;
|
149 | }
|
150 |
|
151 | async close() {
|
152 | this._closed = true; // fail all waiting enqueued
|
153 |
|
154 | this._enqueueWaitingList.forEach(cbs => cbs.rej(new Error('Queue closed'))); // wait for all existing items to be dequeued
|
155 |
|
156 |
|
157 | if (this._items.length > 0) {
|
158 | await new Promise((res, rej) => this._closeWaitingList.push({
|
159 | res,
|
160 | rej
|
161 | }));
|
162 | } // release all remaining waiting dequeues
|
163 |
|
164 |
|
165 | this._dequeueWaitingList.forEach(cbs => cbs.rej(new QueueClosedError(this.name)));
|
166 | }
|
167 |
|
168 | async enqueueRange(newItems) {
|
169 | for (const newItem of newItems) {
|
170 | await this.enqueue(newItem);
|
171 | }
|
172 | }
|
173 |
|
174 | async dequeueRange(count) {
|
175 | const results = [];
|
176 |
|
177 | while (results.length < count) {
|
178 | try {
|
179 | const item = await this.dequeue();
|
180 | results.push(item);
|
181 | } catch (err) {
|
182 | if (err instanceof QueueClosedError) break;
|
183 | throw err;
|
184 | }
|
185 | }
|
186 |
|
187 | return results;
|
188 | }
|
189 |
|
190 | }
|
191 |
|
192 | exports.default = AsyncQueue;
|
193 | //# sourceMappingURL=data:application/json;charset=utf-8;base64,{"version":3,"sources":["../src/AsyncQueue.js"],"names":["MODULE_NAME","log","warn","error","noteGauge","noteCount","trackOp","Module","__filename","DEFAULT_MAX_QUEUE_SIZE","QueueClosedError","Error","constructor","queueName","AsyncQueue","config","_items","length","name","undefined","maxSize","_maxSize","_closed","_enqueueWaitingList","_dequeueWaitingList","_closeWaitingList","enqueue","newItem","Promise","res","rej","push","shift","dequeue","item","forEach","cbs","dequeueAllAvailable","items","splice","enqueuesToRelease","Math","min","index","close","enqueueRange","newItems","dequeueRange","count","results","err"],"mappings":";;;;;;;AAEA;;AAEA;;;;;;AAEA,MAAM;AAAEA,EAAAA,WAAF;AAAeC,EAAAA,GAAf;AAAoBC,EAAAA,IAApB;AAA0BC,EAAAA,KAA1B;AAAiCC,EAAAA,SAAjC;AAA4CC,EAAAA,SAA5C;AAAuDC,EAAAA;AAAvD,IAAmE,IAAIC,eAAJ,CAAWC,UAAX,CAAzE,C,CAAgG;;AAGhG,MAAMC,sBAAsB,GAAG,CAAC,CAAhC;;AAOO,MAAMC,gBAAN,SAA+BC,KAA/B,CAAqC;AAC1CC,EAAAA,WAAW,CAACC,SAAD,EAAqB;AAC9B,UAAMA,SAAS,GAAI,UAASA,SAAU,aAAvB,GAAsC,cAArD;AACD;;AAHyC;;;;AAM7B,MAAMC,UAAN,CAAoB;AAajCF,EAAAA,WAAW,CAACG,MAAD,EAAuB;AAAA;;AAAA;;AAAA;;AAAA;;AAAA;;AAAA;;AAAA;;AAAA,uCAmItB,MAAc;AACxB,aAAO,KAAKC,MAAL,CAAYC,MAAnB;AACD,KArIiC;;AAEhC;AACAF,IAAAA,MAAM,GAAGA,MAAM,IAAI;AAAEG,MAAAA,IAAI,EAAEC,SAAR;AAAmBC,MAAAA,OAAO,EAAED;AAA5B,KAAnB;AACA,SAAKD,IAAL,GAAYH,MAAM,CAACG,IAAnB;AACA,SAAKG,QAAL,GAAgBN,MAAM,CAACK,OAAP,IAAkBX,sBAAlC,CALgC,CAOhC;;AACA,SAAKO,MAAL,GAAc,EAAd;AACA,SAAKM,OAAL,GAAe,KAAf;AAEA,SAAKC,mBAAL,GAA2B,EAA3B;AACA,SAAKC,mBAAL,GAA2B,EAA3B;AACA,SAAKC,iBAAL,GAAyB,EAAzB;AAEA,2BAAS,IAAT;AACD;;AAGD,QAAMC,OAAN,CAAcC,OAAd,EAAyC;AAEvC,QAAI,KAAKL,OAAT,EAAkB;AAChB,YAAM,IAAIX,KAAJ,CAAW,iBAAX,CAAN;AACD,KAJsC,CAMvC;;;AACA,QAAI,KAAKU,QAAL,GAAgB,CAAhB,IAAqB,KAAKL,MAAL,CAAYC,MAAZ,IAAsB,KAAKI,QAApD,EAA8D;AAC5D,YAAM,IAAIO,OAAJ,CAAY,CAACC,GAAD,EAAMC,GAAN,KAAc,KAAKP,mBAAL,CAAyBQ,IAAzB,CAA8B;AAAEF,QAAAA,GAAF;AAAOC,QAAAA;AAAP,OAA9B,CAA1B,CAAN;AACD,KATsC,CAWvC;AAEA;;;AACA,QAAI,KAAKN,mBAAL,CAAyBP,MAA7B,EAAqC;AACnC,WAAKO,mBAAL,CAAyBQ,KAAzB,GAAiCH,GAAjC,CAAqCF,OAArC;AACD,KAFD,MAEO;AACL,WAAKX,MAAL,CAAYe,IAAZ,CAAiBJ,OAAjB;AACD;;AAED,SAAKT,IAAL,IAAab,SAAS,CAAE,GAAE,KAAKa,IAAK,UAAd,EAAyB,CAAzB,CAAtB;AACA,SAAKA,IAAL,IAAad,SAAS,CAAE,GAAE,KAAKc,IAAK,QAAd,EAAuB,KAAKF,MAAL,CAAYC,MAAnC,CAAtB;AACD;;AAED,QAAMgB,OAAN,GAA4B;AAC1B,QAAIC,IAAJ;;AACA,QAAI,KAAKlB,MAAL,CAAYC,MAAZ,KAAuB,CAA3B,EAA8B;AAAE;AAE9B,UAAI,KAAKK,OAAT,EAAkB;AAChB,cAAM,IAAIZ,gBAAJ,CAAqB,KAAKQ,IAA1B,CAAN;AACD;;AACDgB,MAAAA,IAAI,GAAG,MAAM,IAAIN,OAAJ,CAAY,CAACC,GAAD,EAAMC,GAAN,KAAc,KAAKN,mBAAL,CAAyBO,IAAzB,CAA8B;AAAEF,QAAAA,GAAF;AAAOC,QAAAA;AAAP,OAA9B,CAA1B,CAAb;AAED,KAPD,MAOO;AAAE;AAEPI,MAAAA,IAAI,GAAG,KAAKlB,MAAL,CAAYgB,KAAZ,EAAP,CAFK,CAIL;;AACA,UAAI,KAAKT,mBAAL,CAAyBN,MAA7B,EAAqC;AACnC,aAAKM,mBAAL,CAAyBS,KAAzB,GAAiCH,GAAjC;AACD;AAEF;;AAED,SAAKX,IAAL,IAAab,SAAS,CAAE,GAAE,KAAKa,IAAK,UAAd,EAAyB,CAAzB,CAAtB;AACA,SAAKA,IAAL,IAAad,SAAS,CAAE,GAAE,KAAKc,IAAK,QAAd,EAAuB,KAAKF,MAAL,CAAYC,MAAnC,CAAtB,CArB0B,CAuB1B;AAEA;;AACA,QAAI,KAAKD,MAAL,CAAYC,MAAZ,KAAuB,CAA3B,EAA8B;AAC5B,WAAKQ,iBAAL,CAAuBU,OAAvB,CAA+BC,GAAG,IAAIA,GAAG,CAACP,GAAJ,EAAtC;AACD;;AAED,WAAOK,IAAP;AACD;;AAEDG,EAAAA,mBAAmB,GAAa;AAC9B,UAAMC,KAAK,GAAG,KAAKtB,MAAL,CAAYuB,MAAZ,CAAmB,CAAnB,CAAd;;AAEA,SAAKrB,IAAL,IAAab,SAAS,CAAE,GAAE,KAAKa,IAAK,UAAd,EAAyBoB,KAAK,CAACrB,MAA/B,CAAtB;AACA,SAAKC,IAAL,IAAad,SAAS,CAAE,GAAE,KAAKc,IAAK,QAAd,EAAuB,KAAKF,MAAL,CAAYC,MAAnC,CAAtB;;AAEA,QAAI,KAAKM,mBAAL,CAAyBN,MAAzB,GAAkC,CAAtC,EAAyC;AACvC,YAAMuB,iBAAiB,GAAGC,IAAI,CAACC,GAAL,CAAS,KAAKnB,mBAAL,CAAyBN,MAAlC,EAA0C,KAAKI,QAA/C,CAA1B;;AACA,WAAK,IAAIsB,KAAK,GAAG,CAAjB,EAAoBA,KAAK,GAAGH,iBAA5B,EAA+CG,KAAK,EAApD,EAAwD;AACtD,aAAKpB,mBAAL,CAAyBS,KAAzB,GAAiCH,GAAjC;AACD;AACF,KAX6B,CAa9B;;;AACA,SAAKJ,iBAAL,CAAuBU,OAAvB,CAA+BC,GAAG,IAAIA,GAAG,CAACP,GAAJ,EAAtC;;AAEA,WAAOS,KAAP;AACD;;AAED,QAAMM,KAAN,GAA6B;AAC3B,SAAKtB,OAAL,GAAe,IAAf,CAD2B,CAG3B;;AACA,SAAKC,mBAAL,CAAyBY,OAAzB,CAAiCC,GAAG,IAAIA,GAAG,CAACN,GAAJ,CAAQ,IAAInB,KAAJ,CAAU,cAAV,CAAR,CAAxC,EAJ2B,CAM3B;;;AACA,QAAI,KAAKK,MAAL,CAAYC,MAAZ,GAAqB,CAAzB,EAA4B;AAC1B,YAAM,IAAIW,OAAJ,CAAY,CAACC,GAAD,EAAMC,GAAN,KAAe,KAAKL,iBAAL,CAAuBM,IAAvB,CAA4B;AAAEF,QAAAA,GAAF;AAAOC,QAAAA;AAAP,OAA5B,CAA3B,CAAN;AACD,KAT0B,CAW3B;;;AACA,SAAKN,mBAAL,CAAyBW,OAAzB,CAAiCC,GAAG,IAAIA,GAAG,CAACN,GAAJ,CAAQ,IAAIpB,gBAAJ,CAAqB,KAAKQ,IAA1B,CAAR,CAAxC;AACD;;AAED,QAAM2B,YAAN,CAAoBC,QAApB,EAAwC;AACtC,SAAK,MAAMnB,OAAX,IAAsBmB,QAAtB,EAAgC;AAC9B,YAAM,KAAKpB,OAAL,CAAaC,OAAb,CAAN;AACD;AACF;;AAED,QAAMoB,YAAN,CAAmBC,KAAnB,EAAqD;AACnD,UAAMC,OAAO,GAAG,EAAhB;;AACA,WAAOA,OAAO,CAAChC,MAAR,GAAiB+B,KAAxB,EAA+B;AAC7B,UAAI;AACF,cAAMd,IAAI,GAAG,MAAM,KAAKD,OAAL,EAAnB;AACAgB,QAAAA,OAAO,CAAClB,IAAR,CAAaG,IAAb;AACD,OAHD,CAGE,OAAOgB,GAAP,EAAY;AACZ,YAAIA,GAAG,YAAYxC,gBAAnB,EACE;AACF,cAAMwC,GAAN;AACD;AACF;;AACD,WAAOD,OAAP;AACD;;AA9IgC","sourcesContent":["//@flow\n\nimport autoBind from 'auto-bind'\n\nimport Module from './Module'\n\nconst { MODULE_NAME, log, warn, error, noteGauge, noteCount, trackOp } = new Module(__filename) // eslint-disable-line no-unused-vars\n\n\nconst DEFAULT_MAX_QUEUE_SIZE = -1\n\ntype QueueConfig = {|\n  name?: ?string,\n  maxSize?: ?number,\n|}\n\nexport class QueueClosedError extends Error {\n  constructor(queueName: ?string) {\n    super(queueName ? `Queue '${queueName}' is closed` : 'Queue closed')\n  }\n}\n\nexport default class AsyncQueue<T> {\n  \n  name: ?string\n\n  _items: Array<T>\n  _closed: boolean\n  _maxSize: number\n\n  _enqueueWaitingList: Array<{ res: () => void, rej: (err: Error) => void }>\n  _dequeueWaitingList: Array<{ res: (T) => void, rej: (err: Error) => void }>\n  _closeWaitingList: Array<{ res: () => void, rej: (err: Error) => void }>\n\n\n  constructor(config?: QueueConfig) {\n\n    // config\n    config = config || { name: undefined, maxSize: undefined }\n    this.name = config.name\n    this._maxSize = config.maxSize || DEFAULT_MAX_QUEUE_SIZE\n\n    // state\n    this._items = []\n    this._closed = false\n\n    this._enqueueWaitingList = []\n    this._dequeueWaitingList = []\n    this._closeWaitingList = []\n\n    autoBind(this)\n  }\n\n\n  async enqueue(newItem: T): Promise<void> {\n\n    if (this._closed) {\n      throw new Error(`Queue is closed`)\n    }\n\n    // wait for space to enqueue\n    if (this._maxSize > 0 && this._items.length >= this._maxSize) {\n      await new Promise((res, rej) => this._enqueueWaitingList.push({ res, rej }))  \n    }\n\n    //log(`Enqueued '${newItem}' to queue '${this.name}'`)    \n    \n    // release oldest waiting dequeue\n    if (this._dequeueWaitingList.length) {\n      this._dequeueWaitingList.shift().res(newItem)\n    } else {\n      this._items.push(newItem)\n    }\n\n    this.name && noteCount(`${this.name}.enqueue`, 1)\n    this.name && noteGauge(`${this.name}.items`, this._items.length)\n  }\n\n  async dequeue(): Promise<T> {\n    let item: T\n    if (this._items.length === 0) { // if there are NO queued items\n     \n      if (this._closed) {\n        throw new QueueClosedError(this.name)\n      }\n      item = await new Promise((res, rej) => this._dequeueWaitingList.push({ res, rej }))  \n      \n    } else { // if there are queued items\n      \n      item = this._items.shift() \n\n      // release oldest waiting enqueue\n      if (this._enqueueWaitingList.length) {\n        this._enqueueWaitingList.shift().res()\n      }  \n\n    }\n\n    this.name && noteCount(`${this.name}.dequeue`, 1)\n    this.name && noteGauge(`${this.name}.items`, this._items.length)\n\n    //log(`Dequeued '${item}' from queue '${this.name}'`)\n\n    // release all waiting closes\n    if (this._items.length === 0) {\n      this._closeWaitingList.forEach(cbs => cbs.res())\n    }\n\n    return item   \n  }\n\n  dequeueAllAvailable(): Array<T> {\n    const items = this._items.splice(0)\n\n    this.name && noteCount(`${this.name}.dequeue`, items.length)\n    this.name && noteGauge(`${this.name}.items`, this._items.length)\n\n    if (this._enqueueWaitingList.length > 0) {\n      const enqueuesToRelease = Math.min(this._enqueueWaitingList.length, this._maxSize)\n      for (let index = 0; index < enqueuesToRelease; index++) {\n        this._enqueueWaitingList.shift().res()\n      }\n    }\n\n    // release all waiting closes\n    this._closeWaitingList.forEach(cbs => cbs.res())\n\n    return items\n  }\n\n  async close(): Promise<void> {\n    this._closed = true\n\n    // fail all waiting enqueued\n    this._enqueueWaitingList.forEach(cbs => cbs.rej(new Error('Queue closed')))\n\n    // wait for all existing items to be dequeued\n    if (this._items.length > 0) {\n      await new Promise((res, rej)  => this._closeWaitingList.push({ res, rej }))  \n    }\n\n    // release all remaining waiting dequeues\n    this._dequeueWaitingList.forEach(cbs => cbs.rej(new QueueClosedError(this.name)))\n  }\n\n  async enqueueRange (newItems: Array<T>) {\n    for (const newItem of newItems) {\n      await this.enqueue(newItem)\n    }\n  }\n\n  async dequeueRange(count: number): Promise<Array<T>> {\n    const results = []\n    while (results.length < count) {\n      try {\n        const item = await this.dequeue()\n        results.push(item)\n      } catch (err) {\n        if (err instanceof QueueClosedError)\n          break\n        throw err\n      }\n    }\n    return results\n  }\n\n  getLength = (): number => {\n    return this._items.length\n  }\n}\n"]} |
\ | No newline at end of file |