UNPKG

2.28 kBJavaScriptView Raw
1import EventEmitter from 'events'
2import { pipe } from 'it-pipe'
3import PROTOCOL from './protocol.js'
4import encoding from './encoding.js'
5
6export default class Connection extends EventEmitter {
7 constructor (remoteId, libp2p, room) {
8 super()
9 this._remoteId = remoteId
10 this._libp2p = libp2p
11 this._room = room
12 this._connection = null
13 this._connecting = false
14 }
15
16 push (message) {
17 if (this._connection) {
18 this._connection.push(encoding(message))
19
20 return
21 }
22
23 this.once('connect', () => {
24 this.push(message)
25 })
26
27 if (!this._connecting) {
28 this._connect()
29 }
30 }
31
32 stop () {
33 if (this._connection) {
34 this._connection.end()
35 }
36 }
37
38 async _connect () {
39 this._connecting = true
40
41 if (!this._isConnectedToRemote()) {
42 this.emit('disconnect')
43 this._connecting = false
44 return // early
45 }
46
47 const peer = await this._libp2p.peerStore.get(this._remoteId)
48 const { stream } = await this._libp2p.dialProtocol(peer.id, PROTOCOL)
49 this._connection = new FiFoMessageQueue()
50
51 pipe(this._connection, stream, async (source) => {
52 this._connecting = false
53 this.emit('connect', this._connection)
54
55 for await (const message of source) {
56 this.emit('message', message)
57 }
58
59 this.emit('disconnect')
60 })
61 .catch((err) => {
62 this.emit('error', err)
63 })
64 }
65
66 _isConnectedToRemote () {
67 return this._libp2p.getConnections(this._remoteId).length !== 0
68 }
69}
70
71class FiFoMessageQueue {
72 constructor () {
73 this._queue = []
74 }
75
76 [Symbol.asyncIterator] () {
77 return this
78 }
79
80 push (message) {
81 if (this._ended) {
82 throw new Error('Message queue ended')
83 }
84
85 if (this._resolve) {
86 return this._resolve({
87 done: false,
88 value: message
89 })
90 }
91
92 this._queue.push(message)
93 }
94
95 end () {
96 this._ended = true
97 if (this._resolve) {
98 this._resolve({
99 done: true
100 })
101 }
102 }
103
104 next () {
105 if (this._ended) {
106 return {
107 done: true
108 }
109 }
110
111 if (this._queue.length) {
112 return {
113 done: false,
114 value: this._queue.shift()
115 }
116 }
117
118 return new Promise((resolve) => {
119 this._resolve = resolve
120 })
121 }
122}