UNPKG

7.21 kBJavaScriptView Raw
1const url = require('url')
2const exitHook = require('async-exit-hook')
3
4const { logproto } = require('./proto')
5const protoHelpers = require('./proto/helpers')
6const req = require('./requests')
7let snappy = false
8
9/**
10 * A batching transport layer for Grafana Loki
11 *
12 * @class Batcher
13 */
14class Batcher {
15 loadSnappy () {
16 return require('snappy')
17 }
18
19 /**
20 * Creates an instance of Batcher.
21 * Starts the batching loop if enabled.
22 * @param {*} options
23 * @memberof Batcher
24 */
25 constructor (options) {
26 // Load given options to the object
27 this.options = options
28
29 // Construct Grafana Loki push API url
30 this.url = new url.URL(this.options.host + '/loki/api/v1/push')
31
32 // Parse basic auth parameters if given
33 if (options.basicAuth) {
34 const btoa = require('btoa')
35 const basicAuth = 'Basic ' + btoa(options.basicAuth)
36 this.options.headers = Object.assign(this.options.headers, { 'Authorization': basicAuth })
37 }
38
39 // Define the batching intervals
40 this.interval = this.options.interval
41 ? Number(this.options.interval) * 1000
42 : 5000
43 this.circuitBreakerInterval = 60000
44
45 // Initialize the log batch
46 this.batch = {
47 streams: []
48 }
49
50 // If snappy binaries have not been built, fallback to JSON transport
51 if (!this.options.json) {
52 try {
53 snappy = this.loadSnappy()
54 } catch (error) {
55 this.options.json = true
56 }
57 if (!snappy) {
58 this.options.json = true
59 }
60 }
61
62 // Define the content type headers for the POST request based on the data type
63 this.contentType = 'application/x-protobuf'
64 if (this.options.json) {
65 this.contentType = 'application/json'
66 }
67
68 // If batching is enabled, run the loop
69 this.options.batching && this.run()
70
71 if (this.options.gracefulShutdown) {
72 exitHook(callback => {
73 this.close(() => callback())
74 })
75 }
76 }
77
78 /**
79 * Returns a promise that resolves after the given duration.
80 *
81 * @param {*} duration
82 * @returns {Promise}
83 */
84 wait (duration) {
85 return new Promise(resolve => {
86 setTimeout(resolve, duration)
87 })
88 }
89
90 /**
91 * Pushes logs into the batch.
92 * If logEntry is given, pushes it straight to this.sendBatchToLoki()
93 *
94 * @param {*} logEntry
95 */
96 async pushLogEntry (logEntry) {
97 const noTimestamp =
98 logEntry && logEntry.entries && logEntry.entries[0].ts === undefined
99 // If user has decided to replace the given timestamps with a generated one, generate it
100 if (this.options.replaceTimestamp || noTimestamp) {
101 logEntry.entries[0].ts = Date.now()
102 }
103
104 // If protobuf is the used data type, construct the timestamps
105 if (!this.options.json) {
106 logEntry = protoHelpers.createProtoTimestamps(logEntry)
107 }
108
109 // If batching is not enabled, push the log immediately to Loki API
110 if (this.options.batching !== undefined && !this.options.batching) {
111 await this.sendBatchToLoki(logEntry)
112 } else {
113 const { streams } = this.batch
114
115 // Find if there's already a log with identical labels in the batch
116 const match = streams.findIndex(
117 stream => JSON.stringify(stream.labels) === JSON.stringify(logEntry.labels)
118 )
119
120 if (match > -1) {
121 // If there's a match, push the log under the same label
122 logEntry.entries.forEach(entry => {
123 streams[match].entries.push(entry)
124 })
125 } else {
126 // Otherwise, create a new label under streams
127 streams.push(logEntry)
128 }
129 }
130 }
131
132 /**
133 * Clears the batch.
134 */
135 clearBatch () {
136 this.batch.streams = []
137 }
138
139 /**
140 * Sends a batch to Grafana Loki push endpoint.
141 * If a single logEntry is given, creates a batch first around it.
142 *
143 * @param {*} logEntry
144 * @returns {Promise}
145 */
146 sendBatchToLoki (logEntry) {
147 return new Promise((resolve, reject) => {
148 // If the batch is empty, do nothing
149 if (this.batch.streams.length === 0 && !logEntry) {
150 resolve()
151 } else {
152 let reqBody
153
154 // If the data format is JSON, there's no need to construct a buffer
155 if (this.options.json) {
156 let preparedJSONBatch
157 if (logEntry !== undefined) {
158 // If a single logEntry is given, wrap it according to the batch format
159 preparedJSONBatch = protoHelpers.prepareJSONBatch({ streams: [logEntry] })
160 } else {
161 // Stringify the JSON ready for transport
162 preparedJSONBatch = protoHelpers.prepareJSONBatch(this.batch)
163 }
164 reqBody = JSON.stringify(preparedJSONBatch)
165 } else {
166 try {
167 let batch
168 if (logEntry !== undefined) {
169 // If a single logEntry is given, wrap it according to the batch format
170 batch = { streams: [logEntry] }
171 } else {
172 batch = this.batch
173 }
174
175 const preparedBatch = protoHelpers.prepareProtoBatch(batch)
176
177 // Check if the batch can be encoded in Protobuf and is correct format
178 const err = logproto.PushRequest.verify(preparedBatch)
179
180 // Reject the promise if the batch is not of correct format
181 if (err) reject(err)
182
183 // Create the PushRequest object
184 const message = logproto.PushRequest.create(preparedBatch)
185 // Encode the PushRequest object and create the binary buffer
186 const buffer = logproto.PushRequest.encode(message).finish()
187 // Compress the buffer with snappy
188 reqBody = snappy.compressSync(buffer)
189 } catch (err) {
190 reject(err)
191 }
192 }
193
194 // Send the data to Grafana Loki
195 req.post(this.url, this.contentType, this.options.headers, reqBody, this.options.timeout)
196 .then(() => {
197 // No need to clear the batch if batching is disabled
198 logEntry === undefined && this.clearBatch()
199 resolve()
200 })
201 .catch(err => {
202 // Clear the batch on error if enabled
203 this.options.clearOnError && this.clearBatch()
204 reject(err)
205 })
206 }
207 })
208 }
209
210 /**
211 * Runs the batch push loop.
212 *
213 * Sends the batch to Loki and waits for
214 * the amount of this.interval between requests.
215 */
216 async run () {
217 this.runLoop = true
218 while (this.runLoop) {
219 try {
220 await this.sendBatchToLoki()
221 if (this.interval === this.circuitBreakerInterval) {
222 if (this.options.interval !== undefined) {
223 this.interval = Number(this.options.interval) * 1000
224 } else {
225 this.interval = 5000
226 }
227 }
228 } catch (e) {
229 this.interval = this.circuitBreakerInterval
230 }
231 await this.wait(this.interval)
232 }
233 }
234
235 /**
236 * Stops the batch push loop
237 *
238 * @param {() => void} [callback]
239 */
240 close (callback) {
241 this.runLoop = false
242 this.sendBatchToLoki()
243 .then(() => { if (callback) { callback() } }) // maybe should emit something here
244 .catch(() => { if (callback) { callback() } }) // maybe should emit something here
245 }
246}
247
248module.exports = Batcher