UNPKG

4.22 kBJavaScriptView Raw
1'use strict';
2var PacketStream = require('packet-stream')
3var pull = require('pull-stream')
4var pullWeird = require('./pull-weird')
5var goodbye = require('pull-goodbye')
6var u = require('./util')
7var explain = require('explain-error')
8
9function isFunction (f) {
10 return 'function' === typeof f
11}
12
13function isString (s) {
14 return 'string' === typeof s
15}
16
17function isObject (o) {
18 return o && 'object' === typeof o
19}
20
21function isSource (t) { return 'source' === t }
22function isSink (t) { return 'sink' === t }
23function isDuplex (t) { return 'duplex' === t }
24function isSync (t) { return 'sync' === t }
25function isAsync (t) { return 'async' === t }
26function isRequest (t) { return isSync(t) || isAsync(t) }
27function isStream (t) { return isSource(t) || isSink(t) || isDuplex(t) }
28
29module.exports = function initStream (localCall, codec, onClose) {
30
31 var ps = PacketStream({
32 message: function (msg) {
33// if(isString(msg)) return
34// if(msg.length > 0 && isString(msg[0]))
35// localCall('msg', 'emit', msg)
36 },
37 request: function (opts, cb) {
38 if(!Array.isArray(opts.args))
39 return cb(new Error('invalid request, args should be array, was:'+JSON.stringify(opts)))
40 var name = opts.name, args = opts.args
41 var inCB = false, called = false, async = false, value
42
43 args.push(function (err, value) {
44 called = true
45 inCB = true; cb(err, value)
46 })
47 try {
48 value = localCall('async', name, args)
49 } catch (err) {
50 if(inCB || called) throw explain(err, 'no callback provided to muxrpc async funtion')
51 return cb(err)
52 }
53
54 },
55 stream: function (stream) {
56 stream.read = function (data, end) {
57 var name = data.name
58 var type = data.type
59 var err, value
60
61 stream.read = null
62
63 if(!isStream(type))
64 return stream.write(null, new Error('unsupported stream type:'+type))
65
66 //how would this actually happen?
67 if(end) return stream.write(null, end)
68
69 try { value = localCall(type, name, data.args) }
70 catch (_err) { err = _err }
71
72 var _stream = pullWeird[
73 {source: 'sink', sink: 'source'}[type] || 'duplex'
74 ](stream)
75
76 return u.pipeToStream(
77 type, _stream,
78 err ? u.errorAsStream(type, err) : value
79 )
80
81// if(isSource(type))
82// _stream(err ? pull.error(err) : value)
83// else if (isSink(type))
84// (err ? abortSink(err) : value)(_stream)
85// else if (isDuplex(type))
86// pull(_stream, err ? abortDuplex(err) : value, _stream)
87 }
88 },
89
90 close: function (err) {
91 ps = null // deallocate
92 ws.ended = true
93 if(ws.closed) return
94 ws.closed = true
95 if(onClose) {
96 var close = onClose; onClose = null; close(err)
97 }
98 }
99 })
100
101 var ws = goodbye(pullWeird(ps, function (_) {
102 //this error will be handled in PacketStream.close
103 }))
104
105 ws = codec ? codec(ws) : ws
106
107 ws.remoteCall = function (type, name, args, cb) {
108 if(name === 'emit') return ps.message(args)
109
110 if(!(isRequest(type) || isStream(type)))
111 throw new Error('unsupported type:' + JSON.stringify(type))
112
113 if(isRequest(type))
114 return ps.request({name: name, args: args}, cb)
115
116 var ws = ps.stream(), s = pullWeird[type](ws, cb)
117 ws.write({name: name, args: args, type: type})
118 return s
119 }
120
121
122 //hack to work around ordering in setting ps.ended.
123 //Question: if an object has subobjects, which
124 //all have close events, should the subobjects fire close
125 //before the parent? or should parents close after?
126 //should there be a preclose event on the parent
127 //that fires when it's about to close all the children?
128 ws.isOpen = function () {
129 return !ps.ended
130 }
131
132 ws.close = function (err, cb) {
133 if(isFunction(err))
134 cb = err, err = false
135 if(!ps) return (cb && cb())
136 if(err) return ps.destroy(err), (cb && cb())
137
138 ps.close(function (err) {
139 if(cb) cb(err)
140 else if(err) throw explain(err, 'no callback provided for muxrpc close')
141 })
142
143 return this
144 }
145 ws.closed = false
146
147 return ws
148}
149
150
151