UNPKG

2.82 kBJavaScriptView Raw
1const got = require('got')
2const url = require('url')
3const { logproto } = require('./proto')
4const snappy = require('snappy')
5const protoHelpers = require('./proto/helpers')
6
7module.exports = class Batcher {
8 constructor (options) {
9 this.options = options
10 this.url = new url.URL(this.options.host + '/api/prom/push').toString()
11 this.interval = this.options.interval
12 ? Number(this.options.interval) * 1000
13 : 5000
14 this.circuitBreakerInterval = 60000
15 this.batch = {
16 streams: []
17 }
18 this.contentType = 'application/x-protobuf'
19 if (this.options.json) {
20 this.contentType = 'application/json'
21 }
22 }
23
24 wait (duration) {
25 return new Promise(resolve => {
26 setTimeout(resolve, duration)
27 })
28 }
29
30 pushLogEntry (logEntry) {
31 if (this.options.json) {
32 this.batch.streams.push(logEntry)
33 } else {
34 const { streams } = this.batch
35 logEntry = protoHelpers.createProtoTimestamps(logEntry)
36 const match = streams.findIndex(
37 stream => stream.labels === logEntry.labels
38 )
39 if (match > -1) {
40 logEntry.entries.forEach(entry => {
41 streams[match].entries.push(entry)
42 })
43 } else {
44 streams.push(logEntry)
45 }
46 }
47 if (
48 this.options.batching !== undefined &&
49 this.options.batching === false
50 ) {
51 this.sendBatchToLoki()
52 }
53 }
54
55 clearBatch () {
56 this.batch.streams = []
57 }
58
59 sendBatchToLoki () {
60 return new Promise((resolve, reject) => {
61 if (this.batch.streams.length === 0) {
62 resolve()
63 } else {
64 let reqBody
65 if (this.options.json) {
66 reqBody = JSON.stringify(this.batch)
67 } else {
68 try {
69 const batch = protoHelpers.sortBatch(this.batch)
70 const err = logproto.PushRequest.verify(batch)
71 if (err) reject(err)
72 const message = logproto.PushRequest.create(batch)
73 const buffer = logproto.PushRequest.encode(message).finish()
74 reqBody = snappy.compressSync(buffer)
75 } catch (err) {
76 reject(err)
77 }
78 }
79 got
80 .post(this.url, {
81 body: reqBody,
82 headers: {
83 'content-type': this.contentType
84 }
85 })
86 .then(res => {
87 this.clearBatch()
88 resolve()
89 })
90 .catch(err => {
91 reject(err)
92 })
93 }
94 })
95 }
96
97 async run () {
98 while (true) {
99 try {
100 await this.sendBatchToLoki()
101 if (this.interval === this.circuitBreakerInterval) {
102 this.interval = Number(this.options.interval) * 1000
103 }
104 } catch (e) {
105 this.interval = this.circuitBreakerInterval
106 }
107 await this.wait(this.interval)
108 }
109 }
110}