1 |
|
2 |
|
3 | import autoBind from 'auto-bind'
|
4 |
|
5 | import System from './System'
|
6 | import Module from './Module'
|
7 | import { sleep } from './asyncUtils'
|
8 | import { SECOND_MS, MINUTE_MS } from './consts'
|
9 |
|
10 | const { MODULE_NAME, log, warn, error, noteGauge, noteCount, noteTimer, trackOp } = new Module(__filename)
|
11 |
|
12 | type ExecutionOptions = {|
|
13 | dontTerminateOnCompletion?: ?boolean,
|
14 | |}
|
15 |
|
16 | export default class Execution<T> {
|
17 |
|
18 | _options: ExecutionOptions
|
19 | _startTimeMS: ?number
|
20 | _endTimeMS: ?number
|
21 | _terminating: boolean = false
|
22 | _onTerminateHandlers: Array<(err: ?Error, data: ?T) => Promise<void>> = []
|
23 |
|
24 | |
25 |
|
26 |
|
27 |
|
28 | terminationHandler: ?(err: ?Error, data: ?T) => Promise<void>
|
29 |
|
30 |
|
31 | constructor(options?: ?ExecutionOptions) {
|
32 | this._options = options || {
|
33 | dontTerminateOnCompletion: false,
|
34 | }
|
35 |
|
36 | autoBind(this)
|
37 | }
|
38 |
|
39 |
|
40 | async run(runFunc: () => Promise<T>): Promise<void> {
|
41 | try {
|
42 | noteCount(`execution.start`)
|
43 |
|
44 |
|
45 | process.on('SIGINT', async () => await this._terminate(new Error('SIGINT')))
|
46 |
|
47 | if (System.getConfig().trackVitalsIntervalMS) {
|
48 | this._trackVitals()
|
49 | }
|
50 |
|
51 | this._startTimeMS = Date.now()
|
52 | log('Execution started')
|
53 | const returnValue = await runFunc()
|
54 |
|
55 | if (!this._options.dontTerminateOnCompletion) {
|
56 | await this._terminate(null, returnValue)
|
57 | }
|
58 | } catch (err) {
|
59 | await this._terminate(err)
|
60 | }
|
61 | }
|
62 |
|
63 | |
64 |
|
65 |
|
66 |
|
67 | addOnTerminateHandler(onTerminateHandler: (err: ?Error, data: ?T) => Promise<void>): void {
|
68 | this._onTerminateHandlers.push(onTerminateHandler)
|
69 | }
|
70 |
|
71 |
|
72 | async _trackVitals() {
|
73 | while (!this._terminating) {
|
74 | try {
|
75 | const used = process.memoryUsage()
|
76 | for (const key in used) {
|
77 | noteGauge(`memory.${key}MB`, Math.round(used[key] / 1024 / 1024 * 100) / 100)
|
78 | }
|
79 | await sleep(System.getConfig().trackVitalsIntervalMS || 10 * SECOND_MS)
|
80 | } catch (err) {
|
81 | error('Failed to track vitals', { err })
|
82 | await sleep(1 * MINUTE_MS)
|
83 | }
|
84 | }
|
85 | }
|
86 |
|
87 | async _terminate(terminationError: ?Error, data: ?T) {
|
88 | if (this._terminating)
|
89 | return
|
90 | this._terminating = true
|
91 |
|
92 | this._endTimeMS = Date.now()
|
93 |
|
94 | const durationMS = this._endTimeMS - this._startTimeMS
|
95 |
|
96 | if (terminationError == null) {
|
97 | await noteCount(`execution.succeed`)
|
98 | await noteTimer('execution.succeed_duration', durationMS)
|
99 | log('Execution ended successfully', { durationMS })
|
100 | } else if (terminationError.message === 'SIGINT') {
|
101 | await noteCount(`execution.terminate`)
|
102 | await noteTimer('execution.terminate_duration', durationMS)
|
103 | warn('Execution terminated by SIGINT signal, terminating...', { durationMS })
|
104 | } else {
|
105 | await noteCount(`execution.fail`)
|
106 | await noteTimer('execution.fail_duration', durationMS)
|
107 | error('Execution threw an unhandled exception, terminating...', { durationMS, err: terminationError })
|
108 | }
|
109 | await noteCount('execution.end')
|
110 | await noteTimer('execution.end_duration', durationMS)
|
111 |
|
112 |
|
113 | for (const onTerminateHandler of this._onTerminateHandlers) {
|
114 | try {
|
115 | await onTerminateHandler(terminationError, data)
|
116 | } catch (err) {
|
117 | error('one of the onTerminate handlers threw an error', { err })
|
118 | }
|
119 | }
|
120 |
|
121 | await System.flush()
|
122 |
|
123 |
|
124 | if (!this.terminationHandler) {
|
125 | process.exit(terminationError == null ? 0 : 1)
|
126 | } else {
|
127 |
|
128 | try {
|
129 | await this.terminationHandler(terminationError, data)
|
130 | } catch (err) {
|
131 | error('terminationHandler threw an error', { err })
|
132 | await System.flush()
|
133 | process.exit(1)
|
134 | }
|
135 | }
|
136 | }
|
137 |
|
138 | }
|