UNPKG

9.79 kBJavaScriptView Raw
1const request = require('request-promise');
2const stringifySafe = require('json-stringify-safe');
3const assign = require('lodash.assign');
4const dgram = require('dgram');
5const zlib = require('zlib');
6
7const nanoSecDigits = 9;
8
9exports.version = require('../package.json').version;
10
11const jsonToString = (json) => {
12 try {
13 return JSON.stringify(json);
14 } catch (ex) {
15 return stringifySafe(json, null, null, () => {});
16 }
17};
18
19const messagesToBody = messages => messages.map(jsonToString).join(`\n`);
20
21const UNAVAILABLE_CODES = ['ETIMEDOUT', 'ECONNRESET', 'ESOCKETTIMEDOUT', 'ECONNABORTED'];
22
23const zlibPromised = body => new Promise(((resolve, reject) => {
24 zlib.gzip(body, (err, res) => {
25 if (err) return reject(err);
26 return resolve(res);
27 });
28}));
29
30const protocolToPortMap = {
31 udp: 5050,
32 http: 8070,
33 https: 8071,
34};
35
36const USER_AGENT = 'Logzio-Logger NodeJS';
37
38class LogzioLogger {
39 constructor({
40 token,
41 host = 'listener.logz.io',
42 type = 'nodejs',
43 sendIntervalMs = 10 * 1000,
44 bufferSize = 100,
45 debug = false,
46 numberOfRetries = 3,
47 supressErrors = false,
48 addTimestampWithNanoSecs = false,
49 compress = false,
50 internalLogger = console,
51 protocol = 'http',
52 port,
53 timeout,
54 sleepUntilNextRetry = 2 * 1000,
55 callback = this._defaultCallback,
56 extraFields = {},
57 }) {
58 if (!token) {
59 throw new Error('You are required to supply a token for logging.');
60 }
61
62 this.token = token;
63 this.host = host;
64 this.type = type;
65 this.sendIntervalMs = sendIntervalMs;
66 this.bufferSize = bufferSize;
67 this.debug = debug;
68 this.numberOfRetries = numberOfRetries;
69 this.supressErrors = supressErrors;
70 this.addTimestampWithNanoSecs = addTimestampWithNanoSecs;
71 this.compress = compress;
72 this.internalLogger = internalLogger;
73 this.sleepUntilNextRetry = sleepUntilNextRetry;
74
75 this.timer = null;
76 this.closed = false;
77
78 this.protocol = protocol;
79 this._setProtocol(port);
80
81 /*
82 Callback method executed on each bulk of messages sent to logzio.
83 If the bulk failed, it will be called: callback(exception), otherwise upon
84 success it will called as callback()
85 */
86 this.callback = callback;
87
88 /*
89 * the read/write/connection timeout in milliseconds of the outgoing HTTP request
90 */
91 this.timeout = timeout;
92
93 // build the url for logging
94 this.url = `${this.protocol}://${this.host}:${this.port}?token=${this.token}`;
95
96 this.messages = [];
97 this.bulkId = 1;
98 this.extraFields = extraFields;
99 }
100
101 _setProtocol(port) {
102 if (!protocolToPortMap[this.protocol]) {
103 throw new Error(`Invalid protocol defined. Valid options are : ${JSON.stringify(Object.keys(protocolToPortMap))}`);
104 }
105 this.port = port || protocolToPortMap[this.protocol];
106
107 if (this.protocol === 'udp') {
108 this.udpClient = dgram.createSocket('udp4');
109 }
110 }
111
112 _defaultCallback(err) {
113 if (err && !this.supressErrors) {
114 this.internalLogger.log(`logzio-logger error: ${err}`, err);
115 }
116 }
117
118 sendAndClose(callback) {
119 this.callback = callback || this._defaultCallback;
120 this._debug('Sending last messages and closing...');
121 this._popMsgsAndSend();
122 clearTimeout(this.timer);
123
124 if (this.protocol === 'udp') {
125 this.udpClient.close();
126 }
127 }
128
129 _timerSend() {
130 if (this.messages.length > 0) {
131 this._debug(`Woke up and saw ${this.messages.length} messages to send. Sending now...`);
132 this._popMsgsAndSend();
133 }
134
135 this.timer = setTimeout(() => {
136 this._timerSend();
137 }, this.sendIntervalMs);
138 }
139
140 _sendMessagesUDP() {
141 const udpSentCallback = (err) => {
142 if (err) {
143 this._debug(`Error while sending udp packets. err = ${err}`);
144 this.callback(new Error(`Failed to send udp log message. err = ${err}`));
145 }
146 };
147
148 this.messages.forEach((message) => {
149 const msg = message;
150 msg.token = this.token;
151 const buff = Buffer.from(stringifySafe(msg));
152
153 this._debug('Starting to send messages via udp.');
154 this.udpClient.send(buff, 0, buff.length, this.port, this.host, udpSentCallback);
155 });
156 }
157
158 close() {
159 // clearing the timer allows the node event loop to quit when needed
160 clearTimeout(this.timer);
161
162 // send pending messages, if any
163 if (this.messages.length > 0) {
164 this._debug('Closing, purging messages.');
165 this._popMsgsAndSend();
166 }
167
168 if (this.protocol === 'udp') {
169 this.udpClient.close();
170 }
171
172 // no more logging allowed
173 this.closed = true;
174 }
175
176 /**
177 * Attach a timestamp to the log record.
178 * If @timestamp already exists, use it. Else, use current time.
179 * The same goes for @timestamp_nano
180 * @param msg - The message (Object) to append the timestamp to.
181 * @private
182 */
183 _addTimestamp(msg) {
184 const now = (new Date()).toISOString();
185 msg['@timestamp'] = msg['@timestamp'] || now;
186
187 if (this.addTimestampWithNanoSecs) {
188 const time = process.hrtime();
189 msg['@timestamp_nano'] = msg['@timestamp_nano'] || [now, time[1].toString().padStart(nanoSecDigits ,'0')].join('-');
190 }
191 }
192
193 log(msg, obj) {
194 if (this.closed === true) {
195 throw new Error('Logging into a logger that has been closed!');
196 }
197 if (![null, undefined].includes(obj)){
198 msg += JSON.stringify(obj);
199 }
200 if (typeof msg === 'string') {
201 msg = {
202 message: msg,
203 };
204 }
205 msg = assign(msg, this.extraFields);
206 if (!msg.type) {
207 msg.type = this.type;
208 }
209 this._addTimestamp(msg);
210
211 this.messages.push(msg);
212 if (this.messages.length >= this.bufferSize) {
213 this._debug('Buffer is full - sending bulk');
214 this._popMsgsAndSend();
215 }
216 }
217
218 _popMsgsAndSend() {
219 if (this.protocol === 'udp') {
220 this._debug('Sending messages via udp');
221 this._sendMessagesUDP();
222 } else {
223 const bulk = this._createBulk(this.messages);
224 this._debug(`Sending bulk #${bulk.id}`);
225 this._send(bulk);
226 }
227
228 this.messages = [];
229 }
230
231 _createBulk(msgs) {
232 const bulk = {};
233 // creates a new copy of the array. Objects references are copied (no deep copy)
234 bulk.msgs = msgs.slice();
235 bulk.attemptNumber = 1;
236 bulk.sleepUntilNextRetry = this.sleepUntilNextRetry;
237 bulk.id = this.bulkId; // TODO test
238 this.bulkId += 1;
239
240 return bulk;
241 }
242
243 _debug(msg) {
244 if (this.debug) this.internalLogger.log(`logzio-nodejs: ${msg}`);
245 }
246
247 _tryAgainIn(sleepTimeMs, bulk) {
248 this._debug(`Bulk #${bulk.id} - Trying again in ${sleepTimeMs}[ms], attempt no. ${bulk.attemptNumber}`);
249 setTimeout(() => {
250 this._send(bulk);
251 }, sleepTimeMs);
252 }
253
254 _send(bulk) {
255 const body = messagesToBody(bulk.msgs);
256 const options = {
257 uri: this.url,
258 headers: {
259 host: this.host,
260 accept: '*/*',
261 'user-agent': USER_AGENT,
262 'content-type': 'text/plain',
263 },
264 };
265
266 if (typeof this.timeout !== 'undefined') {
267 options.timeout = this.timeout;
268 }
269
270 return Promise.resolve()
271 .then(() => {
272 if (this.compress) {
273 options.headers['content-encoding'] = 'gzip';
274 return zlibPromised(body);
275 }
276 return body;
277 })
278 .then((finalBody) => {
279 options.body = finalBody;
280 this._tryToSend(options, bulk);
281 });
282 }
283
284 _tryToSend(options, bulk) {
285 this._debug(`Sending bulk of ${bulk.msgs.length} logs`)
286 return request.post(options)
287 .then(() => {
288 this._debug(`Bulk #${bulk.id} - sent successfully`);
289 this.callback();
290 })
291 .catch((err) => {
292 // In rare cases server is busy
293 const errorCode = err.cause && err.cause.code;
294 if (UNAVAILABLE_CODES.includes(errorCode)) {
295 if (bulk.attemptNumber >= this.numberOfRetries) {
296 return this.callback(new Error(`Failed after ${bulk.attemptNumber} retries on error = ${err}`), bulk);
297 }
298 this._debug(`Bulk #${bulk.id} - failed on error: ${err}`);
299 const sleepTimeMs = bulk.sleepUntilNextRetry;
300 bulk.sleepUntilNextRetry *= 2;
301 bulk.attemptNumber += 1;
302
303 return this._tryAgainIn(sleepTimeMs, bulk);
304 }
305 if (err.statusCode !== 200) {
306 return this.callback(new Error(`There was a problem with the request.\nResponse: ${err.statusCode}: ${err.message}`), bulk);
307 }
308 return this.callback(err, bulk);
309 });
310 }
311}
312
313const createLogger = (options) => {
314 const l = new LogzioLogger(options);
315 l._timerSend();
316 return l;
317};
318
319
320module.exports = {
321 jsonToString,
322 createLogger
323};