UNPKG

6.62 kBJavaScriptView Raw
1const got = require('got')
2const url = require('url')
3const exitHook = require('async-exit-hook')
4
5const { logproto } = require('./proto')
6const protoHelpers = require('./proto/helpers')
7let snappy = false
8
9/**
10 * A batching transport layer for Grafana Loki
11 *
12 * @class Batcher
13 */
14class Batcher {
15 loadSnappy () {
16 return require('snappy')
17 }
18 /**
19 * Creates an instance of Batcher.
20 * Starts the batching loop if enabled.
21 * @param {*} options
22 * @memberof Batcher
23 */
24 constructor (options) {
25 // Load given options to the object
26 this.options = options
27
28 // Construct Grafana Loki push API url
29 this.url = new url.URL(this.options.host + '/api/prom/push').toString()
30
31 // Define the batching intervals
32 this.interval = this.options.interval
33 ? Number(this.options.interval) * 1000
34 : 5000
35 this.circuitBreakerInterval = 60000
36
37 // Initialize the log batch
38 this.batch = {
39 streams: []
40 }
41
42 // If snappy binaries have not been built, fallback to JSON transport
43 if (!this.options.json) {
44 try {
45 snappy = this.loadSnappy()
46 } catch (error) {
47 this.options.json = true
48 }
49 if (!snappy) {
50 this.options.json = true
51 }
52 }
53
54 // Define the content type headers for the POST request based on the data type
55 this.contentType = 'application/x-protobuf'
56 if (this.options.json) {
57 this.contentType = 'application/json'
58 }
59
60 // If batching is enabled, run the loop
61 this.options.batching && this.run()
62
63 exitHook(callback => {
64 this.sendBatchToLoki()
65 .then(() => callback())
66 .catch(() => callback())
67 })
68 }
69
70 /**
71 * Returns a promise that resolves after the given duration.
72 *
73 * @param {*} duration
74 * @returns {Promise}
75 */
76 wait (duration) {
77 return new Promise(resolve => {
78 setTimeout(resolve, duration)
79 })
80 }
81
82 /**
83 * Pushes logs into the batch.
84 * If logEntry is given, pushes it straight to this.sendBatchToLoki()
85 *
86 * @param {*} logEntry
87 */
88 async pushLogEntry (logEntry) {
89 // If user has decided to replace the given timestamps with a generated one, generate it
90 if (this.options.replaceTimestamp) {
91 logEntry.entries[0].ts = Date.now()
92 }
93
94 // If protobuf is the used data type, construct the timestamps
95 if (!this.options.json) {
96 logEntry = protoHelpers.createProtoTimestamps(logEntry)
97 }
98
99 // If batching is not enabled, push the log immediately to Loki API
100 if (this.options.batching !== undefined && !this.options.batching) {
101 console.log('Sending batch straight to loki')
102 await this.sendBatchToLoki(logEntry)
103 } else {
104 const { streams } = this.batch
105
106 // Find if there's already a log with identical labels in the batch
107 const match = streams.findIndex(
108 stream => stream.labels === logEntry.labels
109 )
110
111 if (match > -1) {
112 // If there's a match, push the log under the same label
113 logEntry.entries.forEach(entry => {
114 streams[match].entries.push(entry)
115 })
116 } else {
117 // Otherwise, create a new label under streams
118 streams.push(logEntry)
119 }
120 }
121 }
122
123 /**
124 * Clears the batch.
125 */
126 clearBatch () {
127 this.batch.streams = []
128 }
129
130 /**
131 * Sends a batch to Grafana Loki push endpoint.
132 * If a single logEntry is given, creates a batch first around it.
133 *
134 * @param {*} logEntry
135 * @returns {Promise}
136 */
137 sendBatchToLoki (logEntry) {
138 // Flag of replacing timestamps on error
139 const replace =
140 this.interval === this.circuitBreakerInterval &&
141 this.options.replaceOnError
142
143 return new Promise((resolve, reject) => {
144 // If the batch is empty, do nothing
145 if (this.batch.streams.length === 0 && !logEntry) {
146 resolve()
147 } else {
148 let reqBody
149
150 // If the data format is JSON, there's no need to construct a buffer
151 if (this.options.json) {
152 if (logEntry !== undefined) {
153 // If a single logEntry is given, wrap it according to the batch format
154 reqBody = JSON.stringify({ streams: [logEntry] })
155 } else {
156 // Sort the batch and ensure that there are no duplicate timestamps
157 reqBody = protoHelpers.sortBatch(this.batch, replace)
158 // Stringify the JSON ready for transport
159 reqBody = JSON.stringify(reqBody)
160 }
161 } else {
162 try {
163 let batch
164 if (logEntry !== undefined) {
165 // If a single logEntry is given, wrap it according to the batch format
166 batch = { streams: [logEntry] }
167 } else {
168 // Sort the batch and ensure that there are no duplicate timestamps
169 batch = protoHelpers.sortBatch(this.batch, replace)
170 }
171
172 // Check if the batch can be encoded in Protobuf and is correct format
173 const err = logproto.PushRequest.verify(batch)
174
175 // Reject the promise if the batch is not of correct format
176 if (err) reject(err)
177
178 // Create the PushRequest object
179 const message = logproto.PushRequest.create(batch)
180
181 // Encode the PushRequest object and create the binary buffer
182 const buffer = logproto.PushRequest.encode(message).finish()
183
184 // Compress the buffer with snappy
185 reqBody = snappy.compressSync(buffer)
186 } catch (err) {
187 reject(err)
188 }
189 }
190
191 // Send the data to Grafana Loki
192 got
193 .post(this.url, {
194 body: reqBody,
195 headers: {
196 'content-type': this.contentType
197 }
198 })
199 .then(res => {
200 // No need to clear the batch if batching is disabled
201 logEntry === undefined && this.clearBatch()
202 resolve()
203 })
204 .catch(err => {
205 // Clear the batch on error if enabled
206 this.options.clearOnError && this.clearBatch()
207 reject(err)
208 })
209 }
210 })
211 }
212
213 /**
214 * Runs the batch push loop.
215 *
216 * Sends the batch to Loki and waits for
217 * the amount of this.interval between requests.
218 */
219 async run () {
220 while (true) {
221 try {
222 await this.sendBatchToLoki()
223 if (this.interval === this.circuitBreakerInterval) {
224 this.interval = Number(this.options.interval) * 1000
225 }
226 } catch (e) {
227 this.interval = this.circuitBreakerInterval
228 }
229 await this.wait(this.interval)
230 }
231 }
232}
233
234module.exports = Batcher