1 | const request = require('request-promise');
|
2 | const stringifySafe = require('json-stringify-safe');
|
3 | const assign = require('lodash.assign');
|
4 | const dgram = require('dgram');
|
5 | const zlib = require('zlib');
|
6 |
|
7 | const nanoSecDigits = 9;
|
8 |
|
9 | exports.version = require('../package.json').version;
|
10 |
|
11 | const jsonToString = (json) => {
|
12 | try {
|
13 | return JSON.stringify(json);
|
14 | } catch (ex) {
|
15 | return stringifySafe(json, null, null, () => {});
|
16 | }
|
17 | };
|
18 |
|
19 | const messagesToBody = messages => messages.map(jsonToString).join(`\n`);
|
20 |
|
21 | const UNAVAILABLE_CODES = ['ETIMEDOUT', 'ECONNRESET', 'ESOCKETTIMEDOUT', 'ECONNABORTED'];
|
22 |
|
23 | const zlibPromised = body => new Promise(((resolve, reject) => {
|
24 | zlib.gzip(body, (err, res) => {
|
25 | if (err) return reject(err);
|
26 | return resolve(res);
|
27 | });
|
28 | }));
|
29 |
|
30 | const protocolToPortMap = {
|
31 | udp: 5050,
|
32 | http: 8070,
|
33 | https: 8071,
|
34 | };
|
35 |
|
36 | const USER_AGENT = 'Logzio-Logger NodeJS';
|
37 |
|
38 | class LogzioLogger {
|
39 | constructor({
|
40 | token,
|
41 | host = 'listener.logz.io',
|
42 | type = 'nodejs',
|
43 | sendIntervalMs = 10 * 1000,
|
44 | bufferSize = 100,
|
45 | debug = false,
|
46 | numberOfRetries = 3,
|
47 | supressErrors = false,
|
48 | addTimestampWithNanoSecs = false,
|
49 | compress = false,
|
50 | internalLogger = console,
|
51 | protocol = 'http',
|
52 | port,
|
53 | timeout,
|
54 | sleepUntilNextRetry = 2 * 1000,
|
55 | callback = this._defaultCallback,
|
56 | extraFields = {},
|
57 | }) {
|
58 | if (!token) {
|
59 | throw new Error('You are required to supply a token for logging.');
|
60 | }
|
61 |
|
62 | this.token = token;
|
63 | this.host = host;
|
64 | this.type = type;
|
65 | this.sendIntervalMs = sendIntervalMs;
|
66 | this.bufferSize = bufferSize;
|
67 | this.debug = debug;
|
68 | this.numberOfRetries = numberOfRetries;
|
69 | this.supressErrors = supressErrors;
|
70 | this.addTimestampWithNanoSecs = addTimestampWithNanoSecs;
|
71 | this.compress = compress;
|
72 | this.internalLogger = internalLogger;
|
73 | this.sleepUntilNextRetry = sleepUntilNextRetry;
|
74 |
|
75 | this.timer = null;
|
76 | this.closed = false;
|
77 |
|
78 | this.protocol = protocol;
|
79 | this._setProtocol(port);
|
80 |
|
81 | |
82 |
|
83 |
|
84 |
|
85 |
|
86 | this.callback = callback;
|
87 |
|
88 | |
89 |
|
90 |
|
91 | this.timeout = timeout;
|
92 |
|
93 |
|
94 | this.url = `${this.protocol}://${this.host}:${this.port}?token=${this.token}`;
|
95 |
|
96 | this.messages = [];
|
97 | this.bulkId = 1;
|
98 | this.extraFields = extraFields;
|
99 | }
|
100 |
|
101 | _setProtocol(port) {
|
102 | if (!protocolToPortMap[this.protocol]) {
|
103 | throw new Error(`Invalid protocol defined. Valid options are : ${JSON.stringify(Object.keys(protocolToPortMap))}`);
|
104 | }
|
105 | this.port = port || protocolToPortMap[this.protocol];
|
106 |
|
107 | if (this.protocol === 'udp') {
|
108 | this.udpClient = dgram.createSocket('udp4');
|
109 | }
|
110 | }
|
111 |
|
112 | _defaultCallback(err) {
|
113 | if (err && !this.supressErrors) {
|
114 | this.internalLogger.log(`logzio-logger error: ${err}`, err);
|
115 | }
|
116 | }
|
117 |
|
118 | sendAndClose(callback) {
|
119 | this.callback = callback || this._defaultCallback;
|
120 | this._debug('Sending last messages and closing...');
|
121 | this._popMsgsAndSend();
|
122 | clearTimeout(this.timer);
|
123 |
|
124 | if (this.protocol === 'udp') {
|
125 | this.udpClient.close();
|
126 | }
|
127 | }
|
128 |
|
129 | _timerSend() {
|
130 | if (this.messages.length > 0) {
|
131 | this._debug(`Woke up and saw ${this.messages.length} messages to send. Sending now...`);
|
132 | this._popMsgsAndSend();
|
133 | }
|
134 |
|
135 | this.timer = setTimeout(() => {
|
136 | this._timerSend();
|
137 | }, this.sendIntervalMs);
|
138 | }
|
139 |
|
140 | _sendMessagesUDP() {
|
141 | const udpSentCallback = (err) => {
|
142 | if (err) {
|
143 | this._debug(`Error while sending udp packets. err = ${err}`);
|
144 | this.callback(new Error(`Failed to send udp log message. err = ${err}`));
|
145 | }
|
146 | };
|
147 |
|
148 | this.messages.forEach((message) => {
|
149 | const msg = message;
|
150 | msg.token = this.token;
|
151 | const buff = Buffer.from(stringifySafe(msg));
|
152 |
|
153 | this._debug('Starting to send messages via udp.');
|
154 | this.udpClient.send(buff, 0, buff.length, this.port, this.host, udpSentCallback);
|
155 | });
|
156 | }
|
157 |
|
158 | close() {
|
159 |
|
160 | clearTimeout(this.timer);
|
161 |
|
162 |
|
163 | if (this.messages.length > 0) {
|
164 | this._debug('Closing, purging messages.');
|
165 | this._popMsgsAndSend();
|
166 | }
|
167 |
|
168 | if (this.protocol === 'udp') {
|
169 | this.udpClient.close();
|
170 | }
|
171 |
|
172 |
|
173 | this.closed = true;
|
174 | }
|
175 |
|
176 | |
177 |
|
178 |
|
179 |
|
180 |
|
181 |
|
182 |
|
183 | _addTimestamp(msg) {
|
184 | const now = (new Date()).toISOString();
|
185 | msg['@timestamp'] = msg['@timestamp'] || now;
|
186 |
|
187 | if (this.addTimestampWithNanoSecs) {
|
188 | const time = process.hrtime();
|
189 | msg['@timestamp_nano'] = msg['@timestamp_nano'] || [now, time[1].toString().padStart(nanoSecDigits ,'0')].join('-');
|
190 | }
|
191 | }
|
192 |
|
193 | log(msg, obj) {
|
194 | if (this.closed === true) {
|
195 | throw new Error('Logging into a logger that has been closed!');
|
196 | }
|
197 | if (![null, undefined].includes(obj)){
|
198 | msg += JSON.stringify(obj);
|
199 | }
|
200 | if (typeof msg === 'string') {
|
201 | msg = {
|
202 | message: msg,
|
203 | };
|
204 | }
|
205 | msg = assign(msg, this.extraFields);
|
206 | if (!msg.type) {
|
207 | msg.type = this.type;
|
208 | }
|
209 | this._addTimestamp(msg);
|
210 |
|
211 | this.messages.push(msg);
|
212 | if (this.messages.length >= this.bufferSize) {
|
213 | this._debug('Buffer is full - sending bulk');
|
214 | this._popMsgsAndSend();
|
215 | }
|
216 | }
|
217 |
|
218 | _popMsgsAndSend() {
|
219 | if (this.protocol === 'udp') {
|
220 | this._debug('Sending messages via udp');
|
221 | this._sendMessagesUDP();
|
222 | } else {
|
223 | const bulk = this._createBulk(this.messages);
|
224 | this._debug(`Sending bulk #${bulk.id}`);
|
225 | this._send(bulk);
|
226 | }
|
227 |
|
228 | this.messages = [];
|
229 | }
|
230 |
|
231 | _createBulk(msgs) {
|
232 | const bulk = {};
|
233 |
|
234 | bulk.msgs = msgs.slice();
|
235 | bulk.attemptNumber = 1;
|
236 | bulk.sleepUntilNextRetry = this.sleepUntilNextRetry;
|
237 | bulk.id = this.bulkId;
|
238 | this.bulkId += 1;
|
239 |
|
240 | return bulk;
|
241 | }
|
242 |
|
243 | _debug(msg) {
|
244 | if (this.debug) this.internalLogger.log(`logzio-nodejs: ${msg}`);
|
245 | }
|
246 |
|
247 | _tryAgainIn(sleepTimeMs, bulk) {
|
248 | this._debug(`Bulk #${bulk.id} - Trying again in ${sleepTimeMs}[ms], attempt no. ${bulk.attemptNumber}`);
|
249 | setTimeout(() => {
|
250 | this._send(bulk);
|
251 | }, sleepTimeMs);
|
252 | }
|
253 |
|
254 | _send(bulk) {
|
255 | const body = messagesToBody(bulk.msgs);
|
256 | const options = {
|
257 | uri: this.url,
|
258 | headers: {
|
259 | host: this.host,
|
260 | accept: '*/*',
|
261 | 'user-agent': USER_AGENT,
|
262 | 'content-type': 'text/plain',
|
263 | },
|
264 | };
|
265 |
|
266 | if (typeof this.timeout !== 'undefined') {
|
267 | options.timeout = this.timeout;
|
268 | }
|
269 |
|
270 | return Promise.resolve()
|
271 | .then(() => {
|
272 | if (this.compress) {
|
273 | options.headers['content-encoding'] = 'gzip';
|
274 | return zlibPromised(body);
|
275 | }
|
276 | return body;
|
277 | })
|
278 | .then((finalBody) => {
|
279 | options.body = finalBody;
|
280 | this._tryToSend(options, bulk);
|
281 | });
|
282 | }
|
283 |
|
284 | _tryToSend(options, bulk) {
|
285 | this._debug(`Sending bulk of ${bulk.msgs.length} logs`)
|
286 | return request.post(options)
|
287 | .then(() => {
|
288 | this._debug(`Bulk #${bulk.id} - sent successfully`);
|
289 | this.callback();
|
290 | })
|
291 | .catch((err) => {
|
292 |
|
293 | const errorCode = err.cause && err.cause.code;
|
294 | if (UNAVAILABLE_CODES.includes(errorCode)) {
|
295 | if (bulk.attemptNumber >= this.numberOfRetries) {
|
296 | return this.callback(new Error(`Failed after ${bulk.attemptNumber} retries on error = ${err}`), bulk);
|
297 | }
|
298 | this._debug(`Bulk #${bulk.id} - failed on error: ${err}`);
|
299 | const sleepTimeMs = bulk.sleepUntilNextRetry;
|
300 | bulk.sleepUntilNextRetry *= 2;
|
301 | bulk.attemptNumber += 1;
|
302 |
|
303 | return this._tryAgainIn(sleepTimeMs, bulk);
|
304 | }
|
305 | if (err.statusCode !== 200) {
|
306 | return this.callback(new Error(`There was a problem with the request.\nResponse: ${err.statusCode}: ${err.message}`), bulk);
|
307 | }
|
308 | return this.callback(err, bulk);
|
309 | });
|
310 | }
|
311 | }
|
312 |
|
313 | const createLogger = (options) => {
|
314 | const l = new LogzioLogger(options);
|
315 | l._timerSend();
|
316 | return l;
|
317 | };
|
318 |
|
319 |
|
320 | module.exports = {
|
321 | jsonToString,
|
322 | createLogger
|
323 | };
|