1 | 'use strict'
|
2 |
|
3 | const EventEmitter = require('events')
|
4 | const pipe = require('it-pipe')
|
5 |
|
6 | const emitter = new EventEmitter()
|
7 |
|
8 | function handler ({ connection, stream }) {
|
9 | const peerId = connection.remotePeer.toB58String()
|
10 |
|
11 | pipe(
|
12 | stream,
|
13 | async function (source) {
|
14 | for await (const message of source) {
|
15 | let msg
|
16 |
|
17 | try {
|
18 | msg = JSON.parse(message.toString())
|
19 | } catch (err) {
|
20 | emitter.emit('warning', err.message)
|
21 | continue
|
22 | }
|
23 |
|
24 | if (peerId !== msg.from.toString()) {
|
25 | emitter.emit('warning', 'no peerid match ' + msg.from)
|
26 | continue
|
27 | }
|
28 |
|
29 | const topicIDs = msg.topicIDs
|
30 | if (!Array.isArray(topicIDs)) {
|
31 | emitter.emit('warning', 'no topic IDs')
|
32 | continue
|
33 | }
|
34 |
|
35 | msg.data = Buffer.from(msg.data, 'hex')
|
36 | msg.seqno = Buffer.from(msg.seqno, 'hex')
|
37 |
|
38 | topicIDs.forEach((topic) => {
|
39 | emitter.emit(topic, msg)
|
40 | })
|
41 | }
|
42 | }
|
43 | )
|
44 | }
|
45 |
|
46 | exports = module.exports = {
|
47 | handler: handler,
|
48 | emitter: emitter
|
49 | }
|