UNPKG

10.2 kBJavaScriptView Raw
1import createDebug from 'debug';
2import WaitList from 'promise-waitlist';
3
4import Channel from './protocol/channel';
5import Identity from './protocol/identity';
6import Message from './protocol/message';
7import {
8 Hello as PHello,
9 Shake as PShake,
10 Packet as PPacket,
11} from './messages';
12import SyncAgent from './sync-agent';
13import { compareDistance } from './utils';
14
15const debug = createDebug('vowlink:peer');
16
17export const VERSION = 2;
18export const MAX_ERROR_REASON_LEN = 1024;
19export const ID_LENGTH = 32;
20export const HANDSHAKE_TIMEOUT = 5000;
21
22// "Unique" id for logging
23let uid = 0;
24
25export default class Peer {
26 /** **(Internal)** */
27 constructor(options = {}) {
28 const {
29 sodium,
30 localId,
31 socket,
32 globalPeerIds = new Set(),
33 channels = [],
34 inviteWaitList = new WaitList(),
35 } = options;
36
37 if (!sodium) {
38 throw new Error('Missing required `sodium` option');
39 }
40 if (!localId) {
41 throw new Error('Missing required `localId` option');
42 }
43 if (!socket) {
44 throw new Error('Missing required `socket` option');
45 }
46
47 this.sodium = sodium;
48
49 this.localId = localId;
50 this.remoteId = null;
51 this.destroyed = false;
52
53 this.debugId = '[not ready]';
54
55 this.socket = socket;
56
57 this.globalPeerIds = globalPeerIds;
58 this.channels = channels;
59 this.inviteWaitList = inviteWaitList;
60
61 // Channel => "active" | "pending"
62 this.syncAgents = new Map();
63 }
64
65 //
66 // High-level protocol
67 //
68
69 /** **(Internal)** */
70 async ready() {
71 await this.socket.send(PHello.encode({
72 version: VERSION,
73 peerId: this.localId,
74 }).finish());
75
76 const first = await this.socket.receive(HANDSHAKE_TIMEOUT);
77 const hello = PHello.decode(first);
78 if (hello.version !== VERSION) {
79 throw new Error('Unsupported protocol version: ' + hello.version);
80 }
81 if (hello.peerId.length !== ID_LENGTH) {
82 throw new Error('Invalid remote peer id length: ' + hello.peerId.length);
83 }
84 this.remoteId = hello.peerId;
85 this.debugId = this.remoteId.toString('hex').slice(0, 8) + ':' + uid;
86 uid = (uid + 1) >>> 0;
87
88 this.debug('got hello');
89
90 const compare = compareDistance(this.localId, this.remoteId);
91 const shouldShake = compare < 0;
92
93 const remoteHexId = this.remoteId.toString('hex');
94 const isDuplicate = this.globalPeerIds.has(remoteHexId) || compare === 0;
95 this.globalPeerIds.add(remoteHexId);
96
97 if (shouldShake) {
98 this.debug('shaking isDuplicate=%j', isDuplicate);
99 await this.socket.send(PShake.encode({
100 isDuplicate,
101 }).finish());
102
103 if (isDuplicate) {
104 this.remoteId = null;
105 await this.destroy('Duplicate');
106 return false;
107 }
108
109 return true;
110 }
111
112 const second = await this.socket.receive(HANDSHAKE_TIMEOUT);
113 const shake = PShake.decode(second);
114 this.debug('got shake isDuplicate=%j', shake.isDuplicate);
115
116 if (shake.isDuplicate) {
117 this.remoteId = null;
118 await this.destroy('Duplicate');
119 return false;
120 }
121
122 return true;
123 }
124
125 /** **(Internal)** */
126 async loop() {
127 this.debug('starting loop');
128
129 for (const channel of this.channels) {
130 this.synchronize(channel);
131 }
132
133 for (;;) {
134 const data = await this.socket.receive();
135 const packet = PPacket.decode(data);
136 this.debug('got packet.type=%s', packet.content);
137
138 switch (packet.content) {
139 case 'error':
140 throw new Error('Got error: ' +
141 packet.error.reason.slice(0, MAX_ERROR_REASON_LEN));
142 case 'invite':
143 await this.onInvite(packet.invite);
144 break;
145 case 'query':
146 await this.onQuery(packet.query);
147 break;
148 case 'queryResponse':
149 await this.onQueryResponse(packet.queryResponse);
150 break;
151 case 'bulk':
152 await this.onBulk(packet.bulk);
153 break;
154 case 'bulkResponse':
155 await this.onBulkResponse(packet.bulkResponse);
156 break;
157 case 'notification':
158 await this.onNotification(packet.notification);
159 break;
160 default:
161 throw new Error('Unsupported packet type: ' + packet.content);
162 }
163 }
164 }
165
166 /**
167 * Send an invite to remote peer.
168 *
169 * @param {Object} invite - `encryptedInvite` property of
170 * `Identity#issueInvite`
171 * @returns Promise
172 */
173 async sendInvite(invite) {
174 this.debug('sending invite');
175 const packet = PPacket.encode({
176 invite,
177 }).finish();
178 await this.socket.send(packet);
179 }
180
181 /** **(Internal)** */
182 async destroy(reason) {
183 if (this.destroyed) {
184 throw new Error('Already destroyed');
185 }
186
187 this.destroyed = true;
188 this.debug('destroying due to reason=%s', reason);
189
190 // Cleanup
191 if (this.remoteId !== null) {
192 const remoteHexId = this.remoteId.toString('hex');
193 this.globalPeerIds.delete(remoteHexId);
194 }
195
196 for (const agent of this.syncAgents.values()) {
197 agent.destroy();
198 }
199
200 const packet = PPacket.encode({
201 error: { reason },
202 }).finish();
203
204 try {
205 await this.socket.send(packet);
206 } catch (e) {
207 // swallow error
208 }
209 await this.socket.close();
210 }
211
212 //
213 // Miscellaneous events
214 //
215
216 /** **(Internal)** */
217 onNewChannel(channel) {
218 this.synchronize(channel);
219 }
220
221 /** **(Internal)** */
222 async onNewMessage(channel) {
223 await this.socket.send(PPacket.encode({
224 notification: {
225 channelId: channel.id,
226 },
227 }).finish());
228 }
229
230 //
231 // Handling packets
232 //
233
234 /** **(Internal)** */
235 async onInvite(packet) {
236 if (packet.requestId.length !== Identity.INVITE_REQUEST_ID_LENGTH) {
237 throw new Error('Invalid requestId in EncryptedInvite');
238 }
239
240 this.debug('got invite.id=%s',
241 packet.requestId.toString('hex').slice(0, 8));
242 this.inviteWaitList.resolve(packet.requestId.toString('hex'), packet);
243 }
244
245 /** **(Internal)** */
246 async onQuery(packet) {
247 Channel.checkId(packet.channelId, 'Invalid channelId in Query');
248 if (packet.cursor === 'hash') {
249 Message.checkHash(packet.hash, 'Invalid cursor.hash in Query');
250 }
251
252 const channel = this.getChannel(packet.channelId);
253 if (!channel) {
254 this.debug('responding to query for unknown channel');
255 await this.socket.send(PPacket.encode({
256 queryResponse: {
257 channelId: packet.channelId,
258 abbreviatedMessages: [],
259 forwardHash: null,
260 backwardHash: null,
261 },
262 }).finish());
263 return;
264 }
265
266 this.debug('query for channel.id=%s', channel.debugId);
267 return await this.getSyncAgent(channel).receiveQuery(packet);
268 }
269
270 /** **(Internal)** */
271 async onQueryResponse(packet) {
272 Channel.checkId(packet.channelId, 'Invalid channelId in QueryResponse');
273 for (const abbr of packet.abbreviatedMessages) {
274 Message.checkHash(abbr.hash, 'Invalid abbreviated message hash');
275 for (const hash of abbr.parents) {
276 Message.checkHash(hash, 'Invalid abbreviated message parent hash');
277 }
278 }
279 if (packet.forwardHash.length !== 0) {
280 Message.checkHash(packet.forwardHash, 'Invalid forward hash');
281 }
282 if (packet.backwardHash.length !== 0) {
283 Message.checkHash(packet.backwardHash, 'Invalid backward hash');
284 }
285
286 const channel = this.getChannel(packet.channelId);
287 if (!channel) {
288 this.debug('ignoring query response for unknown channel');
289 return;
290 }
291
292 return await this.getSyncAgent(channel).receiveQueryResponse(packet);
293 }
294
295 /** **(Internal)** */
296 async onBulk(packet) {
297 Channel.checkId(packet.channelId, 'Invalid channelId in Bulk');
298 for (const hash of packet.hashes) {
299 Message.checkHash(hash, 'Invalid message hash in Bulk');
300 }
301
302 const channel = this.getChannel(packet.channelId);
303 if (!channel) {
304 this.debug('responding bulk for unknown channel');
305 await this.socket.send(PPacket.encode({
306 bulkResponse: {
307 channelId: packet.channelId,
308 messages: [],
309 forwardIndex: packet.hashes.length,
310 },
311 }).finish());
312 return;
313 }
314
315 this.debug('bulk for channel.id=%s hashes.length=%d', channel.debugId,
316 packet.hashes.length);
317 return await this.getSyncAgent(channel).receiveBulk(packet);
318 }
319
320 /** **(Internal)** */
321 async onBulkResponse(packet) {
322 Channel.checkId(packet.channelId, 'Invalid channelId in BulkResponse');
323
324 const channel = this.getChannel(packet.channelId);
325 if (!channel) {
326 this.debug('ignoring bulk response for unknown channel');
327 return;
328 }
329
330 // NOTE: `Message` constructor will check each message
331 return await this.getSyncAgent(channel).receiveBulkResponse(packet);
332 }
333
334 /** **(Internal)** */
335 async onNotification(packet) {
336 Channel.checkId(packet.channelId, 'Invalid channelId in Notification');
337
338 const channel = this.getChannel(packet.channelId);
339 if (!channel) {
340 return;
341 }
342
343 this.debug('notification for channel.id=%s', channel.debugId);
344 this.synchronize(channel);
345 }
346
347 //
348 // Synchronization
349 //
350
351 /** **(Internal)** */
352 synchronize(channel) {
353 this.debug('channel.id=%s sync start', channel.debugId);
354 this.getSyncAgent(channel).synchronize().catch((e) => {
355 this.debug('channel.id=%s sync error.message=%s', channel.debugId,
356 e.stack);
357 });
358 }
359
360 //
361 // Utils
362 //
363
364 /** **(Internal)** */
365 getChannel(channelId) {
366 return this.channels.find((channel) => {
367 return channel.id.equals(channelId);
368 });
369 }
370
371 /** **(Internal)** */
372 getSyncAgent(channel) {
373 let agent;
374 if (this.syncAgents.has(channel)) {
375 agent = this.syncAgents.get(channel);
376 } else {
377 agent = new SyncAgent({
378 channel,
379 socket: this.socket,
380 sodium: this.sodium,
381 });
382 this.syncAgents.set(channel, agent);
383 }
384 return agent;
385 }
386
387 /** **(Internal)** */
388 debug(fmt, ...args) {
389 debug('id=%s ' + fmt, ...[ this.debugId ].concat(args));
390 }
391}
392
393// Convenience
394Peer.VERSION = VERSION;
395Peer.MAX_ERROR_REASON_LEN = MAX_ERROR_REASON_LEN;
396Peer.ID_LENGTH = ID_LENGTH;