UNPKG

6.45 kBJavaScriptView Raw
1const got = require('got')
2const url = require('url')
3const { logproto } = require('./proto')
4const protoHelpers = require('./proto/helpers')
5let snappy = false
6
7/**
8 * A batching transport layer for Grafana Loki
9 *
10 * @class Batcher
11 */
12class Batcher {
13 loadSnappy () {
14 return require('snappy')
15 }
16 /**
17 * Creates an instance of Batcher.
18 * Starts the batching loop if enabled.
19 * @param {*} options
20 * @memberof Batcher
21 */
22 constructor (options) {
23 // Load given options to the object
24 this.options = options
25
26 // Construct Grafana Loki push API url
27 this.url = new url.URL(this.options.host + '/api/prom/push').toString()
28
29 // Define the batching intervals
30 this.interval = this.options.interval
31 ? Number(this.options.interval) * 1000
32 : 5000
33 this.circuitBreakerInterval = 60000
34
35 // Initialize the log batch
36 this.batch = {
37 streams: []
38 }
39
40 // If snappy binaries have not been built, fallback to JSON transport
41 if (!this.options.json) {
42 try {
43 snappy = this.loadSnappy()
44 } catch (error) {
45 this.options.json = true
46 }
47 if (!snappy) {
48 this.options.json = true
49 }
50 }
51
52 // Define the content type headers for the POST request based on the data type
53 this.contentType = 'application/x-protobuf'
54 if (this.options.json) {
55 this.contentType = 'application/json'
56 }
57
58 // If batching is enabled, run the loop
59 this.options.batching && this.run()
60 }
61
62 /**
63 * Returns a promise that resolves after the given duration.
64 *
65 * @param {*} duration
66 * @returns {Promise}
67 */
68 wait (duration) {
69 return new Promise(resolve => {
70 setTimeout(resolve, duration)
71 })
72 }
73
74 /**
75 * Pushes logs into the batch.
76 * If logEntry is given, pushes it straight to this.sendBatchToLoki()
77 *
78 * @param {*} logEntry
79 */
80 async pushLogEntry (logEntry) {
81 // If user has decided to replace the given timestamps with a generated one, generate it
82 if (this.options.replaceTimestamp) {
83 logEntry.entries[0].ts = Date.now()
84 }
85
86 // If protobuf is the used data type, construct the timestamps
87 if (!this.options.json) {
88 logEntry = protoHelpers.createProtoTimestamps(logEntry)
89 }
90
91 // If batching is not enabled, push the log immediately to Loki API
92 if (this.options.batching !== undefined && !this.options.batching) {
93 console.log('Sending batch straight to loki')
94 await this.sendBatchToLoki(logEntry)
95 } else {
96 const { streams } = this.batch
97
98 // Find if there's already a log with identical labels in the batch
99 const match = streams.findIndex(
100 stream => stream.labels === logEntry.labels
101 )
102
103 if (match > -1) {
104 // If there's a match, push the log under the same label
105 logEntry.entries.forEach(entry => {
106 streams[match].entries.push(entry)
107 })
108 } else {
109 // Otherwise, create a new label under streams
110 streams.push(logEntry)
111 }
112 }
113 }
114
115 /**
116 * Clears the batch.
117 */
118 clearBatch () {
119 this.batch.streams = []
120 }
121
122 /**
123 * Sends a batch to Grafana Loki push endpoint.
124 * If a single logEntry is given, creates a batch first around it.
125 *
126 * @param {*} logEntry
127 * @returns {Promise}
128 */
129 sendBatchToLoki (logEntry) {
130 // Flag of replacing timestamps on error
131 const replace =
132 this.interval === this.circuitBreakerInterval &&
133 this.options.replaceOnError
134
135 return new Promise((resolve, reject) => {
136 // If the batch is empty, do nothing
137 if (this.batch.streams.length === 0 && !logEntry) {
138 resolve()
139 } else {
140 let reqBody
141
142 // If the data format is JSON, there's no need to construct a buffer
143 if (this.options.json) {
144 if (logEntry !== undefined) {
145 // If a single logEntry is given, wrap it according to the batch format
146 reqBody = JSON.stringify({ streams: [logEntry] })
147 } else {
148 // Sort the batch and ensure that there are no duplicate timestamps
149 reqBody = protoHelpers.sortBatch(this.batch, replace)
150 // Stringify the JSON ready for transport
151 reqBody = JSON.stringify(reqBody)
152 }
153 } else {
154 try {
155 let batch
156 if (logEntry !== undefined) {
157 // If a single logEntry is given, wrap it according to the batch format
158 batch = { streams: [logEntry] }
159 } else {
160 // Sort the batch and ensure that there are no duplicate timestamps
161 batch = protoHelpers.sortBatch(this.batch, replace)
162 }
163
164 // Check if the batch can be encoded in Protobuf and is correct format
165 const err = logproto.PushRequest.verify(batch)
166
167 // Reject the promise if the batch is not of correct format
168 if (err) reject(err)
169
170 // Create the PushRequest object
171 const message = logproto.PushRequest.create(batch)
172
173 // Encode the PushRequest object and create the binary buffer
174 const buffer = logproto.PushRequest.encode(message).finish()
175
176 // Compress the buffer with snappy
177 reqBody = snappy.compressSync(buffer)
178 } catch (err) {
179 reject(err)
180 }
181 }
182
183 // Send the data to Grafana Loki
184 got
185 .post(this.url, {
186 body: reqBody,
187 headers: {
188 'content-type': this.contentType
189 }
190 })
191 .then(res => {
192 // No need to clear the batch if batching is disabled
193 logEntry === undefined && this.clearBatch()
194 resolve()
195 })
196 .catch(err => {
197 // Clear the batch on error if enabled
198 this.options.clearOnError && this.clearBatch()
199 reject(err)
200 })
201 }
202 })
203 }
204
205 /**
206 * Runs the batch push loop.
207 *
208 * Sends the batch to Loki and waits for
209 * the amount of this.interval between requests.
210 */
211 async run () {
212 while (true) {
213 try {
214 await this.sendBatchToLoki()
215 if (this.interval === this.circuitBreakerInterval) {
216 this.interval = Number(this.options.interval) * 1000
217 }
218 } catch (e) {
219 this.interval = this.circuitBreakerInterval
220 }
221 await this.wait(this.interval)
222 }
223 }
224}
225
226module.exports = Batcher