UNPKG

12.6 kBJavaScriptView Raw
1/*!
2
3 ----------------------------------------------------------------------------
4 | qewd: Quick and Easy Web Development |
5 | |
6 | Copyright (c) 2017-19 M/Gateway Developments Ltd, |
7 | Redhill, Surrey UK. |
8 | All rights reserved. |
9 | |
10 | http://www.mgateway.com |
11 | Email: rtweed@mgateway.com |
12 | |
13 | |
14 | Licensed under the Apache License, Version 2.0 (the "License"); |
15 | you may not use this file except in compliance with the License. |
16 | You may obtain a copy of the License at |
17 | |
18 | http://www.apache.org/licenses/LICENSE-2.0 |
19 | |
20 | Unless required by applicable law or agreed to in writing, software |
21 | distributed under the License is distributed on an "AS IS" BASIS, |
22 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
23 | See the License for the specific language governing permissions and |
24 | limitations under the License. |
25 ----------------------------------------------------------------------------
26
27 29 May 2019
28
29*/
30
31var ewdQueueDocument = 'ewdQueue';
32var resilientMode = require('./resilientMode');
33var handleJWT = require('./jwtHandler');
34
35var customSocketConnections = {};
36
37function clearSocketData() {
38 var self = this;
39 this.io.clients(function(error, clients) {
40 var clientHash = {};
41 clients.forEach(function(socketId) {
42 clientHash[socketId] = true;
43 });
44 for (var socketId in q.sockets) {
45 if (!clientHash[socketId]) {
46 delete self.sockets[socketId];
47 console.log('application mapping for defunct socket ' + socketId + ' deleted');
48 }
49 }
50 });
51}
52
53function isEmpty(obj) {
54 if (Array.isArray(obj)) return false;
55 for (var name in obj) {
56 return false;
57 }
58 return true;
59}
60
61module.exports = function(q, io, customModule) {
62
63 var customSockets;
64 if (customModule) customSockets = require(customModule);
65
66 io.on('connection', function (socket) {
67 if (!q.workerResponseHandlers) q.workerResponseHandlers = {};
68
69 /*
70 function sendMessageToClient(messageObj) {
71 //console.log('sending to socket ' + socket.id);
72 socket.emit('ewdjs', messageObj);
73 };
74 */
75
76 var handleMessage = function(data) {
77 var startTime = new Date();
78 var expectJWT = false;
79
80 if (typeof data !== 'object') {
81 console.log('Received message is not an object: ' + data);
82 return;
83 }
84 var type = data.type;
85 if (!type) {
86 console.log('No type defined for message: ' + JSON.stringify(data));
87 return;
88 }
89
90 // valid websocket message received from browser
91
92 var responseObj;
93 if (type === 'ewd-register' || type === 'ewd-reregister') {
94 data.socketId = socket.id;
95 data.ipAddress = socket.request.connection.remoteAddress;
96 if (data.jwt && type === 'ewd-register') {
97 // record application used by this new socket
98 if (!q.sockets) q.sockets = {};
99 q.sockets[socket.id] = {
100 application: data.application,
101 jwt: true
102 };
103 // clear down records for any disconnected clients
104 //clearSocketData.call(q);
105 }
106 }
107
108 //console.log('** sockets: incoming message received: ' + JSON.stringify(data));
109 // for browser-based applications that use JWTs
110 // but NOT for MicroService server connections over web sockets
111
112 //if (socket && socket.id) console.log('socket.id = ' + socket.id);
113 //if (q.sockets && q.sockets[socket.id]) console.log('q.sockets[socket.id].jwt = ' + q.sockets[socket.id].jwt);
114
115 if (type !== 'restRequest' && q.sockets && q.sockets[socket.id] && q.sockets[socket.id].jwt) expectJWT = true;
116 //console.log('** expectJWT = ' + expectJWT);
117 //console.log('** type = ' + type);
118
119 // queue & dispatch message to worker. Response handled by callback function:
120 var thisSocket = socket;
121 var ix;
122 var count;
123 var token = data.token;
124
125 if (q.resilientMode && q.db && token) {
126 ix = resilientMode.storeIncomingMessage.call(q, data);
127 data.dbIndex = ix; // so the worker can record progress to database
128 count = 0;
129 }
130
131 var handleResponse = function(resultObj) {
132 //console.log('*** /lib/sockets handleResponse: ' + JSON.stringify(resultObj));
133
134 function sendError(error) {
135 thisSocket.emit('ewdjs', {error: error});
136 }
137
138 if (resultObj.message && resultObj.message.qewd_destination && resultObj.message.qewd_application) {
139 /*
140 Incoming request is asking to re-route to a different application on another microservice
141 Make sure this is allowed before doing the re-routing
142 */
143 var destination = resultObj.message.qewd_destination;
144 var fromApp = resultObj.message.ewd_application;
145 var toApp = resultObj.message.qewd_application;
146 var config = q.userDefined.config;
147 // does the Microservice exist as a registered destination?
148 if (q.u_services.byDestination[destination]) {
149 // has permission been granted for this application switch?
150 if (config.permit_application_switch && config.permit_application_switch[fromApp] && config.permit_application_switch[fromApp][toApp]) {
151 console.log('forwarding message to ' + destination);
152 q.u_services.byDestination[destination].client.send(resultObj.message, function(responseObj) {
153 //console.log('*!*!* response from microservice: ' + JSON.stringify(responseObj, null, 2));
154 thisSocket.emit('ewdjs', responseObj);
155 });
156 return;
157 }
158 else {
159 return sendError('No permissions set to allow a switch from ' + fromApp + ' to ' + toApp);
160 }
161 }
162 else {
163 return sendError(destination + ' is not a valid microservice destination');
164 }
165 }
166
167 // ignore messages directed to specific sockets - these are handled separately - see this.on('response') above
168 if (resultObj.socketId) {
169 console.log('*** socket-specific message ' + JSON.stringify(resultObj));
170 return;
171 }
172
173 // intercept response for further processing on master process if required
174 var message = resultObj.message;
175
176 var sendMessageToClient = function(responseObj) {
177 // make sure original request Id is also added
178 if (message && message.ms_requestId) {
179 responseObj.ms_requestId = message.ms_requestId;
180 if (responseObj.message) responseObj.message.ms_requestId = message.ms_requestId;
181 }
182 // if worker response handler finished() - ie without a defined response,
183 // don't return any response to browser
184
185 if (!isEmpty(responseObj.message)) {
186 //if (responseObj.message) thisSocket.emit('ewdjs', responseObj);
187 //console.log('sending ' + JSON.stringify(responseObj));
188 responseObj.responseTime = (Date.now() - startTime) + 'ms';
189 thisSocket.emit('ewdjs', responseObj);
190 if (q.resilientMode && q.db && resultObj.type !== 'ewd-register') {
191 count++;
192 resilientMode.storeResponse.call(q, resultObj, token, ix, count, handleMessage)
193 }
194 }
195 };
196
197 if (message && message.ewd_application) {
198 var application = message.ewd_application;
199 if (!message.error) {
200 //console.log('****** check for worker response intercept ****');
201 // if this application has worker response intercept handlers defined, make sure they are loaded now
202 var appPath = application;
203 if (!q.workerResponseHandlers[application]) {
204 if (q.userDefined.config && q.userDefined.config.moduleMap && q.userDefined.config.moduleMap[application]) {
205 appPath = q.userDefined.config.moduleMap[application];
206 }
207 try {
208 q.workerResponseHandlers[application] = require(appPath).workerResponseHandlers || {};
209 }
210 catch(err) {
211 error = 'Unable to load worker response intercept handler module for: ' + application;
212 q.workerResponseHandlers[application] = {};
213 }
214 }
215
216 var type = resultObj.type.toString();
217 if (resultObj.message.use_microservice === true) {
218 message.original_type = type;
219 type = 'use_microservice';
220 }
221 if (application && type && q.workerResponseHandlers && q.workerResponseHandlers[application] && q.workerResponseHandlers[application][type]) {
222 var resp = q.workerResponseHandlers[application][type].call(q, message, sendMessageToClient);
223 //console.log('workerResponseHandler response: ' + resp);
224 if (resp === true) return; // The workerResponseHandler has sent the response to the client itself;
225 //if (resp) resultObj.message = resp;
226 if (typeof resp === 'undefined') {
227 resultObj.message = message; // WorkerResponseHandler didn't do anything to message
228 }
229 else {
230 resultObj.message = resp; // workerResponseHandler explicitly returned a message
231 }
232 }
233 }
234 if (resultObj.message) delete resultObj.message.ewd_application;
235 if (type === 'restRequest') {
236 // path was previously added to aid workerResponseHandler filtering
237 // now get rid of it before sending response
238 if (resultObj.message) delete resultObj.message.path;
239 }
240 }
241
242 // send response to browser
243 //console.log('sending to socket ' + socket.id);
244
245 sendMessageToClient(resultObj);
246
247 /*
248 var responseTime = (Date.now() - startTime) + 'ms';
249 console.log('Response time: ' + responseTime);
250 resultObj.responseTime = responseTime;
251 if (resultObj.message) {
252 //console.log('emitting');
253 thisSocket.emit('ewdjs', resultObj);
254 }
255 if (q.resilientMode && q.db && resultObj.type !== 'ewd-register') {
256 count++;
257 resilientMode.storeResponse.call(q, resultObj, token, ix, count, handleMessage)
258 }
259 */
260
261 };
262
263 if (expectJWT && data.token) {
264 // browser-based websocket application using JWTs
265 handleJWT.masterRequest.call(q, data, thisSocket, handleResponse);
266 return;
267 }
268 q.handleMessage(data, handleResponse);
269 };
270
271 socket.on('ewdjs', handleMessage);
272 socket.on('disconnect', function() {
273 delete customSocketConnections[socket.id];
274 if (q.sockets) delete q.sockets[socket.id]
275 console.log('socket ' + socket.id + ' disconnected');
276 });
277
278 if (customSockets && !customSocketConnections[socket.id]) {
279 // load the custom handlers for this socket - unless already loaded
280 console.log('** Loading custom socket module handlers for socket ' + socket.id);
281 customSockets(io, socket, q);
282 customSocketConnections[socket.id] = 'connected';
283 }
284
285 });
286 if (q.jwt) q.jwt.handlers = handleJWT;
287 q.io = io;
288 q.io.toAll = function(message) {
289 io.emit('ewdjs', message);
290 };
291 q.io.toApplication = function(message, application) {
292 for (var id in q.sockets) {
293 if (q.sockets[id].application === application) {
294 q.io.to(id).emit('ewdjs', message);
295 }
296 }
297 };
298};