UNPKG

3.43 kBJavaScriptView Raw
1const SEPARATOR = '~'
2const ESCAPE = '!'
3const SE = require('separator-escape')(SEPARATOR, ESCAPE)
4
5function head(x) {
6 return Array.isArray(x) ? x[0] : x
7}
8
9function tail(x) {
10 return Array.isArray(x) ? x.slice(1) : []
11}
12
13function compose(stream, transforms, cb) {
14 if (!stream) throw new Error('multiserver.compose: *must* pass stream')
15 ;(function next(err, stream, i, addr) {
16 if (err) {
17 err.address = addr + '~' + err.address
18 return cb(err)
19 } else if (i >= transforms.length) {
20 stream.address = addr
21 return cb(null, stream)
22 } else
23 transforms[i](stream, (err, _stream) => {
24 if (!err && !stream) throw new Error('expected error or stream')
25 if (_stream) _stream.meta = _stream.meta || stream.meta
26 next(err, _stream, i + 1, err ? addr : addr + '~' + _stream.address)
27 })
28 })(null, stream, 0, stream.address)
29}
30
31function asyncify(f) {
32 return function fnAsAsync(cb) {
33 if (f.length) return f(cb)
34 if (cb) {
35 let result
36 try {
37 result = f()
38 } catch (err) {
39 return cb(err)
40 }
41 return cb(null, result)
42 }
43 return f()
44 }
45}
46
47function identity(x) {
48 return x
49}
50
51module.exports = function Compose(ary, wrap) {
52 if (!wrap) wrap = identity
53 const proto = head(ary)
54 const trans = tail(ary)
55
56 function parse(str) {
57 const parts = SE.parse(str)
58 const out = []
59 for (let i = 0; i < parts.length; i++) {
60 const v = ary[i].parse(parts[i])
61 if (!v) return null
62 out[i] = v
63 }
64 return out
65 }
66
67 function parseMaybe(str) {
68 return typeof str === 'string' ? parse(str) : str
69 }
70
71 return {
72 name: ary.map((e) => e.name).join(SEPARATOR),
73
74 scope: proto.scope,
75
76 client(_opts, cb) {
77 const opts = parseMaybe(_opts)
78 if (!opts) return cb(new Error('could not parse address:' + _opts))
79 return proto.client(head(opts), (err, stream) => {
80 if (err) return cb(err)
81 compose(
82 wrap(stream),
83 trans.map((tr, i) => tr.create(opts[i + 1])),
84 cb
85 )
86 })
87 },
88
89 // There should be a callback , called with null when the server started to
90 // listen. (net.server.listen is async for example)
91 server(onConnection, onError, onStart) {
92 onError =
93 onError ||
94 function onServerError(err) {
95 console.error('server error, from', err.address)
96 console.error(err)
97 }
98 return asyncify(
99 proto.server(function onComposedConnection(stream) {
100 compose(
101 wrap(stream),
102 trans.map((tr) => tr.create()),
103 (err, stream) => {
104 if (err) onError(err)
105 else onConnection(stream)
106 }
107 )
108 }, onStart)
109 )
110 },
111
112 parse: parse,
113
114 stringify(scope) {
115 const addresses = []
116 const fullAddress = proto.stringify(scope)
117 if (!fullAddress) return
118 else {
119 const splittedAddresses = fullAddress.split(';')
120 if (splittedAddresses.length > 1) {
121 // More than one hostname needs to be updated
122 addresses.push(...splittedAddresses)
123 } else {
124 addresses.push(fullAddress)
125 }
126 }
127 return addresses
128 .map((addr) => {
129 const singleAddr = [addr].concat(trans.map((t) => t.stringify(scope)))
130 return SE.stringify(singleAddr)
131 })
132 .join(';')
133 },
134 }
135}