1 | var arraydiff = require('arraydiff');
|
2 | var deepEqual = require('fast-deep-equal');
|
3 | var ShareDBError = require('./error');
|
4 | var util = require('./util');
|
5 |
|
6 | var ERROR_CODE = ShareDBError.CODES;
|
7 |
|
8 | function QueryEmitter(request, stream, ids, extra) {
|
9 | this.backend = request.backend;
|
10 | this.agent = request.agent;
|
11 | this.db = request.db;
|
12 | this.index = request.index;
|
13 | this.query = request.query;
|
14 | this.collection = request.collection;
|
15 | this.fields = request.fields;
|
16 | this.options = request.options;
|
17 | this.snapshotProjection = request.snapshotProjection;
|
18 | this.stream = stream;
|
19 | this.ids = ids;
|
20 | this.extra = extra;
|
21 |
|
22 | this.skipPoll = this.options.skipPoll || util.doNothing;
|
23 | this.canPollDoc = this.db.canPollDoc(this.collection, this.query);
|
24 | this.pollDebounce =
|
25 | (typeof this.options.pollDebounce === 'number') ? this.options.pollDebounce :
|
26 | (typeof this.db.pollDebounce === 'number') ? this.db.pollDebounce : 0;
|
27 | this.pollInterval =
|
28 | (typeof this.options.pollInterval === 'number') ? this.options.pollInterval :
|
29 | (typeof this.db.pollInterval === 'number') ? this.db.pollInterval : 0;
|
30 |
|
31 | this._polling = false;
|
32 | this._pendingPoll = null;
|
33 | this._pollDebounceId = null;
|
34 | this._pollIntervalId = null;
|
35 | }
|
36 | module.exports = QueryEmitter;
|
37 |
|
38 |
|
39 | QueryEmitter.prototype._open = function() {
|
40 | var emitter = this;
|
41 | this._defaultCallback = function(err) {
|
42 | if (err) emitter.onError(err);
|
43 | };
|
44 | emitter.stream.on('data', function(data) {
|
45 | if (data.error) {
|
46 | return emitter.onError(data.error);
|
47 | }
|
48 | emitter._update(data);
|
49 | });
|
50 | emitter.stream.on('end', function() {
|
51 | emitter.destroy();
|
52 | });
|
53 |
|
54 | this._flushPoll();
|
55 | };
|
56 |
|
57 | QueryEmitter.prototype.destroy = function() {
|
58 | clearTimeout(this._pollDebounceId);
|
59 | clearTimeout(this._pollIntervalId);
|
60 | this.stream.destroy();
|
61 | };
|
62 |
|
63 | QueryEmitter.prototype._emitTiming = function(action, start) {
|
64 | this.backend.emit('timing', action, Date.now() - start, this);
|
65 | };
|
66 |
|
67 | QueryEmitter.prototype._update = function(op) {
|
68 |
|
69 |
|
70 |
|
71 |
|
72 | var id = op.d;
|
73 | var pollCallback = this._defaultCallback;
|
74 |
|
75 |
|
76 |
|
77 |
|
78 |
|
79 |
|
80 |
|
81 |
|
82 |
|
83 |
|
84 |
|
85 |
|
86 |
|
87 |
|
88 |
|
89 |
|
90 |
|
91 |
|
92 |
|
93 |
|
94 |
|
95 |
|
96 |
|
97 |
|
98 |
|
99 | if (this.ids.indexOf(id) !== -1) {
|
100 | var emitter = this;
|
101 | pollCallback = function(err) {
|
102 |
|
103 |
|
104 |
|
105 | emitter.onOp(op);
|
106 | if (err) emitter.onError(err);
|
107 | };
|
108 | }
|
109 |
|
110 |
|
111 | try {
|
112 | if (
|
113 | this.db.skipPoll(this.collection, id, op, this.query) ||
|
114 | this.skipPoll(this.collection, id, op, this.query)
|
115 | ) {
|
116 | return pollCallback();
|
117 | }
|
118 | } catch (err) {
|
119 | return pollCallback(err);
|
120 | }
|
121 | if (this.canPollDoc) {
|
122 |
|
123 |
|
124 | this.queryPollDoc(id, pollCallback);
|
125 | } else {
|
126 |
|
127 |
|
128 | this.queryPoll(pollCallback);
|
129 | }
|
130 | };
|
131 |
|
132 | QueryEmitter.prototype._flushPoll = function() {
|
133 |
|
134 |
|
135 |
|
136 | if (this._polling || this._pollDebounceId) return;
|
137 |
|
138 |
|
139 |
|
140 | if (this._pendingPoll) {
|
141 | this.queryPoll();
|
142 |
|
143 |
|
144 |
|
145 | } else if (this.pollInterval) {
|
146 | var emitter = this;
|
147 | this._pollIntervalId = setTimeout(function() {
|
148 | emitter._pollIntervalId = null;
|
149 | emitter.queryPoll(emitter._defaultCallback);
|
150 | }, this.pollInterval);
|
151 | }
|
152 | };
|
153 |
|
154 | QueryEmitter.prototype.queryPoll = function(callback) {
|
155 | var emitter = this;
|
156 |
|
157 |
|
158 |
|
159 |
|
160 |
|
161 |
|
162 |
|
163 |
|
164 |
|
165 |
|
166 |
|
167 |
|
168 |
|
169 | if (this._polling || this._pollDebounceId) {
|
170 | if (this._pendingPoll) {
|
171 | this._pendingPoll.push(callback);
|
172 | } else {
|
173 | this._pendingPoll = [callback];
|
174 | }
|
175 | return;
|
176 | }
|
177 | this._polling = true;
|
178 | var pending = this._pendingPoll;
|
179 | this._pendingPoll = null;
|
180 | if (this.pollDebounce) {
|
181 | this._pollDebounceId = setTimeout(function() {
|
182 | emitter._pollDebounceId = null;
|
183 | emitter._flushPoll();
|
184 | }, this.pollDebounce);
|
185 | }
|
186 | clearTimeout(this._pollIntervalId);
|
187 |
|
188 | var start = Date.now();
|
189 | this.db.queryPoll(this.collection, this.query, this.options, function(err, ids, extra) {
|
190 | if (err) return emitter._finishPoll(err, callback, pending);
|
191 | emitter._emitTiming('queryEmitter.poll', start);
|
192 |
|
193 |
|
194 | if (!deepEqual(emitter.extra, extra)) {
|
195 | emitter.extra = extra;
|
196 | emitter.onExtra(extra);
|
197 | }
|
198 |
|
199 | var idsDiff = arraydiff(emitter.ids, ids);
|
200 | if (idsDiff.length) {
|
201 | emitter.ids = ids;
|
202 | var inserted = getInserted(idsDiff);
|
203 | if (inserted.length) {
|
204 | emitter.db.getSnapshotBulk(emitter.collection, inserted, emitter.fields, null, function(err, snapshotMap) {
|
205 | if (err) return emitter._finishPoll(err, callback, pending);
|
206 | var snapshots = emitter.backend._getSnapshotsFromMap(inserted, snapshotMap);
|
207 | var snapshotType = emitter.backend.SNAPSHOT_TYPES.current;
|
208 | emitter.backend._sanitizeSnapshots(
|
209 | emitter.agent,
|
210 | emitter.snapshotProjection,
|
211 | emitter.collection,
|
212 | snapshots,
|
213 | snapshotType,
|
214 | function(err) {
|
215 | if (err) return emitter._finishPoll(err, callback, pending);
|
216 | emitter._emitTiming('queryEmitter.pollGetSnapshotBulk', start);
|
217 | var diff = mapDiff(idsDiff, snapshotMap);
|
218 | emitter.onDiff(diff);
|
219 | emitter._finishPoll(err, callback, pending);
|
220 | });
|
221 | });
|
222 | } else {
|
223 | emitter.onDiff(idsDiff);
|
224 | emitter._finishPoll(err, callback, pending);
|
225 | }
|
226 | } else {
|
227 | emitter._finishPoll(err, callback, pending);
|
228 | }
|
229 | });
|
230 | };
|
231 | QueryEmitter.prototype._finishPoll = function(err, callback, pending) {
|
232 | this._polling = false;
|
233 | if (callback) callback(err);
|
234 | if (pending) {
|
235 | for (var i = 0; i < pending.length; i++) {
|
236 | callback = pending[i];
|
237 | if (callback) callback(err);
|
238 | }
|
239 | }
|
240 | this._flushPoll();
|
241 | };
|
242 |
|
243 | QueryEmitter.prototype.queryPollDoc = function(id, callback) {
|
244 | var emitter = this;
|
245 | var start = Date.now();
|
246 | this.db.queryPollDoc(this.collection, id, this.query, this.options, function(err, matches) {
|
247 | if (err) return callback(err);
|
248 | emitter._emitTiming('queryEmitter.pollDoc', start);
|
249 |
|
250 |
|
251 | var i = emitter.ids.indexOf(id);
|
252 |
|
253 | if (i === -1 && matches) {
|
254 |
|
255 |
|
256 | var index = emitter.ids.push(id) - 1;
|
257 |
|
258 |
|
259 | emitter.db.getSnapshot(emitter.collection, id, emitter.fields, null, function(err, snapshot) {
|
260 | if (err) return callback(err);
|
261 | var snapshots = [snapshot];
|
262 | var snapshotType = emitter.backend.SNAPSHOT_TYPES.current;
|
263 | emitter.backend._sanitizeSnapshots(
|
264 | emitter.agent,
|
265 | emitter.snapshotProjection,
|
266 | emitter.collection,
|
267 | snapshots,
|
268 | snapshotType,
|
269 | function(err) {
|
270 | if (err) return callback(err);
|
271 | emitter.onDiff([new arraydiff.InsertDiff(index, snapshots)]);
|
272 | emitter._emitTiming('queryEmitter.pollDocGetSnapshot', start);
|
273 | callback();
|
274 | });
|
275 | });
|
276 | return;
|
277 | }
|
278 |
|
279 | if (i !== -1 && !matches) {
|
280 | emitter.ids.splice(i, 1);
|
281 | emitter.onDiff([new arraydiff.RemoveDiff(i, 1)]);
|
282 | return callback();
|
283 | }
|
284 |
|
285 | callback();
|
286 | });
|
287 | };
|
288 |
|
289 |
|
290 |
|
291 |
|
292 |
|
293 |
|
294 | QueryEmitter.prototype.onError =
|
295 | QueryEmitter.prototype.onDiff =
|
296 | QueryEmitter.prototype.onExtra =
|
297 | QueryEmitter.prototype.onOp = function() {
|
298 | throw new ShareDBError(
|
299 | ERROR_CODE.ERR_QUERY_EMITTER_LISTENER_NOT_ASSIGNED,
|
300 | 'Required QueryEmitter listener not assigned'
|
301 | );
|
302 | };
|
303 |
|
304 | function getInserted(diff) {
|
305 | var inserted = [];
|
306 | for (var i = 0; i < diff.length; i++) {
|
307 | var item = diff[i];
|
308 | if (item instanceof arraydiff.InsertDiff) {
|
309 | for (var j = 0; j < item.values.length; j++) {
|
310 | inserted.push(item.values[j]);
|
311 | }
|
312 | }
|
313 | }
|
314 | return inserted;
|
315 | }
|
316 |
|
317 | function mapDiff(idsDiff, snapshotMap) {
|
318 | var diff = [];
|
319 | for (var i = 0; i < idsDiff.length; i++) {
|
320 | var item = idsDiff[i];
|
321 | if (item instanceof arraydiff.InsertDiff) {
|
322 | var values = [];
|
323 | for (var j = 0; j < item.values.length; j++) {
|
324 | var id = item.values[j];
|
325 | values.push(snapshotMap[id]);
|
326 | }
|
327 | diff.push(new arraydiff.InsertDiff(item.index, values));
|
328 | } else {
|
329 | diff.push(item);
|
330 | }
|
331 | }
|
332 | return diff;
|
333 | }
|