1 |
|
2 |
|
3 | import autoBind from 'auto-bind'
|
4 |
|
5 | import Module from './Module'
|
6 |
|
7 | const { MODULE_NAME, log, warn, error, noteGauge, noteCount, trackOp } = new Module(__filename)
|
8 |
|
9 |
|
10 | const DEFAULT_MAX_QUEUE_SIZE = -1
|
11 |
|
12 | type QueueConfig = {|
|
13 | name?: ?string,
|
14 | maxSize?: ?number,
|
15 | |}
|
16 |
|
17 | export class QueueClosedError extends Error {
|
18 | constructor(queueName: ?string) {
|
19 | super(queueName ? `Queue '${queueName}' is closed` : 'Queue closed')
|
20 | }
|
21 | }
|
22 |
|
23 | export default class AsyncQueue<T> {
|
24 |
|
25 | name: ?string
|
26 |
|
27 | _items: Array<T>
|
28 | _closed: boolean
|
29 | _maxSize: number
|
30 |
|
31 | _enqueueWaitingList: Array<{ res: () => void, rej: (err: Error) => void }>
|
32 | _dequeueWaitingList: Array<{ res: (T) => void, rej: (err: Error) => void }>
|
33 | _closeWaitingList: Array<{ res: () => void, rej: (err: Error) => void }>
|
34 |
|
35 |
|
36 | constructor(config?: QueueConfig) {
|
37 |
|
38 |
|
39 | config = config || { name: undefined, maxSize: undefined }
|
40 | this.name = config.name
|
41 | this._maxSize = config.maxSize || DEFAULT_MAX_QUEUE_SIZE
|
42 |
|
43 |
|
44 | this._items = []
|
45 | this._closed = false
|
46 |
|
47 | this._enqueueWaitingList = []
|
48 | this._dequeueWaitingList = []
|
49 | this._closeWaitingList = []
|
50 |
|
51 | autoBind(this)
|
52 | }
|
53 |
|
54 |
|
55 | async enqueue(newItem: T): Promise<void> {
|
56 |
|
57 | if (this._closed) {
|
58 | throw new Error(`Queue is closed`)
|
59 | }
|
60 |
|
61 |
|
62 | if (this._maxSize > 0 && this._items.length >= this._maxSize) {
|
63 | await new Promise((res, rej) => this._enqueueWaitingList.push({ res, rej }))
|
64 | }
|
65 |
|
66 |
|
67 |
|
68 |
|
69 | if (this._dequeueWaitingList.length) {
|
70 | this._dequeueWaitingList.shift().res(newItem)
|
71 | } else {
|
72 | this._items.push(newItem)
|
73 | }
|
74 |
|
75 | this.name && noteCount(`${this.name}.enqueue`, 1)
|
76 | this.name && noteGauge(`${this.name}.items`, this._items.length)
|
77 | }
|
78 |
|
79 | async dequeue(): Promise<T> {
|
80 | let item: T
|
81 | if (this._items.length === 0) {
|
82 |
|
83 | if (this._closed) {
|
84 | throw new QueueClosedError(this.name)
|
85 | }
|
86 | item = await new Promise((res, rej) => this._dequeueWaitingList.push({ res, rej }))
|
87 |
|
88 | } else {
|
89 |
|
90 | item = this._items.shift()
|
91 |
|
92 |
|
93 | if (this._enqueueWaitingList.length) {
|
94 | this._enqueueWaitingList.shift().res()
|
95 | }
|
96 |
|
97 | }
|
98 |
|
99 | this.name && noteCount(`${this.name}.dequeue`, 1)
|
100 | this.name && noteGauge(`${this.name}.items`, this._items.length)
|
101 |
|
102 |
|
103 |
|
104 |
|
105 | if (this._items.length === 0) {
|
106 | this._closeWaitingList.forEach(cbs => cbs.res())
|
107 | }
|
108 |
|
109 | return item
|
110 | }
|
111 |
|
112 | dequeueAllAvailable(): Array<T> {
|
113 | const items = this._items.splice(0)
|
114 |
|
115 | this.name && noteCount(`${this.name}.dequeue`, items.length)
|
116 | this.name && noteGauge(`${this.name}.items`, this._items.length)
|
117 |
|
118 | if (this._enqueueWaitingList.length > 0) {
|
119 | const enqueuesToRelease = Math.min(this._enqueueWaitingList.length, this._maxSize)
|
120 | for (let index = 0; index < enqueuesToRelease; index++) {
|
121 | this._enqueueWaitingList.shift().res()
|
122 | }
|
123 | }
|
124 |
|
125 |
|
126 | this._closeWaitingList.forEach(cbs => cbs.res())
|
127 |
|
128 | return items
|
129 | }
|
130 |
|
131 | async close(): Promise<void> {
|
132 | this._closed = true
|
133 |
|
134 |
|
135 | this._enqueueWaitingList.forEach(cbs => cbs.rej(new Error('Queue closed')))
|
136 |
|
137 |
|
138 | if (this._items.length > 0) {
|
139 | await new Promise((res, rej) => this._closeWaitingList.push({ res, rej }))
|
140 | }
|
141 |
|
142 |
|
143 | this._dequeueWaitingList.forEach(cbs => cbs.rej(new QueueClosedError(this.name)))
|
144 | }
|
145 |
|
146 | async enqueueRange (newItems: Array<T>) {
|
147 | for (const newItem of newItems) {
|
148 | await this.enqueue(newItem)
|
149 | }
|
150 | }
|
151 |
|
152 | async dequeueRange(count: number): Promise<Array<T>> {
|
153 | const results = []
|
154 | while (results.length < count) {
|
155 | try {
|
156 | const item = await this.dequeue()
|
157 | results.push(item)
|
158 | } catch (err) {
|
159 | if (err instanceof QueueClosedError)
|
160 | break
|
161 | throw err
|
162 | }
|
163 | }
|
164 | return results
|
165 | }
|
166 |
|
167 | getLength = (): number => {
|
168 | return this._items.length
|
169 | }
|
170 | }
|