UNPKG

4.26 kBJavaScriptView Raw
1//@flow
2
3import autoBind from 'auto-bind'
4
5import System from './System'
6import Module from './Module'
7import { sleep } from './asyncUtils'
8import { SECOND_MS, MINUTE_MS } from './consts'
9
10const { MODULE_NAME, log, warn, error, noteGauge, noteCount, noteTimer, trackOp } = new Module(__filename) // eslint-disable-line no-unused-vars
11
12type ExecutionOptions = {|
13 dontTerminateOnCompletion?: ?boolean, // Used for cases where code returns but some callbacks still keep the process alive (like web server)
14|}
15
16export default class Execution<T> {
17
18 _options: ExecutionOptions
19 _startTimeMS: ?number
20 _endTimeMS: ?number
21 _terminating: boolean = false
22 _onTerminateHandlers: Array<(err: ?Error, data: ?T) => Promise<void>> = []
23
24 /**
25 * An async function that is called AFTER a termination has accured and infra resources were released,
26 * This handler is called after infra resources were flushed, so it should avoid using them.
27 */
28 terminationHandler: ?(err: ?Error, data: ?T) => Promise<void>
29
30
31 constructor(options?: ?ExecutionOptions) {
32 this._options = options || {
33 dontTerminateOnCompletion: false,
34 }
35
36 autoBind(this)
37 }
38
39
40 async run(runFunc: () => Promise<T>): Promise<void> {
41 try {
42 noteCount(`execution.start`)
43
44 // Catches ctrl+c and PM2 shutdown
45 process.on('SIGINT', async () => await this._terminate(new Error('SIGINT')))
46
47 if (System.getConfig().trackVitalsIntervalMS) {
48 this._trackVitals()
49 }
50
51 this._startTimeMS = Date.now()
52 log('Execution started')
53 const returnValue = await runFunc()
54
55 if (!this._options.dontTerminateOnCompletion) {
56 await this._terminate(null, returnValue)
57 }
58 } catch (err) {
59 await this._terminate(err)
60 }
61 }
62
63 /**
64 * Allows adding an async function that is called BEFORE a termination has accured.
65 * @param {*} terminationHandler
66 */
67 addOnTerminateHandler(onTerminateHandler: (err: ?Error, data: ?T) => Promise<void>): void {
68 this._onTerminateHandlers.push(onTerminateHandler)
69 }
70
71
72 async _trackVitals() {
73 while (!this._terminating) {
74 try {
75 const used = process.memoryUsage()
76 for (const key in used) {
77 noteGauge(`memory.${key}MB`, Math.round(used[key] / 1024 / 1024 * 100) / 100)
78 }
79 await sleep(System.getConfig().trackVitalsIntervalMS || 10 * SECOND_MS)
80 } catch (err) {
81 error('Failed to track vitals', { err })
82 await sleep(1 * MINUTE_MS)
83 }
84 }
85 }
86
87 async _terminate(terminationError: ?Error, data: ?T) {
88 if (this._terminating)
89 return
90 this._terminating = true
91
92 this._endTimeMS = Date.now()
93 // $FlowIgnore
94 const durationMS = this._endTimeMS - this._startTimeMS
95
96 if (terminationError == null) {
97 await noteCount(`execution.succeed`)
98 await noteTimer('execution.succeed_duration', durationMS)
99 log('Execution ended successfully', { durationMS })
100 } else if (terminationError.message === 'SIGINT') {
101 await noteCount(`execution.terminate`)
102 await noteTimer('execution.terminate_duration', durationMS)
103 warn('Execution terminated by SIGINT signal, terminating...', { durationMS })
104 } else {
105 await noteCount(`execution.fail`)
106 await noteTimer('execution.fail_duration', durationMS)
107 error('Execution threw an unhandled exception, terminating...', { durationMS, err: terminationError })
108 }
109 await noteCount('execution.end')
110 await noteTimer('execution.end_duration', durationMS)
111
112 // onTerniate handlers
113 for (const onTerminateHandler of this._onTerminateHandlers) {
114 try {
115 await onTerminateHandler(terminationError, data)
116 } catch (err) {
117 error('one of the onTerminate handlers threw an error', { err })
118 }
119 }
120
121 await System.flush()
122 // NOTE: logging or tracking behind this point are not guaranteed to be transmitted
123
124 if (!this.terminationHandler) {
125 process.exit(terminationError == null ? 0 : 1)
126 } else {
127 // terminationHandler
128 try {
129 await this.terminationHandler(terminationError, data)
130 } catch (err) {
131 error('terminationHandler threw an error', { err })
132 await System.flush()
133 process.exit(1)
134 }
135 }
136 }
137
138}