UNPKG

5.31 kBJavaScriptView Raw
1'use strict';
2
3Object.defineProperty(exports, "__esModule", {
4 value: true
5});
6exports.default = queue;
7
8var _arrayEach = require('lodash/_arrayEach');
9
10var _arrayEach2 = _interopRequireDefault(_arrayEach);
11
12var _isArray = require('lodash/isArray');
13
14var _isArray2 = _interopRequireDefault(_isArray);
15
16var _noop = require('lodash/noop');
17
18var _noop2 = _interopRequireDefault(_noop);
19
20var _rest = require('lodash/rest');
21
22var _rest2 = _interopRequireDefault(_rest);
23
24var _onlyOnce = require('./onlyOnce');
25
26var _onlyOnce2 = _interopRequireDefault(_onlyOnce);
27
28var _setImmediate = require('./setImmediate');
29
30var _setImmediate2 = _interopRequireDefault(_setImmediate);
31
32var _DoublyLinkedList = require('./DoublyLinkedList');
33
34var _DoublyLinkedList2 = _interopRequireDefault(_DoublyLinkedList);
35
36function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
37
38function queue(worker, concurrency, payload) {
39 if (concurrency == null) {
40 concurrency = 1;
41 } else if (concurrency === 0) {
42 throw new Error('Concurrency must not be zero');
43 }
44
45 function _insert(data, insertAtFront, callback) {
46 if (callback != null && typeof callback !== 'function') {
47 throw new Error('task callback must be a function');
48 }
49 q.started = true;
50 if (!(0, _isArray2.default)(data)) {
51 data = [data];
52 }
53 if (data.length === 0 && q.idle()) {
54 // call drain immediately if there are no tasks
55 return (0, _setImmediate2.default)(function () {
56 q.drain();
57 });
58 }
59 (0, _arrayEach2.default)(data, function (task) {
60 var item = {
61 data: task,
62 callback: callback || _noop2.default
63 };
64
65 if (insertAtFront) {
66 q._tasks.unshift(item);
67 } else {
68 q._tasks.push(item);
69 }
70 });
71 (0, _setImmediate2.default)(q.process);
72 }
73
74 function _next(tasks) {
75 return (0, _rest2.default)(function (args) {
76 workers -= 1;
77
78 (0, _arrayEach2.default)(tasks, function (task) {
79 (0, _arrayEach2.default)(workersList, function (worker, index) {
80 if (worker === task) {
81 workersList.splice(index, 1);
82 return false;
83 }
84 });
85
86 task.callback.apply(task, args);
87
88 if (args[0] != null) {
89 q.error(args[0], task.data);
90 }
91 });
92
93 if (workers <= q.concurrency - q.buffer) {
94 q.unsaturated();
95 }
96
97 if (q.idle()) {
98 q.drain();
99 }
100 q.process();
101 });
102 }
103
104 var workers = 0;
105 var workersList = [];
106 var q = {
107 _tasks: new _DoublyLinkedList2.default(),
108 concurrency: concurrency,
109 payload: payload,
110 saturated: _noop2.default,
111 unsaturated: _noop2.default,
112 buffer: concurrency / 4,
113 empty: _noop2.default,
114 drain: _noop2.default,
115 error: _noop2.default,
116 started: false,
117 paused: false,
118 push: function (data, callback) {
119 _insert(data, false, callback);
120 },
121 kill: function () {
122 q.drain = _noop2.default;
123 q._tasks.empty();
124 },
125 unshift: function (data, callback) {
126 _insert(data, true, callback);
127 },
128 process: function () {
129 while (!q.paused && workers < q.concurrency && q._tasks.length) {
130 var tasks = [],
131 data = [];
132 var l = q._tasks.length;
133 if (q.payload) l = Math.min(l, q.payload);
134 for (var i = 0; i < l; i++) {
135 var node = q._tasks.shift();
136 tasks.push(node);
137 data.push(node.data);
138 }
139
140 if (q._tasks.length === 0) {
141 q.empty();
142 }
143 workers += 1;
144 workersList.push(tasks[0]);
145
146 if (workers === q.concurrency) {
147 q.saturated();
148 }
149
150 var cb = (0, _onlyOnce2.default)(_next(tasks));
151 worker(data, cb);
152 }
153 },
154 length: function () {
155 return q._tasks.length;
156 },
157 running: function () {
158 return workers;
159 },
160 workersList: function () {
161 return workersList;
162 },
163 idle: function () {
164 return q._tasks.length + workers === 0;
165 },
166 pause: function () {
167 q.paused = true;
168 },
169 resume: function () {
170 if (q.paused === false) {
171 return;
172 }
173 q.paused = false;
174 var resumeCount = Math.min(q.concurrency, q._tasks.length);
175 // Need to call q.process once per concurrent
176 // worker to preserve full concurrency after pause
177 for (var w = 1; w <= resumeCount; w++) {
178 (0, _setImmediate2.default)(q.process);
179 }
180 }
181 };
182 return q;
183}
184module.exports = exports['default'];
\No newline at end of file