UNPKG

16.7 kBJavaScriptView Raw
1var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
2 return new (P || (P = Promise))(function (resolve, reject) {
3 function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
4 function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
5 function step(result) { result.done ? resolve(result.value) : new P(function (resolve) { resolve(result.value); }).then(fulfilled, rejected); }
6 step((generator = generator.apply(thisArg, _arguments || [])).next());
7 });
8};
9import { device as deviceEvent, DEVICE_ID_CLAIM, SESSION_ID_CLAIM, USERNAME_CLAIM, } from '@casual-simulation/causal-trees';
10import { Observable, Subscription, empty, } from 'rxjs';
11import { loadChannel, connectDeviceChannel, } from '@casual-simulation/causal-tree-server';
12import { switchMap, mergeMap, tap, concatMap, } from 'rxjs/operators';
13import { socketEvent } from './Utils';
14import { devicesForEvent, } from '@casual-simulation/causal-tree-server/DeviceManagerHelpers';
15/**
16 * Defines a class that is able to serve a set causal trees over Socket.io.
17 *
18 */
19export class CausalTreeServerSocketIO {
20 /**
21 * Creates a new causal tree factory that uses the given socket server, and channel manager.
22 * @param socketServer The Socket.IO server that should be used.
23 * @param channelManager The channel manager that should be used.
24 */
25 constructor(serverDevice, socketServer, deviceManager, channelManager, authenticator, authorizer) {
26 this._serverDevice = serverDevice;
27 this._server = socketServer;
28 this._subs = [];
29 this._deviceManager = deviceManager;
30 this._authenticator = authenticator;
31 this._authorizer = authorizer;
32 this._channelManager = channelManager;
33 this._channelSiteMap = new Map();
34 this._sessionIdMap = new Map();
35 this._deviceIdMap = new Map();
36 this._usernameMap = new Map();
37 this._init();
38 }
39 _listenForAtoms(info, socket, channel, siteMap) {
40 const eventName = `event_${info.id}`;
41 const listener = (refs) => __awaiter(this, void 0, void 0, function* () {
42 if (refs.length > 0) {
43 let site = refs[0].id.site;
44 siteMap.set(site, socket);
45 }
46 yield this._channelManager.addAtoms(channel, refs);
47 });
48 socket.on(eventName, listener);
49 return new Subscription(() => {
50 socket.off(eventName, listener);
51 });
52 }
53 _listenForInfoEvents(info, socket, channel) {
54 const listener = (event, callback) => __awaiter(this, void 0, void 0, function* () {
55 const currentInfo = yield this._channelManager.updateVersionInfo(channel, event);
56 callback(null, currentInfo);
57 });
58 const eventName = `info_${info.id}`;
59 socket.on(eventName, listener);
60 return new Subscription(() => {
61 socket.off(eventName, listener);
62 });
63 }
64 _listenForSiteIdEvents(info, socket, channel, siteMap) {
65 const eventName = `siteId_${info.id}`;
66 let grantedSiteId = null;
67 let listener = (site, callback) => __awaiter(this, void 0, void 0, function* () {
68 const allowed = yield this._channelManager.requestSiteId(channel, site);
69 if (allowed) {
70 grantedSiteId = site.id;
71 siteMap.set(site.id, socket);
72 socket.to(info.id).emit(eventName, site);
73 }
74 callback(null, allowed);
75 });
76 socket.on(eventName, listener);
77 return new Subscription(() => {
78 socket.off(eventName, listener);
79 if (grantedSiteId) {
80 siteMap.delete(grantedSiteId);
81 }
82 });
83 }
84 _listenForWeaveEvents(info, socket, channel) {
85 const eventName = `weave_${info.id}`;
86 const listener = (event, callback) => __awaiter(this, void 0, void 0, function* () {
87 const exported = yield this._channelManager.exchangeWeaves(channel, event);
88 callback(null, exported);
89 });
90 socket.on(eventName, listener);
91 return new Subscription(() => {
92 socket.off(eventName, listener);
93 });
94 }
95 _listenForLeaveEvents(device, info, socket, loaded) {
96 const eventName = `leave_${info.id}`;
97 const listener = () => {
98 socket.leave(info.id);
99 this._deviceManager.leaveChannel(device, info);
100 };
101 socket.on(eventName, listener);
102 return new Subscription(() => {
103 socket.off(eventName, listener);
104 });
105 }
106 _listenForRemoteEvents(device, info, socket, channel) {
107 const eventName = `remote_event_${info.id}`;
108 const listener = (events) => __awaiter(this, void 0, void 0, function* () {
109 yield this._routeRemoteEvents(info, events, device, channel);
110 });
111 socket.on(eventName, listener);
112 return new Subscription(() => {
113 socket.off(eventName, listener);
114 });
115 }
116 _getSocketsForDevice(deviceId) {
117 let sockets = this._deviceIdMap.get(deviceId);
118 if (!sockets) {
119 sockets = new Set();
120 this._deviceIdMap.set(deviceId, sockets);
121 }
122 return sockets;
123 }
124 _getSocketsForUsername(username) {
125 let sockets = this._usernameMap.get(username);
126 if (!sockets) {
127 sockets = new Set();
128 this._usernameMap.set(username, sockets);
129 }
130 return sockets;
131 }
132 _listenForServerEvents(device, info, socket, channel) {
133 const handle = {
134 socket,
135 info,
136 };
137 const sessionId = device.claims[SESSION_ID_CLAIM];
138 const deviceId = device.claims[DEVICE_ID_CLAIM];
139 const username = device.claims[USERNAME_CLAIM];
140 this._sessionIdMap.set(sessionId, handle);
141 const deviceSockets = this._getSocketsForDevice(deviceId);
142 const usernameSockets = this._getSocketsForUsername(username);
143 deviceSockets.add(handle);
144 usernameSockets.add(handle);
145 return new Subscription(() => {
146 this._sessionIdMap.delete(device.claims[SESSION_ID_CLAIM]);
147 deviceSockets.delete(handle);
148 usernameSockets.delete(handle);
149 });
150 }
151 _routeRemoteEvents(info, events, device, channel) {
152 return __awaiter(this, void 0, void 0, function* () {
153 const connectedDevices = this._deviceManager.getConnectedDevices(info);
154 const devices = connectedDevices.map(d => [d, d.extra.info]);
155 let server = [];
156 let deviceEvents = new Map();
157 for (let event of events) {
158 let dEvent = deviceEvent(device, event.event);
159 if (event.sessionId || event.deviceId || event.username) {
160 let result = devicesForEvent(event, devices);
161 for (let device of result) {
162 let list = deviceEvents.get(device);
163 if (!list) {
164 list = [];
165 deviceEvents.set(device, list);
166 }
167 list.push(dEvent);
168 }
169 }
170 else {
171 server.push(dEvent);
172 }
173 }
174 if (server.length > 0) {
175 yield this._channelManager.sendEvents(channel, server);
176 }
177 deviceEvents.forEach((events, device) => {
178 let socket = device.extra.socket;
179 socket.emit(`remote_event_${info.id}`, events);
180 });
181 });
182 }
183 _setupListeners(socket, device, extra, channel, loaded) {
184 let siteMap = this._channelSiteMap.get(channel.info.id);
185 if (!siteMap) {
186 siteMap = new Map();
187 this._channelSiteMap.set(channel.info.id, siteMap);
188 }
189 return [
190 this._listenForAtoms(channel.info, socket, loaded, siteMap),
191 this._listenForInfoEvents(channel.info, socket, loaded),
192 this._listenForSiteIdEvents(channel.info, socket, loaded, siteMap),
193 this._listenForWeaveEvents(channel.info, socket, loaded),
194 this._listenForLeaveEvents(device, channel.info, socket, loaded),
195 this._listenForRemoteEvents(extra.info, channel.info, socket, loaded),
196 this._listenForServerEvents(extra.info, channel.info, socket, loaded),
197 ];
198 }
199 _init() {
200 this._subs.push(this._channelManager.whileCausalTreeLoaded((tree, info, events) => {
201 let siteMap = this._channelSiteMap.get(info.id);
202 if (!siteMap) {
203 siteMap = new Map();
204 this._channelSiteMap.set(info.id, siteMap);
205 }
206 return [
207 events.subscribe(events => {
208 let socketEvents = new Map();
209 for (let event of events) {
210 if (event.sessionId) {
211 const session = this._sessionIdMap.get(event.sessionId);
212 if (session) {
213 addEvent(session, event);
214 }
215 }
216 if (event.deviceId) {
217 const sockets = this._deviceIdMap.get(event.deviceId);
218 if (sockets) {
219 for (let socket of sockets) {
220 addEvent(socket, event);
221 }
222 }
223 }
224 if (event.username) {
225 const sockets = this._usernameMap.get(event.username);
226 if (sockets) {
227 for (let socket of sockets) {
228 addEvent(socket, event);
229 }
230 }
231 }
232 }
233 for (let [handle, events] of socketEvents) {
234 const mapped = [...events].map(e => deviceEvent(this._serverDevice, e.event));
235 handle.socket.emit(`remote_event_${handle.info.id}`, mapped);
236 }
237 function addEvent(socket, event) {
238 let events = socketEvents.get(socket);
239 if (!events) {
240 events = new Set([event]);
241 socketEvents.set(socket, events);
242 }
243 else {
244 events.add(event);
245 }
246 }
247 }),
248 tree.atomAdded.subscribe(atoms => {
249 if (atoms.length > 0) {
250 let site = atoms[0].id.site;
251 let siteSocket = siteMap.get(site);
252 if (siteSocket) {
253 siteSocket
254 .to(info.id)
255 .emit(`event_${info.id}`, atoms);
256 }
257 else {
258 this._server
259 .to(info.id)
260 .emit(`event_${info.id}`, atoms);
261 }
262 }
263 }),
264 ];
265 }), this._deviceManager.whenConnectedToChannel((device, channel) => __awaiter(this, void 0, void 0, function* () {
266 let subs = [];
267 const extra = device.extra;
268 const socket = extra.socket;
269 const loaded = yield this._channelManager.loadChannel(channel.info);
270 subs.push(yield this._channelManager.connect(loaded, extra.info), loaded.subscription, ...this._setupListeners(socket, device, extra, channel, loaded));
271 return subs;
272 })));
273 this._server.on('connection', (socket) => __awaiter(this, void 0, void 0, function* () {
274 const loginEvents = socketEvent(socket, 'login', (token) => ({ token }));
275 const loginFlow = loginEvents.pipe(tap(({ token }) => {
276 console.log(`[CasualTreeServerSocketIO] Logging ${token.username} in...`);
277 }), switchMap(({ token }) => this._authenticator.authenticate(token), (data, result) => (Object.assign({}, data, { result }))), tap(({ token, result }) => {
278 if (!result.success) {
279 console.log(`[CasualTreeServerSocketIO] ${token.username} not authenticated.`);
280 socket.emit('login_result', {
281 error: result.error,
282 message: 'Unable to authenticate',
283 }, null);
284 }
285 else {
286 console.log(`[CasualTreeServerSocketIO] ${token.username} logged in!`);
287 }
288 }), switchMap(({ result }) => !result.success
289 ? empty()
290 : connectDevice(this._deviceManager, socket.id, {
291 info: result.info,
292 socket: socket,
293 }).pipe(tap(device => {
294 socket.emit('login_result', null, result.info);
295 }), mergeMap(device => socketEvent(socket, 'join_channel', (info) => ({ info })), (device, info) => (Object.assign({ device }, info))))), mergeMap(({ info, device }) => this._authorizer.isAllowedToLoad(device.extra.info, info), (data, canLoad) => (Object.assign({}, data, { canLoad }))), tap(({ info, canLoad }) => {
296 if (!canLoad) {
297 console.log('[CausalTreeServerSocketIO] Not allowed to load channel: ' +
298 info.id);
299 socket.emit(`join_channel_result_${info.id}`, 'channel_doesnt_exist');
300 }
301 else {
302 console.log('[CausalTreeServerSocketIO] Allowed to load channel: ' +
303 info.id);
304 }
305 }), mergeMap(({ canLoad, info }) => !canLoad
306 ? empty()
307 : loadChannel(this._channelManager, info), (data, loaded) => (Object.assign({}, data, { loaded }))), switchMap(({ device, loaded }) => this._authorizer.isAllowedAccess(device.extra.info, loaded), (data, authorized) => (Object.assign({}, data, { authorized }))), tap(({ info, authorized, loaded }) => {
308 if (!authorized) {
309 console.log('[CausalTreeServerSocketIO] Not authorized:' +
310 info.id);
311 loaded.subscription.unsubscribe();
312 socket.emit(`join_channel_result_${info.id}`, 'unauthorized');
313 }
314 else {
315 console.log('[CausalTreeServerSocketIO] Authorized:' + info.id);
316 }
317 }), switchMap(({ authorized, info, device, loaded }) => !authorized
318 ? empty()
319 : join(socket, info.id).pipe(concatMap(() => connectDeviceChannel(this._deviceManager, device, info)), tap(() => {
320 console.log('[CausalTreeServerSocketIO] Finish join channel:' +
321 info.id);
322 loaded.subscription.unsubscribe();
323 socket.emit(`join_channel_result_${info.id}`, null);
324 }))));
325 const sub = loginFlow.subscribe(null, err => console.error(err));
326 socket.on('disconnect', () => {
327 if (sub) {
328 sub.unsubscribe();
329 }
330 });
331 }));
332 }
333}
334function connectDevice(manager, id, extra) {
335 return Observable.create((observer) => {
336 let device;
337 setup();
338 return new Subscription(() => {
339 if (device) {
340 manager.disconnectDevice(device);
341 }
342 });
343 function setup() {
344 return __awaiter(this, void 0, void 0, function* () {
345 try {
346 device = yield manager.connectDevice(id, extra);
347 observer.next(device);
348 }
349 catch (err) {
350 observer.error(err);
351 }
352 });
353 }
354 });
355}
356function join(socket, id) {
357 return Observable.create((observer) => {
358 socket.join(id, err => {
359 if (err) {
360 observer.error(err);
361 return;
362 }
363 observer.next();
364 });
365 return () => {
366 socket.leave(id);
367 };
368 });
369}
370//# sourceMappingURL=CausalTreeServerSocketIO.js.map
\No newline at end of file