1 | "use strict";
|
2 |
|
3 | const Queue = require(`better-queue`);
|
4 |
|
5 | const {
|
6 | store
|
7 | } = require(`../redux`);
|
8 |
|
9 | const FastMemoryStore = require(`../query/better-queue-custom-store`);
|
10 |
|
11 | const queryRunner = require(`../query/query-runner`);
|
12 |
|
13 | const websocketManager = require(`../utils/websocket-manager`);
|
14 |
|
15 | const GraphQLRunner = require(`./graphql-runner`);
|
16 |
|
17 | const createBaseOptions = () => {
|
18 | return {
|
19 | concurrent: 4,
|
20 | store: FastMemoryStore()
|
21 | };
|
22 | };
|
23 |
|
24 | const createBuildQueue = () => {
|
25 | const graphqlRunner = new GraphQLRunner(store);
|
26 |
|
27 | const handler = (queryJob, callback) => queryRunner(graphqlRunner, queryJob).then(result => callback(null, result)).catch(callback);
|
28 |
|
29 | return new Queue(handler, createBaseOptions());
|
30 | };
|
31 |
|
32 | const createDevelopQueue = getRunner => {
|
33 | const queueOptions = Object.assign({}, createBaseOptions(), {
|
34 | priority: (job, cb) => {
|
35 | if (job.id && websocketManager.activePaths.has(job.id)) {
|
36 | cb(null, 10);
|
37 | } else {
|
38 | cb(null, 1);
|
39 | }
|
40 | },
|
41 | merge: (oldTask, newTask, cb) => {
|
42 | cb(null, newTask);
|
43 | }
|
44 | });
|
45 |
|
46 | const handler = (queryJob, callback) => {
|
47 | queryRunner(getRunner(), queryJob).then(result => {
|
48 | if (queryJob.isPage) {
|
49 | websocketManager.emitPageData({
|
50 | result,
|
51 | id: queryJob.id
|
52 | });
|
53 | } else {
|
54 | websocketManager.emitStaticQueryData({
|
55 | result,
|
56 | id: queryJob.id
|
57 | });
|
58 | }
|
59 |
|
60 | callback(null, result);
|
61 | }, error => callback(error));
|
62 | };
|
63 |
|
64 | return new Queue(handler, queueOptions);
|
65 | };
|
66 |
|
67 |
|
68 |
|
69 |
|
70 |
|
71 |
|
72 |
|
73 |
|
74 | const processBatch = async (queue, jobs, activity) => {
|
75 | let numJobs = jobs.length;
|
76 |
|
77 | if (numJobs === 0) {
|
78 | return Promise.resolve();
|
79 | }
|
80 |
|
81 | return new Promise((resolve, reject) => {
|
82 | if (activity.tick) {
|
83 | queue.on(`task_finish`, () => activity.tick());
|
84 | }
|
85 |
|
86 | const gc = () => {
|
87 | queue.removeAllListeners(`task_failed`);
|
88 | queue.removeAllListeners(`drain`);
|
89 | queue.removeAllListeners(`task_finish`);
|
90 | queue = null;
|
91 | };
|
92 |
|
93 | queue
|
94 | .on(`task_failed`, (...err) => {
|
95 | gc();
|
96 | reject(err);
|
97 | })
|
98 |
|
99 | .on(`drain`, () => {
|
100 | gc();
|
101 | resolve();
|
102 | });
|
103 | jobs.forEach(job => queue.push(job));
|
104 | });
|
105 | };
|
106 |
|
107 | module.exports = {
|
108 | createBuildQueue,
|
109 | createDevelopQueue,
|
110 | processBatch
|
111 | };
|
112 |
|
\ | No newline at end of file |