UNPKG

3.15 kBJavaScriptView Raw
1var assert = require('assert')
2var separator = '~', escape = '!'
3var SE = require('separator-escape')(separator, escape)
4
5var isArray = Array.isArray
6function isFunction (f) {
7 return 'function' === typeof f
8}
9function isString (s) {
10 return 'string' === typeof s
11}
12function head (opts) {
13 return isArray(opts) ? opts[0] : opts
14}
15function tail (opts) {
16 return isArray(opts) ? opts.slice(1) : []
17}
18
19function compose (stream, transforms, cb) {
20 if(!stream) throw new Error('multiserver.compose: *must* pass stream')
21 ;(function next (err, stream, i, addr) {
22 if(err) {
23 err.address = addr + '~' + err.address
24 return cb(err)
25 }
26 else if(i >= transforms.length) {
27 stream.address = addr
28 return cb(null, stream)
29 }
30 else
31 transforms[i](stream, function (err, _stream) {
32 if(!err && !stream) throw new Error('expected error or stream')
33 if(_stream) _stream.meta = _stream.meta || stream.meta
34 next(err, _stream, i+1, err ? addr : (addr+'~'+_stream.address))
35 })
36 })(null, stream, 0, stream.address)
37}
38
39function asyncify(f) {
40 return function(cb) {
41 if (f.length) return f(cb)
42 if (cb) {
43 var result
44 try{
45 result = f()
46 } catch(err) {return cb(err)}
47 return cb(null, result)
48 }
49 return f()
50 }
51}
52
53module.exports = function (ary, wrap) {
54 if(!wrap) wrap = function (e) { return e }
55 var proto = head(ary)
56 var trans = tail(ary)
57
58 function parse (str) {
59 var parts = SE.parse(str)
60 var out = []
61 for(var i = 0; i < parts.length; i++) {
62 var v = ary[i].parse(parts[i])
63 if(!v) return null
64 out[i] = v
65 }
66 return out
67 }
68
69 function parseMaybe (str) {
70 return isString(str) ? parse(str) : str
71 }
72
73 return {
74 name: ary.map(function (e) { return e.name }).join(separator),
75 scope: proto.scope,
76 client: function (_opts, cb) {
77 var opts = parseMaybe(_opts)
78 if(!opts) return cb(new Error('could not parse address:'+_opts))
79 return proto.client(head(opts), function (err, stream) {
80 if(err) return cb(err)
81 compose(
82 wrap(stream),
83 trans.map(function (tr, i) { return tr.create(opts[i+1]) }),
84 cb
85 )
86 })
87 },
88 // There should be a callback , called with
89 // null when the server started to listen.
90 // (net.server.listen is async for example)
91 server: function (onConnection, onError, onStart) {
92 onError = onError || function (err) {
93 console.error('server error, from', err.address)
94 console.error(err.stack)
95 }
96 return asyncify(proto.server(function (stream) {
97 compose(
98 wrap(stream),
99 trans.map(function (tr) { return tr.create() }),
100 function (err, stream) {
101 if(err) onError(err)
102 else onConnection(stream)
103 }
104 )
105 }, onStart))
106 },
107 parse: parse,
108 stringify: function (scope) {
109 var none
110 var _ary = ary.map(function (e) {
111 var v = e.stringify(scope)
112 if(!v) none = true
113 else return v
114 })
115 if(none) return
116 return SE.stringify(_ary)
117 }
118 }
119}
120