UNPKG

3.45 kBJavaScriptView Raw
1const got = require('got')
2const url = require('url')
3const { logproto } = require('./proto')
4const snappy = require('snappy')
5const protoHelpers = require('./proto/helpers')
6
7module.exports = class Batcher {
8 constructor (options) {
9 this.options = options
10 this.url = new url.URL(this.options.host + '/api/prom/push').toString()
11 this.interval = this.options.interval
12 ? Number(this.options.interval) * 1000
13 : 5000
14 this.circuitBreakerInterval = 60000
15 this.batch = {
16 streams: []
17 }
18 this.contentType = 'application/x-protobuf'
19 if (this.options.json) {
20 this.contentType = 'application/json'
21 }
22 this.options.batching && this.run()
23 }
24
25 wait (duration) {
26 return new Promise(resolve => {
27 setTimeout(resolve, duration)
28 })
29 }
30
31 pushLogEntry (logEntry) {
32 if (this.options.replaceTimestamp) {
33 logEntry.entries[0].ts = Date.now()
34 }
35
36 if (
37 this.options.batching !== undefined &&
38 this.options.batching === false
39 ) {
40 if (!this.options.json) {
41 logEntry = protoHelpers.createProtoTimestamps(logEntry)
42 }
43 this.sendBatchToLoki(logEntry)
44 } else {
45 if (this.options.json) {
46 this.batch.streams.push(logEntry)
47 } else {
48 const { streams } = this.batch
49
50 logEntry = protoHelpers.createProtoTimestamps(logEntry)
51
52 const match = streams.findIndex(
53 stream => stream.labels === logEntry.labels
54 )
55
56 if (match > -1) {
57 logEntry.entries.forEach(entry => {
58 streams[match].entries.push(entry)
59 })
60 } else {
61 streams.push(logEntry)
62 }
63 }
64 }
65 }
66
67 clearBatch () {
68 this.batch.streams = []
69 }
70
71 sendBatchToLoki (logEntry) {
72 return new Promise((resolve, reject) => {
73 if (this.batch.streams.length === 0 && !logEntry) {
74 resolve()
75 } else {
76 let reqBody
77 if (this.options.json) {
78 if (logEntry) {
79 reqBody = JSON.stringify({ streams: [logEntry] })
80 } else {
81 reqBody = JSON.stringify(this.batch)
82 }
83 } else {
84 try {
85 let batch
86 if (logEntry) {
87 batch = { streams: [logEntry] }
88 } else {
89 batch = protoHelpers.sortBatch(this.batch)
90 }
91 const err = logproto.PushRequest.verify(batch)
92 if (err) reject(err)
93 const message = logproto.PushRequest.create(batch)
94 const buffer = logproto.PushRequest.encode(message).finish()
95 reqBody = snappy.compressSync(buffer)
96 } catch (err) {
97 reject(err)
98 }
99 }
100 got
101 .post(this.url, {
102 body: reqBody,
103 headers: {
104 'content-type': this.contentType
105 }
106 })
107 .then(res => {
108 !logEntry && this.clearBatch()
109 resolve()
110 })
111 .catch(err => {
112 this.options.clearOnError && this.clearBatch()
113 reject(err)
114 })
115 }
116 })
117 }
118
119 async run () {
120 while (true) {
121 try {
122 await this.sendBatchToLoki()
123 if (this.interval === this.circuitBreakerInterval) {
124 this.interval = Number(this.options.interval) * 1000
125 }
126 } catch (e) {
127 this.interval = this.circuitBreakerInterval
128 }
129 await this.wait(this.interval)
130 }
131 }
132}