UNPKG

5.89 kBJavaScriptView Raw
1//
2
3const { EventEmitter } = require('events');
4const generateId = require('./generate-id');
5const hasher = require('./hasher');
6
7
8
9
10
11
12/**
13 * Class representing an observed-remove set
14 *
15 * Implements all methods and iterators of the native `Set` object in addition to the following.
16 * See: {@link https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Set}
17 */
18class ObservedRemoveSet extends EventEmitter {
19
20
21
22
23
24
25
26
27 /**
28 * Create an observed-remove set.
29 * @param {Iterable<T>} [entries=[]] Iterable of initial values
30 * @param {Object} [options={}]
31 * @param {String} [options.maxAge=5000] Max age of insertion/deletion identifiers
32 * @param {String} [options.bufferPublishing=20] Interval by which to buffer 'publish' events
33 */
34 constructor(entries , options = {}) {
35 super();
36 this.maxAge = typeof options.maxAge === 'undefined' ? 5000 : options.maxAge;
37 this.bufferPublishing = typeof options.bufferPublishing === 'undefined' ? 30 : options.bufferPublishing;
38 this.publishTimeout = null;
39 this.pairs = new Map();
40 this.deletions = new Map();
41 this.insertQueue = [];
42 this.deleteQueue = [];
43 if (!entries) {
44 return;
45 }
46 for (const value of entries) {
47 this.add(value);
48 }
49 }
50
51 /* :: @@iterator(): Iterator<T> { return ({}: any); } */
52 // $FlowFixMe: computed property
53 * [Symbol.iterator]() {
54 for (const pair of this.pairs.values()) {
55 yield pair[1];
56 }
57 }
58
59 dequeue() {
60 if (this.publishTimeout) {
61 return;
62 }
63 if (this.bufferPublishing > 0) {
64 this.publishTimeout = setTimeout(() => this.publish(), this.bufferPublishing);
65 } else {
66 this.publish();
67 }
68 }
69
70 publish() {
71 this.publishTimeout = null;
72 const insertQueue = this.insertQueue;
73 const deleteQueue = this.deleteQueue;
74 this.insertQueue = [];
75 this.deleteQueue = [];
76 this.sync([insertQueue, deleteQueue]);
77 }
78
79 /**
80 * Emit a 'publish' event containing a specified queue or all of the set's insertions and deletions.
81 * @param {Array<Array<any>>} queue - Array of insertions and deletions
82 * @return {void}
83 */
84 sync(queue = this.dump()) {
85 this.emit('publish', queue);
86 }
87
88 flush() {
89 const maxAgeString = (Date.now() - this.maxAge).toString(36).padStart(9, '0');
90 for (const [id] of this.deletions) {
91 if (id < maxAgeString) {
92 this.deletions.delete(id);
93 }
94 }
95 }
96
97 /**
98 * Return an array containing all of the set's insertions and deletions.
99 * @return {[Array<*>, Array<*>]>}
100 */
101 dump() {
102 return [[...this.pairs], [...this.deletions]];
103 }
104
105 /**
106 * Process an array of insertion and deletions.
107 * @param {Array<Array<any>>} queue - Array of insertions and deletions
108 * @return {void}
109 */
110 process(queue , skipFlush = false) {
111 const [insertions, deletions] = queue;
112 for (const [id, hash] of deletions) {
113 this.deletions.set(id, hash);
114 }
115 for (const [hash, [id, value]] of insertions) {
116 if (this.deletions.has(id)) {
117 continue;
118 }
119 const pair = this.pairs.get(hash);
120 if (!pair || (pair && pair[0] < id)) {
121 this.pairs.set(hash, [id, value]);
122 if (!pair) {
123 this.emit('add', value, pair ? pair[1] : undefined);
124 }
125 }
126 }
127 for (const [id, hash] of deletions) {
128 const pair = this.pairs.get(hash);
129 if (pair && pair[0] === id) {
130 this.pairs.delete(hash);
131 this.emit('delete', pair[1]);
132 }
133 }
134 if (!skipFlush) {
135 this.flush();
136 }
137 }
138
139 add(value , id = generateId()) {
140 const hash = this.hash(value);
141 const pair = this.pairs.get(hash);
142 const insertMessage = [hash, [id, value]];
143 if (pair) {
144 const deleteMessage = [pair[0], hash];
145 this.process([[insertMessage], [deleteMessage]], true);
146 this.deleteQueue.push(deleteMessage);
147 } else {
148 this.process([[insertMessage], []], true);
149 }
150 this.insertQueue.push(insertMessage);
151 this.dequeue();
152 return this;
153 }
154
155 delete(value ) {
156 const hash = this.hash(value);
157 const pair = this.pairs.get(hash);
158 if (pair) {
159 const message = [pair[0], hash];
160 this.process([[], [message]], true);
161 this.deleteQueue.push(message);
162 this.dequeue();
163 }
164 }
165
166 clear() {
167 for (const value of this) {
168 this.delete(value);
169 }
170 }
171
172 * entries() {
173 for (const [id, value] of this.pairs.values()) { // eslint-disable-line no-unused-vars
174 yield [value, value];
175 }
176 }
177
178 forEach(callback , thisArg ) {
179 if (thisArg) {
180 for (const value of this.pairs.values()) {
181 callback.bind(thisArg)(value, value, this);
182 }
183 } else {
184 for (const value of this.pairs.values()) {
185 callback(value, value, this);
186 }
187 }
188 }
189
190 has(value ) {
191 return !!this.pairs.get(this.hash(value));
192 }
193
194 activeIds(value ) {
195 const hash = this.hash(value);
196 const pair = this.pairs.get(hash);
197 if (!pair) {
198 return [];
199 }
200 return [pair[0]];
201 }
202
203 * keys() {
204 for (const [id, value] of this.pairs.values()) { // eslint-disable-line no-unused-vars
205 yield value;
206 }
207 }
208
209 * values() {
210 for (const [id, value] of this.pairs.values()) { // eslint-disable-line no-unused-vars
211 yield value;
212 }
213 }
214
215 hash(value ) {
216 return hasher(value);
217 }
218
219 get size() {
220 return this.pairs.size;
221 }
222}
223
224module.exports = ObservedRemoveSet;