1 | 'use strict';
|
2 | var PacketStream = require('packet-stream')
|
3 | var pull = require('pull-stream')
|
4 | var pullWeird = require('./pull-weird')
|
5 | var goodbye = require('pull-goodbye')
|
6 | var u = require('./util')
|
7 | var explain = require('explain-error')
|
8 |
|
9 | function isFunction (f) {
|
10 | return 'function' === typeof f
|
11 | }
|
12 |
|
13 | function isString (s) {
|
14 | return 'string' === typeof s
|
15 | }
|
16 |
|
17 | function isObject (o) {
|
18 | return o && 'object' === typeof o
|
19 | }
|
20 |
|
21 | function isSource (t) { return 'source' === t }
|
22 | function isSink (t) { return 'sink' === t }
|
23 | function isDuplex (t) { return 'duplex' === t }
|
24 | function isSync (t) { return 'sync' === t }
|
25 | function isAsync (t) { return 'async' === t }
|
26 | function isRequest (t) { return isSync(t) || isAsync(t) }
|
27 | function isStream (t) { return isSource(t) || isSink(t) || isDuplex(t) }
|
28 |
|
29 | module.exports = function initStream (localCall, codec, onClose) {
|
30 |
|
31 | var ps = PacketStream({
|
32 | message: function (msg) {
|
33 |
|
34 |
|
35 |
|
36 | },
|
37 | request: function (opts, cb) {
|
38 | if(!Array.isArray(opts.args))
|
39 | return cb(new Error('invalid request, args should be array, was:'+JSON.stringify(opts)))
|
40 | var name = opts.name, args = opts.args
|
41 | var inCB = false, called = false, async = false, value
|
42 |
|
43 | args.push(function (err, value) {
|
44 | called = true
|
45 | inCB = true; cb(err, value)
|
46 | })
|
47 | try {
|
48 | value = localCall('async', name, args)
|
49 | } catch (err) {
|
50 | if(inCB || called) throw explain(err, 'no callback provided to muxrpc async funtion')
|
51 | return cb(err)
|
52 | }
|
53 |
|
54 | },
|
55 | stream: function (stream) {
|
56 | stream.read = function (data, end) {
|
57 | var name = data.name
|
58 | var type = data.type
|
59 | var err, value
|
60 |
|
61 | stream.read = null
|
62 |
|
63 | if(!isStream(type))
|
64 | return stream.write(null, new Error('unsupported stream type:'+type))
|
65 |
|
66 |
|
67 | if(end) return stream.write(null, end)
|
68 |
|
69 | try { value = localCall(type, name, data.args) }
|
70 | catch (_err) { err = _err }
|
71 |
|
72 | var _stream = pullWeird[
|
73 | {source: 'sink', sink: 'source'}[type] || 'duplex'
|
74 | ](stream)
|
75 |
|
76 | return u.pipeToStream(
|
77 | type, _stream,
|
78 | err ? u.errorAsStream(type, err) : value
|
79 | )
|
80 |
|
81 |
|
82 |
|
83 |
|
84 |
|
85 |
|
86 |
|
87 | }
|
88 | },
|
89 |
|
90 | close: function (err) {
|
91 | ps = null
|
92 | ws.ended = true
|
93 | if(ws.closed) return
|
94 | ws.closed = true
|
95 | if(onClose) {
|
96 | var close = onClose; onClose = null; close(err)
|
97 | }
|
98 | }
|
99 | })
|
100 |
|
101 | var ws = goodbye(pullWeird(ps, function (_) {
|
102 |
|
103 | }))
|
104 |
|
105 | ws = codec ? codec(ws) : ws
|
106 |
|
107 | ws.remoteCall = function (type, name, args, cb) {
|
108 | if(name === 'emit') return ps.message(args)
|
109 |
|
110 | if(!(isRequest(type) || isStream(type)))
|
111 | throw new Error('unsupported type:' + JSON.stringify(type))
|
112 |
|
113 | if(isRequest(type))
|
114 | return ps.request({name: name, args: args}, cb)
|
115 |
|
116 | var ws = ps.stream(), s = pullWeird[type](ws, cb)
|
117 | ws.write({name: name, args: args, type: type})
|
118 | return s
|
119 | }
|
120 |
|
121 |
|
122 |
|
123 |
|
124 |
|
125 |
|
126 |
|
127 |
|
128 | ws.isOpen = function () {
|
129 | return !ps.ended
|
130 | }
|
131 |
|
132 | ws.close = function (err, cb) {
|
133 | if(isFunction(err))
|
134 | cb = err, err = false
|
135 | if(!ps) return (cb && cb())
|
136 | if(err) return ps.destroy(err), (cb && cb())
|
137 |
|
138 | ps.close(function (err) {
|
139 | if(cb) cb(err)
|
140 | else if(err) throw explain(err, 'no callback provided for muxrpc close')
|
141 | })
|
142 |
|
143 | return this
|
144 | }
|
145 | ws.closed = false
|
146 |
|
147 | return ws
|
148 | }
|
149 |
|
150 |
|
151 |
|