UNPKG

4.38 kBJavaScriptView Raw
1//@flow
2
3import autoBind from 'auto-bind'
4
5import Module from './Module'
6
7const { MODULE_NAME, log, warn, error, noteGauge, noteCount, trackOp } = new Module(__filename) // eslint-disable-line no-unused-vars
8
9
10const DEFAULT_MAX_QUEUE_SIZE = -1
11
12type QueueConfig = {|
13 name?: ?string,
14 maxSize?: ?number,
15|}
16
17export class QueueClosedError extends Error {
18 constructor(queueName: ?string) {
19 super(queueName ? `Queue '${queueName}' is closed` : 'Queue closed')
20 }
21}
22
23export default class AsyncQueue<T> {
24
25 name: ?string
26
27 _items: Array<T>
28 _closed: boolean
29 _maxSize: number
30
31 _enqueueWaitingList: Array<{ res: () => void, rej: (err: Error) => void }>
32 _dequeueWaitingList: Array<{ res: (T) => void, rej: (err: Error) => void }>
33 _closeWaitingList: Array<{ res: () => void, rej: (err: Error) => void }>
34
35
36 constructor(config?: QueueConfig) {
37
38 // config
39 config = config || { name: undefined, maxSize: undefined }
40 this.name = config.name
41 this._maxSize = config.maxSize || DEFAULT_MAX_QUEUE_SIZE
42
43 // state
44 this._items = []
45 this._closed = false
46
47 this._enqueueWaitingList = []
48 this._dequeueWaitingList = []
49 this._closeWaitingList = []
50
51 autoBind(this)
52 }
53
54
55 async enqueue(newItem: T): Promise<void> {
56
57 if (this._closed) {
58 throw new Error(`Queue is closed`)
59 }
60
61 // wait for space to enqueue
62 if (this._maxSize > 0 && this._items.length >= this._maxSize) {
63 await new Promise((res, rej) => this._enqueueWaitingList.push({ res, rej }))
64 }
65
66 //log(`Enqueued '${newItem}' to queue '${this.name}'`)
67
68 // release oldest waiting dequeue
69 if (this._dequeueWaitingList.length) {
70 this._dequeueWaitingList.shift().res(newItem)
71 } else {
72 this._items.push(newItem)
73 }
74
75 this.name && noteCount(`${this.name}.enqueue`, 1)
76 this.name && noteGauge(`${this.name}.items`, this._items.length)
77 }
78
79 async dequeue(): Promise<T> {
80 let item: T
81 if (this._items.length === 0) { // if there are NO queued items
82
83 if (this._closed) {
84 throw new QueueClosedError(this.name)
85 }
86 item = await new Promise((res, rej) => this._dequeueWaitingList.push({ res, rej }))
87
88 } else { // if there are queued items
89
90 item = this._items.shift()
91
92 // release oldest waiting enqueue
93 if (this._enqueueWaitingList.length) {
94 this._enqueueWaitingList.shift().res()
95 }
96
97 }
98
99 this.name && noteCount(`${this.name}.dequeue`, 1)
100 this.name && noteGauge(`${this.name}.items`, this._items.length)
101
102 //log(`Dequeued '${item}' from queue '${this.name}'`)
103
104 // release all waiting closes
105 if (this._items.length === 0) {
106 this._closeWaitingList.forEach(cbs => cbs.res())
107 }
108
109 return item
110 }
111
112 dequeueAllAvailable(): Array<T> {
113 const items = this._items.splice(0)
114
115 this.name && noteCount(`${this.name}.dequeue`, items.length)
116 this.name && noteGauge(`${this.name}.items`, this._items.length)
117
118 if (this._enqueueWaitingList.length > 0) {
119 const enqueuesToRelease = Math.min(this._enqueueWaitingList.length, this._maxSize)
120 for (let index = 0; index < enqueuesToRelease; index++) {
121 this._enqueueWaitingList.shift().res()
122 }
123 }
124
125 // release all waiting closes
126 this._closeWaitingList.forEach(cbs => cbs.res())
127
128 return items
129 }
130
131 async close(): Promise<void> {
132 this._closed = true
133
134 // fail all waiting enqueued
135 this._enqueueWaitingList.forEach(cbs => cbs.rej(new Error('Queue closed')))
136
137 // wait for all existing items to be dequeued
138 if (this._items.length > 0) {
139 await new Promise((res, rej) => this._closeWaitingList.push({ res, rej }))
140 }
141
142 // release all remaining waiting dequeues
143 this._dequeueWaitingList.forEach(cbs => cbs.rej(new QueueClosedError(this.name)))
144 }
145
146 async enqueueRange (newItems: Array<T>) {
147 for (const newItem of newItems) {
148 await this.enqueue(newItem)
149 }
150 }
151
152 async dequeueRange(count: number): Promise<Array<T>> {
153 const results = []
154 while (results.length < count) {
155 try {
156 const item = await this.dequeue()
157 results.push(item)
158 } catch (err) {
159 if (err instanceof QueueClosedError)
160 break
161 throw err
162 }
163 }
164 return results
165 }
166
167 getLength = (): number => {
168 return this._items.length
169 }
170}