1 | var assert = require('assert')
|
2 | var separator = '~', escape = '!'
|
3 | var SE = require('separator-escape')(separator, escape)
|
4 |
|
5 | var isArray = Array.isArray
|
6 | function isFunction (f) {
|
7 | return 'function' === typeof f
|
8 | }
|
9 | function isString (s) {
|
10 | return 'string' === typeof s
|
11 | }
|
12 | function head (opts) {
|
13 | return isArray(opts) ? opts[0] : opts
|
14 | }
|
15 | function tail (opts) {
|
16 | return isArray(opts) ? opts.slice(1) : []
|
17 | }
|
18 |
|
19 | function compose (stream, transforms, cb) {
|
20 | if(!stream) throw new Error('multiserver.compose: *must* pass stream')
|
21 | ;(function next (err, stream, i, addr) {
|
22 | if(err) {
|
23 | err.address = addr + '~' + err.address
|
24 | return cb(err)
|
25 | }
|
26 | else if(i >= transforms.length) {
|
27 | stream.address = addr
|
28 | return cb(null, stream)
|
29 | }
|
30 | else
|
31 | transforms[i](stream, function (err, _stream) {
|
32 | if(!err && !stream) throw new Error('expected error or stream')
|
33 | if(_stream) _stream.meta = _stream.meta || stream.meta
|
34 | next(err, _stream, i+1, err ? addr : (addr+'~'+_stream.address))
|
35 | })
|
36 | })(null, stream, 0, stream.address)
|
37 | }
|
38 |
|
39 | function asyncify(f) {
|
40 | return function(cb) {
|
41 | if (f.length) return f(cb)
|
42 | if (cb) {
|
43 | var result
|
44 | try{
|
45 | result = f()
|
46 | } catch(err) {return cb(err)}
|
47 | return cb(null, result)
|
48 | }
|
49 | return f()
|
50 | }
|
51 | }
|
52 |
|
53 | module.exports = function (ary, wrap) {
|
54 | if(!wrap) wrap = function (e) { return e }
|
55 | var proto = head(ary)
|
56 | var trans = tail(ary)
|
57 |
|
58 | function parse (str) {
|
59 | var parts = SE.parse(str)
|
60 | var out = []
|
61 | for(var i = 0; i < parts.length; i++) {
|
62 | var v = ary[i].parse(parts[i])
|
63 | if(!v) return null
|
64 | out[i] = v
|
65 | }
|
66 | return out
|
67 | }
|
68 |
|
69 | function parseMaybe (str) {
|
70 | return isString(str) ? parse(str) : str
|
71 | }
|
72 |
|
73 | return {
|
74 | name: ary.map(function (e) { return e.name }).join(separator),
|
75 | scope: proto.scope,
|
76 | client: function (_opts, cb) {
|
77 | var opts = parseMaybe(_opts)
|
78 | if(!opts) return cb(new Error('could not parse address:'+_opts))
|
79 | return proto.client(head(opts), function (err, stream) {
|
80 | if(err) return cb(err)
|
81 | compose(
|
82 | wrap(stream),
|
83 | trans.map(function (tr, i) { return tr.create(opts[i+1]) }),
|
84 | cb
|
85 | )
|
86 | })
|
87 | },
|
88 |
|
89 |
|
90 |
|
91 | server: function (onConnection, onError, onStart) {
|
92 | onError = onError || function (err) {
|
93 | console.error('server error, from', err.address)
|
94 | console.error(err.stack)
|
95 | }
|
96 | return asyncify(proto.server(function (stream) {
|
97 | compose(
|
98 | wrap(stream),
|
99 | trans.map(function (tr) { return tr.create() }),
|
100 | function (err, stream) {
|
101 | if(err) onError(err)
|
102 | else onConnection(stream)
|
103 | }
|
104 | )
|
105 | }, onStart))
|
106 | },
|
107 | parse: parse,
|
108 | stringify: function (scope) {
|
109 | var none
|
110 | var _ary = ary.map(function (e) {
|
111 | var v = e.stringify(scope)
|
112 | if(!v) none = true
|
113 | else return v
|
114 | })
|
115 | if(none) return
|
116 | return SE.stringify(_ary)
|
117 | }
|
118 | }
|
119 | }
|
120 |
|