UNPKG

2.72 kBJavaScriptView Raw
1"use strict";
2
3const Queue = require(`better-queue`);
4
5const {
6 store
7} = require(`../redux`);
8
9const FastMemoryStore = require(`../query/better-queue-custom-store`);
10
11const queryRunner = require(`../query/query-runner`);
12
13const websocketManager = require(`../utils/websocket-manager`);
14
15const GraphQLRunner = require(`./graphql-runner`);
16
17const createBaseOptions = () => {
18 return {
19 concurrent: 4,
20 store: FastMemoryStore()
21 };
22};
23
24const createBuildQueue = () => {
25 const graphqlRunner = new GraphQLRunner(store);
26
27 const handler = (queryJob, callback) => queryRunner(graphqlRunner, queryJob).then(result => callback(null, result)).catch(callback);
28
29 return new Queue(handler, createBaseOptions());
30};
31
32const createDevelopQueue = getRunner => {
33 const queueOptions = Object.assign({}, createBaseOptions(), {
34 priority: (job, cb) => {
35 if (job.id && websocketManager.activePaths.has(job.id)) {
36 cb(null, 10);
37 } else {
38 cb(null, 1);
39 }
40 },
41 merge: (oldTask, newTask, cb) => {
42 cb(null, newTask);
43 }
44 });
45
46 const handler = (queryJob, callback) => {
47 queryRunner(getRunner(), queryJob).then(result => {
48 if (queryJob.isPage) {
49 websocketManager.emitPageData({
50 result,
51 id: queryJob.id
52 });
53 } else {
54 websocketManager.emitStaticQueryData({
55 result,
56 id: queryJob.id
57 });
58 }
59
60 callback(null, result);
61 }, error => callback(error));
62 };
63
64 return new Queue(handler, queueOptions);
65};
66/**
67 * Returns a promise that pushes jobs onto queue and resolves onces
68 * they're all finished processing (or rejects if one or more jobs
69 * fail)
70 * Note: queue is reused in develop so make sure to thoroughly cleanup hooks
71 */
72
73
74const processBatch = async (queue, jobs, activity) => {
75 let numJobs = jobs.length;
76
77 if (numJobs === 0) {
78 return Promise.resolve();
79 }
80
81 return new Promise((resolve, reject) => {
82 if (activity.tick) {
83 queue.on(`task_finish`, () => activity.tick());
84 }
85
86 const gc = () => {
87 queue.removeAllListeners(`task_failed`);
88 queue.removeAllListeners(`drain`);
89 queue.removeAllListeners(`task_finish`);
90 queue = null;
91 };
92
93 queue // Note: the first arg is the path, the second the error
94 .on(`task_failed`, (...err) => {
95 gc();
96 reject(err);
97 }) // Note: `drain` fires when all tasks _finish_
98 // `empty` fires when queue is empty (but tasks are still running)
99 .on(`drain`, () => {
100 gc();
101 resolve();
102 });
103 jobs.forEach(job => queue.push(job));
104 });
105};
106
107module.exports = {
108 createBuildQueue,
109 createDevelopQueue,
110 processBatch
111};
112//# sourceMappingURL=queue.js.map
\No newline at end of file