UNPKG

6.52 kBJavaScriptView Raw
1// Copyright (c) 2018 Kinvey Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4// in compliance with the License. You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software distributed under the License
9// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10// or implied. See the License for the specific language governing permissions and limitations under
11// the License.
12
13const net = require('net');
14const log = require('../log/logger');
15const { serialize } = require('../utils');
16
17function getSerializableError(err) {
18 let errorToLog = err.toString();
19
20 if (!(err instanceof Error)) {
21 log.debug('Error argument not an instance of Error');
22 try {
23 serialize(err); // if err has circular references, this would fail
24 errorToLog = err;
25 } catch (err) {
26 const msg = 'Error argument not instance of Error and not stringifiable';
27 errorToLog = msg;
28 log.debug(msg);
29 }
30 }
31
32 return errorToLog;
33}
34
35module.exports = (() => {
36 let server = {};
37
38 function composeErrorReply(errorMessage, debugMessage, err, metadata) {
39 // build an error response as generated by blApi
40 // FIXME: error response format should be handled by the outermost layer
41 // and not be duplicated in various places.
42
43 return {
44 isError: true,
45 error: getSerializableError(err),
46 message: errorMessage,
47 debugMessage,
48 // note: stack is not a stack array, it's a formatted text string that contains a printed stack trace
49 stackTrace: err.stack,
50 metadata: metadata || {}
51 };
52 }
53
54 function parseTask(task, callback) {
55 log.debug('parseTask Invoked');
56 log.debug(task);
57
58 if (!task) {
59 return callback(new Error('Invalid or missing task object'));
60 }
61
62 if (typeof task === 'object') {
63 return callback(null, task);
64 }
65
66 let parsedTask;
67
68 try {
69 parsedTask = JSON.parse(task);
70 log.debug('Task parsed');
71 } catch (e) {
72 log.debug('Task Parse failed');
73 log.debug(e);
74 return callback(new Error('invalid task: unable to parse task json'));
75 }
76
77 return callback(null, parsedTask);
78 }
79
80 function startServer(taskReceivedCallback, startedCallback, options) {
81 server = net.createServer((socket) => {
82 log.info('Connection established');
83 let clientDisconnected = false;
84 let data = '';
85 let lineEnd;
86 let task;
87
88 function validateSocketReadiness() {
89 if (clientDisconnected) {
90 return 'Connection ended by client.';
91 } else if (socket.destroyed) {
92 return 'Connection lost - cannot write response.';
93 }
94 return null;
95 }
96
97 function writeResponse(data) {
98 const msg = validateSocketReadiness();
99 if (msg) {
100 log.error(msg);
101 return msg;
102 }
103 socket.write(serialize(data));
104 socket.write('\n');
105 return null;
106 }
107
108 function processTask(task) {
109 setImmediate(() => {
110 log.debug('About to parse task');
111 log.debug(`Task: ${task}`);
112
113 parseTask(task, (parseError, parsedTask) => {
114 log.debug('Task parsing complete');
115
116 if (parseError) {
117 log.error(`Parse error! ${parseError}`);
118 parseError.isError = true;
119 writeResponse(composeErrorReply('Internal Error', parseError.toString(), parseError));
120 return;
121 }
122
123 // Set task request.query to request.params to account for difference in BL tasks
124 if (parsedTask && parsedTask.request && parsedTask.request.params && !parsedTask.request.query) {
125 parsedTask.request.query = parsedTask.request.params;
126 }
127
128 log.debug('About to invoke taskReceivedCallback');
129 log.debug(parsedTask);
130
131 taskReceivedCallback(parsedTask, (err, result) => {
132 log.debug('About to respond');
133 // processBL returns pre-assembled response object
134
135 if (err) {
136 log.debug('Responding with error');
137 log.error(err);
138 return writeResponse(composeErrorReply('Internal Error', 'Unable to execute Flex method', err));
139 }
140
141 log.debug('Responding with success');
142 log.debug(result);
143
144 // Make sure result is present, undefined is not stringified into JSON
145 if (typeof result === 'undefined') {
146 result = null;
147 }
148
149 // TODO: Check the size of ret and use a non-blocking serializer if too large
150 return writeResponse(result);
151 });
152 });
153 });
154 }
155
156 socket.on('error', (err) => {
157 // Avoid crashing the process on errors for a single connection
158 log.error(err.stack);
159 });
160
161 socket.on('data', (chunk) => {
162 log.debug('chunk received');
163
164 // Always append the new data, then process task below
165 data += chunk.toString();
166
167 // process one task at a time in arrival order, sending back responses in the same order
168 // an already running processing loop will handle the newly arrived task as well
169 lineEnd = data.indexOf('\n');
170
171 while (lineEnd > -1) {
172 task = data.slice(0, lineEnd + 1);
173 data = data.slice(lineEnd + 1);
174
175 if (task == null) {
176 // Task is invalid, can't process. Can't send an error back because we don't have a task Id.
177 // Break out of the loop as no more data will be in data.slice
178 break;
179 }
180
181 // TODO: do not ^^^ recopy all the data for each task line, index into data instead
182 // skip blank lines, processBL non-blank ones
183
184 if (task.indexOf('{"healthCheck":1}') > -1) {
185 log.info('healthcheck!');
186 writeResponse({ status: 'ready' });
187 return;
188 }
189
190 processTask(task);
191 lineEnd = data.indexOf('\n');
192 }
193 });
194
195 socket.on('end', () => {
196 clientDisconnected = true;
197 });
198 });
199
200 setImmediate(() => {
201 server.listen(options.port || 7000, () => {
202 startedCallback();
203 });
204 });
205 }
206
207 function stop(callback) {
208 server.close(callback);
209 }
210
211 return {
212 startServer,
213 stop
214 };
215})();