UNPKG

2.87 kBJavaScriptView Raw
1const moment = require('moment')
2
3module.exports = {
4 createProtoTimestamps: logEntry => {
5 if (logEntry && logEntry.entries && logEntry.entries.length > 0) {
6 logEntry.entries = logEntry.entries.map(entry => {
7 const dt = moment(entry.ts).valueOf()
8 return {
9 timestamp: {
10 seconds: Math.floor(dt / 1000),
11 nanos: (dt % 1000) * 1000
12 },
13 line: entry.line
14 }
15 })
16 }
17 return logEntry
18 },
19 sortBatch: (batch, replace) => {
20 let max
21
22 if (
23 batch.streams[0] &&
24 batch.streams[0].entries &&
25 batch.streams[0].entries.find(
26 entry => entry.timestamp && entry.timestamp.seconds
27 )
28 ) {
29 max = {
30 seconds: 0,
31 nanos: 0
32 }
33
34 batch.streams = batch.streams.map(stream => {
35 if (replace) {
36 stream.entries = stream.entries.map(entry => {
37 const dt = moment(Date.now()).valueOf()
38 entry.timestamp = {
39 seconds: Math.floor(dt / 1000),
40 nanos: (dt % 1000) * 1000
41 }
42 return entry
43 })
44 }
45 // Sort the entries first by seconds and then by nanoseconds
46 stream.entries = stream.entries.sort(
47 (a, b) =>
48 a.timestamp.seconds - b.timestamp.seconds ||
49 a.timestamp.nanos - b.timestamp.nanos
50 )
51
52 // Then ensure that there's no duplicate entries by nanosecond
53 stream.entries = stream.entries.map(entry => {
54 const { seconds, nanos } = entry.timestamp
55
56 if (max.seconds === seconds && max.nanos === nanos) {
57 if (nanos === 999999) {
58 entry.timestamp.nanos = 0
59 entry.timestamp.seconds++
60 } else {
61 entry.timestamp.nanos++
62 }
63 } else if (max.seconds === seconds && max.nanos > nanos) {
64 entry.timestamp = JSON.parse(JSON.stringify(max))
65 entry.timestamp.nanos++
66 }
67
68 max = JSON.parse(JSON.stringify(entry.timestamp))
69
70 return entry
71 })
72
73 return stream
74 })
75 } else {
76 max = 0
77
78 batch.streams = batch.streams.map(stream => {
79 if (replace) {
80 stream.entries = stream.entries.map(entry => {
81 entry.ts = Date.now()
82 return entry
83 })
84 }
85
86 // Sort the entries
87 stream.entries = stream.entries.sort((a, b) => a.ts - b.ts)
88
89 // Then ensure that there's no duplicate entries by nanosecond
90 stream.entries = stream.entries.map(entry => {
91 if (max === entry.ts) {
92 entry.ts++
93 } else if (max > entry.ts) {
94 entry.ts = max
95 entry.ts++
96 }
97 max = entry.ts
98 return entry
99 })
100
101 return stream
102 })
103 }
104 return batch
105 }
106}