UNPKG

12 kBJavaScriptView Raw
1var arraydiff = require('arraydiff');
2var deepEqual = require('fast-deep-equal');
3var ShareDBError = require('./error');
4var util = require('./util');
5
6var ERROR_CODE = ShareDBError.CODES;
7
8function QueryEmitter(request, stream, ids, extra) {
9 this.backend = request.backend;
10 this.agent = request.agent;
11 this.db = request.db;
12 this.index = request.index;
13 this.query = request.query;
14 this.collection = request.collection;
15 this.fields = request.fields;
16 this.options = request.options;
17 this.snapshotProjection = request.snapshotProjection;
18 this.stream = stream;
19 this.ids = ids;
20 this.extra = extra;
21
22 this.skipPoll = this.options.skipPoll || util.doNothing;
23 this.canPollDoc = this.db.canPollDoc(this.collection, this.query);
24 this.pollDebounce =
25 (typeof this.options.pollDebounce === 'number') ? this.options.pollDebounce :
26 (typeof this.db.pollDebounce === 'number') ? this.db.pollDebounce : 0;
27 this.pollInterval =
28 (typeof this.options.pollInterval === 'number') ? this.options.pollInterval :
29 (typeof this.db.pollInterval === 'number') ? this.db.pollInterval : 0;
30
31 this._polling = false;
32 this._pendingPoll = null;
33 this._pollDebounceId = null;
34 this._pollIntervalId = null;
35}
36module.exports = QueryEmitter;
37
38// Start processing events from the stream
39QueryEmitter.prototype._open = function() {
40 var emitter = this;
41 this._defaultCallback = function(err) {
42 if (err) emitter.onError(err);
43 };
44 emitter.stream.on('data', function(data) {
45 if (data.error) {
46 return emitter.onError(data.error);
47 }
48 emitter._update(data);
49 });
50 emitter.stream.on('end', function() {
51 emitter.destroy();
52 });
53 // Make sure we start polling if pollInterval is being used
54 this._flushPoll();
55};
56
57QueryEmitter.prototype.destroy = function() {
58 clearTimeout(this._pollDebounceId);
59 clearTimeout(this._pollIntervalId);
60 this.stream.destroy();
61};
62
63QueryEmitter.prototype._emitTiming = function(action, start) {
64 this.backend.emit('timing', action, Date.now() - start, this);
65};
66
67QueryEmitter.prototype._update = function(op) {
68 // Note that `op` should not be projected or sanitized yet. It's possible for
69 // a query to filter on a field that's not in the projection. skipPoll checks
70 // to see if an op could possibly affect a query, so it should get passed the
71 // full op. The onOp listener function must call backend.sanitizeOp()
72 var id = op.d;
73 var pollCallback = this._defaultCallback;
74
75 // Check if the op's id matches the query before updating the query results
76 // and send it through immediately if it does. The current snapshot
77 // (including the op) for a newly matched document will get sent in the
78 // insert diff, so we don't need to send the op that caused the doc to
79 // match. If the doc already exists in the client and isn't otherwise
80 // subscribed, the client will need to request the op when it receives the
81 // snapshot from the query to bring itself up to date.
82 //
83 // The client may see the result of the op get reflected before the query
84 // results update. This might prove janky in some cases, since a doc could
85 // get deleted before it is removed from the results, for example. However,
86 // it will mean that ops which don't end up changing the results are
87 // received sooner even if query polling takes a while.
88 //
89 // Alternatively, we could send the op message only after the query has
90 // updated, and it would perhaps be ideal to send in the same message to
91 // avoid the user seeing transitional states where the doc is updated but
92 // the results order is not.
93 //
94 // We should send the op even if it is the op that causes the document to no
95 // longer match the query. If client-side filters are applied to the model
96 // to figure out which documents to render in a list, we will want the op
97 // that removed the doc from the query to cause the client-side computed
98 // list to update.
99 if (this.ids.indexOf(id) !== -1) {
100 var emitter = this;
101 pollCallback = function(err) {
102 // Send op regardless of polling error. Clients handle subscription to ops
103 // on the documents that currently match query results independently from
104 // updating which docs match the query
105 emitter.onOp(op);
106 if (err) emitter.onError(err);
107 };
108 }
109
110 // Ignore if the database or user function says we don't need to poll
111 try {
112 if (
113 this.db.skipPoll(this.collection, id, op, this.query) ||
114 this.skipPoll(this.collection, id, op, this.query)
115 ) {
116 return pollCallback();
117 }
118 } catch (err) {
119 return pollCallback(err);
120 }
121 if (this.canPollDoc) {
122 // We can query against only the document that was modified to see if the
123 // op has changed whether or not it matches the results
124 this.queryPollDoc(id, pollCallback);
125 } else {
126 // We need to do a full poll of the query, because the query uses limits,
127 // sorts, or something special
128 this.queryPoll(pollCallback);
129 }
130};
131
132QueryEmitter.prototype._flushPoll = function() {
133 // Don't send another polling query at the same time or within the debounce
134 // timeout. This function will be called again once the poll that is
135 // currently in progress or the pollDebounce timeout completes
136 if (this._polling || this._pollDebounceId) return;
137
138 // If another polling event happened while we were polling, call poll again,
139 // as the results may have changed
140 if (this._pendingPoll) {
141 this.queryPoll();
142
143 // If a pollInterval is specified, poll if the query doesn't get polled in
144 // the time of the interval
145 } else if (this.pollInterval) {
146 var emitter = this;
147 this._pollIntervalId = setTimeout(function() {
148 emitter._pollIntervalId = null;
149 emitter.queryPoll(emitter._defaultCallback);
150 }, this.pollInterval);
151 }
152};
153
154QueryEmitter.prototype.queryPoll = function(callback) {
155 var emitter = this;
156
157 // Only run a single polling check against mongo at a time per emitter. This
158 // matters for two reasons: First, one callback could return before the
159 // other. Thus, our result diffs could get out of order, and the clients
160 // could end up with results in a funky order and the wrong results being
161 // mutated in the query. Second, only having one query executed
162 // simultaneously per emitter will act as a natural adaptive rate limiting
163 // in case the db is under load.
164 //
165 // This isn't necessary for the document polling case, since they operate
166 // on a given id and won't accidentally modify the wrong doc. Also, those
167 // queries should be faster and are less likely to be the same, so there is
168 // less benefit to possible load reduction.
169 if (this._polling || this._pollDebounceId) {
170 if (this._pendingPoll) {
171 this._pendingPoll.push(callback);
172 } else {
173 this._pendingPoll = [callback];
174 }
175 return;
176 }
177 this._polling = true;
178 var pending = this._pendingPoll;
179 this._pendingPoll = null;
180 if (this.pollDebounce) {
181 this._pollDebounceId = setTimeout(function() {
182 emitter._pollDebounceId = null;
183 emitter._flushPoll();
184 }, this.pollDebounce);
185 }
186 clearTimeout(this._pollIntervalId);
187
188 var start = Date.now();
189 this.db.queryPoll(this.collection, this.query, this.options, function(err, ids, extra) {
190 if (err) return emitter._finishPoll(err, callback, pending);
191 emitter._emitTiming('queryEmitter.poll', start);
192
193 // Be nice to not have to do this in such a brute force way
194 if (!deepEqual(emitter.extra, extra)) {
195 emitter.extra = extra;
196 emitter.onExtra(extra);
197 }
198
199 var idsDiff = arraydiff(emitter.ids, ids);
200 if (idsDiff.length) {
201 emitter.ids = ids;
202 var inserted = getInserted(idsDiff);
203 if (inserted.length) {
204 emitter.db.getSnapshotBulk(emitter.collection, inserted, emitter.fields, null, function(err, snapshotMap) {
205 if (err) return emitter._finishPoll(err, callback, pending);
206 var snapshots = emitter.backend._getSnapshotsFromMap(inserted, snapshotMap);
207 var snapshotType = emitter.backend.SNAPSHOT_TYPES.current;
208 emitter.backend._sanitizeSnapshots(
209 emitter.agent,
210 emitter.snapshotProjection,
211 emitter.collection,
212 snapshots,
213 snapshotType,
214 function(err) {
215 if (err) return emitter._finishPoll(err, callback, pending);
216 emitter._emitTiming('queryEmitter.pollGetSnapshotBulk', start);
217 var diff = mapDiff(idsDiff, snapshotMap);
218 emitter.onDiff(diff);
219 emitter._finishPoll(err, callback, pending);
220 });
221 });
222 } else {
223 emitter.onDiff(idsDiff);
224 emitter._finishPoll(err, callback, pending);
225 }
226 } else {
227 emitter._finishPoll(err, callback, pending);
228 }
229 });
230};
231QueryEmitter.prototype._finishPoll = function(err, callback, pending) {
232 this._polling = false;
233 if (callback) callback(err);
234 if (pending) {
235 for (var i = 0; i < pending.length; i++) {
236 callback = pending[i];
237 if (callback) callback(err);
238 }
239 }
240 this._flushPoll();
241};
242
243QueryEmitter.prototype.queryPollDoc = function(id, callback) {
244 var emitter = this;
245 var start = Date.now();
246 this.db.queryPollDoc(this.collection, id, this.query, this.options, function(err, matches) {
247 if (err) return callback(err);
248 emitter._emitTiming('queryEmitter.pollDoc', start);
249
250 // Check if the document was in the previous results set
251 var i = emitter.ids.indexOf(id);
252
253 if (i === -1 && matches) {
254 // Add doc to the collection. Order isn't important, so we'll just whack
255 // it at the end
256 var index = emitter.ids.push(id) - 1;
257 // We can get the result to send to the client async, since there is a
258 // delay in sending to the client anyway
259 emitter.db.getSnapshot(emitter.collection, id, emitter.fields, null, function(err, snapshot) {
260 if (err) return callback(err);
261 var snapshots = [snapshot];
262 var snapshotType = emitter.backend.SNAPSHOT_TYPES.current;
263 emitter.backend._sanitizeSnapshots(
264 emitter.agent,
265 emitter.snapshotProjection,
266 emitter.collection,
267 snapshots,
268 snapshotType,
269 function(err) {
270 if (err) return callback(err);
271 emitter.onDiff([new arraydiff.InsertDiff(index, snapshots)]);
272 emitter._emitTiming('queryEmitter.pollDocGetSnapshot', start);
273 callback();
274 });
275 });
276 return;
277 }
278
279 if (i !== -1 && !matches) {
280 emitter.ids.splice(i, 1);
281 emitter.onDiff([new arraydiff.RemoveDiff(i, 1)]);
282 return callback();
283 }
284
285 callback();
286 });
287};
288
289// Clients must assign each of these functions synchronously after constructing
290// an instance of QueryEmitter. The instance is subscribed to an op stream at
291// construction time, and does not buffer emitted events. Diff events assume
292// all messages are received and applied in order, so it is critical that none
293// are dropped.
294QueryEmitter.prototype.onError =
295QueryEmitter.prototype.onDiff =
296QueryEmitter.prototype.onExtra =
297QueryEmitter.prototype.onOp = function() {
298 throw new ShareDBError(
299 ERROR_CODE.ERR_QUERY_EMITTER_LISTENER_NOT_ASSIGNED,
300 'Required QueryEmitter listener not assigned'
301 );
302};
303
304function getInserted(diff) {
305 var inserted = [];
306 for (var i = 0; i < diff.length; i++) {
307 var item = diff[i];
308 if (item instanceof arraydiff.InsertDiff) {
309 for (var j = 0; j < item.values.length; j++) {
310 inserted.push(item.values[j]);
311 }
312 }
313 }
314 return inserted;
315}
316
317function mapDiff(idsDiff, snapshotMap) {
318 var diff = [];
319 for (var i = 0; i < idsDiff.length; i++) {
320 var item = idsDiff[i];
321 if (item instanceof arraydiff.InsertDiff) {
322 var values = [];
323 for (var j = 0; j < item.values.length; j++) {
324 var id = item.values[j];
325 values.push(snapshotMap[id]);
326 }
327 diff.push(new arraydiff.InsertDiff(item.index, values));
328 } else {
329 diff.push(item);
330 }
331 }
332 return diff;
333}