1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 | const net = require('net');
|
14 | const log = require('../log/logger');
|
15 | const { serialize } = require('../utils');
|
16 |
|
17 | function getSerializableError(err) {
|
18 | let errorToLog = err.toString();
|
19 |
|
20 | if (!(err instanceof Error)) {
|
21 | log.debug('Error argument not an instance of Error');
|
22 | try {
|
23 | serialize(err);
|
24 | errorToLog = err;
|
25 | } catch (err) {
|
26 | const msg = 'Error argument not instance of Error and not stringifiable';
|
27 | errorToLog = msg;
|
28 | log.debug(msg);
|
29 | }
|
30 | }
|
31 |
|
32 | return errorToLog;
|
33 | }
|
34 |
|
35 | module.exports = (() => {
|
36 | let server = {};
|
37 |
|
38 | function composeErrorReply(errorMessage, debugMessage, err, metadata) {
|
39 |
|
40 |
|
41 |
|
42 |
|
43 | return {
|
44 | isError: true,
|
45 | error: getSerializableError(err),
|
46 | message: errorMessage,
|
47 | debugMessage,
|
48 |
|
49 | stackTrace: err.stack,
|
50 | metadata: metadata || {}
|
51 | };
|
52 | }
|
53 |
|
54 | function parseTask(task, callback) {
|
55 | log.debug('parseTask Invoked');
|
56 | log.debug(task);
|
57 |
|
58 | if (!task) {
|
59 | return callback(new Error('Invalid or missing task object'));
|
60 | }
|
61 |
|
62 | if (typeof task === 'object') {
|
63 | return callback(null, task);
|
64 | }
|
65 |
|
66 | let parsedTask;
|
67 |
|
68 | try {
|
69 | parsedTask = JSON.parse(task);
|
70 | log.debug('Task parsed');
|
71 | } catch (e) {
|
72 | log.debug('Task Parse failed');
|
73 | log.debug(e);
|
74 | return callback(new Error('invalid task: unable to parse task json'));
|
75 | }
|
76 |
|
77 | return callback(null, parsedTask);
|
78 | }
|
79 |
|
80 | function startServer(taskReceivedCallback, startedCallback, options) {
|
81 | server = net.createServer((socket) => {
|
82 | log.info('Connection established');
|
83 | let clientDisconnected = false;
|
84 | let data = '';
|
85 | let lineEnd;
|
86 | let task;
|
87 |
|
88 | function validateSocketReadiness() {
|
89 | if (clientDisconnected) {
|
90 | return 'Connection ended by client.';
|
91 | } else if (socket.destroyed) {
|
92 | return 'Connection lost - cannot write response.';
|
93 | }
|
94 | return null;
|
95 | }
|
96 |
|
97 | function writeResponse(data) {
|
98 | const msg = validateSocketReadiness();
|
99 | if (msg) {
|
100 | log.error(msg);
|
101 | return msg;
|
102 | }
|
103 | socket.write(serialize(data));
|
104 | socket.write('\n');
|
105 | return null;
|
106 | }
|
107 |
|
108 | function processTask(task) {
|
109 | setImmediate(() => {
|
110 | log.debug('About to parse task');
|
111 | log.debug(`Task: ${task}`);
|
112 |
|
113 | parseTask(task, (parseError, parsedTask) => {
|
114 | log.debug('Task parsing complete');
|
115 |
|
116 | if (parseError) {
|
117 | log.error(`Parse error! ${parseError}`);
|
118 | parseError.isError = true;
|
119 | writeResponse(composeErrorReply('Internal Error', parseError.toString(), parseError));
|
120 | return;
|
121 | }
|
122 |
|
123 |
|
124 | if (parsedTask && parsedTask.request && parsedTask.request.params && !parsedTask.request.query) {
|
125 | parsedTask.request.query = parsedTask.request.params;
|
126 | }
|
127 |
|
128 | log.debug('About to invoke taskReceivedCallback');
|
129 | log.debug(parsedTask);
|
130 |
|
131 | taskReceivedCallback(parsedTask, (err, result) => {
|
132 | log.debug('About to respond');
|
133 |
|
134 |
|
135 | if (err) {
|
136 | log.debug('Responding with error');
|
137 | log.error(err);
|
138 | return writeResponse(composeErrorReply('Internal Error', 'Unable to execute Flex method', err));
|
139 | }
|
140 |
|
141 | log.debug('Responding with success');
|
142 | log.debug(result);
|
143 |
|
144 |
|
145 | if (typeof result === 'undefined') {
|
146 | result = null;
|
147 | }
|
148 |
|
149 |
|
150 | return writeResponse(result);
|
151 | });
|
152 | });
|
153 | });
|
154 | }
|
155 |
|
156 | socket.on('error', (err) => {
|
157 |
|
158 | log.error(err.stack);
|
159 | });
|
160 |
|
161 | socket.on('data', (chunk) => {
|
162 | log.debug('chunk received');
|
163 |
|
164 |
|
165 | data += chunk.toString();
|
166 |
|
167 |
|
168 |
|
169 | lineEnd = data.indexOf('\n');
|
170 |
|
171 | while (lineEnd > -1) {
|
172 | task = data.slice(0, lineEnd + 1);
|
173 | data = data.slice(lineEnd + 1);
|
174 |
|
175 | if (task == null) {
|
176 |
|
177 |
|
178 | break;
|
179 | }
|
180 |
|
181 |
|
182 |
|
183 |
|
184 | if (task.indexOf('{"healthCheck":1}') > -1) {
|
185 | log.info('healthcheck!');
|
186 | writeResponse({ status: 'ready' });
|
187 | return;
|
188 | }
|
189 |
|
190 | processTask(task);
|
191 | lineEnd = data.indexOf('\n');
|
192 | }
|
193 | });
|
194 |
|
195 | socket.on('end', () => {
|
196 | clientDisconnected = true;
|
197 | });
|
198 | });
|
199 |
|
200 | setImmediate(() => {
|
201 | server.listen(options.port || 7000, () => {
|
202 | startedCallback();
|
203 | });
|
204 | });
|
205 | }
|
206 |
|
207 | function stop(callback) {
|
208 | server.close(callback);
|
209 | }
|
210 |
|
211 | return {
|
212 | startServer,
|
213 | stop
|
214 | };
|
215 | })();
|