1 | const SEPARATOR = '~'
|
2 | const ESCAPE = '!'
|
3 | const SE = require('separator-escape')(SEPARATOR, ESCAPE)
|
4 |
|
5 | function head(x) {
|
6 | return Array.isArray(x) ? x[0] : x
|
7 | }
|
8 |
|
9 | function tail(x) {
|
10 | return Array.isArray(x) ? x.slice(1) : []
|
11 | }
|
12 |
|
13 | function compose(stream, transforms, cb) {
|
14 | if (!stream) throw new Error('multiserver.compose: *must* pass stream')
|
15 | ;(function next(err, stream, i, addr) {
|
16 | if (err) {
|
17 | err.address = addr + '~' + err.address
|
18 | return cb(err)
|
19 | } else if (i >= transforms.length) {
|
20 | stream.address = addr
|
21 | return cb(null, stream)
|
22 | } else
|
23 | transforms[i](stream, (err, _stream) => {
|
24 | if (!err && !stream) throw new Error('expected error or stream')
|
25 | if (_stream) _stream.meta = _stream.meta || stream.meta
|
26 | next(err, _stream, i + 1, err ? addr : addr + '~' + _stream.address)
|
27 | })
|
28 | })(null, stream, 0, stream.address)
|
29 | }
|
30 |
|
31 | function asyncify(f) {
|
32 | return function fnAsAsync(cb) {
|
33 | if (f.length) return f(cb)
|
34 | if (cb) {
|
35 | let result
|
36 | try {
|
37 | result = f()
|
38 | } catch (err) {
|
39 | return cb(err)
|
40 | }
|
41 | return cb(null, result)
|
42 | }
|
43 | return f()
|
44 | }
|
45 | }
|
46 |
|
47 | function identity(x) {
|
48 | return x
|
49 | }
|
50 |
|
51 | module.exports = function Compose(ary, wrap) {
|
52 | if (!wrap) wrap = identity
|
53 | const proto = head(ary)
|
54 | const trans = tail(ary)
|
55 |
|
56 | function parse(str) {
|
57 | const parts = SE.parse(str)
|
58 | const out = []
|
59 | for (let i = 0; i < parts.length; i++) {
|
60 | const v = ary[i].parse(parts[i])
|
61 | if (!v) return null
|
62 | out[i] = v
|
63 | }
|
64 | return out
|
65 | }
|
66 |
|
67 | function parseMaybe(str) {
|
68 | return typeof str === 'string' ? parse(str) : str
|
69 | }
|
70 |
|
71 | return {
|
72 | name: ary.map((e) => e.name).join(SEPARATOR),
|
73 |
|
74 | scope: proto.scope,
|
75 |
|
76 | client(_opts, cb) {
|
77 | const opts = parseMaybe(_opts)
|
78 | if (!opts) return cb(new Error('could not parse address:' + _opts))
|
79 | return proto.client(head(opts), (err, stream) => {
|
80 | if (err) return cb(err)
|
81 | compose(
|
82 | wrap(stream),
|
83 | trans.map((tr, i) => tr.create(opts[i + 1])),
|
84 | cb
|
85 | )
|
86 | })
|
87 | },
|
88 |
|
89 |
|
90 |
|
91 | server(onConnection, onError, onStart) {
|
92 | onError =
|
93 | onError ||
|
94 | function onServerError(err) {
|
95 | console.error('server error, from', err.address)
|
96 | console.error(err)
|
97 | }
|
98 | return asyncify(
|
99 | proto.server(function onComposedConnection(stream) {
|
100 | compose(
|
101 | wrap(stream),
|
102 | trans.map((tr) => tr.create()),
|
103 | (err, stream) => {
|
104 | if (err) onError(err)
|
105 | else onConnection(stream)
|
106 | }
|
107 | )
|
108 | }, onStart)
|
109 | )
|
110 | },
|
111 |
|
112 | parse: parse,
|
113 |
|
114 | stringify(scope) {
|
115 | const addresses = []
|
116 | const fullAddress = proto.stringify(scope)
|
117 | if (!fullAddress) return
|
118 | else {
|
119 | const splittedAddresses = fullAddress.split(';')
|
120 | if (splittedAddresses.length > 1) {
|
121 |
|
122 | addresses.push(...splittedAddresses)
|
123 | } else {
|
124 | addresses.push(fullAddress)
|
125 | }
|
126 | }
|
127 | return addresses
|
128 | .map((addr) => {
|
129 | const singleAddr = [addr].concat(trans.map((t) => t.stringify(scope)))
|
130 | return SE.stringify(singleAddr)
|
131 | })
|
132 | .join(';')
|
133 | },
|
134 | }
|
135 | }
|