1 |
|
2 |
|
3 | const { EventEmitter } = require('events');
|
4 | const generateId = require('./generate-id');
|
5 | const hasher = require('./hasher');
|
6 |
|
7 | type Options = {
|
8 | maxAge?:number,
|
9 | bufferPublishing?:number
|
10 | };
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 | class ObservedRemoveSet<T> extends EventEmitter {
|
19 | declare maxAge: number;
|
20 | declare bufferPublishing: number;
|
21 | declare pairs: Map<string, [string, T]>;
|
22 | declare deletions: Map<string, string>;
|
23 | declare deleteQueue: Array<*>;
|
24 | declare insertQueue: Array<*>;
|
25 | declare publishTimeout: null | TimeoutID;
|
26 |
|
27 | |
28 |
|
29 |
|
30 |
|
31 |
|
32 |
|
33 |
|
34 | constructor(entries?: Iterable<T>, options?:Options = {}) {
|
35 | super();
|
36 | this.maxAge = typeof options.maxAge === 'undefined' ? 5000 : options.maxAge;
|
37 | this.bufferPublishing = typeof options.bufferPublishing === 'undefined' ? 30 : options.bufferPublishing;
|
38 | this.publishTimeout = null;
|
39 | this.pairs = new Map();
|
40 | this.deletions = new Map();
|
41 | this.insertQueue = [];
|
42 | this.deleteQueue = [];
|
43 | if (!entries) {
|
44 | return;
|
45 | }
|
46 | for (const value of entries) {
|
47 | this.add(value);
|
48 | }
|
49 | }
|
50 |
|
51 |
|
52 |
|
53 | * [Symbol.iterator]() {
|
54 | for (const pair of this.pairs.values()) {
|
55 | yield pair[1];
|
56 | }
|
57 | }
|
58 |
|
59 | dequeue() {
|
60 | if (this.publishTimeout) {
|
61 | return;
|
62 | }
|
63 | if (this.bufferPublishing > 0) {
|
64 | this.publishTimeout = setTimeout(() => this.publish(), this.bufferPublishing);
|
65 | } else {
|
66 | this.publish();
|
67 | }
|
68 | }
|
69 |
|
70 | publish() {
|
71 | this.publishTimeout = null;
|
72 | const insertQueue = this.insertQueue;
|
73 | const deleteQueue = this.deleteQueue;
|
74 | this.insertQueue = [];
|
75 | this.deleteQueue = [];
|
76 | this.sync([insertQueue, deleteQueue]);
|
77 | }
|
78 |
|
79 | |
80 |
|
81 |
|
82 |
|
83 |
|
84 | sync(queue?: [Array<*>, Array<*>] = this.dump()) {
|
85 | this.emit('publish', queue);
|
86 | }
|
87 |
|
88 | flush() {
|
89 | const maxAgeString = (Date.now() - this.maxAge).toString(36).padStart(9, '0');
|
90 | for (const [id] of this.deletions) {
|
91 | if (id < maxAgeString) {
|
92 | this.deletions.delete(id);
|
93 | }
|
94 | }
|
95 | }
|
96 |
|
97 | |
98 |
|
99 |
|
100 |
|
101 | dump():[Array<*>, Array<*>] {
|
102 | return [[...this.pairs], [...this.deletions]];
|
103 | }
|
104 |
|
105 | |
106 |
|
107 |
|
108 |
|
109 |
|
110 | process(queue:[Array<*>, Array<*>], skipFlush?: boolean = false) {
|
111 | const [insertions, deletions] = queue;
|
112 | for (const [id, hash] of deletions) {
|
113 | this.deletions.set(id, hash);
|
114 | }
|
115 | for (const [hash, [id, value]] of insertions) {
|
116 | if (this.deletions.has(id)) {
|
117 | continue;
|
118 | }
|
119 | const pair = this.pairs.get(hash);
|
120 | if (!pair || (pair && pair[0] < id)) {
|
121 | this.pairs.set(hash, [id, value]);
|
122 | if (!pair) {
|
123 | this.emit('add', value, pair ? pair[1] : undefined);
|
124 | }
|
125 | }
|
126 | }
|
127 | for (const [id, hash] of deletions) {
|
128 | const pair = this.pairs.get(hash);
|
129 | if (pair && pair[0] === id) {
|
130 | this.pairs.delete(hash);
|
131 | this.emit('delete', pair[1]);
|
132 | }
|
133 | }
|
134 | if (!skipFlush) {
|
135 | this.flush();
|
136 | }
|
137 | }
|
138 |
|
139 | add(value:T, id?:string = generateId()) {
|
140 | const hash = this.hash(value);
|
141 | const pair = this.pairs.get(hash);
|
142 | const insertMessage = [hash, [id, value]];
|
143 | if (pair) {
|
144 | const deleteMessage = [pair[0], hash];
|
145 | this.process([[insertMessage], [deleteMessage]], true);
|
146 | this.deleteQueue.push(deleteMessage);
|
147 | } else {
|
148 | this.process([[insertMessage], []], true);
|
149 | }
|
150 | this.insertQueue.push(insertMessage);
|
151 | this.dequeue();
|
152 | return this;
|
153 | }
|
154 |
|
155 | delete(value:T) {
|
156 | const hash = this.hash(value);
|
157 | const pair = this.pairs.get(hash);
|
158 | if (pair) {
|
159 | const message = [pair[0], hash];
|
160 | this.process([[], [message]], true);
|
161 | this.deleteQueue.push(message);
|
162 | this.dequeue();
|
163 | }
|
164 | }
|
165 |
|
166 | clear() {
|
167 | for (const value of this) {
|
168 | this.delete(value);
|
169 | }
|
170 | }
|
171 |
|
172 | * entries():Iterator<[T, T]> {
|
173 | for (const [id, value] of this.pairs.values()) {
|
174 | yield [value, value];
|
175 | }
|
176 | }
|
177 |
|
178 | forEach(callback:Function, thisArg?:any) {
|
179 | if (thisArg) {
|
180 | for (const value of this.pairs.values()) {
|
181 | callback.bind(thisArg)(value, value, this);
|
182 | }
|
183 | } else {
|
184 | for (const value of this.pairs.values()) {
|
185 | callback(value, value, this);
|
186 | }
|
187 | }
|
188 | }
|
189 |
|
190 | has(value:T):boolean {
|
191 | return !!this.pairs.get(this.hash(value));
|
192 | }
|
193 |
|
194 | activeIds(value:T):Array<string> {
|
195 | const hash = this.hash(value);
|
196 | const pair = this.pairs.get(hash);
|
197 | if (!pair) {
|
198 | return [];
|
199 | }
|
200 | return [pair[0]];
|
201 | }
|
202 |
|
203 | * keys():Iterator<T> {
|
204 | for (const [id, value] of this.pairs.values()) {
|
205 | yield value;
|
206 | }
|
207 | }
|
208 |
|
209 | * values():Iterator<T> {
|
210 | for (const [id, value] of this.pairs.values()) {
|
211 | yield value;
|
212 | }
|
213 | }
|
214 |
|
215 | hash(value:T):string {
|
216 | return hasher(value);
|
217 | }
|
218 |
|
219 | get size():number {
|
220 | return this.pairs.size;
|
221 | }
|
222 | }
|
223 |
|
224 | module.exports = ObservedRemoveSet;
|