1 | const asyncRemove = require('pull-async-filter');
|
2 | const concatStreams = require('pull-cat');
|
3 | const computed = require('mutant/computed');
|
4 | const pull = require('pull-stream');
|
5 | const many = require('pull-many');
|
6 | const MutantArray = require('mutant/array');
|
7 | const MutantPullReduce = require('mutant-pull-reduce');
|
8 |
|
9 | module.exports = (sbot, ssbChessIndex) => {
|
10 | const chessTypeMessages = [
|
11 | 'chess_invite',
|
12 | 'chess_move',
|
13 | 'chess_invite_accept',
|
14 | 'chess_game_end'];
|
15 |
|
16 | function latestGameMessageForPlayerObs(playerId) {
|
17 | const opts = {
|
18 | since: Date.now(),
|
19 | reverse: false,
|
20 | };
|
21 |
|
22 | const chessMessagesReferencingPlayer = chessMessagesForPlayerGames(playerId, opts);
|
23 |
|
24 | return MutantPullReduce(chessMessagesReferencingPlayer, (state, next) => next, {
|
25 | nextTick: true,
|
26 | sync: true,
|
27 | });
|
28 | }
|
29 |
|
30 | function latestGameMessageForOtherPlayersObs(playerId) {
|
31 | const opts = {
|
32 | since: Date.now(),
|
33 | reverse: false,
|
34 | };
|
35 |
|
36 | const observingGamesMsgs = chessMessagesForOtherPlayersGames(playerId, opts);
|
37 |
|
38 | return MutantPullReduce(observingGamesMsgs, (state, next) => next, {
|
39 | nextTick: true,
|
40 | sync: true,
|
41 | });
|
42 | }
|
43 |
|
44 | function chessMessagesForPlayerGames(playerId, opts) {
|
45 | const since = opts ? opts.since : null;
|
46 | const reverse = opts ? opts.reverse : false;
|
47 | const messageTypes = opts && opts.messageTypes ? opts.messageTypes : chessTypeMessages;
|
48 |
|
49 |
|
50 | const liveStream = (opts && (opts.live !== undefined && opts.live !== null)) ? opts.live : true;
|
51 |
|
52 | const liveFeed = sbot.createLogStream({
|
53 | live: liveStream,
|
54 | gt: since,
|
55 | reverse,
|
56 | });
|
57 |
|
58 | return pull(
|
59 | liveFeed,
|
60 | msgMatchesFilter(
|
61 | playerId,
|
62 | true,
|
63 | messageTypes,
|
64 | ),
|
65 | );
|
66 | }
|
67 |
|
68 | function chessMessagesForOtherPlayersGames(playerId, opts) {
|
69 | const since = opts ? opts.since : null;
|
70 | const reverse = opts ? opts.reverse : false;
|
71 | const messageTypes = opts && opts.messageTypes ? opts.messageTypes : chessTypeMessages;
|
72 |
|
73 | const liveFeed = sbot.createLogStream({
|
74 | live: true,
|
75 | gt: since,
|
76 | reverse,
|
77 | });
|
78 |
|
79 | return pull(
|
80 | liveFeed,
|
81 | msgMatchesFilter(
|
82 | playerId,
|
83 | false,
|
84 | messageTypes,
|
85 | ),
|
86 | );
|
87 | }
|
88 |
|
89 | function getGameId(msg) {
|
90 | if (msg.value.content.type === 'chess_invite') {
|
91 | return msg.key;
|
92 | }
|
93 | if (!msg.value || !msg.value.content || !msg.value.content.root) {
|
94 | return null;
|
95 | }
|
96 | return msg.value.content.root;
|
97 | }
|
98 |
|
99 | function isChessMessage(msg, msgTypes) {
|
100 | if (!msg.value || !msg.value.content) {
|
101 | return false;
|
102 | }
|
103 |
|
104 | return msgTypes.indexOf(msg.value.content.type) !== -1;
|
105 | }
|
106 |
|
107 | function msgMatchesFilter(playerId, playerShouldBeInGame, messageTypes) {
|
108 | return pull(
|
109 | pull(
|
110 | pull.filter(msg => isChessMessage(msg, messageTypes)),
|
111 | asyncRemove((msg, cb) => {
|
112 | const gameId = getGameId(msg);
|
113 |
|
114 | if (gameId == null) {
|
115 | cb(null, false);
|
116 | } else {
|
117 | ssbChessIndex.gameHasPlayer(gameId, playerId, (err, result) => {
|
118 | if (playerShouldBeInGame) {
|
119 | cb(err, !result);
|
120 | } else {
|
121 | cb(err, result);
|
122 | }
|
123 | });
|
124 | }
|
125 | }),
|
126 | ),
|
127 | );
|
128 | }
|
129 |
|
130 | function getRingBufferGameMsgsForPlayer(id, getSituationObservable, msgTypes, size, opts) {
|
131 | const since = opts ? opts.since : null;
|
132 |
|
133 | const nonLiveMsgSources = msgTypes.map(type => pull(
|
134 | sbot.messagesByType({ type, reverse: true, gte: since }),
|
135 | msgMatchesFilter(id, true, msgTypes),
|
136 | ));
|
137 |
|
138 | const liveStream = chessMessagesForPlayerGames(id, {
|
139 | live: true,
|
140 |
|
141 | since: Date.now() - 60000,
|
142 | messageTypes: msgTypes,
|
143 | });
|
144 |
|
145 | const oldEntries = pull(
|
146 | pull(
|
147 | many(nonLiveMsgSources),
|
148 | ),
|
149 | pull.take(size),
|
150 | );
|
151 |
|
152 |
|
153 |
|
154 | const stream = concatStreams([oldEntries, liveStream]);
|
155 |
|
156 | const obsArray = MutantArray([]);
|
157 |
|
158 | let count = 0;
|
159 | let pushToFront = false;
|
160 | pull(stream, pull.drain((msg) => {
|
161 | const situationObs = getSituationObservable(msg.value.content.root);
|
162 |
|
163 | const entry = computed([situationObs], situation => ({
|
164 | msg,
|
165 | situation,
|
166 | }));
|
167 |
|
168 | if (msg.sync) {
|
169 |
|
170 |
|
171 |
|
172 | pushToFront = true;
|
173 | } else if (pushToFront) {
|
174 | obsArray.insert(entry, 0);
|
175 |
|
176 | if (count > size) {
|
177 |
|
178 | obsArray.pop();
|
179 | }
|
180 | } else {
|
181 | obsArray.push(entry);
|
182 | }
|
183 |
|
184 | count += 1;
|
185 | }));
|
186 |
|
187 |
|
188 | return computed([obsArray], array => array.sort((a, b) => b.msg.timestamp - a.msg.timestamp));
|
189 | }
|
190 |
|
191 | return {
|
192 | |
193 |
|
194 |
|
195 |
|
196 | latestGameMessageForPlayerObs,
|
197 |
|
198 | |
199 |
|
200 |
|
201 |
|
202 | latestGameMessageForOtherPlayersObs,
|
203 |
|
204 | |
205 |
|
206 |
|
207 |
|
208 |
|
209 |
|
210 | getRingBufferGameMsgsForPlayer,
|
211 |
|
212 | |
213 |
|
214 |
|
215 |
|
216 | chessMessagesForPlayerGames,
|
217 |
|
218 | |
219 |
|
220 |
|
221 |
|
222 | chessMessagesForOtherPlayersGames,
|
223 | };
|
224 | };
|