1 | import createDebug from 'debug';
|
2 | import WaitList from 'promise-waitlist';
|
3 |
|
4 | import Channel from './protocol/channel';
|
5 | import Identity from './protocol/identity';
|
6 | import Message from './protocol/message';
|
7 | import {
|
8 | Hello as PHello,
|
9 | Shake as PShake,
|
10 | Packet as PPacket,
|
11 | } from './messages';
|
12 | import SyncAgent from './sync-agent';
|
13 | import { compareDistance } from './utils';
|
14 |
|
15 | const debug = createDebug('vowlink:peer');
|
16 |
|
17 | export const VERSION = 2;
|
18 | export const MAX_ERROR_REASON_LEN = 1024;
|
19 | export const ID_LENGTH = 32;
|
20 | export const HANDSHAKE_TIMEOUT = 5000;
|
21 |
|
22 |
|
23 | let uid = 0;
|
24 |
|
25 | export default class Peer {
|
26 |
|
27 | constructor(options = {}) {
|
28 | const {
|
29 | sodium,
|
30 | localId,
|
31 | socket,
|
32 | globalPeerIds = new Set(),
|
33 | channels = [],
|
34 | inviteWaitList = new WaitList(),
|
35 | } = options;
|
36 |
|
37 | if (!sodium) {
|
38 | throw new Error('Missing required `sodium` option');
|
39 | }
|
40 | if (!localId) {
|
41 | throw new Error('Missing required `localId` option');
|
42 | }
|
43 | if (!socket) {
|
44 | throw new Error('Missing required `socket` option');
|
45 | }
|
46 |
|
47 | this.sodium = sodium;
|
48 |
|
49 | this.localId = localId;
|
50 | this.remoteId = null;
|
51 | this.destroyed = false;
|
52 |
|
53 | this.debugId = '[not ready]';
|
54 |
|
55 | this.socket = socket;
|
56 |
|
57 | this.globalPeerIds = globalPeerIds;
|
58 | this.channels = channels;
|
59 | this.inviteWaitList = inviteWaitList;
|
60 |
|
61 |
|
62 | this.syncAgents = new Map();
|
63 | }
|
64 |
|
65 |
|
66 |
|
67 |
|
68 |
|
69 |
|
70 | async ready() {
|
71 | await this.socket.send(PHello.encode({
|
72 | version: VERSION,
|
73 | peerId: this.localId,
|
74 | }).finish());
|
75 |
|
76 | const first = await this.socket.receive(HANDSHAKE_TIMEOUT);
|
77 | const hello = PHello.decode(first);
|
78 | if (hello.version !== VERSION) {
|
79 | throw new Error('Unsupported protocol version: ' + hello.version);
|
80 | }
|
81 | if (hello.peerId.length !== ID_LENGTH) {
|
82 | throw new Error('Invalid remote peer id length: ' + hello.peerId.length);
|
83 | }
|
84 | this.remoteId = hello.peerId;
|
85 | this.debugId = this.remoteId.toString('hex').slice(0, 8) + ':' + uid;
|
86 | uid = (uid + 1) >>> 0;
|
87 |
|
88 | this.debug('got hello');
|
89 |
|
90 | const compare = compareDistance(this.localId, this.remoteId);
|
91 | const shouldShake = compare < 0;
|
92 |
|
93 | const remoteHexId = this.remoteId.toString('hex');
|
94 | const isDuplicate = this.globalPeerIds.has(remoteHexId) || compare === 0;
|
95 | this.globalPeerIds.add(remoteHexId);
|
96 |
|
97 | if (shouldShake) {
|
98 | this.debug('shaking isDuplicate=%j', isDuplicate);
|
99 | await this.socket.send(PShake.encode({
|
100 | isDuplicate,
|
101 | }).finish());
|
102 |
|
103 | if (isDuplicate) {
|
104 | this.remoteId = null;
|
105 | await this.destroy('Duplicate');
|
106 | return false;
|
107 | }
|
108 |
|
109 | return true;
|
110 | }
|
111 |
|
112 | const second = await this.socket.receive(HANDSHAKE_TIMEOUT);
|
113 | const shake = PShake.decode(second);
|
114 | this.debug('got shake isDuplicate=%j', shake.isDuplicate);
|
115 |
|
116 | if (shake.isDuplicate) {
|
117 | this.remoteId = null;
|
118 | await this.destroy('Duplicate');
|
119 | return false;
|
120 | }
|
121 |
|
122 | return true;
|
123 | }
|
124 |
|
125 |
|
126 | async loop() {
|
127 | this.debug('starting loop');
|
128 |
|
129 | for (const channel of this.channels) {
|
130 | this.synchronize(channel);
|
131 | }
|
132 |
|
133 | for (;;) {
|
134 | const data = await this.socket.receive();
|
135 | const packet = PPacket.decode(data);
|
136 | this.debug('got packet.type=%s', packet.content);
|
137 |
|
138 | switch (packet.content) {
|
139 | case 'error':
|
140 | throw new Error('Got error: ' +
|
141 | packet.error.reason.slice(0, MAX_ERROR_REASON_LEN));
|
142 | case 'invite':
|
143 | await this.onInvite(packet.invite);
|
144 | break;
|
145 | case 'query':
|
146 | await this.onQuery(packet.query);
|
147 | break;
|
148 | case 'queryResponse':
|
149 | await this.onQueryResponse(packet.queryResponse);
|
150 | break;
|
151 | case 'bulk':
|
152 | await this.onBulk(packet.bulk);
|
153 | break;
|
154 | case 'bulkResponse':
|
155 | await this.onBulkResponse(packet.bulkResponse);
|
156 | break;
|
157 | case 'notification':
|
158 | await this.onNotification(packet.notification);
|
159 | break;
|
160 | default:
|
161 | throw new Error('Unsupported packet type: ' + packet.content);
|
162 | }
|
163 | }
|
164 | }
|
165 |
|
166 | |
167 |
|
168 |
|
169 |
|
170 |
|
171 |
|
172 |
|
173 | async sendInvite(invite) {
|
174 | this.debug('sending invite');
|
175 | const packet = PPacket.encode({
|
176 | invite,
|
177 | }).finish();
|
178 | await this.socket.send(packet);
|
179 | }
|
180 |
|
181 |
|
182 | async destroy(reason) {
|
183 | if (this.destroyed) {
|
184 | throw new Error('Already destroyed');
|
185 | }
|
186 |
|
187 | this.destroyed = true;
|
188 | this.debug('destroying due to reason=%s', reason);
|
189 |
|
190 |
|
191 | if (this.remoteId !== null) {
|
192 | const remoteHexId = this.remoteId.toString('hex');
|
193 | this.globalPeerIds.delete(remoteHexId);
|
194 | }
|
195 |
|
196 | for (const agent of this.syncAgents.values()) {
|
197 | agent.destroy();
|
198 | }
|
199 |
|
200 | const packet = PPacket.encode({
|
201 | error: { reason },
|
202 | }).finish();
|
203 |
|
204 | try {
|
205 | await this.socket.send(packet);
|
206 | } catch (e) {
|
207 |
|
208 | }
|
209 | await this.socket.close();
|
210 | }
|
211 |
|
212 |
|
213 |
|
214 |
|
215 |
|
216 |
|
217 | onNewChannel(channel) {
|
218 | this.synchronize(channel);
|
219 | }
|
220 |
|
221 |
|
222 | async onNewMessage(channel) {
|
223 | await this.socket.send(PPacket.encode({
|
224 | notification: {
|
225 | channelId: channel.id,
|
226 | },
|
227 | }).finish());
|
228 | }
|
229 |
|
230 |
|
231 |
|
232 |
|
233 |
|
234 |
|
235 | async onInvite(packet) {
|
236 | if (packet.requestId.length !== Identity.INVITE_REQUEST_ID_LENGTH) {
|
237 | throw new Error('Invalid requestId in EncryptedInvite');
|
238 | }
|
239 |
|
240 | this.debug('got invite.id=%s',
|
241 | packet.requestId.toString('hex').slice(0, 8));
|
242 | this.inviteWaitList.resolve(packet.requestId.toString('hex'), packet);
|
243 | }
|
244 |
|
245 |
|
246 | async onQuery(packet) {
|
247 | Channel.checkId(packet.channelId, 'Invalid channelId in Query');
|
248 | if (packet.cursor === 'hash') {
|
249 | Message.checkHash(packet.hash, 'Invalid cursor.hash in Query');
|
250 | }
|
251 |
|
252 | const channel = this.getChannel(packet.channelId);
|
253 | if (!channel) {
|
254 | this.debug('responding to query for unknown channel');
|
255 | await this.socket.send(PPacket.encode({
|
256 | queryResponse: {
|
257 | channelId: packet.channelId,
|
258 | abbreviatedMessages: [],
|
259 | forwardHash: null,
|
260 | backwardHash: null,
|
261 | },
|
262 | }).finish());
|
263 | return;
|
264 | }
|
265 |
|
266 | this.debug('query for channel.id=%s', channel.debugId);
|
267 | return await this.getSyncAgent(channel).receiveQuery(packet);
|
268 | }
|
269 |
|
270 |
|
271 | async onQueryResponse(packet) {
|
272 | Channel.checkId(packet.channelId, 'Invalid channelId in QueryResponse');
|
273 | for (const abbr of packet.abbreviatedMessages) {
|
274 | Message.checkHash(abbr.hash, 'Invalid abbreviated message hash');
|
275 | for (const hash of abbr.parents) {
|
276 | Message.checkHash(hash, 'Invalid abbreviated message parent hash');
|
277 | }
|
278 | }
|
279 | if (packet.forwardHash.length !== 0) {
|
280 | Message.checkHash(packet.forwardHash, 'Invalid forward hash');
|
281 | }
|
282 | if (packet.backwardHash.length !== 0) {
|
283 | Message.checkHash(packet.backwardHash, 'Invalid backward hash');
|
284 | }
|
285 |
|
286 | const channel = this.getChannel(packet.channelId);
|
287 | if (!channel) {
|
288 | this.debug('ignoring query response for unknown channel');
|
289 | return;
|
290 | }
|
291 |
|
292 | return await this.getSyncAgent(channel).receiveQueryResponse(packet);
|
293 | }
|
294 |
|
295 |
|
296 | async onBulk(packet) {
|
297 | Channel.checkId(packet.channelId, 'Invalid channelId in Bulk');
|
298 | for (const hash of packet.hashes) {
|
299 | Message.checkHash(hash, 'Invalid message hash in Bulk');
|
300 | }
|
301 |
|
302 | const channel = this.getChannel(packet.channelId);
|
303 | if (!channel) {
|
304 | this.debug('responding bulk for unknown channel');
|
305 | await this.socket.send(PPacket.encode({
|
306 | bulkResponse: {
|
307 | channelId: packet.channelId,
|
308 | messages: [],
|
309 | forwardIndex: packet.hashes.length,
|
310 | },
|
311 | }).finish());
|
312 | return;
|
313 | }
|
314 |
|
315 | this.debug('bulk for channel.id=%s hashes.length=%d', channel.debugId,
|
316 | packet.hashes.length);
|
317 | return await this.getSyncAgent(channel).receiveBulk(packet);
|
318 | }
|
319 |
|
320 |
|
321 | async onBulkResponse(packet) {
|
322 | Channel.checkId(packet.channelId, 'Invalid channelId in BulkResponse');
|
323 |
|
324 | const channel = this.getChannel(packet.channelId);
|
325 | if (!channel) {
|
326 | this.debug('ignoring bulk response for unknown channel');
|
327 | return;
|
328 | }
|
329 |
|
330 |
|
331 | return await this.getSyncAgent(channel).receiveBulkResponse(packet);
|
332 | }
|
333 |
|
334 |
|
335 | async onNotification(packet) {
|
336 | Channel.checkId(packet.channelId, 'Invalid channelId in Notification');
|
337 |
|
338 | const channel = this.getChannel(packet.channelId);
|
339 | if (!channel) {
|
340 | return;
|
341 | }
|
342 |
|
343 | this.debug('notification for channel.id=%s', channel.debugId);
|
344 | this.synchronize(channel);
|
345 | }
|
346 |
|
347 |
|
348 |
|
349 |
|
350 |
|
351 |
|
352 | synchronize(channel) {
|
353 | this.debug('channel.id=%s sync start', channel.debugId);
|
354 | this.getSyncAgent(channel).synchronize().catch((e) => {
|
355 | this.debug('channel.id=%s sync error.message=%s', channel.debugId,
|
356 | e.stack);
|
357 | });
|
358 | }
|
359 |
|
360 |
|
361 |
|
362 |
|
363 |
|
364 |
|
365 | getChannel(channelId) {
|
366 | return this.channels.find((channel) => {
|
367 | return channel.id.equals(channelId);
|
368 | });
|
369 | }
|
370 |
|
371 |
|
372 | getSyncAgent(channel) {
|
373 | let agent;
|
374 | if (this.syncAgents.has(channel)) {
|
375 | agent = this.syncAgents.get(channel);
|
376 | } else {
|
377 | agent = new SyncAgent({
|
378 | channel,
|
379 | socket: this.socket,
|
380 | sodium: this.sodium,
|
381 | });
|
382 | this.syncAgents.set(channel, agent);
|
383 | }
|
384 | return agent;
|
385 | }
|
386 |
|
387 |
|
388 | debug(fmt, ...args) {
|
389 | debug('id=%s ' + fmt, ...[ this.debugId ].concat(args));
|
390 | }
|
391 | }
|
392 |
|
393 |
|
394 | Peer.VERSION = VERSION;
|
395 | Peer.MAX_ERROR_REASON_LEN = MAX_ERROR_REASON_LEN;
|
396 | Peer.ID_LENGTH = ID_LENGTH;
|