UNPKG

27.1 kBJavaScriptView Raw
1"use strict";
2
3var _create = require("babel-runtime/core-js/object/create");
4
5var _create2 = _interopRequireDefault(_create);
6
7var _keys = require("babel-runtime/core-js/object/keys");
8
9var _keys2 = _interopRequireDefault(_keys);
10
11var _classCallCheck2 = require("babel-runtime/helpers/classCallCheck");
12
13var _classCallCheck3 = _interopRequireDefault(_classCallCheck2);
14
15var _createClass2 = require("babel-runtime/helpers/createClass");
16
17var _createClass3 = _interopRequireDefault(_createClass2);
18
19var _utils = require("./utils");
20
21var _utils2 = _interopRequireDefault(_utils);
22
23function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
24
25/**
26 * The purpose of this class is to accumulate /sync responses such that a
27 * complete "initial" JSON response can be returned which accurately represents
28 * the sum total of the /sync responses accumulated to date. It only handles
29 * room data: that is, everything under the "rooms" top-level key.
30 *
31 * This class is used when persisting room data so a complete /sync response can
32 * be loaded from disk and incremental syncs can be performed on the server,
33 * rather than asking the server to do an initial sync on startup.
34 */
35var SyncAccumulator = function () {
36 /**
37 * @param {Object} opts
38 * @param {Number=} opts.maxTimelineEntries The ideal maximum number of
39 * timeline entries to keep in the sync response. This is best-effort, as
40 * clients do not always have a back-pagination token for each event, so
41 * it's possible there may be slightly *less* than this value. There will
42 * never be more. This cannot be 0 or else it makes it impossible to scroll
43 * back in a room. Default: 50.
44 */
45 function SyncAccumulator(opts) {
46 (0, _classCallCheck3.default)(this, SyncAccumulator);
47
48 opts = opts || {};
49 opts.maxTimelineEntries = opts.maxTimelineEntries || 50;
50 this.opts = opts;
51 this.accountData = {
52 //$event_type: Object
53 };
54 this.inviteRooms = {
55 //$roomId: { ... sync 'invite' json data ... }
56 };
57 this.joinRooms = {
58 //$roomId: {
59 // _currentState: { $event_type: { $state_key: json } },
60 // _timeline: [
61 // { event: $event, token: null|token },
62 // { event: $event, token: null|token },
63 // { event: $event, token: null|token },
64 // ...
65 // ],
66 // _summary: {
67 // m.heroes: [ $user_id ],
68 // m.joined_member_count: $count,
69 // m.invited_member_count: $count
70 // },
71 // _accountData: { $event_type: json },
72 // _unreadNotifications: { ... unread_notifications JSON ... },
73 // _readReceipts: { $user_id: { data: $json, eventId: $event_id }}
74 //}
75 };
76 // the /sync token which corresponds to the last time rooms were
77 // accumulated. We remember this so that any caller can obtain a
78 // coherent /sync response and know at what point they should be
79 // streaming from without losing events.
80 this.nextBatch = null;
81
82 // { ('invite'|'join'|'leave'): $groupId: { ... sync 'group' data } }
83 this.groups = {
84 invite: {},
85 join: {},
86 leave: {}
87 };
88 }
89
90 (0, _createClass3.default)(SyncAccumulator, [{
91 key: "accumulate",
92 value: function accumulate(syncResponse) {
93 this._accumulateRooms(syncResponse);
94 this._accumulateGroups(syncResponse);
95 this._accumulateAccountData(syncResponse);
96 this.nextBatch = syncResponse.next_batch;
97 }
98 }, {
99 key: "_accumulateAccountData",
100 value: function _accumulateAccountData(syncResponse) {
101 var _this = this;
102
103 if (!syncResponse.account_data || !syncResponse.account_data.events) {
104 return;
105 }
106 // Clobbers based on event type.
107 syncResponse.account_data.events.forEach(function (e) {
108 _this.accountData[e.type] = e;
109 });
110 }
111
112 /**
113 * Accumulate incremental /sync room data.
114 * @param {Object} syncResponse the complete /sync JSON
115 */
116
117 }, {
118 key: "_accumulateRooms",
119 value: function _accumulateRooms(syncResponse) {
120 var _this2 = this;
121
122 if (!syncResponse.rooms) {
123 return;
124 }
125 if (syncResponse.rooms.invite) {
126 (0, _keys2.default)(syncResponse.rooms.invite).forEach(function (roomId) {
127 _this2._accumulateRoom(roomId, "invite", syncResponse.rooms.invite[roomId]);
128 });
129 }
130 if (syncResponse.rooms.join) {
131 (0, _keys2.default)(syncResponse.rooms.join).forEach(function (roomId) {
132 _this2._accumulateRoom(roomId, "join", syncResponse.rooms.join[roomId]);
133 });
134 }
135 if (syncResponse.rooms.leave) {
136 (0, _keys2.default)(syncResponse.rooms.leave).forEach(function (roomId) {
137 _this2._accumulateRoom(roomId, "leave", syncResponse.rooms.leave[roomId]);
138 });
139 }
140 }
141 }, {
142 key: "_accumulateRoom",
143 value: function _accumulateRoom(roomId, category, data) {
144 // Valid /sync state transitions
145 // +--------+ <======+ 1: Accept an invite
146 // +== | INVITE | | (5) 2: Leave a room
147 // | +--------+ =====+ | 3: Join a public room previously
148 // |(1) (4) | | left (handle as if new room)
149 // V (2) V | 4: Reject an invite
150 // +------+ ========> +--------+ 5: Invite to a room previously
151 // | JOIN | (3) | LEAVE* | left (handle as if new room)
152 // +------+ <======== +--------+
153 //
154 // * equivalent to "no state"
155 switch (category) {
156 case "invite":
157 // (5)
158 this._accumulateInviteState(roomId, data);
159 break;
160 case "join":
161 if (this.inviteRooms[roomId]) {
162 // (1)
163 // was previously invite, now join. We expect /sync to give
164 // the entire state and timeline on 'join', so delete previous
165 // invite state
166 delete this.inviteRooms[roomId];
167 }
168 // (3)
169 this._accumulateJoinState(roomId, data);
170 break;
171 case "leave":
172 if (this.inviteRooms[roomId]) {
173 // (4)
174 delete this.inviteRooms[roomId];
175 } else {
176 // (2)
177 delete this.joinRooms[roomId];
178 }
179 break;
180 default:
181 console.error("Unknown cateogory: ", category);
182 }
183 }
184 }, {
185 key: "_accumulateInviteState",
186 value: function _accumulateInviteState(roomId, data) {
187 if (!data.invite_state || !data.invite_state.events) {
188 // no new data
189 return;
190 }
191 if (!this.inviteRooms[roomId]) {
192 this.inviteRooms[roomId] = {
193 invite_state: data.invite_state
194 };
195 return;
196 }
197 // accumulate extra keys for invite->invite transitions
198 // clobber based on event type / state key
199 // We expect invite_state to be small, so just loop over the events
200 var currentData = this.inviteRooms[roomId];
201 data.invite_state.events.forEach(function (e) {
202 var hasAdded = false;
203 for (var i = 0; i < currentData.invite_state.events.length; i++) {
204 var current = currentData.invite_state.events[i];
205 if (current.type === e.type && current.state_key == e.state_key) {
206 currentData.invite_state.events[i] = e; // update
207 hasAdded = true;
208 }
209 }
210 if (!hasAdded) {
211 currentData.invite_state.events.push(e);
212 }
213 });
214 }
215
216 // Accumulate timeline and state events in a room.
217
218 }, {
219 key: "_accumulateJoinState",
220 value: function _accumulateJoinState(roomId, data) {
221 // We expect this function to be called a lot (every /sync) so we want
222 // this to be fast. /sync stores events in an array but we often want
223 // to clobber based on type/state_key. Rather than convert arrays to
224 // maps all the time, just keep private maps which contain
225 // the actual current accumulated sync state, and array-ify it when
226 // getJSON() is called.
227
228 // State resolution:
229 // The 'state' key is the delta from the previous sync (or start of time
230 // if no token was supplied), to the START of the timeline. To obtain
231 // the current state, we need to "roll forward" state by reading the
232 // timeline. We want to store the current state so we can drop events
233 // out the end of the timeline based on opts.maxTimelineEntries.
234 //
235 // 'state' 'timeline' current state
236 // |-------x<======================>x
237 // T I M E
238 //
239 // When getJSON() is called, we 'roll back' the current state by the
240 // number of entries in the timeline to work out what 'state' should be.
241
242 // Back-pagination:
243 // On an initial /sync, the server provides a back-pagination token for
244 // the start of the timeline. When /sync deltas come down, they also
245 // include back-pagination tokens for the start of the timeline. This
246 // means not all events in the timeline have back-pagination tokens, as
247 // it is only the ones at the START of the timeline which have them.
248 // In order for us to have a valid timeline (and back-pagination token
249 // to match), we need to make sure that when we remove old timeline
250 // events, that we roll forward to an event which has a back-pagination
251 // token. This means we can't keep a strict sliding-window based on
252 // opts.maxTimelineEntries, and we may have a few less. We should never
253 // have more though, provided that the /sync limit is less than or equal
254 // to opts.maxTimelineEntries.
255
256 if (!this.joinRooms[roomId]) {
257 // Create truly empty objects so event types of 'hasOwnProperty' and co
258 // don't cause this code to break.
259 this.joinRooms[roomId] = {
260 _currentState: (0, _create2.default)(null),
261 _timeline: [],
262 _accountData: (0, _create2.default)(null),
263 _unreadNotifications: {},
264 _summary: {},
265 _readReceipts: {}
266 };
267 }
268 var currentData = this.joinRooms[roomId];
269
270 if (data.account_data && data.account_data.events) {
271 // clobber based on type
272 data.account_data.events.forEach(function (e) {
273 currentData._accountData[e.type] = e;
274 });
275 }
276
277 // these probably clobber, spec is unclear.
278 if (data.unread_notifications) {
279 currentData._unreadNotifications = data.unread_notifications;
280 }
281 if (data.summary) {
282 var HEROES_KEY = "m.heroes";
283 var INVITED_COUNT_KEY = "m.invited_member_count";
284 var JOINED_COUNT_KEY = "m.joined_member_count";
285
286 var acc = currentData._summary;
287 var sum = data.summary;
288 acc[HEROES_KEY] = sum[HEROES_KEY] || acc[HEROES_KEY];
289 acc[JOINED_COUNT_KEY] = sum[JOINED_COUNT_KEY] || acc[JOINED_COUNT_KEY];
290 acc[INVITED_COUNT_KEY] = sum[INVITED_COUNT_KEY] || acc[INVITED_COUNT_KEY];
291 }
292
293 if (data.ephemeral && data.ephemeral.events) {
294 data.ephemeral.events.forEach(function (e) {
295 // We purposefully do not persist m.typing events.
296 // Technically you could refresh a browser before the timer on a
297 // typing event is up, so it'll look like you aren't typing when
298 // you really still are. However, the alternative is worse. If
299 // we do persist typing events, it will look like people are
300 // typing forever until someone really does start typing (which
301 // will prompt Synapse to send down an actual m.typing event to
302 // clobber the one we persisted).
303 if (e.type !== "m.receipt" || !e.content) {
304 // This means we'll drop unknown ephemeral events but that
305 // seems okay.
306 return;
307 }
308 // Handle m.receipt events. They clobber based on:
309 // (user_id, receipt_type)
310 // but they are keyed in the event as:
311 // content:{ $event_id: { $receipt_type: { $user_id: {json} }}}
312 // so store them in the former so we can accumulate receipt deltas
313 // quickly and efficiently (we expect a lot of them). Fold the
314 // receipt type into the key name since we only have 1 at the
315 // moment (m.read) and nested JSON objects are slower and more
316 // of a hassle to work with. We'll inflate this back out when
317 // getJSON() is called.
318 (0, _keys2.default)(e.content).forEach(function (eventId) {
319 if (!e.content[eventId]["m.read"]) {
320 return;
321 }
322 (0, _keys2.default)(e.content[eventId]["m.read"]).forEach(function (userId) {
323 // clobber on user ID
324 currentData._readReceipts[userId] = {
325 data: e.content[eventId]["m.read"][userId],
326 eventId: eventId
327 };
328 });
329 });
330 });
331 }
332
333 // if we got a limited sync, we need to remove all timeline entries or else
334 // we will have gaps in the timeline.
335 if (data.timeline && data.timeline.limited) {
336 currentData._timeline = [];
337 }
338
339 // Work out the current state. The deltas need to be applied in the order:
340 // - existing state which didn't come down /sync.
341 // - State events under the 'state' key.
342 // - State events in the 'timeline'.
343 if (data.state && data.state.events) {
344 data.state.events.forEach(function (e) {
345 setState(currentData._currentState, e);
346 });
347 }
348 if (data.timeline && data.timeline.events) {
349 data.timeline.events.forEach(function (e, index) {
350 // this nops if 'e' isn't a state event
351 setState(currentData._currentState, e);
352 // append the event to the timeline. The back-pagination token
353 // corresponds to the first event in the timeline
354 currentData._timeline.push({
355 event: e,
356 token: index === 0 ? data.timeline.prev_batch : null
357 });
358 });
359 }
360
361 // attempt to prune the timeline by jumping between events which have
362 // pagination tokens.
363 if (currentData._timeline.length > this.opts.maxTimelineEntries) {
364 var startIndex = currentData._timeline.length - this.opts.maxTimelineEntries;
365 for (var i = startIndex; i < currentData._timeline.length; i++) {
366 if (currentData._timeline[i].token) {
367 // keep all events after this, including this one
368 currentData._timeline = currentData._timeline.slice(i, currentData._timeline.length);
369 break;
370 }
371 }
372 }
373 }
374
375 /**
376 * Accumulate incremental /sync group data.
377 * @param {Object} syncResponse the complete /sync JSON
378 */
379
380 }, {
381 key: "_accumulateGroups",
382 value: function _accumulateGroups(syncResponse) {
383 var _this3 = this;
384
385 if (!syncResponse.groups) {
386 return;
387 }
388 if (syncResponse.groups.invite) {
389 (0, _keys2.default)(syncResponse.groups.invite).forEach(function (groupId) {
390 _this3._accumulateGroup(groupId, "invite", syncResponse.groups.invite[groupId]);
391 });
392 }
393 if (syncResponse.groups.join) {
394 (0, _keys2.default)(syncResponse.groups.join).forEach(function (groupId) {
395 _this3._accumulateGroup(groupId, "join", syncResponse.groups.join[groupId]);
396 });
397 }
398 if (syncResponse.groups.leave) {
399 (0, _keys2.default)(syncResponse.groups.leave).forEach(function (groupId) {
400 _this3._accumulateGroup(groupId, "leave", syncResponse.groups.leave[groupId]);
401 });
402 }
403 }
404 }, {
405 key: "_accumulateGroup",
406 value: function _accumulateGroup(groupId, category, data) {
407 var _arr = ['invite', 'join', 'leave'];
408
409 for (var _i = 0; _i < _arr.length; _i++) {
410 var cat = _arr[_i];
411 delete this.groups[cat][groupId];
412 }
413 this.groups[category][groupId] = data;
414 }
415
416 /**
417 * Return everything under the 'rooms' key from a /sync response which
418 * represents all room data that should be stored. This should be paired
419 * with the sync token which represents the most recent /sync response
420 * provided to accumulate().
421 * @return {Object} An object with a "nextBatch", "roomsData" and "accountData"
422 * keys.
423 * The "nextBatch" key is a string which represents at what point in the
424 * /sync stream the accumulator reached. This token should be used when
425 * restarting a /sync stream at startup. Failure to do so can lead to missing
426 * events. The "roomsData" key is an Object which represents the entire
427 * /sync response from the 'rooms' key onwards. The "accountData" key is
428 * a list of raw events which represent global account data.
429 */
430
431 }, {
432 key: "getJSON",
433 value: function getJSON() {
434 var _this4 = this;
435
436 var data = {
437 join: {},
438 invite: {},
439 // always empty. This is set by /sync when a room was previously
440 // in 'invite' or 'join'. On fresh startup, the client won't know
441 // about any previous room being in 'invite' or 'join' so we can
442 // just omit mentioning it at all, even if it has previously come
443 // down /sync.
444 // The notable exception is when a client is kicked or banned:
445 // we may want to hold onto that room so the client can clearly see
446 // why their room has disappeared. We don't persist it though because
447 // it is unclear *when* we can safely remove the room from the DB.
448 // Instead, we assume that if you're loading from the DB, you've
449 // refreshed the page, which means you've seen the kick/ban already.
450 leave: {}
451 };
452 (0, _keys2.default)(this.inviteRooms).forEach(function (roomId) {
453 data.invite[roomId] = _this4.inviteRooms[roomId];
454 });
455 (0, _keys2.default)(this.joinRooms).forEach(function (roomId) {
456 var roomData = _this4.joinRooms[roomId];
457 var roomJson = {
458 ephemeral: { events: [] },
459 account_data: { events: [] },
460 state: { events: [] },
461 timeline: {
462 events: [],
463 prev_batch: null
464 },
465 unread_notifications: roomData._unreadNotifications,
466 summary: roomData._summary
467 };
468 // Add account data
469 (0, _keys2.default)(roomData._accountData).forEach(function (evType) {
470 roomJson.account_data.events.push(roomData._accountData[evType]);
471 });
472
473 // Add receipt data
474 var receiptEvent = {
475 type: "m.receipt",
476 room_id: roomId,
477 content: {
478 // $event_id: { "m.read": { $user_id: $json } }
479 }
480 };
481 (0, _keys2.default)(roomData._readReceipts).forEach(function (userId) {
482 var receiptData = roomData._readReceipts[userId];
483 if (!receiptEvent.content[receiptData.eventId]) {
484 receiptEvent.content[receiptData.eventId] = {
485 "m.read": {}
486 };
487 }
488 receiptEvent.content[receiptData.eventId]["m.read"][userId] = receiptData.data;
489 });
490 // add only if we have some receipt data
491 if ((0, _keys2.default)(receiptEvent.content).length > 0) {
492 roomJson.ephemeral.events.push(receiptEvent);
493 }
494
495 // Add timeline data
496 roomData._timeline.forEach(function (msgData) {
497 if (!roomJson.timeline.prev_batch) {
498 // the first event we add to the timeline MUST match up to
499 // the prev_batch token.
500 if (!msgData.token) {
501 return; // this shouldn't happen as we prune constantly.
502 }
503 roomJson.timeline.prev_batch = msgData.token;
504 }
505 roomJson.timeline.events.push(msgData.event);
506 });
507
508 // Add state data: roll back current state to the start of timeline,
509 // by "reverse clobbering" from the end of the timeline to the start.
510 // Convert maps back into arrays.
511 var rollBackState = (0, _create2.default)(null);
512 for (var i = roomJson.timeline.events.length - 1; i >= 0; i--) {
513 var timelineEvent = roomJson.timeline.events[i];
514 if (timelineEvent.state_key === null || timelineEvent.state_key === undefined) {
515 continue; // not a state event
516 }
517 // since we're going back in time, we need to use the previous
518 // state value else we'll break causality. We don't have the
519 // complete previous state event, so we need to create one.
520 var prevStateEvent = _utils2.default.deepCopy(timelineEvent);
521 if (prevStateEvent.unsigned) {
522 if (prevStateEvent.unsigned.prev_content) {
523 prevStateEvent.content = prevStateEvent.unsigned.prev_content;
524 }
525 if (prevStateEvent.unsigned.prev_sender) {
526 prevStateEvent.sender = prevStateEvent.unsigned.prev_sender;
527 }
528 }
529 setState(rollBackState, prevStateEvent);
530 }
531 (0, _keys2.default)(roomData._currentState).forEach(function (evType) {
532 (0, _keys2.default)(roomData._currentState[evType]).forEach(function (stateKey) {
533 var ev = roomData._currentState[evType][stateKey];
534 if (rollBackState[evType] && rollBackState[evType][stateKey]) {
535 // use the reverse clobbered event instead.
536 ev = rollBackState[evType][stateKey];
537 }
538 roomJson.state.events.push(ev);
539 });
540 });
541 data.join[roomId] = roomJson;
542 });
543
544 // Add account data
545 var accData = [];
546 (0, _keys2.default)(this.accountData).forEach(function (evType) {
547 accData.push(_this4.accountData[evType]);
548 });
549
550 return {
551 nextBatch: this.nextBatch,
552 roomsData: data,
553 groupsData: this.groups,
554 accountData: accData
555 };
556 }
557 }, {
558 key: "getNextBatchToken",
559 value: function getNextBatchToken() {
560 return this.nextBatch;
561 }
562 }]);
563 return SyncAccumulator;
564}(); /*
565 Copyright 2017 Vector Creations Ltd
566 Copyright 2018 New Vector Ltd
567
568 Licensed under the Apache License, Version 2.0 (the "License");
569 you may not use this file except in compliance with the License.
570 You may obtain a copy of the License at
571
572 http://www.apache.org/licenses/LICENSE-2.0
573
574 Unless required by applicable law or agreed to in writing, software
575 distributed under the License is distributed on an "AS IS" BASIS,
576 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
577 See the License for the specific language governing permissions and
578 limitations under the License.
579 */
580
581/**
582 * This is an internal module. See {@link SyncAccumulator} for the public class.
583 * @module sync-accumulator
584 */
585
586function setState(eventMap, event) {
587 if (event.state_key === null || event.state_key === undefined || !event.type) {
588 return;
589 }
590 if (!eventMap[event.type]) {
591 eventMap[event.type] = (0, _create2.default)(null);
592 }
593 eventMap[event.type][event.state_key] = event;
594}
595
596module.exports = SyncAccumulator;
597//# sourceMappingURL=sync-accumulator.js.map
\No newline at end of file