UNPKG

4.27 kBPlain TextView Raw
1import { Worker } from 'worker_threads'
2import { DeferredPromise, pDefer, _range, AnyObject } from '@naturalcycles/js-lib'
3import through2Concurrent = require('through2-concurrent')
4import { TransformTyped } from '../../stream.model'
5import { WorkerInput, WorkerOutput } from './transformMultiThreaded.model'
6
7/* eslint-disable unicorn/require-post-message-target-origin */
8
9export interface TransformMultiThreadedOptions {
10 /**
11 * Absolute path to a js file with worker code
12 */
13 workerFile: string
14
15 /**
16 * @default 2, to match CircleCI and Github Actions environments
17 */
18 poolSize?: number
19
20 /**
21 * @default to poolSize
22 */
23 concurrency?: number
24
25 /**
26 * @default to Math.max(16, concurrency x 2)
27 */
28 highWaterMark?: number
29
30 /**
31 * Passed to the Worker as `workerData` property (initial data).
32 */
33 workerData?: AnyObject
34}
35
36const workerProxyFilePath = `${__dirname}/workerClassProxy.js`
37
38/**
39 * Spawns a pool of Workers (threads).
40 * Distributes (using round-robin, equally) all inputs over Workers.
41 * Workers emit 1 output for each 1 input.
42 * Output of Workers is passed down the stream. Order is RANDOM (since it's a multi-threaded environment).
43 */
44export function transformMultiThreaded<IN, OUT>(
45 opt: TransformMultiThreadedOptions,
46): TransformTyped<IN, OUT> {
47 const { workerFile, poolSize = 2, workerData } = opt
48 const maxConcurrency = opt.concurrency || poolSize
49 const highWaterMark = Math.max(16, maxConcurrency)
50
51 console.log({
52 poolSize,
53 maxConcurrency,
54 highWaterMark,
55 })
56
57 const workerDonePromises: DeferredPromise<Error | undefined>[] = []
58 const messageDonePromises: Record<number, DeferredPromise<OUT>> = {}
59 let index = -1 // input chunk index, will start from 0
60
61 const workers = _range(0, poolSize).map(workerIndex => {
62 workerDonePromises.push(pDefer())
63
64 const worker = new Worker(workerProxyFilePath, {
65 workerData: {
66 workerIndex,
67 workerFile, // pass it, so workerProxy can require() it
68 ...workerData,
69 },
70 })
71
72 // const {threadId} = worker
73 // console.log({threadId})
74
75 worker.on('error', err => {
76 console.error(`Worker ${workerIndex} error`, err)
77 workerDonePromises[workerIndex]!.reject(err)
78 })
79
80 worker.on('exit', _exitCode => {
81 // console.log(`Worker ${index} exit: ${exitCode}`)
82 workerDonePromises[workerIndex]!.resolve(undefined)
83 })
84
85 worker.on('message', (out: WorkerOutput<OUT>) => {
86 // console.log(`Message from Worker ${workerIndex}:`, out)
87 // console.log(Object.keys(messageDonePromises))
88 // tr.push(out.payload)
89 if (out.error) {
90 messageDonePromises[out.index]!.reject(out.error)
91 } else {
92 messageDonePromises[out.index]!.resolve(out.payload)
93 }
94 })
95
96 return worker
97 })
98
99 return through2Concurrent.obj(
100 {
101 maxConcurrency,
102 highWaterMark,
103 async final(cb) {
104 try {
105 // Push null (complete) to all sub-streams
106 workers.forEach(worker => worker.postMessage(null))
107
108 console.log(`transformMultiThreaded.final is waiting for all chains to be done`)
109 await Promise.all(workerDonePromises)
110 console.log(`transformMultiThreaded.final all chains done`)
111
112 cb()
113 } catch (err) {
114 cb(err as Error)
115 }
116 },
117 },
118 async function transformMapFn(chunk: IN, _, cb) {
119 // Freezing the index, because it may change due to concurrency
120 const currentIndex = ++index
121
122 // Create the unresolved promise (to avait)
123 messageDonePromises[currentIndex] = pDefer<OUT>()
124
125 const worker = workers[currentIndex % poolSize]! // round-robin
126 worker.postMessage({
127 index: currentIndex,
128 payload: chunk,
129 } as WorkerInput)
130
131 try {
132 // awaiting for result
133 const out = await messageDonePromises[currentIndex]
134 // console.log('awaited!')
135 // return the result
136 cb(null, out)
137 } catch (err) {
138 // Currently we only support ErrorMode.SUPPRESS
139 // Error is logged and output continues
140 console.error(err)
141
142 cb() // emit nothing in case of an error
143 }
144
145 // clean up
146 delete messageDonePromises[currentIndex]
147 },
148 )
149}