1 | 'use strict'
|
2 |
|
3 | const EventEmitter = require('events')
|
4 | const pipe = require('it-pipe')
|
5 |
|
6 | const PROTOCOL = require('./protocol')
|
7 | const encoding = require('./encoding')
|
8 |
|
9 | module.exports = class Connection extends EventEmitter {
|
10 | constructor (remoteId, libp2p, room) {
|
11 | super()
|
12 | this._remoteId = remoteId
|
13 | this._libp2p = libp2p
|
14 | this._room = room
|
15 | this._connection = null
|
16 | this._connecting = false
|
17 | }
|
18 |
|
19 | push (message) {
|
20 | if (this._connection) {
|
21 | this._connection.push(encoding(message))
|
22 |
|
23 | return
|
24 | }
|
25 |
|
26 | this.once('connect', () => {
|
27 | this.push(message)
|
28 | })
|
29 |
|
30 | if (!this._connecting) {
|
31 | this._connect()
|
32 | }
|
33 | }
|
34 |
|
35 | stop () {
|
36 | if (this._connection) {
|
37 | this._connection.end()
|
38 | }
|
39 | }
|
40 |
|
41 | async _connect () {
|
42 | this._connecting = true
|
43 |
|
44 | if (!this._isConnectedToRemote()) {
|
45 | this.emit('disconnect')
|
46 | this._connecting = false
|
47 | return
|
48 | }
|
49 |
|
50 | const peerInfo = this._libp2p.peerStore.get(this._remoteId)
|
51 | const { stream } = await this._libp2p.dialProtocol(peerInfo, PROTOCOL)
|
52 | this._connection = new FiFoMessageQueue()
|
53 |
|
54 | pipe(this._connection, stream, async (source) => {
|
55 | this._connecting = false
|
56 | this.emit('connect', this._connection)
|
57 |
|
58 | for await (const message of source) {
|
59 | this.emit('message', message)
|
60 | }
|
61 | })
|
62 | .then(() => {
|
63 | this.emit('disconnect')
|
64 | }, (err) => {
|
65 | this.emit('error', err)
|
66 | })
|
67 | }
|
68 |
|
69 | _isConnectedToRemote () {
|
70 | for (const peerId of this._libp2p.connections.keys()) {
|
71 | if (peerId === this._remoteId) {
|
72 | return true
|
73 | }
|
74 | }
|
75 | }
|
76 | }
|
77 |
|
78 | class FiFoMessageQueue {
|
79 | constructor () {
|
80 | this._queue = []
|
81 | }
|
82 |
|
83 | [Symbol.asyncIterator] () {
|
84 | return this
|
85 | }
|
86 |
|
87 | push (message) {
|
88 | if (this._ended) {
|
89 | throw new Error('Message queue ended')
|
90 | }
|
91 |
|
92 | if (this._resolve) {
|
93 | return this._resolve({
|
94 | done: false,
|
95 | value: message
|
96 | })
|
97 | }
|
98 |
|
99 | this._queue.push(message)
|
100 | }
|
101 |
|
102 | end () {
|
103 | this._ended = true
|
104 | if (this._resolve) {
|
105 | this._resolve({
|
106 | done: true
|
107 | })
|
108 | }
|
109 | }
|
110 |
|
111 | next () {
|
112 | if (this._ended) {
|
113 | return {
|
114 | done: true
|
115 | }
|
116 | }
|
117 |
|
118 | if (this._queue.length) {
|
119 | return {
|
120 | done: false,
|
121 | value: this._queue.shift()
|
122 | }
|
123 | }
|
124 |
|
125 | return new Promise((resolve) => {
|
126 | this._resolve = resolve
|
127 | })
|
128 | }
|
129 | }
|