UNPKG

11.1 kBJavaScriptView Raw
1const { networkInterfaces } = require('os');
2const stringifySafe = require('json-stringify-safe');
3const assign = require('lodash.assign');
4const dgram = require('dgram');
5const zlib = require('zlib');
6const axiosInstance = require('./axiosInstance');
7
8
9const nanoSecDigits = 9;
10
11exports.version = require('../package.json').version;
12
13const jsonToString = (json) => {
14 try {
15 return JSON.stringify(json);
16 } catch (ex) {
17 return stringifySafe(json, null, null, () => {});
18 }
19};
20
21const messagesToBody = messages => messages.map(jsonToString).join(`\n`);
22
23const UNAVAILABLE_CODES = ['ETIMEDOUT', 'ECONNRESET', 'ESOCKETTIMEDOUT', 'ECONNABORTED'];
24
25const zlibPromised = body => new Promise(((resolve, reject) => {
26 zlib.gzip(body, (err, res) => {
27 if (err) return reject(err);
28 return resolve(res);
29 });
30}));
31
32const protocolToPortMap = {
33 udp: 5050,
34 http: 8070,
35 https: 8071,
36};
37
38const USER_AGENT = 'Logzio-Logger NodeJS';
39
40class LogzioLogger {
41 constructor({
42 token,
43 host = 'listener.logz.io',
44 type = 'nodejs',
45 sendIntervalMs = 10 * 1000,
46 bufferSize = 100,
47 debug = false,
48 numberOfRetries = 3,
49 supressErrors = false,
50 addTimestampWithNanoSecs = false,
51 compress = false,
52 internalLogger = console,
53 protocol = 'http',
54 port,
55 timeout,
56 sleepUntilNextRetry = 2 * 1000,
57 callback = this._defaultCallback,
58 setUserAgent = true,
59 extraFields = {},
60 }) {
61 if (!token) {
62 throw new Error('You are required to supply a token for logging.');
63 }
64
65 this.token = token;
66 this.host = host;
67 this.type = type;
68 this.sendIntervalMs = sendIntervalMs;
69 this.bufferSize = bufferSize;
70 this.debug = debug;
71 this.numberOfRetries = numberOfRetries;
72 this.supressErrors = supressErrors;
73 this.addTimestampWithNanoSecs = addTimestampWithNanoSecs;
74 this.compress = compress;
75 this.internalLogger = internalLogger;
76 this.sleepUntilNextRetry = sleepUntilNextRetry;
77 this.setUserAgent = setUserAgent;
78 this.timer = null;
79 this.closed = false;
80
81 this.protocol = protocol;
82 this._setProtocol(port);
83 this.url = `${this.protocol}://${this.host}:${this.port}?token=${this.token}`;
84
85 this.axiosInstance = axiosInstance;
86 this.axiosInstance.defaults.headers.post = {
87 Host: this.host,
88 Accept: '*/*',
89 'Content-Type': 'text/plain',
90 ...(this.setUserAgent ? { 'user-agent': USER_AGENT } : {}),
91 ...(this.compress ? { 'content-encoding': 'gzip' } : {}),
92
93 };
94
95 /*
96 Callback method executed on each bulk of messages sent to logzio.
97 If the bulk failed, it will be called: callback(exception), otherwise upon
98 success it will called as callback()
99 */
100 this.callback = callback;
101
102 /*
103 * the read/write/connection timeout in milliseconds of the outgoing HTTP request
104 */
105 this.timeout = timeout;
106
107 // build the url for logging
108
109 this.messages = [];
110 this.bulkId = 1;
111 this.extraFields = extraFields;
112 this.typeOfIP = 'IPv4';
113 }
114
115 _setProtocol(port) {
116 if (!protocolToPortMap[this.protocol]) {
117 throw new Error(`Invalid protocol defined. Valid options are : ${JSON.stringify(Object.keys(protocolToPortMap))}`);
118 }
119 this.port = port || protocolToPortMap[this.protocol];
120
121 if (this.protocol === 'udp') {
122 this.udpClient = dgram.createSocket('udp4');
123 }
124 }
125
126 _defaultCallback(err) {
127 if (err && !this.supressErrors) {
128 this.internalLogger.log(`logzio-logger error: ${err}`, err);
129 }
130 }
131
132 sendAndClose(callback) {
133 this.callback = callback || this._defaultCallback;
134 this._debug('Sending last messages and closing...');
135 this._popMsgsAndSend();
136 clearTimeout(this.timer);
137
138 if (this.protocol === 'udp') {
139 this.udpClient.close();
140 }
141 }
142
143 _timerSend() {
144 if (this.messages.length > 0) {
145 this._debug(`Woke up and saw ${this.messages.length} messages to send. Sending now...`);
146 this._popMsgsAndSend();
147 }
148
149 this.timer = setTimeout(() => {
150 this._timerSend();
151 }, this.sendIntervalMs);
152 }
153
154 _sendMessagesUDP() {
155 const udpSentCallback = (err) => {
156 if (err) {
157 this._debug(`Error while sending udp packets. err = ${err}`);
158 this.callback(new Error(`Failed to send udp log message. err = ${err}`));
159 }
160 };
161
162 this.messages.forEach((message) => {
163 const msg = message;
164 msg.token = this.token;
165 const buff = Buffer.from(stringifySafe(msg));
166
167 this._debug('Starting to send messages via udp.');
168 this.udpClient.send(buff, 0, buff.length, this.port, this.host, udpSentCallback);
169 });
170 }
171
172 close() {
173 // clearing the timer allows the node event loop to quit when needed
174 clearTimeout(this.timer);
175
176 // send pending messages, if any
177 if (this.messages.length > 0) {
178 this._debug('Closing, purging messages.');
179 this._popMsgsAndSend();
180 }
181
182 if (this.protocol === 'udp') {
183 this.udpClient.close();
184 }
185
186 // no more logging allowed
187 this.closed = true;
188 }
189
190 /**
191 * Attach a timestamp to the log record.
192 * If @timestamp already exists, use it. Else, use current time.
193 * The same goes for @timestamp_nano
194 * @param msg - The message (Object) to append the timestamp to.
195 * @private
196 */
197 _addTimestamp(msg) {
198 const now = (new Date()).toISOString();
199 msg['@timestamp'] = msg['@timestamp'] || now;
200
201 if (this.addTimestampWithNanoSecs) {
202 const time = process.hrtime();
203 msg['@timestamp_nano'] = msg['@timestamp_nano'] || [now, time[1].toString().padStart(nanoSecDigits, '0')].join('-');
204 }
205 }
206
207 /**
208 * Attach a Source IP to the log record.
209 * @param msg - The message (Object) to append the timestamp to.
210 * @private
211 */
212 _addSourceIP(msg) {
213 const { en0 } = networkInterfaces();
214 if (en0 && en0.length > 0) {
215 const relevantIPs = [];
216 en0.forEach((ip) => {
217 // Skip over non-IPv4 and internal (i.e. 127.0.0.1) addresses
218 // 'IPv4' is in Node <= 17, from 18 it's a number 4 or 6
219 const familyV4Value = typeof ip.family === 'string' ? this.typeOfIP : 4;
220 if (ip.family === familyV4Value && !ip.internal) {
221 relevantIPs.push(ip.address);
222 // msg.sourceIP = ip.address;
223 }
224 });
225
226 if (relevantIPs.length > 1) {
227 relevantIPs.forEach((ip, idx) => {
228 msg[`sourceIP_${idx}`] = ip;
229 });
230 } else if (relevantIPs.length === 1) {
231 const [sourceIP] = relevantIPs;
232 msg.sourceIP = sourceIP;
233 }
234 }
235 }
236
237 log(msg, obj) {
238 if (this.closed === true) {
239 throw new Error('Logging into a logger that has been closed!');
240 }
241 if (![null, undefined].includes(obj)) {
242 msg += JSON.stringify(obj);
243 }
244 if (typeof msg === 'string') {
245 msg = {
246 message: msg,
247 };
248 }
249 this._addSourceIP(msg);
250 msg = assign(msg, this.extraFields);
251 if (!msg.type) {
252 msg.type = this.type;
253 }
254 this._addTimestamp(msg);
255
256 this.messages.push(msg);
257 if (this.messages.length >= this.bufferSize) {
258 this._debug('Buffer is full - sending bulk');
259 this._popMsgsAndSend();
260 }
261 }
262
263 _popMsgsAndSend() {
264 if (this.protocol === 'udp') {
265 this._debug('Sending messages via udp');
266 this._sendMessagesUDP();
267 } else {
268 const bulk = this._createBulk(this.messages);
269 this._debug(`Sending bulk #${bulk.id}`);
270 this._send(bulk);
271 }
272
273 this.messages = [];
274 }
275
276 _createBulk(msgs) {
277 const bulk = {};
278 // creates a new copy of the array. Objects references are copied (no deep copy)
279 bulk.msgs = msgs.slice();
280 bulk.attemptNumber = 1;
281 bulk.sleepUntilNextRetry = this.sleepUntilNextRetry;
282 bulk.id = this.bulkId; // TODO test
283 this.bulkId += 1;
284
285 return bulk;
286 }
287
288 _debug(msg) {
289 if (this.debug) this.internalLogger.log(`logzio-nodejs: ${msg}`);
290 }
291
292 _tryAgainIn(sleepTimeMs, bulk) {
293 this._debug(`Bulk #${bulk.id} - Trying again in ${sleepTimeMs}[ms], attempt no. ${bulk.attemptNumber}`);
294 setTimeout(() => {
295 this._send(bulk);
296 }, sleepTimeMs);
297 }
298
299 _send(bulk) {
300 const body = messagesToBody(bulk.msgs);
301
302 if (typeof this.timeout !== 'undefined') {
303 this.axiosInstance.defaults.timeout = this.timeout;
304 }
305
306 return Promise.resolve()
307 .then(() => {
308 if (this.compress) {
309 return zlibPromised(body);
310 }
311 return body;
312 })
313 .then((finalBody) => {
314 this._tryToSend(finalBody, bulk);
315 });
316 }
317
318 _tryToSend(body, bulk) {
319 this._debug(`Sending bulk of ${bulk.msgs.length} logs`);
320 return this.axiosInstance.post(this.url, body)
321 .then(() => {
322 this._debug(`Bulk #${bulk.id} - sent successfully`);
323 this.callback();
324 })
325 .catch((err) => {
326 // In rare cases server is busy
327 const errorCode = err.code;
328 if (UNAVAILABLE_CODES.includes(errorCode)) {
329 if (bulk.attemptNumber >= this.numberOfRetries) {
330 return this.callback(new Error(`Failed after ${bulk.attemptNumber} retries on error = ${err}`), bulk);
331 }
332 this._debug(`Bulk #${bulk.id} - failed on error: ${err}`);
333 const sleepTimeMs = bulk.sleepUntilNextRetry;
334 bulk.sleepUntilNextRetry *= 2;
335 bulk.attemptNumber += 1;
336
337 return this._tryAgainIn(sleepTimeMs, bulk);
338 }
339 if (err.statusCode !== 200) {
340 return this.callback(new Error(`There was a problem with the request.\nResponse: ${err.statusCode}: ${err.message}`), bulk);
341 }
342 return this.callback(err, bulk);
343 });
344 }
345}
346
347const createLogger = (options) => {
348 const l = new LogzioLogger(options);
349 l._timerSend();
350 return l;
351};
352
353
354module.exports = {
355 jsonToString,
356 createLogger
357};