1 | import * as fs from 'fs';
|
2 |
|
3 | export class Queue<T>
|
4 | {
|
5 | protected pending: T[];
|
6 | constructor(private handler: (message: T, next: (processed: boolean) => void) => void, queue?: T[])
|
7 | {
|
8 | this.pending = queue || [];
|
9 | }
|
10 |
|
11 | public enqueue(message: T)
|
12 | {
|
13 | this.pending.push(message);
|
14 | this.save();
|
15 | this.process();
|
16 | };
|
17 |
|
18 | public save(_throw?: boolean)
|
19 | {
|
20 | if (_throw)
|
21 | throw new Error('You need to define where and how to save the queue.');
|
22 | }
|
23 |
|
24 | private processing: boolean = false;
|
25 | private current: T;
|
26 |
|
27 | public process()
|
28 | {
|
29 | if (this.processing)
|
30 | return;
|
31 | this.processing = true;
|
32 | var message = this.pending.shift();
|
33 | this.current = message;
|
34 | if (!message)
|
35 | return this.processing = false;
|
36 | this.handler(message, (processed) =>
|
37 | {
|
38 | if (processed === false)
|
39 | {
|
40 | this.enqueue(message);
|
41 | }
|
42 | this.save();
|
43 | this.processing = false;
|
44 | if (processed !== false)
|
45 | process.nextTick(this.process.bind(this));
|
46 | });
|
47 | };
|
48 | } |
\ | No newline at end of file |