UNPKG

4.7 kBJavaScriptView Raw
1'use strict'
2
3const EventEmitter2 = require('eventemitter2').EventEmitter2
4const async = require('async')
5const log = require('debug')('interactor:interface')
6const path = require('path')
7const config = require(path.join(__dirname, '../config')).transporters
8
9module.exports = class TransporterInterface extends EventEmitter2 {
10 /**
11 * Construct new transporter interface with default options and daemon
12 * @param {Object} opts [optionnal] Default options
13 * @param {InteractorDaemon} Daemon needed by transports
14 */
15 constructor (opts, daemon) {
16 log('New transporter interface')
17
18 super({
19 delimiter: ':',
20 wildcard: true
21 })
22 this.opts = opts || {}
23 this.daemon = daemon
24 this.transporters = new Map()
25 this.transportersEndpoints = new Map()
26 this.endpoints = new Map()
27 this.config = config
28 return this
29 }
30
31 /**
32 * Add transporter
33 * @param {String} name of the transporter (in ./transporters/)
34 * @param {Object} opts [optionnal] custom options
35 */
36 bind (name, opts) {
37 if (!opts) opts = {}
38 if (!this.config[name] || !this.config[name].enabled) return this
39 log('Bind [%s] transport to transporter interface', name)
40 let Transport = this._loadTransporter(name)
41 this.transporters.set(name, new Transport(Object.assign(opts, this.opts), this.daemon))
42 this.transportersEndpoints.set(name, this.config[name].endpoints || {})
43 this._bindEvents(name)
44 return this
45 }
46
47 /**
48 * Disconnect each transporters
49 */
50 disconnect () {
51 log('Disconnect all transporters')
52 this.transporters.forEach(transporter => {
53 transporter.disconnect()
54 })
55 }
56
57 /**
58 * Connect each transporters with new endpoints
59 * @param {Object} endpoints
60 * @param {Function} callback
61 */
62 connect (endpoints, cb) {
63 async.each(this.transporters, (data, next) => {
64 let name = data[0]
65 let transport = data[1]
66 // Isn't connected, connect it
67 if (!transport.isConnected()) {
68 log(`Transporters are not connected, connect them with: ${JSON.stringify(endpoints)}`)
69 transport.connect(this._buildConnectParamsFromEndpoints(name, endpoints), next)
70 // Endpoints have changed, reconnect
71 } else if (JSON.stringify(endpoints) !== JSON.stringify(this.endpoints)) {
72 log(`Received new endpoints to connect transporters: ${JSON.stringify(endpoints)}`)
73 transport.reconnect(this._buildConnectParamsFromEndpoints(name, endpoints), next)
74 // No changes
75 } else {
76 return next(null)
77 }
78 }, (err) => {
79 // Save endpoints
80 this.endpoints = endpoints
81 cb(err)
82 })
83 }
84
85 /**
86 * Send to each transporters
87 */
88 send (channel, data) {
89 this.transporters.forEach(transporter => {
90 transporter.send(channel, data)
91 })
92 }
93
94 /**
95 * Require transporter
96 * @param {String} name of the transporter (in ./transporters/)
97 * @private
98 */
99 _loadTransporter (name) {
100 return require('./transporters/' + this._getTransportName(name))
101 }
102
103 /**
104 * Resolve transporter name
105 * @param {String} name of the transporter (in ./transporters/)
106 * @private
107 */
108 _getTransportName (name) {
109 name = name.toLowerCase()
110 name = name.charAt(0).toUpperCase() + name.slice(1)
111 return name + 'Transport'
112 }
113
114 /**
115 * Emit event on transporter event
116 * @param {String} name of the transporter
117 * @private
118 */
119 _bindEvents (name) {
120 const self = this
121 this.transporters.get(name).on('**', function (data) {
122 log('Received event from %s transporter', name)
123 self.emit(this.event, data)
124 })
125 }
126
127 /**
128 * Return an object used to connect() transport
129 * based on transporter endpoints options
130 * @param {String} transporter's name
131 * @param {Object} endpoints
132 * @private
133 */
134 _buildConnectParamsFromEndpoints (name, endpoints) {
135 if (!endpoints) endpoints = {}
136 const opts = this.transportersEndpoints.get(name)
137 if (typeof opts === 'string') {
138 return endpoints[opts] || opts
139 }
140 let params = {}
141 for (let key in opts) {
142 params[key] = endpoints[opts[key]] || opts[key]
143 }
144 return params
145 }
146
147 /**
148 * Is at least one transporter connected
149 * @return {Boolean}
150 */
151 isConnected () {
152 for (let transporter of this.transporters.values()) {
153 if (transporter.isConnected()) return true
154 }
155 return false
156 }
157
158 /**
159 * Get active transporters that are pushing data
160 * @return {String[]}
161 */
162 getActiveTransporters () {
163 let connected = []
164 for (let entry of this.transporters.values()) {
165 if (entry.isConnected()) {
166 connected.push(entry.constructor.name)
167 }
168 }
169 return connected
170 }
171}