UNPKG

17.3 kBJavaScriptView Raw
1"use strict";
2
3Object.defineProperty(exports, "__esModule", {
4 value: true
5});
6exports.default = exports.QueueClosedError = void 0;
7
8var _autoBind = _interopRequireDefault(require("auto-bind"));
9
10var _Module = _interopRequireDefault(require("./Module"));
11
12function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
13
14function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; }
15
16const {
17 MODULE_NAME,
18 log,
19 warn,
20 error,
21 noteGauge,
22 noteCount,
23 trackOp
24} = new _Module.default(__filename); // eslint-disable-line no-unused-vars
25
26const DEFAULT_MAX_QUEUE_SIZE = -1;
27
28class QueueClosedError extends Error {
29 constructor(queueName) {
30 super(queueName ? `Queue '${queueName}' is closed` : 'Queue closed');
31 }
32
33}
34
35exports.QueueClosedError = QueueClosedError;
36
37class AsyncQueue {
38 constructor(config) {
39 _defineProperty(this, "name", void 0);
40
41 _defineProperty(this, "_items", void 0);
42
43 _defineProperty(this, "_closed", void 0);
44
45 _defineProperty(this, "_maxSize", void 0);
46
47 _defineProperty(this, "_enqueueWaitingList", void 0);
48
49 _defineProperty(this, "_dequeueWaitingList", void 0);
50
51 _defineProperty(this, "_closeWaitingList", void 0);
52
53 _defineProperty(this, "getLength", () => {
54 return this._items.length;
55 });
56
57 // config
58 config = config || {
59 name: undefined,
60 maxSize: undefined
61 };
62 this.name = config.name;
63 this._maxSize = config.maxSize || DEFAULT_MAX_QUEUE_SIZE; // state
64
65 this._items = [];
66 this._closed = false;
67 this._enqueueWaitingList = [];
68 this._dequeueWaitingList = [];
69 this._closeWaitingList = [];
70 (0, _autoBind.default)(this);
71 }
72
73 async enqueue(newItem) {
74 if (this._closed) {
75 throw new Error(`Queue is closed`);
76 } // wait for space to enqueue
77
78
79 if (this._maxSize > 0 && this._items.length >= this._maxSize) {
80 await new Promise((res, rej) => this._enqueueWaitingList.push({
81 res,
82 rej
83 }));
84 } //log(`Enqueued '${newItem}' to queue '${this.name}'`)
85 // release oldest waiting dequeue
86
87
88 if (this._dequeueWaitingList.length) {
89 this._dequeueWaitingList.shift().res(newItem);
90 } else {
91 this._items.push(newItem);
92 }
93
94 this.name && noteCount(`${this.name}.enqueue`, 1);
95 this.name && noteGauge(`${this.name}.items`, this._items.length);
96 }
97
98 async dequeue() {
99 let item;
100
101 if (this._items.length === 0) {
102 // if there are NO queued items
103 if (this._closed) {
104 throw new QueueClosedError(this.name);
105 }
106
107 item = await new Promise((res, rej) => this._dequeueWaitingList.push({
108 res,
109 rej
110 }));
111 } else {
112 // if there are queued items
113 item = this._items.shift(); // release oldest waiting enqueue
114
115 if (this._enqueueWaitingList.length) {
116 this._enqueueWaitingList.shift().res();
117 }
118 }
119
120 this.name && noteCount(`${this.name}.dequeue`, 1);
121 this.name && noteGauge(`${this.name}.items`, this._items.length); //log(`Dequeued '${item}' from queue '${this.name}'`)
122 // release all waiting closes
123
124 if (this._items.length === 0) {
125 this._closeWaitingList.forEach(cbs => cbs.res());
126 }
127
128 return item;
129 }
130
131 dequeueAllAvailable() {
132 const items = this._items.splice(0);
133
134 this.name && noteCount(`${this.name}.dequeue`, items.length);
135 this.name && noteGauge(`${this.name}.items`, this._items.length);
136
137 if (this._enqueueWaitingList.length > 0) {
138 const enqueuesToRelease = Math.min(this._enqueueWaitingList.length, this._maxSize);
139
140 for (let index = 0; index < enqueuesToRelease; index++) {
141 this._enqueueWaitingList.shift().res();
142 }
143 } // release all waiting closes
144
145
146 this._closeWaitingList.forEach(cbs => cbs.res());
147
148 return items;
149 }
150
151 async close() {
152 this._closed = true; // fail all waiting enqueued
153
154 this._enqueueWaitingList.forEach(cbs => cbs.rej(new Error('Queue closed'))); // wait for all existing items to be dequeued
155
156
157 if (this._items.length > 0) {
158 await new Promise((res, rej) => this._closeWaitingList.push({
159 res,
160 rej
161 }));
162 } // release all remaining waiting dequeues
163
164
165 this._dequeueWaitingList.forEach(cbs => cbs.rej(new QueueClosedError(this.name)));
166 }
167
168 async enqueueRange(newItems) {
169 for (const newItem of newItems) {
170 await this.enqueue(newItem);
171 }
172 }
173
174 async dequeueRange(count) {
175 const results = [];
176
177 while (results.length < count) {
178 try {
179 const item = await this.dequeue();
180 results.push(item);
181 } catch (err) {
182 if (err instanceof QueueClosedError) break;
183 throw err;
184 }
185 }
186
187 return results;
188 }
189
190}
191
192exports.default = AsyncQueue;
193//# sourceMappingURL=data:application/json;charset=utf-8;base64,{"version":3,"sources":["../src/AsyncQueue.js"],"names":["MODULE_NAME","log","warn","error","noteGauge","noteCount","trackOp","Module","__filename","DEFAULT_MAX_QUEUE_SIZE","QueueClosedError","Error","constructor","queueName","AsyncQueue","config","_items","length","name","undefined","maxSize","_maxSize","_closed","_enqueueWaitingList","_dequeueWaitingList","_closeWaitingList","enqueue","newItem","Promise","res","rej","push","shift","dequeue","item","forEach","cbs","dequeueAllAvailable","items","splice","enqueuesToRelease","Math","min","index","close","enqueueRange","newItems","dequeueRange","count","results","err"],"mappings":";;;;;;;AAEA;;AAEA;;;;;;AAEA,MAAM;AAAEA,EAAAA,WAAF;AAAeC,EAAAA,GAAf;AAAoBC,EAAAA,IAApB;AAA0BC,EAAAA,KAA1B;AAAiCC,EAAAA,SAAjC;AAA4CC,EAAAA,SAA5C;AAAuDC,EAAAA;AAAvD,IAAmE,IAAIC,eAAJ,CAAWC,UAAX,CAAzE,C,CAAgG;;AAGhG,MAAMC,sBAAsB,GAAG,CAAC,CAAhC;;AAOO,MAAMC,gBAAN,SAA+BC,KAA/B,CAAqC;AAC1CC,EAAAA,WAAW,CAACC,SAAD,EAAqB;AAC9B,UAAMA,SAAS,GAAI,UAASA,SAAU,aAAvB,GAAsC,cAArD;AACD;;AAHyC;;;;AAM7B,MAAMC,UAAN,CAAoB;AAajCF,EAAAA,WAAW,CAACG,MAAD,EAAuB;AAAA;;AAAA;;AAAA;;AAAA;;AAAA;;AAAA;;AAAA;;AAAA,uCAmItB,MAAc;AACxB,aAAO,KAAKC,MAAL,CAAYC,MAAnB;AACD,KArIiC;;AAEhC;AACAF,IAAAA,MAAM,GAAGA,MAAM,IAAI;AAAEG,MAAAA,IAAI,EAAEC,SAAR;AAAmBC,MAAAA,OAAO,EAAED;AAA5B,KAAnB;AACA,SAAKD,IAAL,GAAYH,MAAM,CAACG,IAAnB;AACA,SAAKG,QAAL,GAAgBN,MAAM,CAACK,OAAP,IAAkBX,sBAAlC,CALgC,CAOhC;;AACA,SAAKO,MAAL,GAAc,EAAd;AACA,SAAKM,OAAL,GAAe,KAAf;AAEA,SAAKC,mBAAL,GAA2B,EAA3B;AACA,SAAKC,mBAAL,GAA2B,EAA3B;AACA,SAAKC,iBAAL,GAAyB,EAAzB;AAEA,2BAAS,IAAT;AACD;;AAGD,QAAMC,OAAN,CAAcC,OAAd,EAAyC;AAEvC,QAAI,KAAKL,OAAT,EAAkB;AAChB,YAAM,IAAIX,KAAJ,CAAW,iBAAX,CAAN;AACD,KAJsC,CAMvC;;;AACA,QAAI,KAAKU,QAAL,GAAgB,CAAhB,IAAqB,KAAKL,MAAL,CAAYC,MAAZ,IAAsB,KAAKI,QAApD,EAA8D;AAC5D,YAAM,IAAIO,OAAJ,CAAY,CAACC,GAAD,EAAMC,GAAN,KAAc,KAAKP,mBAAL,CAAyBQ,IAAzB,CAA8B;AAAEF,QAAAA,GAAF;AAAOC,QAAAA;AAAP,OAA9B,CAA1B,CAAN;AACD,KATsC,CAWvC;AAEA;;;AACA,QAAI,KAAKN,mBAAL,CAAyBP,MAA7B,EAAqC;AACnC,WAAKO,mBAAL,CAAyBQ,KAAzB,GAAiCH,GAAjC,CAAqCF,OAArC;AACD,KAFD,MAEO;AACL,WAAKX,MAAL,CAAYe,IAAZ,CAAiBJ,OAAjB;AACD;;AAED,SAAKT,IAAL,IAAab,SAAS,CAAE,GAAE,KAAKa,IAAK,UAAd,EAAyB,CAAzB,CAAtB;AACA,SAAKA,IAAL,IAAad,SAAS,CAAE,GAAE,KAAKc,IAAK,QAAd,EAAuB,KAAKF,MAAL,CAAYC,MAAnC,CAAtB;AACD;;AAED,QAAMgB,OAAN,GAA4B;AAC1B,QAAIC,IAAJ;;AACA,QAAI,KAAKlB,MAAL,CAAYC,MAAZ,KAAuB,CAA3B,EAA8B;AAAE;AAE9B,UAAI,KAAKK,OAAT,EAAkB;AAChB,cAAM,IAAIZ,gBAAJ,CAAqB,KAAKQ,IAA1B,CAAN;AACD;;AACDgB,MAAAA,IAAI,GAAG,MAAM,IAAIN,OAAJ,CAAY,CAACC,GAAD,EAAMC,GAAN,KAAc,KAAKN,mBAAL,CAAyBO,IAAzB,CAA8B;AAAEF,QAAAA,GAAF;AAAOC,QAAAA;AAAP,OAA9B,CAA1B,CAAb;AAED,KAPD,MAOO;AAAE;AAEPI,MAAAA,IAAI,GAAG,KAAKlB,MAAL,CAAYgB,KAAZ,EAAP,CAFK,CAIL;;AACA,UAAI,KAAKT,mBAAL,CAAyBN,MAA7B,EAAqC;AACnC,aAAKM,mBAAL,CAAyBS,KAAzB,GAAiCH,GAAjC;AACD;AAEF;;AAED,SAAKX,IAAL,IAAab,SAAS,CAAE,GAAE,KAAKa,IAAK,UAAd,EAAyB,CAAzB,CAAtB;AACA,SAAKA,IAAL,IAAad,SAAS,CAAE,GAAE,KAAKc,IAAK,QAAd,EAAuB,KAAKF,MAAL,CAAYC,MAAnC,CAAtB,CArB0B,CAuB1B;AAEA;;AACA,QAAI,KAAKD,MAAL,CAAYC,MAAZ,KAAuB,CAA3B,EAA8B;AAC5B,WAAKQ,iBAAL,CAAuBU,OAAvB,CAA+BC,GAAG,IAAIA,GAAG,CAACP,GAAJ,EAAtC;AACD;;AAED,WAAOK,IAAP;AACD;;AAEDG,EAAAA,mBAAmB,GAAa;AAC9B,UAAMC,KAAK,GAAG,KAAKtB,MAAL,CAAYuB,MAAZ,CAAmB,CAAnB,CAAd;;AAEA,SAAKrB,IAAL,IAAab,SAAS,CAAE,GAAE,KAAKa,IAAK,UAAd,EAAyBoB,KAAK,CAACrB,MAA/B,CAAtB;AACA,SAAKC,IAAL,IAAad,SAAS,CAAE,GAAE,KAAKc,IAAK,QAAd,EAAuB,KAAKF,MAAL,CAAYC,MAAnC,CAAtB;;AAEA,QAAI,KAAKM,mBAAL,CAAyBN,MAAzB,GAAkC,CAAtC,EAAyC;AACvC,YAAMuB,iBAAiB,GAAGC,IAAI,CAACC,GAAL,CAAS,KAAKnB,mBAAL,CAAyBN,MAAlC,EAA0C,KAAKI,QAA/C,CAA1B;;AACA,WAAK,IAAIsB,KAAK,GAAG,CAAjB,EAAoBA,KAAK,GAAGH,iBAA5B,EAA+CG,KAAK,EAApD,EAAwD;AACtD,aAAKpB,mBAAL,CAAyBS,KAAzB,GAAiCH,GAAjC;AACD;AACF,KAX6B,CAa9B;;;AACA,SAAKJ,iBAAL,CAAuBU,OAAvB,CAA+BC,GAAG,IAAIA,GAAG,CAACP,GAAJ,EAAtC;;AAEA,WAAOS,KAAP;AACD;;AAED,QAAMM,KAAN,GAA6B;AAC3B,SAAKtB,OAAL,GAAe,IAAf,CAD2B,CAG3B;;AACA,SAAKC,mBAAL,CAAyBY,OAAzB,CAAiCC,GAAG,IAAIA,GAAG,CAACN,GAAJ,CAAQ,IAAInB,KAAJ,CAAU,cAAV,CAAR,CAAxC,EAJ2B,CAM3B;;;AACA,QAAI,KAAKK,MAAL,CAAYC,MAAZ,GAAqB,CAAzB,EAA4B;AAC1B,YAAM,IAAIW,OAAJ,CAAY,CAACC,GAAD,EAAMC,GAAN,KAAe,KAAKL,iBAAL,CAAuBM,IAAvB,CAA4B;AAAEF,QAAAA,GAAF;AAAOC,QAAAA;AAAP,OAA5B,CAA3B,CAAN;AACD,KAT0B,CAW3B;;;AACA,SAAKN,mBAAL,CAAyBW,OAAzB,CAAiCC,GAAG,IAAIA,GAAG,CAACN,GAAJ,CAAQ,IAAIpB,gBAAJ,CAAqB,KAAKQ,IAA1B,CAAR,CAAxC;AACD;;AAED,QAAM2B,YAAN,CAAoBC,QAApB,EAAwC;AACtC,SAAK,MAAMnB,OAAX,IAAsBmB,QAAtB,EAAgC;AAC9B,YAAM,KAAKpB,OAAL,CAAaC,OAAb,CAAN;AACD;AACF;;AAED,QAAMoB,YAAN,CAAmBC,KAAnB,EAAqD;AACnD,UAAMC,OAAO,GAAG,EAAhB;;AACA,WAAOA,OAAO,CAAChC,MAAR,GAAiB+B,KAAxB,EAA+B;AAC7B,UAAI;AACF,cAAMd,IAAI,GAAG,MAAM,KAAKD,OAAL,EAAnB;AACAgB,QAAAA,OAAO,CAAClB,IAAR,CAAaG,IAAb;AACD,OAHD,CAGE,OAAOgB,GAAP,EAAY;AACZ,YAAIA,GAAG,YAAYxC,gBAAnB,EACE;AACF,cAAMwC,GAAN;AACD;AACF;;AACD,WAAOD,OAAP;AACD;;AA9IgC","sourcesContent":["//@flow\n\nimport autoBind from 'auto-bind'\n\nimport Module from './Module'\n\nconst { MODULE_NAME, log, warn, error, noteGauge, noteCount, trackOp } = new Module(__filename) // eslint-disable-line no-unused-vars\n\n\nconst DEFAULT_MAX_QUEUE_SIZE = -1\n\ntype QueueConfig = {|\n  name?: ?string,\n  maxSize?: ?number,\n|}\n\nexport class QueueClosedError extends Error {\n  constructor(queueName: ?string) {\n    super(queueName ? `Queue '${queueName}' is closed` : 'Queue closed')\n  }\n}\n\nexport default class AsyncQueue<T> {\n  \n  name: ?string\n\n  _items: Array<T>\n  _closed: boolean\n  _maxSize: number\n\n  _enqueueWaitingList: Array<{ res: () => void, rej: (err: Error) => void }>\n  _dequeueWaitingList: Array<{ res: (T) => void, rej: (err: Error) => void }>\n  _closeWaitingList: Array<{ res: () => void, rej: (err: Error) => void }>\n\n\n  constructor(config?: QueueConfig) {\n\n    // config\n    config = config || { name: undefined, maxSize: undefined }\n    this.name = config.name\n    this._maxSize = config.maxSize || DEFAULT_MAX_QUEUE_SIZE\n\n    // state\n    this._items = []\n    this._closed = false\n\n    this._enqueueWaitingList = []\n    this._dequeueWaitingList = []\n    this._closeWaitingList = []\n\n    autoBind(this)\n  }\n\n\n  async enqueue(newItem: T): Promise<void> {\n\n    if (this._closed) {\n      throw new Error(`Queue is closed`)\n    }\n\n    // wait for space to enqueue\n    if (this._maxSize > 0 && this._items.length >= this._maxSize) {\n      await new Promise((res, rej) => this._enqueueWaitingList.push({ res, rej }))  \n    }\n\n    //log(`Enqueued '${newItem}' to queue '${this.name}'`)    \n    \n    // release oldest waiting dequeue\n    if (this._dequeueWaitingList.length) {\n      this._dequeueWaitingList.shift().res(newItem)\n    } else {\n      this._items.push(newItem)\n    }\n\n    this.name && noteCount(`${this.name}.enqueue`, 1)\n    this.name && noteGauge(`${this.name}.items`, this._items.length)\n  }\n\n  async dequeue(): Promise<T> {\n    let item: T\n    if (this._items.length === 0) { // if there are NO queued items\n     \n      if (this._closed) {\n        throw new QueueClosedError(this.name)\n      }\n      item = await new Promise((res, rej) => this._dequeueWaitingList.push({ res, rej }))  \n      \n    } else { // if there are queued items\n      \n      item = this._items.shift() \n\n      // release oldest waiting enqueue\n      if (this._enqueueWaitingList.length) {\n        this._enqueueWaitingList.shift().res()\n      }  \n\n    }\n\n    this.name && noteCount(`${this.name}.dequeue`, 1)\n    this.name && noteGauge(`${this.name}.items`, this._items.length)\n\n    //log(`Dequeued '${item}' from queue '${this.name}'`)\n\n    // release all waiting closes\n    if (this._items.length === 0) {\n      this._closeWaitingList.forEach(cbs => cbs.res())\n    }\n\n    return item   \n  }\n\n  dequeueAllAvailable(): Array<T> {\n    const items = this._items.splice(0)\n\n    this.name && noteCount(`${this.name}.dequeue`, items.length)\n    this.name && noteGauge(`${this.name}.items`, this._items.length)\n\n    if (this._enqueueWaitingList.length > 0) {\n      const enqueuesToRelease = Math.min(this._enqueueWaitingList.length, this._maxSize)\n      for (let index = 0; index < enqueuesToRelease; index++) {\n        this._enqueueWaitingList.shift().res()\n      }\n    }\n\n    // release all waiting closes\n    this._closeWaitingList.forEach(cbs => cbs.res())\n\n    return items\n  }\n\n  async close(): Promise<void> {\n    this._closed = true\n\n    // fail all waiting enqueued\n    this._enqueueWaitingList.forEach(cbs => cbs.rej(new Error('Queue closed')))\n\n    // wait for all existing items to be dequeued\n    if (this._items.length > 0) {\n      await new Promise((res, rej)  => this._closeWaitingList.push({ res, rej }))  \n    }\n\n    // release all remaining waiting dequeues\n    this._dequeueWaitingList.forEach(cbs => cbs.rej(new QueueClosedError(this.name)))\n  }\n\n  async enqueueRange (newItems: Array<T>) {\n    for (const newItem of newItems) {\n      await this.enqueue(newItem)\n    }\n  }\n\n  async dequeueRange(count: number): Promise<Array<T>> {\n    const results = []\n    while (results.length < count) {\n      try {\n        const item = await this.dequeue()\n        results.push(item)\n      } catch (err) {\n        if (err instanceof QueueClosedError)\n          break\n        throw err\n      }\n    }\n    return results\n  }\n\n  getLength = (): number => {\n    return this._items.length\n  }\n}\n"]}
\No newline at end of file