1 | import { Worker } from 'worker_threads'
|
2 | import { DeferredPromise, pDefer, _range, AnyObject } from '@naturalcycles/js-lib'
|
3 | import through2Concurrent = require('through2-concurrent')
|
4 | import { TransformTyped } from '../../stream.model'
|
5 | import { WorkerInput, WorkerOutput } from './transformMultiThreaded.model'
|
6 |
|
7 |
|
8 |
|
9 | export interface TransformMultiThreadedOptions {
|
10 | |
11 |
|
12 |
|
13 | workerFile: string
|
14 |
|
15 | |
16 |
|
17 |
|
18 | poolSize?: number
|
19 |
|
20 | |
21 |
|
22 |
|
23 | concurrency?: number
|
24 |
|
25 | |
26 |
|
27 |
|
28 | highWaterMark?: number
|
29 |
|
30 | |
31 |
|
32 |
|
33 | workerData?: AnyObject
|
34 | }
|
35 |
|
36 | const workerProxyFilePath = `${__dirname}/workerClassProxy.js`
|
37 |
|
38 |
|
39 |
|
40 |
|
41 |
|
42 |
|
43 |
|
44 | export function transformMultiThreaded<IN, OUT>(
|
45 | opt: TransformMultiThreadedOptions,
|
46 | ): TransformTyped<IN, OUT> {
|
47 | const { workerFile, poolSize = 2, workerData } = opt
|
48 | const maxConcurrency = opt.concurrency || poolSize
|
49 | const highWaterMark = Math.max(16, maxConcurrency)
|
50 |
|
51 | console.log({
|
52 | poolSize,
|
53 | maxConcurrency,
|
54 | highWaterMark,
|
55 | })
|
56 |
|
57 | const workerDonePromises: DeferredPromise<Error | undefined>[] = []
|
58 | const messageDonePromises: Record<number, DeferredPromise<OUT>> = {}
|
59 | let index = -1
|
60 |
|
61 | const workers = _range(0, poolSize).map(workerIndex => {
|
62 | workerDonePromises.push(pDefer())
|
63 |
|
64 | const worker = new Worker(workerProxyFilePath, {
|
65 | workerData: {
|
66 | workerIndex,
|
67 | workerFile,
|
68 | ...workerData,
|
69 | },
|
70 | })
|
71 |
|
72 |
|
73 |
|
74 |
|
75 | worker.on('error', err => {
|
76 | console.error(`Worker ${workerIndex} error`, err)
|
77 | workerDonePromises[workerIndex]!.reject(err)
|
78 | })
|
79 |
|
80 | worker.on('exit', _exitCode => {
|
81 |
|
82 | workerDonePromises[workerIndex]!.resolve(undefined)
|
83 | })
|
84 |
|
85 | worker.on('message', (out: WorkerOutput<OUT>) => {
|
86 |
|
87 |
|
88 |
|
89 | if (out.error) {
|
90 | messageDonePromises[out.index]!.reject(out.error)
|
91 | } else {
|
92 | messageDonePromises[out.index]!.resolve(out.payload)
|
93 | }
|
94 | })
|
95 |
|
96 | return worker
|
97 | })
|
98 |
|
99 | return through2Concurrent.obj(
|
100 | {
|
101 | maxConcurrency,
|
102 | highWaterMark,
|
103 | async final(cb) {
|
104 | try {
|
105 |
|
106 | workers.forEach(worker => worker.postMessage(null))
|
107 |
|
108 | console.log(`transformMultiThreaded.final is waiting for all chains to be done`)
|
109 | await Promise.all(workerDonePromises)
|
110 | console.log(`transformMultiThreaded.final all chains done`)
|
111 |
|
112 | cb()
|
113 | } catch (err) {
|
114 | cb(err as Error)
|
115 | }
|
116 | },
|
117 | },
|
118 | async function transformMapFn(chunk: IN, _, cb) {
|
119 |
|
120 | const currentIndex = ++index
|
121 |
|
122 |
|
123 | messageDonePromises[currentIndex] = pDefer<OUT>()
|
124 |
|
125 | const worker = workers[currentIndex % poolSize]!
|
126 | worker.postMessage({
|
127 | index: currentIndex,
|
128 | payload: chunk,
|
129 | } as WorkerInput)
|
130 |
|
131 | try {
|
132 |
|
133 | const out = await messageDonePromises[currentIndex]
|
134 |
|
135 |
|
136 | cb(null, out)
|
137 | } catch (err) {
|
138 |
|
139 |
|
140 | console.error(err)
|
141 |
|
142 | cb()
|
143 | }
|
144 |
|
145 |
|
146 | delete messageDonePromises[currentIndex]
|
147 | },
|
148 | )
|
149 | }
|