1 | "use strict";
|
2 | Object.defineProperty(exports, "__esModule", { value: true });
|
3 | const tslib_1 = require("tslib");
|
4 | const lifecycle_1 = require("@stoplight/lifecycle");
|
5 | const immer_1 = require("immer");
|
6 | const lodash_1 = require("lodash");
|
7 | const mobx_1 = require("mobx");
|
8 | const neo_async_1 = require("neo-async");
|
9 | const graph_1 = require("../graph");
|
10 | const reporter_1 = require("../reporter");
|
11 | const scheduler_1 = require("../scheduler");
|
12 | const types_1 = require("../types");
|
13 | function createScheduler(props) {
|
14 | return new Scheduler(props);
|
15 | }
|
16 | exports.createScheduler = createScheduler;
|
17 | class Scheduler {
|
18 | constructor(props) {
|
19 | this._queued = {};
|
20 | this._handlers = handlerMap();
|
21 | this._running = {};
|
22 | this.mightBeStuck = false;
|
23 | this.queue = (task) => {
|
24 | if (!this._handlers[task.op].length)
|
25 | return Promise.resolve();
|
26 | const taskId = computeTaskId(task);
|
27 | let queued;
|
28 | if (taskId) {
|
29 | queued = this._queued[taskId];
|
30 | }
|
31 | if (!queued) {
|
32 | queued = new Promise((resolve, reject) => {
|
33 | this._queue.push(task, priorityMap[task.op] || 99, err => {
|
34 | if (taskId) {
|
35 | delete this._queued[taskId];
|
36 | }
|
37 | if (err) {
|
38 | reporter_1.reporter.reportError({
|
39 | code: types_1.GraphiteErrorCode.Generic,
|
40 | message: `Error running task ${task.op} ${err}`,
|
41 | trace: task.trace,
|
42 | nodeId: task.nodeId,
|
43 | });
|
44 | reject(err);
|
45 | }
|
46 | resolve();
|
47 | });
|
48 | });
|
49 | if (taskId) {
|
50 | this._queued[taskId] = queued;
|
51 | }
|
52 | }
|
53 | return queued;
|
54 | };
|
55 | this.queueAll = (tasks) => {
|
56 | return tasks.map(task => this.queue(task));
|
57 | };
|
58 | this.registerHandler = (op, handler) => {
|
59 | this._handlers[op].push(handler);
|
60 | return lifecycle_1.createDisposable(() => {
|
61 | const idx = this._handlers[op].indexOf(handler);
|
62 | if (idx >= 0) {
|
63 | this._handlers[op].splice(idx, 1);
|
64 | }
|
65 | });
|
66 | };
|
67 | this.drain = async () => {
|
68 | await this.drainQueue();
|
69 | await mobx_1.when(() => !this.isBusy);
|
70 | };
|
71 | this._run = (task) => {
|
72 | const node = this._graph.getNodeById(task.nodeId);
|
73 | if (!node) {
|
74 | throw new Error(`Cannot find node with id ${task.nodeId} for task ${scheduler_1.GraphTaskOp[task.op]}.`);
|
75 | }
|
76 | const handler = findHandler(this._handlers[task.op], node);
|
77 | if (!handler)
|
78 | return;
|
79 | return handler.run(node, {
|
80 | task,
|
81 | getNodeById: this._graph.getNodeById,
|
82 | getNodeByUri: this._graph.getNodeByUri,
|
83 | getNodesByType: this._graph.getNodesByType,
|
84 | moveNode: this._graph.moveNode,
|
85 | indexNode: this._graph.indexNode,
|
86 | applyPatch: patch => {
|
87 | return this._graph.applyPatch({
|
88 | operations: patch.operations,
|
89 | trace: Object.assign(Object.assign({}, patch.trace), { sourceOp: task.op }),
|
90 | });
|
91 | },
|
92 | removeNode: (id, trace = {}) => {
|
93 | return this._graph.removeNode(id, Object.assign(Object.assign({}, trace), { sourceOp: task.op }));
|
94 | },
|
95 | addNode: (props, trace = {}) => {
|
96 | return this._graph.addNode(props, Object.assign(Object.assign({}, trace), { sourceOp: task.op }));
|
97 | },
|
98 | setSourceNodeProp: (id, prop, value, trace = {}) => {
|
99 | return this._graph.setSourceNodeProp(id, prop, value, Object.assign(Object.assign({}, trace), { sourceOp: task.op }));
|
100 | },
|
101 | patchSourceNodeProp: (id, prop, parsed, trace = {}) => {
|
102 | return this._graph.patchSourceNodeProp(id, prop, parsed, Object.assign(Object.assign({}, trace), { sourceOp: task.op }));
|
103 | },
|
104 | reportError: (nodeId, error, trace = {}) => {
|
105 | return this._graph.reportError(nodeId, error, immer_1.default(trace, source => {
|
106 | source.sourceOp = task.op;
|
107 | }));
|
108 | },
|
109 | setSourceNodeDiagnostics: (id, source, diagnostics, trace = {}) => {
|
110 | return this._graph.setSourceNodeDiagnostics(id, source, diagnostics, Object.assign(Object.assign({}, trace), { sourceOp: task.op }));
|
111 | },
|
112 | runTask: (t) => {
|
113 | return this.run(t);
|
114 | },
|
115 | queueTask: (t) => {
|
116 | return this.queue(t);
|
117 | },
|
118 | resolver: this._resolver,
|
119 | });
|
120 | };
|
121 | this.drainQueue = () => {
|
122 | return new Promise((resolve, reject) => {
|
123 | if (this.queueLength()) {
|
124 | this._queue.drain = resolve;
|
125 | this._queue.error = reject;
|
126 | }
|
127 | else {
|
128 | resolve();
|
129 | }
|
130 | });
|
131 | };
|
132 | this._runQueueTask = async (task, cb) => {
|
133 | const taskId = computeTaskId(task);
|
134 | this._running[taskId] = true;
|
135 | let error = null;
|
136 | try {
|
137 | const result = await this._run(task);
|
138 | this._handleRunResult(result);
|
139 | }
|
140 | catch (e) {
|
141 | error = e;
|
142 | }
|
143 | finally {
|
144 | delete this._running[taskId];
|
145 | if (error) {
|
146 | cb(error);
|
147 | }
|
148 | else {
|
149 | cb();
|
150 | }
|
151 | }
|
152 | };
|
153 | this._handleRunResult = (result) => {
|
154 | if (result) {
|
155 | this.queueAll(result);
|
156 | }
|
157 | };
|
158 | this._graph = props.graph;
|
159 | this._queue = neo_async_1.priorityQueue(this._runQueueTask, 1);
|
160 | this._resolver = props.resolver;
|
161 | mobx_1.reaction(() => this.queuedCount, mobx_1.action(() => {
|
162 | this.mightBeStuck = false;
|
163 | if (this._mightBeStuckTimeout)
|
164 | clearTimeout(this._mightBeStuckTimeout);
|
165 | this._mightBeStuckTimeout = setTimeout(mobx_1.action(() => {
|
166 | this.mightBeStuck = this.queuedCount !== 0;
|
167 | this._mightBeStuckTimeout = void 0;
|
168 | }), 4750);
|
169 | }), {
|
170 | delay: 250,
|
171 | });
|
172 | }
|
173 | get isBusy() {
|
174 | return this.queuedCount !== 0;
|
175 | }
|
176 | get queuedCount() {
|
177 | return Object.keys(this._queued).length;
|
178 | }
|
179 | snapshot() {
|
180 | const tasks = [];
|
181 | let task = lodash_1.get(this._queue, '_tasks.head');
|
182 | while (task !== null && typeof task === 'object') {
|
183 | tasks.push(task.data);
|
184 | task = task.next;
|
185 | }
|
186 | const runningKeys = Object.keys(this._running);
|
187 | const running = [];
|
188 | for (const key of runningKeys) {
|
189 | const [op, id] = key.split(':');
|
190 | let node;
|
191 | try {
|
192 | node = this._graph.getNodeById(id);
|
193 | }
|
194 | catch (e) {
|
195 | }
|
196 | running.push(Object.assign({ id, op: Number(op), type: node && node.type, path: node && node.path }, (node &&
|
197 | node.category === graph_1.NodeCategory.Source && {
|
198 | spec: node.spec,
|
199 | language: node.language,
|
200 | })));
|
201 | }
|
202 | return {
|
203 | running,
|
204 | queued: Object.keys(this._queued),
|
205 | tasks,
|
206 | };
|
207 | }
|
208 | queueLength() {
|
209 | return this._queue.length();
|
210 | }
|
211 | run(task) {
|
212 | const taskId = computeTaskId(task);
|
213 | const queued = this._queued[taskId];
|
214 | if (queued && this._running[taskId]) {
|
215 | return queued;
|
216 | }
|
217 | this._queue.remove((t) => taskId === computeTaskId(t));
|
218 | const taskPromise = new Promise((resolve, reject) => this._runQueueTask(task, err => {
|
219 | if (taskId) {
|
220 | delete this._queued[taskId];
|
221 | }
|
222 | if (err) {
|
223 | reject(err);
|
224 | }
|
225 | resolve();
|
226 | }));
|
227 | this._queued[taskId] = taskPromise;
|
228 | return taskPromise;
|
229 | }
|
230 | }
|
231 | tslib_1.__decorate([
|
232 | mobx_1.observable,
|
233 | tslib_1.__metadata("design:type", Object)
|
234 | ], Scheduler.prototype, "_queued", void 0);
|
235 | tslib_1.__decorate([
|
236 | mobx_1.observable,
|
237 | tslib_1.__metadata("design:type", Object)
|
238 | ], Scheduler.prototype, "mightBeStuck", void 0);
|
239 | tslib_1.__decorate([
|
240 | mobx_1.computed,
|
241 | tslib_1.__metadata("design:type", Object),
|
242 | tslib_1.__metadata("design:paramtypes", [])
|
243 | ], Scheduler.prototype, "isBusy", null);
|
244 | tslib_1.__decorate([
|
245 | mobx_1.computed,
|
246 | tslib_1.__metadata("design:type", Object),
|
247 | tslib_1.__metadata("design:paramtypes", [])
|
248 | ], Scheduler.prototype, "queuedCount", null);
|
249 | function computeTaskId(task) {
|
250 | return `${task.op}:${task.nodeId}`;
|
251 | }
|
252 | const priorityMap = {
|
253 | [scheduler_1.GraphTaskOp.SerializeSourceNode]: 1,
|
254 | [scheduler_1.GraphTaskOp.DeserializeSourceNode]: 1,
|
255 | [scheduler_1.GraphTaskOp.DiffRawToParsed]: 1,
|
256 | [scheduler_1.GraphTaskOp.ComputeSourceMap]: 2,
|
257 | [scheduler_1.GraphTaskOp.ReadSourceNode]: 3,
|
258 | [scheduler_1.GraphTaskOp.WriteSourceNode]: 3,
|
259 | [scheduler_1.GraphTaskOp.DeleteSourceNode]: 3,
|
260 | [scheduler_1.GraphTaskOp.MoveSourceNode]: 3,
|
261 | [scheduler_1.GraphTaskOp.ResolveSourceNode]: 4,
|
262 | [scheduler_1.GraphTaskOp.TransformParsed]: 5,
|
263 | [scheduler_1.GraphTaskOp.ValidateSourceNode]: 6,
|
264 | };
|
265 | const handlerMap = () => ({
|
266 | [scheduler_1.GraphTaskOp.SerializeSourceNode]: [],
|
267 | [scheduler_1.GraphTaskOp.DeserializeSourceNode]: [],
|
268 | [scheduler_1.GraphTaskOp.DiffRawToParsed]: [],
|
269 | [scheduler_1.GraphTaskOp.ComputeSourceMap]: [],
|
270 | [scheduler_1.GraphTaskOp.ReadSourceNode]: [],
|
271 | [scheduler_1.GraphTaskOp.WriteSourceNode]: [],
|
272 | [scheduler_1.GraphTaskOp.DeleteSourceNode]: [],
|
273 | [scheduler_1.GraphTaskOp.TransformParsed]: [],
|
274 | [scheduler_1.GraphTaskOp.MoveSourceNode]: [],
|
275 | [scheduler_1.GraphTaskOp.ResolveSourceNode]: [],
|
276 | [scheduler_1.GraphTaskOp.ValidateSourceNode]: [],
|
277 | });
|
278 | const findHandler = (handlers, node) => {
|
279 | return handlers.find(h => h.selector(node));
|
280 | };
|
281 |
|
\ | No newline at end of file |