UNPKG

2.38 kBJavaScriptView Raw
1'use strict'
2
3const EventEmitter = require('events')
4const pipe = require('it-pipe')
5
6const PROTOCOL = require('./protocol')
7const encoding = require('./encoding')
8
9module.exports = class Connection extends EventEmitter {
10 constructor (remoteId, libp2p, room) {
11 super()
12 this._remoteId = remoteId
13 this._libp2p = libp2p
14 this._room = room
15 this._connection = null
16 this._connecting = false
17 }
18
19 push (message) {
20 if (this._connection) {
21 this._connection.push(encoding(message))
22
23 return
24 }
25
26 this.once('connect', () => {
27 this.push(message)
28 })
29
30 if (!this._connecting) {
31 this._connect()
32 }
33 }
34
35 stop () {
36 if (this._connection) {
37 this._connection.end()
38 }
39 }
40
41 async _connect () {
42 this._connecting = true
43
44 if (!this._isConnectedToRemote()) {
45 this.emit('disconnect')
46 this._connecting = false
47 return // early
48 }
49
50 const peerInfo = this._libp2p.peerStore.get(this._remoteId)
51 const { stream } = await this._libp2p.dialProtocol(peerInfo, PROTOCOL)
52 this._connection = new FiFoMessageQueue()
53
54 pipe(this._connection, stream, async (source) => {
55 this._connecting = false
56 this.emit('connect', this._connection)
57
58 for await (const message of source) {
59 this.emit('message', message)
60 }
61 })
62 .then(() => {
63 this.emit('disconnect')
64 }, (err) => {
65 this.emit('error', err)
66 })
67 }
68
69 _isConnectedToRemote () {
70 for (const peerId of this._libp2p.connections.keys()) {
71 if (peerId === this._remoteId) {
72 return true
73 }
74 }
75 }
76}
77
78class FiFoMessageQueue {
79 constructor () {
80 this._queue = []
81 }
82
83 [Symbol.asyncIterator] () {
84 return this
85 }
86
87 push (message) {
88 if (this._ended) {
89 throw new Error('Message queue ended')
90 }
91
92 if (this._resolve) {
93 return this._resolve({
94 done: false,
95 value: message
96 })
97 }
98
99 this._queue.push(message)
100 }
101
102 end () {
103 this._ended = true
104 if (this._resolve) {
105 this._resolve({
106 done: true
107 })
108 }
109 }
110
111 next () {
112 if (this._ended) {
113 return {
114 done: true
115 }
116 }
117
118 if (this._queue.length) {
119 return {
120 done: false,
121 value: this._queue.shift()
122 }
123 }
124
125 return new Promise((resolve) => {
126 this._resolve = resolve
127 })
128 }
129}