1 | const url = require('url')
|
2 | const exitHook = require('async-exit-hook')
|
3 |
|
4 | const { logproto } = require('./proto')
|
5 | const protoHelpers = require('./proto/helpers')
|
6 | const req = require('./requests')
|
7 | let snappy = false
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 | class Batcher {
|
15 | loadSnappy () {
|
16 | return require('snappy')
|
17 | }
|
18 |
|
19 | |
20 |
|
21 |
|
22 |
|
23 |
|
24 |
|
25 | constructor (options) {
|
26 |
|
27 | this.options = options
|
28 |
|
29 |
|
30 | this.url = new url.URL(this.options.host + '/loki/api/v1/push')
|
31 |
|
32 |
|
33 | if (options.basicAuth) {
|
34 | const btoa = require('btoa')
|
35 | const basicAuth = 'Basic ' + btoa(options.basicAuth)
|
36 | this.options.headers = Object.assign(this.options.headers, { Authorization: basicAuth })
|
37 | }
|
38 |
|
39 |
|
40 | this.interval = this.options.interval
|
41 | ? Number(this.options.interval) * 1000
|
42 | : 5000
|
43 | this.circuitBreakerInterval = 60000
|
44 |
|
45 |
|
46 | this.batch = {
|
47 | streams: []
|
48 | }
|
49 |
|
50 |
|
51 | if (!this.options.json) {
|
52 | try {
|
53 | snappy = this.loadSnappy()
|
54 | } catch (error) {
|
55 | this.options.json = true
|
56 | }
|
57 | if (!snappy) {
|
58 | this.options.json = true
|
59 | }
|
60 | }
|
61 |
|
62 |
|
63 | this.contentType = 'application/x-protobuf'
|
64 | if (this.options.json) {
|
65 | this.contentType = 'application/json'
|
66 | }
|
67 |
|
68 |
|
69 | this.options.batching && this.run()
|
70 |
|
71 | if (this.options.gracefulShutdown) {
|
72 | exitHook(callback => {
|
73 | this.close(() => callback())
|
74 | })
|
75 | }
|
76 | }
|
77 |
|
78 | |
79 |
|
80 |
|
81 |
|
82 |
|
83 |
|
84 | wait (duration) {
|
85 | return new Promise(resolve => {
|
86 | setTimeout(resolve, duration)
|
87 | })
|
88 | }
|
89 |
|
90 | |
91 |
|
92 |
|
93 |
|
94 |
|
95 |
|
96 | async pushLogEntry (logEntry) {
|
97 | const noTimestamp =
|
98 | logEntry && logEntry.entries && logEntry.entries[0].ts === undefined
|
99 |
|
100 | if (this.options.replaceTimestamp || noTimestamp) {
|
101 | logEntry.entries[0].ts = Date.now()
|
102 | }
|
103 |
|
104 |
|
105 | if (!this.options.json) {
|
106 | logEntry = protoHelpers.createProtoTimestamps(logEntry)
|
107 | }
|
108 |
|
109 |
|
110 | if (this.options.batching !== undefined && !this.options.batching) {
|
111 | await this.sendBatchToLoki(logEntry)
|
112 | } else {
|
113 | const { streams } = this.batch
|
114 |
|
115 |
|
116 | const match = streams.findIndex(
|
117 | stream => JSON.stringify(stream.labels) === JSON.stringify(logEntry.labels)
|
118 | )
|
119 |
|
120 | if (match > -1) {
|
121 |
|
122 | logEntry.entries.forEach(entry => {
|
123 | streams[match].entries.push(entry)
|
124 | })
|
125 | } else {
|
126 |
|
127 | streams.push(logEntry)
|
128 | }
|
129 | }
|
130 | }
|
131 |
|
132 | |
133 |
|
134 |
|
135 | clearBatch () {
|
136 | this.batch.streams = []
|
137 | }
|
138 |
|
139 | |
140 |
|
141 |
|
142 |
|
143 |
|
144 |
|
145 |
|
146 | sendBatchToLoki (logEntry) {
|
147 | return new Promise((resolve, reject) => {
|
148 |
|
149 | if (this.batch.streams.length === 0 && !logEntry) {
|
150 | resolve()
|
151 | } else {
|
152 | let reqBody
|
153 |
|
154 |
|
155 | if (this.options.json) {
|
156 | let preparedJSONBatch
|
157 | if (logEntry !== undefined) {
|
158 |
|
159 | preparedJSONBatch = protoHelpers.prepareJSONBatch({ streams: [logEntry] })
|
160 | } else {
|
161 |
|
162 | preparedJSONBatch = protoHelpers.prepareJSONBatch(this.batch)
|
163 | }
|
164 | reqBody = JSON.stringify(preparedJSONBatch)
|
165 | } else {
|
166 | try {
|
167 | let batch
|
168 | if (logEntry !== undefined) {
|
169 |
|
170 | batch = { streams: [logEntry] }
|
171 | } else {
|
172 | batch = this.batch
|
173 | }
|
174 |
|
175 | const preparedBatch = protoHelpers.prepareProtoBatch(batch)
|
176 |
|
177 |
|
178 | const err = logproto.PushRequest.verify(preparedBatch)
|
179 |
|
180 |
|
181 | if (err) reject(err)
|
182 |
|
183 |
|
184 | const message = logproto.PushRequest.create(preparedBatch)
|
185 |
|
186 | const buffer = logproto.PushRequest.encode(message).finish()
|
187 |
|
188 | reqBody = snappy.compressSync(buffer)
|
189 | } catch (err) {
|
190 | reject(err)
|
191 | }
|
192 | }
|
193 |
|
194 |
|
195 | req.post(this.url, this.contentType, this.options.headers, reqBody, this.options.timeout)
|
196 | .then(() => {
|
197 |
|
198 | logEntry === undefined && this.clearBatch()
|
199 | resolve()
|
200 | })
|
201 | .catch(err => {
|
202 |
|
203 | this.options.clearOnError && this.clearBatch()
|
204 |
|
205 | this.options.onConnectionError !== undefined && this.options.onConnectionError(err)
|
206 |
|
207 | reject(err)
|
208 | })
|
209 | }
|
210 | })
|
211 | }
|
212 |
|
213 | |
214 |
|
215 |
|
216 |
|
217 |
|
218 |
|
219 | async run () {
|
220 | this.runLoop = true
|
221 | while (this.runLoop) {
|
222 | try {
|
223 | await this.sendBatchToLoki()
|
224 | if (this.interval === this.circuitBreakerInterval) {
|
225 | if (this.options.interval !== undefined) {
|
226 | this.interval = Number(this.options.interval) * 1000
|
227 | } else {
|
228 | this.interval = 5000
|
229 | }
|
230 | }
|
231 | } catch (e) {
|
232 | this.interval = this.circuitBreakerInterval
|
233 | }
|
234 | await this.wait(this.interval)
|
235 | }
|
236 | }
|
237 |
|
238 | |
239 |
|
240 |
|
241 |
|
242 |
|
243 | close (callback) {
|
244 | this.runLoop = false
|
245 | this.sendBatchToLoki()
|
246 | .then(() => { if (callback) { callback() } })
|
247 | .catch(() => { if (callback) { callback() } })
|
248 | }
|
249 | }
|
250 |
|
251 | module.exports = Batcher
|