UNPKG

5.88 kBJavaScriptView Raw
1// Copyright (c) 2016 Kinvey Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4// in compliance with the License. You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software distributed under the License
9// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10// or implied. See the License for the specific language governing permissions and limitations under
11// the License.
12
13const net = require('net');
14const log = require('../log/logger');
15const { serialize } = require('../utils');
16
17function getSerializableError(err) {
18 let errorToLog = err.toString();
19
20 if (!(err instanceof Error)) {
21 log.debug('Error argument not an instance of Error');
22 try {
23 serialize(err); // if err has circular references, this would fail
24 errorToLog = err;
25 } catch (err) {
26 const msg = 'Error argument not instance of Error and not stringifiable';
27 errorToLog = msg;
28 log.debug(msg);
29 }
30 }
31
32 return errorToLog;
33}
34
35module.exports = (() => {
36 let server = {};
37
38 function composeErrorReply(errorMessage, debugMessage, err, metadata) {
39 // build an error response as generated by blApi
40 // FIXME: error response format should be handled by the outermost layer
41 // and not be duplicated in various places.
42
43 return {
44 isError: true,
45 error: getSerializableError(err),
46 message: errorMessage,
47 debugMessage,
48 // note: stack is not a stack array, it's a formatted text string that contains a printed stack trace
49 stackTrace: err.stack,
50 metadata: metadata || {}
51 };
52 }
53
54 function parseTask(task, callback) {
55 log.debug('parseTask Invoked');
56 log.debug(task);
57
58 if (!task) {
59 return callback(new Error('Invalid or missing task object'));
60 }
61
62 if (typeof task === 'object') {
63 return callback(null, task);
64 }
65
66 let parsedTask;
67
68 try {
69 parsedTask = JSON.parse(task);
70 log.debug('Task parsed');
71 } catch (e) {
72 log.debug('Task Parse failed');
73 log.debug(e);
74 return callback(new Error('invalid task: unable to parse task json'));
75 }
76
77 return callback(null, parsedTask);
78 }
79
80 function startServer(taskReceivedCallback, startedCallback, options) {
81 server = net.createServer((socket) => {
82 log.info('Connection established');
83 let data = '';
84 let lineEnd;
85 let task;
86
87 function processTask(task) {
88 setImmediate(() => {
89 log.debug('About to parse task');
90 log.debug(`Task: ${task}`);
91
92 parseTask(task, (parseError, parsedTask) => {
93 log.debug('Task parsing complete');
94
95 if (parseError) {
96 log.error(`Parse error! ${parseError}`);
97 parseError.isError = true;
98 socket.write(serialize(composeErrorReply('Internal Error', parseError.toString(), parseError)));
99 socket.write('\n');
100 return;
101 }
102
103 // Set task request.query to request.params to account for difference in BL tasks
104 if (parsedTask && parsedTask.request && parsedTask.request.params && !parsedTask.request.query) {
105 parsedTask.request.query = parsedTask.request.params;
106 }
107
108 log.debug('About to invoke taskReceivedCallback');
109 log.debug(parsedTask);
110
111 taskReceivedCallback(parsedTask, (err, result) => {
112 log.debug('About to respond');
113 // processBL returns pre-assembled response object
114
115 if (err) {
116 log.debug('Responding with error');
117 log.error(err);
118 socket.write(serialize(composeErrorReply('Internal Error', 'Unable to execute Flex method', err)));
119 socket.write('\n');
120 return;
121 }
122
123 log.debug('Responding with success');
124 log.debug(result);
125
126 // Make sure result is present, undefined is not stringified into JSON
127 if (typeof result === 'undefined') {
128 result = null;
129 }
130
131 // TODO: Check the size of ret and use a non-blocking serializer if too large
132 socket.write(serialize(result));
133 socket.write('\n');
134 });
135 });
136 });
137 }
138
139 socket.on('data', (chunk) => {
140 log.debug('chunk received');
141
142 // Always append the new data, then process task below
143 data += chunk.toString();
144
145 // process one task at a time in arrival order, sending back responses in the same order
146 // an already running processing loop will handle the newly arrived task as well
147 lineEnd = data.indexOf('\n');
148
149 while (lineEnd > -1) {
150 task = data.slice(0, lineEnd + 1);
151 data = data.slice(lineEnd + 1);
152
153 if (task == null) {
154 // Task is invalid, can't process. Can't send an error back because we don't have a task Id.
155 // Break out of the loop as no more data will be in data.slice
156 break;
157 }
158
159 // TODO: do not ^^^ recopy all the data for each task line, index into data instead
160 // skip blank lines, processBL non-blank ones
161
162 if (task.indexOf('{"healthCheck":1}') > -1) {
163 log.info('healthcheck!');
164 socket.write(`${serialize({ status: 'ready' })}\n`);
165 return;
166 }
167
168 processTask(task);
169 lineEnd = data.indexOf('\n');
170 }
171 });
172 });
173
174 setImmediate(() => {
175 server.listen(options.port || 7000, () => {
176 startedCallback();
177 });
178 });
179 }
180
181 function stop() {
182 server.close();
183 }
184
185 return {
186 startServer,
187 stop
188 };
189})();