1 | /*!
|
2 |
|
3 | ----------------------------------------------------------------------------
|
4 | | qewd: Quick and Easy Web Development |
|
5 | | |
|
6 | | Copyright (c) 2017-19 M/Gateway Developments Ltd, |
|
7 | | Redhill, Surrey UK. |
|
8 | | All rights reserved. |
|
9 | | |
|
10 | | http://www.mgateway.com |
|
11 | | Email: rtweed@mgateway.com |
|
12 | | |
|
13 | | |
|
14 | | Licensed under the Apache License, Version 2.0 (the "License"); |
|
15 | | you may not use this file except in compliance with the License. |
|
16 | | You may obtain a copy of the License at |
|
17 | | |
|
18 | | http://www.apache.org/licenses/LICENSE-2.0 |
|
19 | | |
|
20 | | Unless required by applicable law or agreed to in writing, software |
|
21 | | distributed under the License is distributed on an "AS IS" BASIS, |
|
22 | | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
23 | | See the License for the specific language governing permissions and |
|
24 | | limitations under the License. |
|
25 | ----------------------------------------------------------------------------
|
26 |
|
27 | 29 May 2019
|
28 |
|
29 | */
|
30 |
|
31 | var ewdQueueDocument = 'ewdQueue';
|
32 | var resilientMode = require('./resilientMode');
|
33 | var handleJWT = require('./jwtHandler');
|
34 |
|
35 | var customSocketConnections = {};
|
36 |
|
37 | function clearSocketData() {
|
38 | var self = this;
|
39 | this.io.clients(function(error, clients) {
|
40 | var clientHash = {};
|
41 | clients.forEach(function(socketId) {
|
42 | clientHash[socketId] = true;
|
43 | });
|
44 | for (var socketId in q.sockets) {
|
45 | if (!clientHash[socketId]) {
|
46 | delete self.sockets[socketId];
|
47 | console.log('application mapping for defunct socket ' + socketId + ' deleted');
|
48 | }
|
49 | }
|
50 | });
|
51 | }
|
52 |
|
53 | function isEmpty(obj) {
|
54 | if (Array.isArray(obj)) return false;
|
55 | for (var name in obj) {
|
56 | return false;
|
57 | }
|
58 | return true;
|
59 | }
|
60 |
|
61 | module.exports = function(q, io, customModule) {
|
62 |
|
63 | var customSockets;
|
64 | if (customModule) customSockets = require(customModule);
|
65 |
|
66 | io.on('connection', function (socket) {
|
67 | if (!q.workerResponseHandlers) q.workerResponseHandlers = {};
|
68 |
|
69 | /*
|
70 | function sendMessageToClient(messageObj) {
|
71 | //console.log('sending to socket ' + socket.id);
|
72 | socket.emit('ewdjs', messageObj);
|
73 | };
|
74 | */
|
75 |
|
76 | var handleMessage = function(data) {
|
77 | var startTime = new Date();
|
78 | var expectJWT = false;
|
79 |
|
80 | if (typeof data !== 'object') {
|
81 | console.log('Received message is not an object: ' + data);
|
82 | return;
|
83 | }
|
84 | var type = data.type;
|
85 | if (!type) {
|
86 | console.log('No type defined for message: ' + JSON.stringify(data));
|
87 | return;
|
88 | }
|
89 |
|
90 | // valid websocket message received from browser
|
91 |
|
92 | var responseObj;
|
93 | if (type === 'ewd-register' || type === 'ewd-reregister') {
|
94 | data.socketId = socket.id;
|
95 | data.ipAddress = socket.request.connection.remoteAddress;
|
96 | if (data.jwt && type === 'ewd-register') {
|
97 | // record application used by this new socket
|
98 | if (!q.sockets) q.sockets = {};
|
99 | q.sockets[socket.id] = {
|
100 | application: data.application,
|
101 | jwt: true
|
102 | };
|
103 | // clear down records for any disconnected clients
|
104 | //clearSocketData.call(q);
|
105 | }
|
106 | }
|
107 |
|
108 | //console.log('** sockets: incoming message received: ' + JSON.stringify(data));
|
109 | // for browser-based applications that use JWTs
|
110 | // but NOT for MicroService server connections over web sockets
|
111 |
|
112 | //if (socket && socket.id) console.log('socket.id = ' + socket.id);
|
113 | //if (q.sockets && q.sockets[socket.id]) console.log('q.sockets[socket.id].jwt = ' + q.sockets[socket.id].jwt);
|
114 |
|
115 | if (type !== 'restRequest' && q.sockets && q.sockets[socket.id] && q.sockets[socket.id].jwt) expectJWT = true;
|
116 | //console.log('** expectJWT = ' + expectJWT);
|
117 | //console.log('** type = ' + type);
|
118 |
|
119 | // queue & dispatch message to worker. Response handled by callback function:
|
120 | var thisSocket = socket;
|
121 | var ix;
|
122 | var count;
|
123 | var token = data.token;
|
124 |
|
125 | if (q.resilientMode && q.db && token) {
|
126 | ix = resilientMode.storeIncomingMessage.call(q, data);
|
127 | data.dbIndex = ix; // so the worker can record progress to database
|
128 | count = 0;
|
129 | }
|
130 |
|
131 | var handleResponse = function(resultObj) {
|
132 | //console.log('*** /lib/sockets handleResponse: ' + JSON.stringify(resultObj));
|
133 |
|
134 | function sendError(error) {
|
135 | thisSocket.emit('ewdjs', {error: error});
|
136 | }
|
137 |
|
138 | if (resultObj.message && resultObj.message.qewd_destination && resultObj.message.qewd_application) {
|
139 | /*
|
140 | Incoming request is asking to re-route to a different application on another microservice
|
141 | Make sure this is allowed before doing the re-routing
|
142 | */
|
143 | var destination = resultObj.message.qewd_destination;
|
144 | var fromApp = resultObj.message.ewd_application;
|
145 | var toApp = resultObj.message.qewd_application;
|
146 | var config = q.userDefined.config;
|
147 | // does the Microservice exist as a registered destination?
|
148 | if (q.u_services.byDestination[destination]) {
|
149 | // has permission been granted for this application switch?
|
150 | if (config.permit_application_switch && config.permit_application_switch[fromApp] && config.permit_application_switch[fromApp][toApp]) {
|
151 | console.log('forwarding message to ' + destination);
|
152 | q.u_services.byDestination[destination].client.send(resultObj.message, function(responseObj) {
|
153 | //console.log('*!*!* response from microservice: ' + JSON.stringify(responseObj, null, 2));
|
154 | thisSocket.emit('ewdjs', responseObj);
|
155 | });
|
156 | return;
|
157 | }
|
158 | else {
|
159 | return sendError('No permissions set to allow a switch from ' + fromApp + ' to ' + toApp);
|
160 | }
|
161 | }
|
162 | else {
|
163 | return sendError(destination + ' is not a valid microservice destination');
|
164 | }
|
165 | }
|
166 |
|
167 | // ignore messages directed to specific sockets - these are handled separately - see this.on('response') above
|
168 | if (resultObj.socketId) {
|
169 | console.log('*** socket-specific message ' + JSON.stringify(resultObj));
|
170 | return;
|
171 | }
|
172 |
|
173 | // intercept response for further processing on master process if required
|
174 | var message = resultObj.message;
|
175 |
|
176 | var sendMessageToClient = function(responseObj) {
|
177 | // make sure original request Id is also added
|
178 | if (message && message.ms_requestId) {
|
179 | responseObj.ms_requestId = message.ms_requestId;
|
180 | if (responseObj.message) responseObj.message.ms_requestId = message.ms_requestId;
|
181 | }
|
182 | // if worker response handler finished() - ie without a defined response,
|
183 | // don't return any response to browser
|
184 |
|
185 | if (!isEmpty(responseObj.message)) {
|
186 | //if (responseObj.message) thisSocket.emit('ewdjs', responseObj);
|
187 | //console.log('sending ' + JSON.stringify(responseObj));
|
188 | responseObj.responseTime = (Date.now() - startTime) + 'ms';
|
189 | thisSocket.emit('ewdjs', responseObj);
|
190 | if (q.resilientMode && q.db && resultObj.type !== 'ewd-register') {
|
191 | count++;
|
192 | resilientMode.storeResponse.call(q, resultObj, token, ix, count, handleMessage)
|
193 | }
|
194 | }
|
195 | };
|
196 |
|
197 | if (message && message.ewd_application) {
|
198 | var application = message.ewd_application;
|
199 | if (!message.error) {
|
200 | //console.log('****** check for worker response intercept ****');
|
201 | // if this application has worker response intercept handlers defined, make sure they are loaded now
|
202 | var appPath = application;
|
203 | if (!q.workerResponseHandlers[application]) {
|
204 | if (q.userDefined.config && q.userDefined.config.moduleMap && q.userDefined.config.moduleMap[application]) {
|
205 | appPath = q.userDefined.config.moduleMap[application];
|
206 | }
|
207 | try {
|
208 | q.workerResponseHandlers[application] = require(appPath).workerResponseHandlers || {};
|
209 | }
|
210 | catch(err) {
|
211 | error = 'Unable to load worker response intercept handler module for: ' + application;
|
212 | q.workerResponseHandlers[application] = {};
|
213 | }
|
214 | }
|
215 |
|
216 | var type = resultObj.type.toString();
|
217 | if (resultObj.message.use_microservice === true) {
|
218 | message.original_type = type;
|
219 | type = 'use_microservice';
|
220 | }
|
221 | if (application && type && q.workerResponseHandlers && q.workerResponseHandlers[application] && q.workerResponseHandlers[application][type]) {
|
222 | var resp = q.workerResponseHandlers[application][type].call(q, message, sendMessageToClient);
|
223 | //console.log('workerResponseHandler response: ' + resp);
|
224 | if (resp === true) return; // The workerResponseHandler has sent the response to the client itself;
|
225 | //if (resp) resultObj.message = resp;
|
226 | if (typeof resp === 'undefined') {
|
227 | resultObj.message = message; // WorkerResponseHandler didn't do anything to message
|
228 | }
|
229 | else {
|
230 | resultObj.message = resp; // workerResponseHandler explicitly returned a message
|
231 | }
|
232 | }
|
233 | }
|
234 | if (resultObj.message) delete resultObj.message.ewd_application;
|
235 | if (type === 'restRequest') {
|
236 | // path was previously added to aid workerResponseHandler filtering
|
237 | // now get rid of it before sending response
|
238 | if (resultObj.message) delete resultObj.message.path;
|
239 | }
|
240 | }
|
241 |
|
242 | // send response to browser
|
243 | //console.log('sending to socket ' + socket.id);
|
244 |
|
245 | sendMessageToClient(resultObj);
|
246 |
|
247 | /*
|
248 | var responseTime = (Date.now() - startTime) + 'ms';
|
249 | console.log('Response time: ' + responseTime);
|
250 | resultObj.responseTime = responseTime;
|
251 | if (resultObj.message) {
|
252 | //console.log('emitting');
|
253 | thisSocket.emit('ewdjs', resultObj);
|
254 | }
|
255 | if (q.resilientMode && q.db && resultObj.type !== 'ewd-register') {
|
256 | count++;
|
257 | resilientMode.storeResponse.call(q, resultObj, token, ix, count, handleMessage)
|
258 | }
|
259 | */
|
260 |
|
261 | };
|
262 |
|
263 | if (expectJWT && data.token) {
|
264 | // browser-based websocket application using JWTs
|
265 | handleJWT.masterRequest.call(q, data, thisSocket, handleResponse);
|
266 | return;
|
267 | }
|
268 | q.handleMessage(data, handleResponse);
|
269 | };
|
270 |
|
271 | socket.on('ewdjs', handleMessage);
|
272 | socket.on('disconnect', function() {
|
273 | delete customSocketConnections[socket.id];
|
274 | if (q.sockets) delete q.sockets[socket.id]
|
275 | console.log('socket ' + socket.id + ' disconnected');
|
276 | });
|
277 |
|
278 | if (customSockets && !customSocketConnections[socket.id]) {
|
279 | // load the custom handlers for this socket - unless already loaded
|
280 | console.log('** Loading custom socket module handlers for socket ' + socket.id);
|
281 | customSockets(io, socket, q);
|
282 | customSocketConnections[socket.id] = 'connected';
|
283 | }
|
284 |
|
285 | });
|
286 | if (q.jwt) q.jwt.handlers = handleJWT;
|
287 | q.io = io;
|
288 | q.io.toAll = function(message) {
|
289 | io.emit('ewdjs', message);
|
290 | };
|
291 | q.io.toApplication = function(message, application) {
|
292 | for (var id in q.sockets) {
|
293 | if (q.sockets[id].application === application) {
|
294 | q.io.to(id).emit('ewdjs', message);
|
295 | }
|
296 | }
|
297 | };
|
298 | };
|