1 | const moment = require('moment')
|
2 |
|
3 | module.exports = {
|
4 | createProtoTimestamps: logEntry => {
|
5 | if (logEntry && logEntry.entries && logEntry.entries.length > 0) {
|
6 | logEntry.entries = logEntry.entries.map(entry => {
|
7 | const dt = moment(entry.ts).valueOf()
|
8 | return {
|
9 | timestamp: {
|
10 | seconds: Math.floor(dt / 1000),
|
11 | nanos: (dt % 1000) * 1000
|
12 | },
|
13 | line: entry.line
|
14 | }
|
15 | })
|
16 | }
|
17 | return logEntry
|
18 | },
|
19 | sortBatch: (batch, replace) => {
|
20 | let max
|
21 |
|
22 | if (
|
23 | batch.streams[0] &&
|
24 | batch.streams[0].entries &&
|
25 | batch.streams[0].entries.find(
|
26 | entry => entry.timestamp && entry.timestamp.seconds
|
27 | )
|
28 | ) {
|
29 | max = {
|
30 | seconds: 0,
|
31 | nanos: 0
|
32 | }
|
33 |
|
34 | batch.streams = batch.streams.map(stream => {
|
35 | if (replace) {
|
36 | stream.entries = stream.entries.map(entry => {
|
37 | const dt = moment(Date.now()).valueOf()
|
38 | entry.timestamp = {
|
39 | seconds: Math.floor(dt / 1000),
|
40 | nanos: (dt % 1000) * 1000
|
41 | }
|
42 | return entry
|
43 | })
|
44 | }
|
45 |
|
46 | stream.entries = stream.entries.sort(
|
47 | (a, b) =>
|
48 | a.timestamp.seconds - b.timestamp.seconds ||
|
49 | a.timestamp.nanos - b.timestamp.nanos
|
50 | )
|
51 |
|
52 |
|
53 | stream.entries = stream.entries.map(entry => {
|
54 | const { seconds, nanos } = entry.timestamp
|
55 |
|
56 | if (max.seconds === seconds && max.nanos === nanos) {
|
57 | if (nanos === 999999) {
|
58 | entry.timestamp.nanos = 0
|
59 | entry.timestamp.seconds++
|
60 | } else {
|
61 | entry.timestamp.nanos++
|
62 | }
|
63 | } else if (max.seconds === seconds && max.nanos > nanos) {
|
64 | entry.timestamp = JSON.parse(JSON.stringify(max))
|
65 | entry.timestamp.nanos++
|
66 | }
|
67 |
|
68 | max = JSON.parse(JSON.stringify(entry.timestamp))
|
69 |
|
70 | return entry
|
71 | })
|
72 |
|
73 | return stream
|
74 | })
|
75 | } else {
|
76 | max = 0
|
77 |
|
78 | batch.streams = batch.streams.map(stream => {
|
79 | if (replace) {
|
80 | stream.entries = stream.entries.map(entry => {
|
81 | entry.ts = Date.now()
|
82 | return entry
|
83 | })
|
84 | }
|
85 |
|
86 |
|
87 | stream.entries = stream.entries.sort((a, b) => a.ts - b.ts)
|
88 |
|
89 |
|
90 | stream.entries = stream.entries.map(entry => {
|
91 | if (max === entry.ts) {
|
92 | entry.ts++
|
93 | } else if (max > entry.ts) {
|
94 | entry.ts = max
|
95 | entry.ts++
|
96 | }
|
97 | max = entry.ts
|
98 | return entry
|
99 | })
|
100 |
|
101 | return stream
|
102 | })
|
103 | }
|
104 | return batch
|
105 | }
|
106 | }
|