1 | var logger = require('debug')
|
2 | var traverse = require('traverse')
|
3 | var errToPOJO = require('./lib/err-serialization')
|
4 | var noop = function () {}
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 | module.exports = function (socket, tree, clientOrServer) {
|
12 | var debug = logger('socket.io-rpc:' + clientOrServer)
|
13 | |
14 |
|
15 |
|
16 |
|
17 | var eventHandlers = {
|
18 | batchStarts: noop,
|
19 | batchEnds: noop,
|
20 | calling: noop,
|
21 | wasCalled: noop,
|
22 | response: noop
|
23 | }
|
24 | var socketId
|
25 | var deferreds = []
|
26 |
|
27 | var invocationCounter = 0
|
28 | var endCounter = 0
|
29 | var remoteCallEnded = function (Id) {
|
30 | if (deferreds[Id]) {
|
31 | delete deferreds[Id]
|
32 | endCounter++
|
33 | eventHandlers.response(endCounter)
|
34 |
|
35 | if (endCounter === invocationCounter) {
|
36 | eventHandlers.batchEnds(endCounter)
|
37 | invocationCounter = 0
|
38 | endCounter = 0
|
39 | }
|
40 | } else {
|
41 |
|
42 | throw new Error('Deferred Id ' + Id + ' was resolved/rejected more than once, this should not occur')
|
43 | }
|
44 | }
|
45 |
|
46 | |
47 |
|
48 |
|
49 |
|
50 | function prepareRemoteCall (fnPath, argumentLength) {
|
51 | function remoteCall () {
|
52 | var args = Array.prototype.slice.call(arguments, 0)
|
53 | return new Promise(function (resolve, reject) {
|
54 | if (rpc.reconnecting) {
|
55 | reject(new Error('socket ' + socketId + ' disconnected, call rejected'))
|
56 | }
|
57 | invocationCounter++
|
58 | debug('calling ', fnPath, 'on ', socketId, ', invocation counter ', invocationCounter)
|
59 | var callParams = {Id: invocationCounter, fnPath: fnPath, args: args}
|
60 | socket.emit('call', callParams)
|
61 | eventHandlers.calling(callParams)
|
62 | if (invocationCounter === 1) {
|
63 | eventHandlers.batchStarts(invocationCounter)
|
64 | }
|
65 | deferreds[invocationCounter] = {resolve: resolve, reject: reject}
|
66 | })
|
67 | }
|
68 |
|
69 | remoteCall.remoteLength = argumentLength
|
70 |
|
71 | return remoteCall
|
72 | }
|
73 | var rpc = prepareRemoteCall
|
74 | socket.rpc = rpc
|
75 | socket.rpc.events = eventHandlers
|
76 | var remoteNodes = {}
|
77 |
|
78 | |
79 |
|
80 |
|
81 | rpc.reconnecting = false
|
82 |
|
83 | if (clientOrServer === 'client') {
|
84 | rpc.initializedP = new Promise(function (resolve, reject) {
|
85 | var assignAndResolveInitP = function () {
|
86 | socketId = socket.io.engine.id
|
87 | resolve()
|
88 | }
|
89 | socket.on('connect', function () {
|
90 | assignAndResolveInitP()
|
91 | debug('connected socket ', socketId)
|
92 | }).on('connect_error', function (err) {
|
93 | if (!socketId) {
|
94 | reject(err)
|
95 | }
|
96 | }).on('reconnect', function () {
|
97 | if (!socketId) {
|
98 | assignAndResolveInitP()
|
99 | }
|
100 | debug('reconnected rpc socket', socketId)
|
101 | rpc.reconnecting = false
|
102 | })
|
103 | })
|
104 | } else {
|
105 | socketId = socket.id
|
106 | }
|
107 |
|
108 | socket.on('disconnect', function onDisconnect () {
|
109 | rpc.reconnecting = true
|
110 | }).on('connect_error', function (err) {
|
111 | debug('connect error: ', err)
|
112 | for (var nodePath in remoteNodes) {
|
113 | remoteNodes[nodePath].reject(err)
|
114 | }
|
115 | }).on('call', function (data) {
|
116 | debug('invocation with ', data)
|
117 | if (!(data && typeof data.Id === 'number')) {
|
118 | return socket.emit('rpcError', {
|
119 | reason: 'Id is a required property for a call data payload'
|
120 | })
|
121 | }
|
122 |
|
123 | |
124 |
|
125 |
|
126 |
|
127 | var emitRes = function (resType, resData) {
|
128 | resData.Id = data.Id
|
129 | socket.emit(resType, resData)
|
130 | eventHandlers.wasCalled(data, resData)
|
131 | }
|
132 | try {
|
133 | var method = traverse(tree).get(data.fnPath.split('.'))
|
134 | } catch (err) {
|
135 | debug(err, ' when resolving an invocation')
|
136 | return emitRes('reject', errToPOJO(err))
|
137 | }
|
138 | if (method && method.apply) {
|
139 | var retVal
|
140 | try {
|
141 | retVal = method.apply(socket, data.args)
|
142 | } catch (err) {
|
143 |
|
144 | console.error('RPC method invocation ' + data.fnPath + 'from ' + socket.id + ' thrown an error : ', err.stack)
|
145 | emitRes('reject', errToPOJO(err))
|
146 | return
|
147 | }
|
148 |
|
149 | Promise.resolve(retVal).then(function (asyncRetVal) {
|
150 | emitRes('resolve', {value: asyncRetVal})
|
151 | }, function (error) {
|
152 | emitRes('reject', errToPOJO(error))
|
153 | })
|
154 | } else {
|
155 | var msg = 'no function exposed on: ' + data.fnPath
|
156 | debug(msg)
|
157 | emitRes('reject', {error: {message: msg}})
|
158 | }
|
159 | }).on('fetchNode', function (path) {
|
160 | debug('fetchNode handler, path ', path)
|
161 |
|
162 | var methods = tree
|
163 | if (path) {
|
164 | methods = traverse(tree).get(path.split('.'))
|
165 | } else {
|
166 | methods = tree
|
167 | }
|
168 |
|
169 | if (!methods) {
|
170 | socket.emit('noSuchNode', path)
|
171 | debug('socket ', socketId, ' requested node ' + path + ' which was not found')
|
172 | return
|
173 | }
|
174 | var localFnTree = traverse(methods).map(function (el) {
|
175 | if (this.isLeaf) {
|
176 | return el.length
|
177 | } else {
|
178 | return el
|
179 | }
|
180 | })
|
181 |
|
182 | socket.emit('node', {path: path, tree: localFnTree})
|
183 | debug('socket ', socketId, ' requested node "' + path + '" which was sent as: ', localFnTree)
|
184 | }).on('node', function (data) {
|
185 | if (remoteNodes[data.path]) {
|
186 | var remoteMethods = traverse(data.tree).map(function (el) {
|
187 | if (this.isLeaf) {
|
188 | debug('path', this.path)
|
189 | var path = this.path.join('.')
|
190 | if (data.path) {
|
191 | path = data.path + '.' + path
|
192 | }
|
193 |
|
194 | this.update(prepareRemoteCall(path, el))
|
195 | }
|
196 | })
|
197 | var promise = remoteNodes[data.path]
|
198 | promise.resolve(remoteMethods)
|
199 | } else {
|
200 | console.warn('socket ' + socketId + ' sent a node ' + data.path + ' which was not requested, ignoring')
|
201 | }
|
202 | }).on('noSuchNode', function (path) {
|
203 | var dfd = remoteNodes[path]
|
204 | var err = new Error('Node is not defined on the socket ' + socketId)
|
205 | err.path = path
|
206 | dfd.reject(err)
|
207 | }).on('resolve', function (data) {
|
208 | deferreds[data.Id].resolve(data.value)
|
209 | remoteCallEnded(data.Id)
|
210 | }).on('reject', function (data) {
|
211 | deferreds[data.Id].reject(data.error)
|
212 | remoteCallEnded(data.Id)
|
213 | })
|
214 |
|
215 | |
216 |
|
217 |
|
218 |
|
219 | socket.rpc.fetchNode = function (path) {
|
220 | if (remoteNodes.hasOwnProperty(path)) {
|
221 | return remoteNodes[path].promise
|
222 | } else {
|
223 | return new Promise(function (resolve, reject) {
|
224 | remoteNodes[path] = {resolve: resolve, reject: reject}
|
225 | debug('fetchNode ', path)
|
226 | socket.emit('fetchNode', path)
|
227 | })
|
228 | }
|
229 | }
|
230 | }
|