UNPKG

6.13 kBJavaScriptView Raw
1/* global define */
2
3(function (root, factory) {
4 if (typeof define === 'function' && define.amd) {
5 define(['es6-promise'], factory)
6 } else if (typeof exports === 'object') {
7 module.exports = factory(require('es6-promise'))
8 } else {
9 root.promisePool = factory(root.ES6Promise)
10 }
11})(this, function (es6promise) {
12 'use strict'
13
14 var Promise = es6promise.Promise
15
16 var generatorFunctionToProducer = function (gen) {
17 gen = gen()
18 return function () {
19 var res = gen.next()
20 return res.done ? null : res.value
21 }
22 }
23
24 var promiseToProducer = function (promise) {
25 var called = false
26 return function () {
27 if (called) {
28 return null
29 }
30 called = true
31 return promise
32 }
33 }
34
35 var toProducer = function (obj) {
36 var type = typeof obj
37 if (type === 'function') {
38 if (obj.constructor && obj.constructor.name === 'GeneratorFunction') {
39 return generatorFunctionToProducer(obj)
40 } else {
41 return obj
42 }
43 }
44 if (type !== 'object' || typeof obj.then !== 'function') {
45 obj = Promise.resolve(obj)
46 }
47 return promiseToProducer(obj)
48 }
49
50 var PromisePoolEvent = function (target, type, data) {
51 this.target = target
52 this.type = type
53 this.data = data
54 }
55
56 var PromisePool = function (source, concurrency, options) {
57 if (typeof concurrency !== 'number' ||
58 Math.floor(concurrency) !== concurrency ||
59 concurrency < 1) {
60 throw new Error('Invalid concurrency')
61 }
62 this._producer = toProducer(source)
63 this._concurrency = concurrency
64 this._options = options || {}
65 this._listeners = {}
66 this._producerDone = false
67 this._size = 0
68 this._promise = null
69 this._callbacks = null
70 }
71
72 PromisePool.prototype.concurrency = function (value) {
73 if (typeof value !== 'undefined') {
74 this._concurrency = value
75 if (this.active()) {
76 this._proceed()
77 }
78 }
79 return this._concurrency
80 }
81
82 PromisePool.prototype.size = function () {
83 return this._size
84 }
85
86 PromisePool.prototype.active = function () {
87 return !!this._promise
88 }
89
90 PromisePool.prototype.promise = function () {
91 return this._promise
92 }
93
94 PromisePool.prototype.start = function () {
95 var that = this
96 this._promise = new Promise(function (resolve, reject) {
97 that._callbacks = {
98 reject: reject,
99 resolve: resolve
100 }
101 that._proceed()
102 })
103 return this._promise
104 }
105
106 PromisePool.prototype.addEventListener = function (type, listener) {
107 this._listeners[type] = this._listeners[type] || []
108 if (this._listeners[type].indexOf(listener) < 0) {
109 this._listeners[type].push(listener)
110 }
111 }
112
113 PromisePool.prototype.removeEventListener = function (type, listener) {
114 if (this._listeners[type]) {
115 var p = this._listeners[type].indexOf(listener)
116 if (p >= 0) {
117 this._listeners[type].splice(p, 1)
118 }
119 }
120 }
121
122 PromisePool.prototype._fireEvent = function (type, data) {
123 if (this._listeners[type] && this._listeners[type].length) {
124 var evt = new PromisePoolEvent(this, type, data)
125 var listeners = this._listeners[type].slice()
126 for (var i = 0, l = listeners.length; i < l; ++i) {
127 listeners[i].call(this, evt)
128 }
129 }
130 }
131
132 PromisePool.prototype._settle = function (error) {
133 if (error) {
134 this._callbacks.reject(error)
135 } else {
136 this._callbacks.resolve()
137 }
138 this._promise = null
139 this._callbacks = null
140 }
141
142 PromisePool.prototype._onPooledPromiseFulfilled = function (promise, result) {
143 this._size--
144 if (this.active()) {
145 this._fireEvent('fulfilled', {
146 promise: promise,
147 result: result
148 })
149 this._proceed()
150 }
151 }
152
153 PromisePool.prototype._onPooledPromiseRejected = function (promise, error) {
154 this._size--
155 if (this.active()) {
156 this._fireEvent('rejected', {
157 promise: promise,
158 error: error
159 })
160 this._settle(error || new Error('Unknown error'))
161 }
162 }
163
164 PromisePool.prototype._trackPromise = function (promise) {
165 var that = this
166 promise.then(function (result) {
167 that._onPooledPromiseFulfilled(promise, result)
168 }, function (error) {
169 that._onPooledPromiseRejected(promise, error)
170 })
171 ['catch'](function (err) {
172 that._settle(new Error('Promise processing failed: ' + err))
173 })
174 }
175
176 PromisePool.prototype._proceed = function () {
177 if (!this._producerDone) {
178 var promise
179 while (this._size < this._concurrency &&
180 !!(promise = this._producer.call(this))) {
181 this._size++
182 this._trackPromise(promise)
183 }
184 if (!promise) {
185 this._producerDone = true
186 }
187 }
188 if (this._producerDone && this._size === 0) {
189 this._settle()
190 }
191 }
192
193 var modernizeOption = function (options, listeners, optKey, eventType, eventKey) {
194 if (options[optKey]) {
195 var cb = options[optKey]
196 listeners[eventType] = function (evt) {
197 cb(evt.target._promise, evt.data.promise, evt.data[eventKey])
198 }
199 }
200 }
201
202 var modernizeOptions = function (options) {
203 var listeners = {}
204 modernizeOption(options, listeners, 'onresolve', 'fulfilled', 'result')
205 modernizeOption(options, listeners, 'onreject', 'rejected', 'error')
206 return listeners
207 }
208
209 var createPool = function (source, concurrency, options) {
210 // Legacy API: options.onresolve and options.onreject
211 var listeners = options ? modernizeOptions(options) : null
212 var pool = new PromisePool(source, concurrency, options)
213 if (listeners) {
214 for (var type in listeners) {
215 pool.addEventListener(type, listeners[type])
216 }
217 }
218 return pool.start()
219 }
220
221 createPool.Promise = Promise
222 createPool.PromisePool = PromisePool
223 createPool.PromisePoolEvent = PromisePoolEvent
224
225 return createPool
226})