UNPKG

6.27 kBJavaScriptView Raw
1const got = require('got')
2const url = require('url')
3const exitHook = require('async-exit-hook')
4
5const { logproto } = require('./proto')
6const protoHelpers = require('./proto/helpers')
7let snappy = false
8
9/**
10 * A batching transport layer for Grafana Loki
11 *
12 * @class Batcher
13 */
14class Batcher {
15 loadSnappy () {
16 return require('snappy')
17 }
18 /**
19 * Creates an instance of Batcher.
20 * Starts the batching loop if enabled.
21 * @param {*} options
22 * @memberof Batcher
23 */
24 constructor (options) {
25 // Load given options to the object
26 this.options = options
27
28 // Construct Grafana Loki push API url
29 this.url = new url.URL(this.options.host + '/api/prom/push').toString()
30
31 // Define the batching intervals
32 this.interval = this.options.interval
33 ? Number(this.options.interval) * 1000
34 : 5000
35 this.circuitBreakerInterval = 60000
36
37 // Initialize the log batch
38 this.batch = {
39 streams: []
40 }
41
42 // If snappy binaries have not been built, fallback to JSON transport
43 if (!this.options.json) {
44 try {
45 snappy = this.loadSnappy()
46 } catch (error) {
47 this.options.json = true
48 }
49 if (!snappy) {
50 this.options.json = true
51 }
52 }
53
54 // Define the content type headers for the POST request based on the data type
55 this.contentType = 'application/x-protobuf'
56 if (this.options.json) {
57 this.contentType = 'application/json'
58 }
59
60 // If batching is enabled, run the loop
61 this.options.batching && this.run()
62
63 exitHook(callback => {
64 this.sendBatchToLoki()
65 .then(() => callback())
66 .catch(() => callback())
67 })
68 }
69
70 /**
71 * Returns a promise that resolves after the given duration.
72 *
73 * @param {*} duration
74 * @returns {Promise}
75 */
76 wait (duration) {
77 return new Promise(resolve => {
78 setTimeout(resolve, duration)
79 })
80 }
81
82 /**
83 * Pushes logs into the batch.
84 * If logEntry is given, pushes it straight to this.sendBatchToLoki()
85 *
86 * @param {*} logEntry
87 */
88 async pushLogEntry (logEntry) {
89 const noTimestamp =
90 logEntry && logEntry.entries && logEntry.entries[0].ts === undefined
91 // If user has decided to replace the given timestamps with a generated one, generate it
92 if (this.options.replaceTimestamp || noTimestamp) {
93 logEntry.entries[0].ts = Date.now()
94 }
95
96 // If protobuf is the used data type, construct the timestamps
97 if (!this.options.json) {
98 logEntry = protoHelpers.createProtoTimestamps(logEntry)
99 }
100
101 // If batching is not enabled, push the log immediately to Loki API
102 if (this.options.batching !== undefined && !this.options.batching) {
103 await this.sendBatchToLoki(logEntry)
104 } else {
105 const { streams } = this.batch
106
107 // Find if there's already a log with identical labels in the batch
108 const match = streams.findIndex(
109 stream => stream.labels === logEntry.labels
110 )
111
112 if (match > -1) {
113 // If there's a match, push the log under the same label
114 logEntry.entries.forEach(entry => {
115 streams[match].entries.push(entry)
116 })
117 } else {
118 // Otherwise, create a new label under streams
119 streams.push(logEntry)
120 }
121 }
122 }
123
124 /**
125 * Clears the batch.
126 */
127 clearBatch () {
128 this.batch.streams = []
129 }
130
131 /**
132 * Sends a batch to Grafana Loki push endpoint.
133 * If a single logEntry is given, creates a batch first around it.
134 *
135 * @param {*} logEntry
136 * @returns {Promise}
137 */
138 sendBatchToLoki (logEntry) {
139 return new Promise((resolve, reject) => {
140 // If the batch is empty, do nothing
141 if (this.batch.streams.length === 0 && !logEntry) {
142 resolve()
143 } else {
144 let reqBody
145
146 // If the data format is JSON, there's no need to construct a buffer
147 if (this.options.json) {
148 if (logEntry !== undefined) {
149 // If a single logEntry is given, wrap it according to the batch format
150 reqBody = JSON.stringify({ streams: [logEntry] })
151 } else {
152 // Stringify the JSON ready for transport
153 reqBody = JSON.stringify(reqBody)
154 }
155 } else {
156 try {
157 let batch
158 if (logEntry !== undefined) {
159 // If a single logEntry is given, wrap it according to the batch format
160 batch = { streams: [logEntry] }
161 } else {
162 batch = this.batch
163 }
164
165 // Check if the batch can be encoded in Protobuf and is correct format
166 const err = logproto.PushRequest.verify(batch)
167
168 // Reject the promise if the batch is not of correct format
169 if (err) reject(err)
170
171 // Create the PushRequest object
172 const message = logproto.PushRequest.create(batch)
173
174 // Encode the PushRequest object and create the binary buffer
175 const buffer = logproto.PushRequest.encode(message).finish()
176
177 // Compress the buffer with snappy
178 reqBody = snappy.compressSync(buffer)
179 } catch (err) {
180 reject(err)
181 }
182 }
183
184 // Send the data to Grafana Loki
185 got
186 .post(this.url, {
187 body: reqBody,
188 headers: {
189 'content-type': this.contentType
190 }
191 })
192 .then(res => {
193 // No need to clear the batch if batching is disabled
194 logEntry === undefined && this.clearBatch()
195 resolve()
196 })
197 .catch(err => {
198 // Clear the batch on error if enabled
199 this.options.clearOnError && this.clearBatch()
200 reject(err)
201 })
202 }
203 })
204 }
205
206 /**
207 * Runs the batch push loop.
208 *
209 * Sends the batch to Loki and waits for
210 * the amount of this.interval between requests.
211 */
212 async run () {
213 while (true) {
214 try {
215 await this.sendBatchToLoki()
216 if (this.interval === this.circuitBreakerInterval) {
217 this.interval = Number(this.options.interval) * 1000
218 }
219 } catch (e) {
220 this.interval = this.circuitBreakerInterval
221 }
222 await this.wait(this.interval)
223 }
224 }
225}
226
227module.exports = Batcher