UNPKG

14.4 kBJavaScriptView Raw
1import { Buffer } from 'buffer';
2import WaitList from 'promise-waitlist';
3import createDebug from 'debug';
4
5import Chain from './protocol/chain';
6import Channel from './protocol/channel';
7import Identity from './protocol/identity';
8import Link from './protocol/link';
9import Message from './protocol/message';
10import StorageCache from './protocol/cache';
11import MemoryStorage from './storage/memory';
12import Peer from './peer';
13
14export {
15 Chain,
16 Channel,
17 Identity,
18 Link,
19 Message,
20 StorageCache,
21
22 // Storage
23 MemoryStorage,
24
25 // Networking
26 Peer,
27};
28
29const debug = createDebug('vowlink:protocol');
30
31export default class Protocol {
32 /**
33 * Instance of the VowLink Protocol. Responsible for managing:
34 *
35 * 1. List of channels (+ persistence)
36 * 2. List of identities (+ persistence)
37 * 3. Remote peers (+ sync)
38 *
39 * @class
40 * @param {Object} options - configuration of the Protocol instance. May have
41 * a `.storage` key with an instance of Storage provider to
42 * be used.
43 */
44 constructor({ sodium, storage, passphrase } = {}) {
45 if (!sodium) {
46 throw new Error('Missing required `sodium` option');
47 }
48
49 this.sodium = sodium;
50 this.storage = storage || new MemoryStorage();
51
52 this.peers = new Set();
53
54 this.id = Buffer.alloc(Peer.ID_LENGTH);
55 sodium.randombytes_buf(this.id);
56 this.debugId = this.id.toString('hex').slice(0, 8);
57
58 // See: waitForInvite()
59 this.inviteWaitList = new WaitList();
60
61 // See: waitForPeer()
62 this.peerWaitList = new WaitList();
63
64 // See: Peer#ready
65 this.globalPeerIds = new Set();
66
67 /** @member {Channel[]} in-memory list of channels */
68 this.channels = [];
69
70 /** @member {Identity[]} in-memory list of identities */
71 this.identities = [];
72
73 // Channel = > WaitListEntry
74 this.channelWaiters = new Map();
75
76 this.encryptionKey = null;
77 if (passphrase) {
78 this.debug('generating encryption key...');
79 const salt = Buffer.alloc(sodium.crypto_pwhash_SALTBYTES);
80 sodium.crypto_generichash(salt, Buffer.from('vowlink-persistence'));
81
82 this.encryptionKey = Buffer.alloc(sodium.crypto_secretbox_KEYBYTES);
83 sodium.crypto_pwhash(
84 this.encryptionKey,
85 Buffer.from(passphrase),
86 salt,
87 sodium.crypto_pwhash_OPSLIMIT_MODERATE,
88 sodium.crypto_pwhash_MEMLIMIT_MODERATE,
89 sodium.crypto_pwhash_ALG_DEFAULT);
90
91 this.debug('generated encryption key');
92 }
93
94 this.debug('created');
95 }
96
97 //
98 // Persistence
99 //
100
101 /**
102 * First method to be invoked on a Protocol instance. Must be awaited before
103 * doing any networking or other API calls.
104 *
105 * @returns {Promise} boolean value. If `false` - the decryption has failed.
106 */
107 async load() {
108 const channelIds = await this.storage.getEntityKeys('channel');
109 for (const id of channelIds) {
110 const serialized = await this.storage.retrieveEntity('channel', id);
111 const decrypted = this.decryptData(serialized);
112 if (!decrypted) {
113 return false;
114 }
115 const channel = await Channel.deserializeData(decrypted, {
116 sodium: this.sodium,
117 storage: this.storage,
118 });
119 this.addChannel(channel, false);
120 this.debug('loaded channel.name=%s', channel.name);
121 }
122
123 const identityNames = await this.storage.getEntityKeys('identity');
124 for (const id of identityNames) {
125 const serialized = await this.storage.retrieveEntity('identity', id);
126 const decrypted = this.decryptData(serialized);
127 if (!decrypted) {
128 return false;
129 }
130 const identity = Identity.deserializeData(decrypted, {
131 sodium: this.sodium,
132 });
133 this.identities.push(identity);
134 this.debug('loaded id.name=%s', identity.name);
135 }
136
137 this.identities.sort(Identity.compare);
138 return true;
139 }
140
141 /**
142 * Add and save to persistence a new Identity instance.
143 *
144 * @param {Identity} id - Identity instance to add
145 * @returns {Promise}
146 */
147 async addIdentity(id) {
148 if (this.identities.some((existing) => id.name === existing.name)) {
149 throw new Error('Duplicate identity');
150 }
151
152 this.identities.push(id);
153 this.identities.sort(Identity.compare);
154 await this.saveIdentity(id);
155 }
156
157 async removeIdentity(identity) {
158 this.debug('removing identity.id=%s', identity.debugHash);
159
160 const index = this.identities.indexOf(identity);
161 if (index === -1) {
162 return;
163 }
164 this.identities.splice(index, 1);
165
166 await this.storage.removeEntity('identity',
167 identity.publicKey.toString('hex'));
168 }
169
170 /**
171 * Add and save to persistence a new Channel instance.
172 *
173 * @param {Channel} channel - Channel instance to add
174 * @returns {Promise} Promise that resolves to either existing channel or
175 * supplied channel
176 */
177 async addChannel(channel, save = true) {
178 let existing = this.getChannel(channel.name) ||
179 this.channels.find((existing) => existing.equals(channel));
180 if (existing) {
181 if (existing.equals(channel)) {
182 this.debug('updating existing channel.id=%s', channel.debugId);
183 return existing;
184 }
185
186 throw new Error(`Channel with a duplicate name: "${channel.name}"`);
187 }
188
189 this.debug('adding channel.id=%s', channel.debugId);
190
191 this.channels.push(channel);
192 this.channels.sort(Channel.compare);
193
194 if (save) {
195 await this.saveChannel(channel);
196 }
197
198 for (const peer of this.peers) {
199 peer.onNewChannel(channel);
200 }
201
202 const waitLoop = () => {
203 // Prevent duplicates
204 if (this.channelWaiters.has(channel)) {
205 return;
206 }
207
208 const waiter = channel.waitForOutgoingMessage();
209 this.channelWaiters.set(channel, waiter);
210
211 waiter.promise.then(() => {
212 this.onNewMessage(channel);
213
214 this.channelWaiters.delete(channel);
215 waitLoop();
216 }).catch((err) => {
217 this.debug('channel waiter promise error=%j', err.message);
218 });
219 };
220 waitLoop();
221
222 return channel;
223 }
224
225 async removeChannel(channel) {
226 this.debug('removing channel.id=%s', channel.debugId);
227
228 const index = this.channels.indexOf(channel);
229 if (index === -1) {
230 return;
231 }
232 this.channels.splice(index, 1);
233
234 if (this.channelWaiters.has(channel)) {
235 const waiter = this.channelWaiters.get(channel);
236 this.channelWaiters.delete(channel);
237 waiter.cancel();
238 }
239
240 await this.storage.removeEntity('channel', channel.id.toString('hex'));
241 await this.storage.removeChannelMessages(channel.id);
242 }
243
244 /**
245 * Save or update instance of Channel in the persistence.
246 *
247 * @param {Channel} channel - Channel instance to save/update
248 * @returns {Promise}
249 */
250 async saveChannel(channel) {
251 this.debug('saving channel.name=%s', channel.name);
252 await this.storage.storeEntity('channel', channel.id.toString('hex'),
253 this.encryptData(channel.serializeData()));
254 }
255
256 /**
257 * Save or update instance of Identity in the persistence.
258 *
259 * @param {Identity} id - Identity instance to save/update
260 * @returns {Promise}
261 */
262 async saveIdentity(id) {
263 this.debug('saving id.name=%s', id.name);
264 await this.storage.storeEntity(
265 'identity', id.publicKey.toString('hex'),
266 this.encryptData(id.serializeData()));
267 }
268
269 //
270 // Identity
271 //
272
273 /**
274 * Create new identity and a channel associated to it. Store both in
275 * persistence.
276 *
277 * @param {string} name - Name of identity (and channel)
278 * @returns {Promise} A Promise with a tuple of newly created Identity and
279 * Channel
280 */
281 async createIdentityPair(name) {
282 const identity = new Identity(name, { sodium: this.sodium });
283
284 const channel = await Channel.fromIdentity(identity, {
285 name: identity.name,
286 sodium: this.sodium,
287 storage: this.storage,
288 });
289 await this.addChannel(channel);
290
291 // NOTE: Save identity after the channel to save the chain
292 await this.addIdentity(identity);
293
294 this.debug('created id.name=%s', identity.name);
295 return [ identity, channel ];
296 }
297
298 /**
299 * Get identity from the in-memory list.
300 *
301 * @param {string} name - Name of identity
302 * @returns {Identity}
303 */
304 getIdentity(name) {
305 return this.identities.find((id) => id.name === name);
306 }
307
308 /**
309 * Get names of in-memory identities.
310 *
311 * @returns {string[]}
312 */
313 getIdentityNames() {
314 return this.identities.map((id) => id.name);
315 }
316
317 //
318 // Channels
319 //
320
321 /**
322 * Get channel from the in-memory list.
323 *
324 * @param {string} name - Name of identity
325 * @returns {Channel}
326 */
327 getChannel(name) {
328 return this.channels.find((channel) => channel.name === name);
329 }
330
331 /**
332 * Get names of in-memory identities.
333 *
334 * @returns {string[]}
335 */
336 getChannelNames() {
337 return this.channels.map((channel) => channel.name);
338 }
339
340 async channelFromInvite(invite, identity, options) {
341 options = Object.assign({}, options, {
342 sodium: this.sodium,
343 storage: this.storage,
344 });
345 const channel = await Channel.fromInvite(invite,
346 Object.assign({}, options, { identity }));
347
348 // Save updated chain
349 await this.saveIdentity(identity);
350
351 // Save channel
352 // NOTE: we return value here to handle duplicates
353 return await this.addChannel(channel);
354 }
355
356 /**
357 * Notify connected (and ready) peers about new message on the channel.
358 * (NOTE: Mandatory to call in order to synchronize messages.)
359 *
360 * @param {Channel} channel - A Channel instance where the message was posted
361 */
362 onNewMessage(channel) {
363 this.debug('broadcasting new message on channel.id=%s', channel.debugId);
364 for (const peer of this.peers) {
365 peer.onNewMessage(channel);
366 }
367 }
368
369 //
370 // Invite
371 //
372
373 /**
374 * Wait for an Invite from the remote peer.
375 *
376 * @param {Buffer} requestId - An unique identifier pertaining to issued and
377 * sent request
378 * @param {Number} [timeout] - Optional timeout value in milliseconds
379 * @returns {WaitListEntry} An object with `.promise` property and `.cancel`
380 * method
381 */
382 waitForInvite(requestId, timeout) {
383 this.debug('wait for invite.id=%s', requestId.toString('hex').slice(0, 8));
384 return this.inviteWaitList.waitFor(requestId.toString('hex'), timeout);
385 }
386
387 /**
388 * Resolve the awaited Invite when issuing access to ourselves.
389 *
390 * @param {Buffer} encryptedInvite - Encrypted invite from `id.issueInvite`
391 * @returns {boolean} `true` if there was a waiter for invite with this
392 * `requestId`
393 */
394 resolveInvite(encryptedInvite) {
395 return this.inviteWaitList.resolve(
396 encryptedInvite.requestId.toString('hex'), encryptedInvite);
397 }
398
399 //
400 // Network
401 //
402
403 /**
404 * Start sending and receiving data from remote connection.
405 *
406 * @param {SocketBase} socket - Remote connection to be used for the data
407 * transfer
408 * @returns {Promise}
409 */
410 async connect(socket) {
411 this.debug('connecting through socket');
412 const peer = new Peer({
413 localId: this.id,
414 socket,
415 sodium: this.sodium,
416 globalPeerIds: this.globalPeerIds,
417 channels: this.channels,
418 inviteWaitList: this.inviteWaitList,
419 });
420
421 try {
422 const isNotDuplicate = await peer.ready();
423
424 // NOTE: Already disconnected
425 if (!isNotDuplicate) {
426 return false;
427 }
428
429 // NOTE: We could attempt to handle duplicates here, but the malicious
430 // entity could try to evict peers by using their ids.
431
432 this.debug('new peer.id=%s', peer.debugId);
433 this.peers.add(peer);
434 this.peerWaitList.resolve(peer.remoteId.toString('hex'), peer);
435
436 await peer.loop();
437
438 this.peers.delete(peer);
439 } catch (e) {
440 this.debug('got error: %s', e.stack);
441 this.peers.delete(peer);
442 await peer.destroy(e.message);
443 throw e;
444 }
445 return true;
446 }
447
448 /**
449 * Wait for a Peer with supplied `peerId` to appear and be ready.
450 *
451 * @param {Buffer} peerId - Peer identifier
452 * @param {Number} [timeout] - Optional timeout value in milliseconds
453 * @returns {WaitListEntry} An object with `.promise` property and `.cancel`
454 * method
455 */
456 waitForPeer(peerId, timeout) {
457 for (const existing of this.peers) {
458 if (existing.remoteId.equals(peerId)) {
459 this.debug('found existing peer.id=%s', existing.debugId);
460 return WaitList.resolve(existing);
461 }
462 }
463
464 this.debug('wait for peer.id=%s', peerId.toString('hex').slice(0, 8));
465 return this.peerWaitList.waitFor(peerId.toString('hex'), timeout);
466 }
467
468 /**
469 * Disconnect all peers. Zero private keys in identities.
470 *
471 * @returns {Promise}
472 */
473 async close() {
474 const peers = Array.from(this.peers);
475 this.peers.clear();
476
477 try {
478 await Promise.all(peers.map(async (peer) => {
479 await peer.destroy('Closed');
480 }));
481 } finally {
482 // Zero private keys in all identities
483 for (const id of this.identities) {
484 id.clear();
485 }
486 // Zero encryption keys in all identities
487 for (const channel of this.channels) {
488 channel.clear();
489 }
490 }
491
492 this.inviteWaitList.close(new Error('Closed'));
493 this.peerWaitList.close(new Error('Closed'));
494 }
495
496 /** **(Internal)** */
497 debug(fmt, ...args) {
498 debug('[%s] ' + fmt, ...[ this.debugId ].concat(args));
499 }
500
501 encryptData(data) {
502 if (!this.encryptionKey) {
503 return data;
504 }
505
506 const sodium = this.sodium;
507
508 const result = Buffer.alloc(
509 sodium.crypto_secretbox_NONCEBYTES +
510 data.length +
511 sodium.crypto_secretbox_MACBYTES);
512 const nonce = result.slice(0, sodium.crypto_secretbox_NONCEBYTES);
513 const ciphertext = result.slice(nonce.length);
514
515 sodium.randombytes_buf(nonce);
516 sodium.crypto_secretbox_easy(ciphertext, data, nonce,
517 this.encryptionKey);
518
519 return result;
520 }
521
522 decryptData(encrypted) {
523 if (!this.encryptionKey) {
524 return encrypted;
525 }
526
527 const sodium = this.sodium;
528
529 const nonce = encrypted.slice(0, sodium.crypto_secretbox_NONCEBYTES);
530 const ciphertext = encrypted.slice(nonce.length);
531 const cleartext = Buffer.alloc(ciphertext.length -
532 sodium.crypto_secretbox_MACBYTES);
533 const success = sodium.crypto_secretbox_open_easy(
534 cleartext,
535 ciphertext,
536 nonce,
537 this.encryptionKey);
538
539 if (!success) {
540 return undefined;
541 }
542 return cleartext;
543 }
544}