1 | import EventEmitter from 'events'
|
2 | import { pipe } from 'it-pipe'
|
3 | import PROTOCOL from './protocol.js'
|
4 | import encoding from './encoding.js'
|
5 |
|
6 | export default class Connection extends EventEmitter {
|
7 | constructor (remoteId, libp2p, room) {
|
8 | super()
|
9 | this._remoteId = remoteId
|
10 | this._libp2p = libp2p
|
11 | this._room = room
|
12 | this._connection = null
|
13 | this._connecting = false
|
14 | }
|
15 |
|
16 | push (message) {
|
17 | if (this._connection) {
|
18 | this._connection.push(encoding(message))
|
19 |
|
20 | return
|
21 | }
|
22 |
|
23 | this.once('connect', () => {
|
24 | this.push(message)
|
25 | })
|
26 |
|
27 | if (!this._connecting) {
|
28 | this._connect()
|
29 | }
|
30 | }
|
31 |
|
32 | stop () {
|
33 | if (this._connection) {
|
34 | this._connection.end()
|
35 | }
|
36 | }
|
37 |
|
38 | async _connect () {
|
39 | this._connecting = true
|
40 |
|
41 | if (!this._isConnectedToRemote()) {
|
42 | this.emit('disconnect')
|
43 | this._connecting = false
|
44 | return
|
45 | }
|
46 |
|
47 | const peer = await this._libp2p.peerStore.get(this._remoteId)
|
48 | const { stream } = await this._libp2p.dialProtocol(peer.id, PROTOCOL)
|
49 | this._connection = new FiFoMessageQueue()
|
50 |
|
51 | pipe(this._connection, stream, async (source) => {
|
52 | this._connecting = false
|
53 | this.emit('connect', this._connection)
|
54 |
|
55 | for await (const message of source) {
|
56 | this.emit('message', message)
|
57 | }
|
58 |
|
59 | this.emit('disconnect')
|
60 | })
|
61 | .catch((err) => {
|
62 | this.emit('error', err)
|
63 | })
|
64 | }
|
65 |
|
66 | _isConnectedToRemote () {
|
67 | return this._libp2p.getConnections(this._remoteId).length !== 0
|
68 | }
|
69 | }
|
70 |
|
71 | class FiFoMessageQueue {
|
72 | constructor () {
|
73 | this._queue = []
|
74 | }
|
75 |
|
76 | [Symbol.asyncIterator] () {
|
77 | return this
|
78 | }
|
79 |
|
80 | push (message) {
|
81 | if (this._ended) {
|
82 | throw new Error('Message queue ended')
|
83 | }
|
84 |
|
85 | if (this._resolve) {
|
86 | return this._resolve({
|
87 | done: false,
|
88 | value: message
|
89 | })
|
90 | }
|
91 |
|
92 | this._queue.push(message)
|
93 | }
|
94 |
|
95 | end () {
|
96 | this._ended = true
|
97 | if (this._resolve) {
|
98 | this._resolve({
|
99 | done: true
|
100 | })
|
101 | }
|
102 | }
|
103 |
|
104 | next () {
|
105 | if (this._ended) {
|
106 | return {
|
107 | done: true
|
108 | }
|
109 | }
|
110 |
|
111 | if (this._queue.length) {
|
112 | return {
|
113 | done: false,
|
114 | value: this._queue.shift()
|
115 | }
|
116 | }
|
117 |
|
118 | return new Promise((resolve) => {
|
119 | this._resolve = resolve
|
120 | })
|
121 | }
|
122 | }
|