UNPKG

9.92 kBJavaScriptView Raw
1"use strict";
2
3exports.__esModule = true;
4exports.WorkerError = void 0;
5
6const uuid = require(`uuid/v4`);
7
8const path = require(`path`);
9
10const hasha = require(`hasha`);
11
12const fs = require(`fs-extra`);
13
14const pDefer = require(`p-defer`);
15
16const _ = require(`lodash`);
17
18const {
19 createContentDigest,
20 slash
21} = require(`gatsby-core-utils`);
22
23const reporter = require(`gatsby-cli/lib/reporter`);
24
25const MESSAGE_TYPES = {
26 JOB_CREATED: `JOB_CREATED`,
27 JOB_COMPLETED: `JOB_COMPLETED`,
28 JOB_FAILED: `JOB_FAILED`,
29 JOB_NOT_WHITELISTED: `JOB_NOT_WHITELISTED`
30};
31let activityForJobs = null;
32let activeJobs = 0;
33let isListeningForMessages = false;
34let hasShownIPCDisabledWarning = false;
35/** @type {Map<string, {id: string, deferred: pDefer.DeferredPromise<any>}>} */
36
37const jobsInProcess = new Map();
38/** @type {Map<string, {job: InternalJob, deferred: pDefer.DeferredPromise<any>}>} */
39
40const externalJobsMap = new Map();
41/**
42 * We want to use absolute paths to make sure they are on the filesystem
43 *
44 * @param {string} filePath
45 * @return {string}
46 */
47
48const convertPathsToAbsolute = filePath => {
49 if (!path.isAbsolute(filePath)) {
50 throw new Error(`${filePath} should be an absolute path.`);
51 }
52
53 return slash(filePath);
54};
55/**
56 * Get contenthash of a file
57 *
58 * @param {string} path
59 */
60
61
62const createFileHash = path => hasha.fromFileSync(path, {
63 algorithm: `sha1`
64});
65/**
66 * @typedef BaseJobInterface
67 * @property {string} name
68 * @property {string} outputDir,
69 * @property {Record<string, any>} args
70
71 * @typedef JobInputInterface
72 * @property {string[]} inputPaths
73 * @property {{name: string, version: string, resolve: string}} plugin
74
75 * @typedef InternalJobInterface
76 * @property {string} id
77 * @property {string} contentDigest
78 * @property {{path: string, contentDigest: string}[]} inputPaths
79 * @property {{name: string, version: string, resolve: string, isLocal: boolean}} plugin
80 *
81 * I know this sucks but this is the only way to do it properly in jsdoc..
82 * @typedef {BaseJobInterface & JobInputInterface} JobInput
83 * @typedef {BaseJobInterface & InternalJobInterface} InternalJob
84 */
85
86/** @type {pDefer.DeferredPromise<void>|null} */
87
88
89let hasActiveJobs = null;
90
91const hasExternalJobsEnabled = () => process.env.ENABLE_GATSBY_EXTERNAL_JOBS === `true` || process.env.ENABLE_GATSBY_EXTERNAL_JOBS === `1`;
92/**
93 * Get the local worker function and execute it on the user's machine
94 *
95 * @template T
96 * @param {function({ inputPaths: InternalJob["inputPaths"], outputDir: InternalJob["outputDir"], args: InternalJob["args"]}): T} workerFn
97 * @param {InternalJob} job
98 * @return {Promise<T>}
99 */
100
101
102const runLocalWorker = async (workerFn, job) => {
103 await fs.ensureDir(job.outputDir);
104 return new Promise((resolve, reject) => {
105 // execute worker nextTick
106 // TODO should we think about threading/queueing here?
107 setImmediate(() => {
108 try {
109 resolve(workerFn({
110 inputPaths: job.inputPaths,
111 outputDir: job.outputDir,
112 args: job.args
113 }));
114 } catch (err) {
115 reject(new WorkerError(err));
116 }
117 });
118 });
119};
120
121const listenForJobMessages = () => {
122 process.on(`message`, msg => {
123 if (msg && msg.type && msg.payload && msg.payload.id && externalJobsMap.has(msg.payload.id)) {
124 const {
125 job,
126 deferred
127 } = externalJobsMap.get(msg.payload.id);
128
129 switch (msg.type) {
130 case MESSAGE_TYPES.JOB_COMPLETED:
131 {
132 deferred.resolve(msg.payload.result);
133 break;
134 }
135
136 case MESSAGE_TYPES.JOB_FAILED:
137 {
138 deferred.reject(new WorkerError(msg.payload.error));
139 break;
140 }
141
142 case MESSAGE_TYPES.JOB_NOT_WHITELISTED:
143 {
144 deferred.resolve(runJob(job, true));
145 break;
146 }
147 }
148
149 externalJobsMap.delete(msg.payload.id);
150 }
151 });
152};
153/**
154 * @param {InternalJob} job
155 */
156
157
158const runExternalWorker = job => {
159 const deferred = pDefer();
160 externalJobsMap.set(job.id, {
161 job,
162 deferred
163 });
164 process.send({
165 type: MESSAGE_TYPES.JOB_CREATED,
166 payload: job
167 });
168 return deferred.promise;
169};
170/**
171 * Make sure we have everything we need to run a job
172 * If we do, run it locally.
173 * TODO add external job execution through ipc
174 *
175 * @param {InternalJob} job
176 * @return {Promise<object>}
177 */
178
179
180const runJob = (job, forceLocal = false) => {
181 const {
182 plugin
183 } = job;
184
185 try {
186 const worker = require(path.posix.join(plugin.resolve, `gatsby-worker.js`));
187
188 if (!worker[job.name]) {
189 throw new Error(`No worker function found for ${job.name}`);
190 }
191
192 if (!forceLocal && !job.plugin.isLocal && hasExternalJobsEnabled()) {
193 if (process.send) {
194 if (!isListeningForMessages) {
195 isListeningForMessages = true;
196 listenForJobMessages();
197 }
198
199 return runExternalWorker(job);
200 } else {
201 // only show the offloading warning once
202 if (!hasShownIPCDisabledWarning) {
203 hasShownIPCDisabledWarning = true;
204 reporter.warn(`Offloading of a job failed as IPC could not be detected. Running job locally.`);
205 }
206 }
207 }
208
209 return runLocalWorker(worker[job.name], job);
210 } catch (err) {
211 throw new Error(`We couldn't find a gatsby-worker.js(${plugin.resolve}/gatsby-worker.js) file for ${plugin.name}@${plugin.version}`);
212 }
213};
214/**
215 * Create an internal job object
216 *
217 * @param {JobInput|InternalJob} job
218 * @param {{name: string, version: string, resolve: string}} plugin
219 * @return {InternalJob}
220 */
221
222
223exports.createInternalJob = (job, plugin) => {
224 // It looks like we already have an augmented job so we shouldn't redo this work
225 // @ts-ignore
226 if (job.id && job.contentDigest) {
227 return job;
228 }
229
230 const {
231 name,
232 inputPaths,
233 outputDir,
234 args
235 } = job; // TODO see if we can make this async, filehashing might be expensive to wait for
236 // currently this needs to be sync as we could miss jobs to have been scheduled and
237 // are still processing their hashes
238
239 const inputPathsWithContentDigest = inputPaths.map(path => {
240 return {
241 path: convertPathsToAbsolute(path),
242 contentDigest: createFileHash(path)
243 };
244 });
245 /** @type {InternalJob} */
246
247 const internalJob = {
248 id: uuid(),
249 name,
250 contentDigest: ``,
251 inputPaths: inputPathsWithContentDigest,
252 outputDir: convertPathsToAbsolute(outputDir),
253 args,
254 plugin: {
255 name: plugin.name,
256 version: plugin.version,
257 resolve: plugin.resolve,
258 isLocal: !plugin.resolve.includes(`/node_modules/`)
259 }
260 }; // generate a contentDigest based on all parameters including file content
261
262 internalJob.contentDigest = createContentDigest({
263 name: job.name,
264 inputPaths: internalJob.inputPaths.map(inputPath => inputPath.contentDigest),
265 outputDir: internalJob.outputDir,
266 args: internalJob.args,
267 plugin: internalJob.plugin
268 });
269 return internalJob;
270};
271/**
272 * Creates a job
273 *
274 * @param {InternalJob} job
275 * @return {Promise<object>}
276 */
277
278
279exports.enqueueJob = async job => {
280 // When we already have a job that's executing, return the same promise.
281 // we have another check in our createJobV2 action to return jobs that have been done in a previous gatsby run
282 if (jobsInProcess.has(job.contentDigest)) {
283 return jobsInProcess.get(job.contentDigest).deferred.promise;
284 }
285
286 if (activeJobs === 0) {
287 hasActiveJobs = pDefer();
288 } // Bump active jobs
289
290
291 activeJobs++;
292
293 if (!activityForJobs) {
294 activityForJobs = reporter.phantomActivity(`Running jobs v2`);
295 activityForJobs.start();
296 }
297
298 const deferred = pDefer();
299 jobsInProcess.set(job.contentDigest, {
300 id: job.id,
301 deferred
302 });
303
304 try {
305 const result = await runJob(job); // this check is to keep our worker results consistent for cloud
306
307 if (result != null && !_.isPlainObject(result)) {
308 throw new Error(`Result of a worker should be an object, type of "${typeof result}" was given`);
309 }
310
311 deferred.resolve(result);
312 } catch (err) {
313 if (err instanceof Error) {
314 deferred.reject(new WorkerError(err.message));
315 }
316
317 deferred.reject(new WorkerError(err));
318 } finally {
319 // when all jobs are done we end the activity
320 if (--activeJobs === 0) {
321 hasActiveJobs.resolve();
322 activityForJobs.end(); // eslint-disable-next-line require-atomic-updates
323
324 activityForJobs = null;
325 }
326 }
327
328 return deferred.promise;
329};
330/**
331 * Get in progress job promise
332 *
333 * @param {string} contentDigest
334 * @return {Promise<void>}
335 */
336
337
338exports.getInProcessJobPromise = contentDigest => {
339 var _jobsInProcess$get;
340
341 return (_jobsInProcess$get = jobsInProcess.get(contentDigest)) === null || _jobsInProcess$get === void 0 ? void 0 : _jobsInProcess$get.deferred.promise;
342};
343/**
344 * Remove a job from our inProgressQueue to reduce memory usage
345 *
346 * @param {string} contentDigest
347 */
348
349
350exports.removeInProgressJob = contentDigest => {
351 jobsInProcess.delete(contentDigest);
352};
353/**
354 * Wait for all processing jobs to have finished
355 *
356 * @return {Promise<void>}
357 */
358
359
360exports.waitUntilAllJobsComplete = () => hasActiveJobs ? hasActiveJobs.promise : Promise.resolve();
361/**
362 * @param {Partial<InternalJob> & {inputPaths: InternalJob['inputPaths']}} job
363 * @return {boolean}
364 */
365
366
367exports.isJobStale = job => {
368 const areInputPathsStale = job.inputPaths.some(inputPath => {
369 // does the inputPath still exists?
370 if (!fs.existsSync(inputPath.path)) {
371 return true;
372 } // check if we're talking about the same file
373
374
375 const fileHash = createFileHash(inputPath.path);
376 return fileHash !== inputPath.contentDigest;
377 });
378 return areInputPathsStale;
379};
380
381class WorkerError extends Error {
382 constructor(message) {
383 super(message);
384 this.name = `WorkerError`;
385 Error.captureStackTrace(this, WorkerError);
386 }
387
388}
389
390exports.WorkerError = WorkerError;
391//# sourceMappingURL=jobs-manager.js.map
\No newline at end of file