UNPKG

6.13 kBJavaScriptView Raw
1const asyncRemove = require('pull-async-filter');
2const concatStreams = require('pull-cat');
3const computed = require('mutant/computed');
4const pull = require('pull-stream');
5const many = require('pull-many');
6const MutantArray = require('mutant/array');
7const MutantPullReduce = require('mutant-pull-reduce');
8
9module.exports = (sbot, ssbChessIndex) => {
10 const chessTypeMessages = [
11 'chess_invite',
12 'chess_move',
13 'chess_invite_accept',
14 'chess_game_end'];
15
16 function latestGameMessageForPlayerObs(playerId) {
17 const opts = {
18 since: Date.now(),
19 reverse: false,
20 };
21
22 const chessMessagesReferencingPlayer = chessMessagesForPlayerGames(playerId, opts);
23
24 return MutantPullReduce(chessMessagesReferencingPlayer, (state, next) => next, {
25 nextTick: true,
26 sync: true,
27 });
28 }
29
30 function latestGameMessageForOtherPlayersObs(playerId) {
31 const opts = {
32 since: Date.now(),
33 reverse: false,
34 };
35
36 const observingGamesMsgs = chessMessagesForOtherPlayersGames(playerId, opts);
37
38 return MutantPullReduce(observingGamesMsgs, (state, next) => next, {
39 nextTick: true,
40 sync: true,
41 });
42 }
43
44 function chessMessagesForPlayerGames(playerId, opts) {
45 const since = opts ? opts.since : null;
46 const reverse = opts ? opts.reverse : false;
47 const messageTypes = opts && opts.messageTypes ? opts.messageTypes : chessTypeMessages;
48
49 // Default to live
50 const liveStream = (opts && (opts.live !== undefined && opts.live !== null)) ? opts.live : true;
51
52 const liveFeed = sbot.createLogStream({
53 live: liveStream,
54 gt: since,
55 reverse,
56 });
57
58 return pull(
59 liveFeed,
60 msgMatchesFilter(
61 playerId,
62 true,
63 messageTypes,
64 ),
65 );
66 }
67
68 function chessMessagesForOtherPlayersGames(playerId, opts) {
69 const since = opts ? opts.since : null;
70 const reverse = opts ? opts.reverse : false;
71 const messageTypes = opts && opts.messageTypes ? opts.messageTypes : chessTypeMessages;
72
73 const liveFeed = sbot.createLogStream({
74 live: true,
75 gt: since,
76 reverse,
77 });
78
79 return pull(
80 liveFeed,
81 msgMatchesFilter(
82 playerId,
83 false,
84 messageTypes,
85 ),
86 );
87 }
88
89 function getGameId(msg) {
90 if (msg.value.content.type === 'chess_invite') {
91 return msg.key;
92 }
93 if (!msg.value || !msg.value.content || !msg.value.content.root) {
94 return null;
95 }
96 return msg.value.content.root;
97 }
98
99 function isChessMessage(msg, msgTypes) {
100 if (!msg.value || !msg.value.content) {
101 return false;
102 }
103
104 return msgTypes.indexOf(msg.value.content.type) !== -1;
105 }
106
107 function msgMatchesFilter(playerId, playerShouldBeInGame, messageTypes) {
108 return pull(
109 pull(
110 pull.filter(msg => isChessMessage(msg, messageTypes)),
111 asyncRemove((msg, cb) => {
112 const gameId = getGameId(msg);
113
114 if (gameId == null) {
115 cb(null, false);
116 } else {
117 ssbChessIndex.gameHasPlayer(gameId, playerId, (err, result) => {
118 if (playerShouldBeInGame) {
119 cb(err, !result);
120 } else {
121 cb(err, result);
122 }
123 });
124 }
125 }),
126 ),
127 );
128 }
129
130 function getRingBufferGameMsgsForPlayer(id, getSituationObservable, msgTypes, size, opts) {
131 const since = opts ? opts.since : null;
132
133 const nonLiveMsgSources = msgTypes.map(type => pull(
134 sbot.messagesByType({ type, reverse: true, gte: since }),
135 msgMatchesFilter(id, true, msgTypes),
136 ));
137
138 const liveStream = chessMessagesForPlayerGames(id, {
139 live: true,
140 // Go Back a minute in case we missed any while loading the old ones.
141 since: Date.now() - 60000,
142 messageTypes: msgTypes,
143 });
144
145 const oldEntries = pull(
146 pull(
147 many(nonLiveMsgSources),
148 ),
149 pull.take(size),
150 );
151
152 // Take a limited amount of old messages and then add any new live messages to
153 // the top of the observable list
154 const stream = concatStreams([oldEntries, liveStream]);
155
156 const obsArray = MutantArray([]);
157
158 let count = 0;
159 let pushToFront = false;
160 pull(stream, pull.drain((msg) => {
161 const situationObs = getSituationObservable(msg.value.content.root);
162
163 const entry = computed([situationObs], situation => ({
164 msg,
165 situation,
166 }));
167
168 if (msg.sync) {
169 // When we have reached messages arriving live in the stream, we start
170 // push to the front of the array rather than the end so the newest
171 // messages are at the top
172 pushToFront = true;
173 } else if (pushToFront) {
174 obsArray.insert(entry, 0);
175
176 if (count > size) {
177 // Remove the oldest entry if we have reached capacity.
178 obsArray.pop();
179 }
180 } else {
181 obsArray.push(entry);
182 }
183
184 count += 1;
185 }));
186
187 // Sort in descending order
188 return computed([obsArray], array => array.sort((a, b) => b.msg.timestamp - a.msg.timestamp));
189 }
190
191 return {
192 /**
193 * Watches for incoming updates to a player's games and sets the observable
194 * value to the latest chess message.
195 */
196 latestGameMessageForPlayerObs,
197
198 /**
199 * Watches for incoming updates to a games a player is not playing in and
200 * sets the observable value to the latest chess message.
201 */
202 latestGameMessageForOtherPlayersObs,
203
204 /**
205 * Get a ring buffer of game messages of a given type concerning a player's game.
206 * @playerId the id of the player to track game messages for.
207 * @msgs an array of game type messages to fill the buffer with, and a size for the
208 * @size The size of the ring buffer.
209 */
210 getRingBufferGameMsgsForPlayer,
211
212 /**
213 * A stream of chess game messages (excluding chat messages) for the given
214 * user after the given timestamp.
215 */
216 chessMessagesForPlayerGames,
217
218 /**
219 * A stream of chess game messages for games that the player of the
220 * given ID is not playing in.
221 */
222 chessMessagesForOtherPlayersGames,
223 };
224};