UNPKG

22.4 kBJavaScriptView Raw
1/** @license MIT License (c) copyright 2010-2016 original author or authors */
2/** @author Brian Cavalier */
3/** @author John Hann */
4
5import Stream from './Stream'
6import * as base from '@most/prelude'
7import { of, empty, never } from './source/core'
8import { from } from './source/from'
9import { periodic } from './source/periodic'
10import symbolObservable from 'symbol-observable'
11
12/**
13 * Core stream type
14 * @type {Stream}
15 */
16export { Stream }
17
18// Add of and empty to constructor for fantasy-land compat
19Stream.of = of
20Stream.empty = empty
21// Add from to constructor for ES Observable compat
22Stream.from = from
23export { of, of as just, empty, never, from, periodic }
24
25// -----------------------------------------------------------------------
26// Draft ES Observable proposal interop
27// https://github.com/zenparsing/es-observable
28
29import { subscribe } from './observable/subscribe'
30
31Stream.prototype.subscribe = function (subscriber) {
32 return subscribe(subscriber, this)
33}
34
35Stream.prototype[symbolObservable] = function () {
36 return this
37}
38
39// -----------------------------------------------------------------------
40// Fluent adapter
41
42import { thru } from './combinator/thru'
43
44/**
45 * Adapt a functional stream transform to fluent style.
46 * It applies f to the this stream object
47 * @param {function(s: Stream): Stream} f function that
48 * receives the stream itself and must return a new stream
49 * @return {Stream}
50 */
51Stream.prototype.thru = function (f) {
52 return thru(f, this)
53}
54
55// -----------------------------------------------------------------------
56// Adapting other sources
57
58/**
59 * Create a stream of events from the supplied EventTarget or EventEmitter
60 * @param {String} event event name
61 * @param {EventTarget|EventEmitter} source EventTarget or EventEmitter. The source
62 * must support either addEventListener/removeEventListener (w3c EventTarget:
63 * http://www.w3.org/TR/DOM-Level-2-Events/events.html#Events-EventTarget),
64 * or addListener/removeListener (node EventEmitter: http://nodejs.org/api/events.html)
65 * @returns {Stream} stream of events of the specified type from the source
66 */
67export { fromEvent } from './source/fromEvent'
68
69// -----------------------------------------------------------------------
70// Observing
71
72import { observe, drain } from './combinator/observe'
73
74export { observe, observe as forEach, drain }
75
76/**
77 * Process all the events in the stream
78 * @returns {Promise} promise that fulfills when the stream ends, or rejects
79 * if the stream fails with an unhandled error.
80 */
81Stream.prototype.observe = Stream.prototype.forEach = function (f) {
82 return observe(f, this)
83}
84
85/**
86 * Consume all events in the stream, without providing a function to process each.
87 * This causes a stream to become active and begin emitting events, and is useful
88 * in cases where all processing has been setup upstream via other combinators, and
89 * there is no need to process the terminal events.
90 * @returns {Promise} promise that fulfills when the stream ends, or rejects
91 * if the stream fails with an unhandled error.
92 */
93Stream.prototype.drain = function () {
94 return drain(this)
95}
96
97// -------------------------------------------------------
98
99import { loop } from './combinator/loop'
100
101export { loop }
102
103/**
104 * Generalized feedback loop. Call a stepper function for each event. The stepper
105 * will be called with 2 params: the current seed and the an event value. It must
106 * return a new { seed, value } pair. The `seed` will be fed back into the next
107 * invocation of stepper, and the `value` will be propagated as the event value.
108 * @param {function(seed:*, value:*):{seed:*, value:*}} stepper loop step function
109 * @param {*} seed initial seed value passed to first stepper call
110 * @returns {Stream} new stream whose values are the `value` field of the objects
111 * returned by the stepper
112 */
113Stream.prototype.loop = function (stepper, seed) {
114 return loop(stepper, seed, this)
115}
116
117// -------------------------------------------------------
118
119import { scan, reduce } from './combinator/accumulate'
120
121export { scan, reduce }
122
123/**
124 * Create a stream containing successive reduce results of applying f to
125 * the previous reduce result and the current stream item.
126 * @param {function(result:*, x:*):*} f reducer function
127 * @param {*} initial initial value
128 * @returns {Stream} new stream containing successive reduce results
129 */
130Stream.prototype.scan = function (f, initial) {
131 return scan(f, initial, this)
132}
133
134/**
135 * Reduce the stream to produce a single result. Note that reducing an infinite
136 * stream will return a Promise that never fulfills, but that may reject if an error
137 * occurs.
138 * @param {function(result:*, x:*):*} f reducer function
139 * @param {*} initial optional initial value
140 * @returns {Promise} promise for the file result of the reduce
141 */
142Stream.prototype.reduce = function (f, initial) {
143 return reduce(f, initial, this)
144}
145
146// -----------------------------------------------------------------------
147// Building and extending
148
149export { unfold } from './source/unfold'
150export { iterate } from './source/iterate'
151export { generate } from './source/generate'
152import { concat, cons as startWith } from './combinator/build'
153
154export { concat, startWith }
155
156/**
157 * @param {Stream} tail
158 * @returns {Stream} new stream containing all items in this followed by
159 * all items in tail
160 */
161Stream.prototype.concat = function (tail) {
162 return concat(this, tail)
163}
164
165/**
166 * @param {*} x value to prepend
167 * @returns {Stream} a new stream with x prepended
168 */
169Stream.prototype.startWith = function (x) {
170 return startWith(x, this)
171}
172
173// -----------------------------------------------------------------------
174// Transforming
175
176import { map, constant, tap } from './combinator/transform'
177import { ap } from './combinator/applicative'
178
179export { map, constant, tap, ap }
180
181/**
182 * Transform each value in the stream by applying f to each
183 * @param {function(*):*} f mapping function
184 * @returns {Stream} stream containing items transformed by f
185 */
186Stream.prototype.map = function (f) {
187 return map(f, this)
188}
189
190/**
191 * Assume this stream contains functions, and apply each function to each item
192 * in the provided stream. This generates, in effect, a cross product.
193 * @param {Stream} xs stream of items to which
194 * @returns {Stream} stream containing the cross product of items
195 */
196Stream.prototype.ap = function (xs) {
197 return ap(this, xs)
198}
199
200/**
201 * Replace each value in the stream with x
202 * @param {*} x
203 * @returns {Stream} stream containing items replaced with x
204 */
205Stream.prototype.constant = function (x) {
206 return constant(x, this)
207}
208
209/**
210 * Perform a side effect for each item in the stream
211 * @param {function(x:*):*} f side effect to execute for each item. The
212 * return value will be discarded.
213 * @returns {Stream} new stream containing the same items as this stream
214 */
215Stream.prototype.tap = function (f) {
216 return tap(f, this)
217}
218
219// -----------------------------------------------------------------------
220// Transducer support
221
222import { transduce } from './combinator/transduce'
223
224export { transduce }
225
226/**
227 * Transform this stream by passing its events through a transducer.
228 * @param {function} transducer transducer function
229 * @return {Stream} stream of events transformed by the transducer
230 */
231Stream.prototype.transduce = function (transducer) {
232 return transduce(transducer, this)
233}
234
235// -----------------------------------------------------------------------
236// FlatMapping
237
238import { flatMap, join } from './combinator/flatMap'
239
240// @deprecated flatMap, use chain instead
241export { flatMap, flatMap as chain, join }
242
243/**
244 * Map each value in the stream to a new stream, and merge it into the
245 * returned outer stream. Event arrival times are preserved.
246 * @param {function(x:*):Stream} f chaining function, must return a Stream
247 * @returns {Stream} new stream containing all events from each stream returned by f
248 */
249Stream.prototype.chain = function (f) {
250 return flatMap(f, this)
251}
252
253// @deprecated use chain instead
254Stream.prototype.flatMap = Stream.prototype.chain
255
256 /**
257 * Monadic join. Flatten a Stream<Stream<X>> to Stream<X> by merging inner
258 * streams to the outer. Event arrival times are preserved.
259 * @returns {Stream<X>} new stream containing all events of all inner streams
260 */
261Stream.prototype.join = function () {
262 return join(this)
263}
264
265import { continueWith } from './combinator/continueWith'
266
267// @deprecated flatMapEnd, use continueWith instead
268export { continueWith, continueWith as flatMapEnd }
269
270/**
271 * Map the end event to a new stream, and begin emitting its values.
272 * @param {function(x:*):Stream} f function that receives the end event value,
273 * and *must* return a new Stream to continue with.
274 * @returns {Stream} new stream that emits all events from the original stream,
275 * followed by all events from the stream returned by f.
276 */
277Stream.prototype.continueWith = function (f) {
278 return continueWith(f, this)
279}
280
281// @deprecated use continueWith instead
282Stream.prototype.flatMapEnd = Stream.prototype.continueWith
283
284import { concatMap } from './combinator/concatMap'
285
286export { concatMap }
287
288Stream.prototype.concatMap = function (f) {
289 return concatMap(f, this)
290}
291
292// -----------------------------------------------------------------------
293// Concurrent merging
294
295import { mergeConcurrently } from './combinator/mergeConcurrently'
296
297export { mergeConcurrently }
298
299/**
300 * Flatten a Stream<Stream<X>> to Stream<X> by merging inner
301 * streams to the outer, limiting the number of inner streams that may
302 * be active concurrently.
303 * @param {number} concurrency at most this many inner streams will be
304 * allowed to be active concurrently.
305 * @return {Stream<X>} new stream containing all events of all inner
306 * streams, with limited concurrency.
307 */
308Stream.prototype.mergeConcurrently = function (concurrency) {
309 return mergeConcurrently(concurrency, this)
310}
311
312// -----------------------------------------------------------------------
313// Merging
314
315import { merge, mergeArray } from './combinator/merge'
316
317export { merge, mergeArray }
318
319/**
320 * Merge this stream and all the provided streams
321 * @returns {Stream} stream containing items from this stream and s in time
322 * order. If two events are simultaneous they will be merged in
323 * arbitrary order.
324 */
325Stream.prototype.merge = function (/* ...streams*/) {
326 return mergeArray(base.cons(this, arguments))
327}
328
329// -----------------------------------------------------------------------
330// Combining
331
332import { combine, combineArray } from './combinator/combine'
333
334export { combine, combineArray }
335
336/**
337 * Combine latest events from all input streams
338 * @param {function(...events):*} f function to combine most recent events
339 * @returns {Stream} stream containing the result of applying f to the most recent
340 * event of each input stream, whenever a new event arrives on any stream.
341 */
342Stream.prototype.combine = function (f /*, ...streams*/) {
343 return combineArray(f, base.replace(this, 0, arguments))
344}
345
346// -----------------------------------------------------------------------
347// Sampling
348
349import { sample, sampleArray, sampleWith } from './combinator/sample'
350
351export { sample, sampleArray, sampleWith }
352
353/**
354 * When an event arrives on sampler, emit the latest event value from stream.
355 * @param {Stream} sampler stream of events at whose arrival time
356 * signal's latest value will be propagated
357 * @returns {Stream} sampled stream of values
358 */
359Stream.prototype.sampleWith = function (sampler) {
360 return sampleWith(sampler, this)
361}
362
363/**
364 * When an event arrives on this stream, emit the result of calling f with the latest
365 * values of all streams being sampled
366 * @param {function(...values):*} f function to apply to each set of sampled values
367 * @returns {Stream} stream of sampled and transformed values
368 */
369Stream.prototype.sample = function (f /* ...streams */) {
370 return sampleArray(f, this, base.tail(arguments))
371}
372
373// -----------------------------------------------------------------------
374// Zipping
375
376import { zip, zipArray } from './combinator/zip'
377
378export { zip, zipArray }
379
380/**
381 * Pair-wise combine items with those in s. Given 2 streams:
382 * [1,2,3] zipWith f [4,5,6] -> [f(1,4),f(2,5),f(3,6)]
383 * Note: zip causes fast streams to buffer and wait for slow streams.
384 * @param {function(a:Stream, b:Stream, ...):*} f function to combine items
385 * @returns {Stream} new stream containing pairs
386 */
387Stream.prototype.zip = function (f /*, ...streams*/) {
388 return zipArray(f, base.replace(this, 0, arguments))
389}
390
391// -----------------------------------------------------------------------
392// Switching
393
394import { switchLatest } from './combinator/switch'
395
396// @deprecated switch, use switchLatest instead
397export { switchLatest, switchLatest as switch }
398
399/**
400 * Given a stream of streams, return a new stream that adopts the behavior
401 * of the most recent inner stream.
402 * @returns {Stream} switching stream
403 */
404Stream.prototype.switchLatest = function () {
405 return switchLatest(this)
406}
407
408// @deprecated use switchLatest instead
409Stream.prototype.switch = Stream.prototype.switchLatest
410
411// -----------------------------------------------------------------------
412// Filtering
413
414import { filter, skipRepeats, skipRepeatsWith } from './combinator/filter'
415
416// @deprecated distinct, use skipRepeats instead
417// @deprecated distinctBy, use skipRepeatsWith instead
418export { filter, skipRepeats, skipRepeats as distinct, skipRepeatsWith, skipRepeatsWith as distinctBy }
419
420/**
421 * Retain only items matching a predicate
422 * stream: -12345678-
423 * filter(x => x % 2 === 0, stream): --2-4-6-8-
424 * @param {function(x:*):boolean} p filtering predicate called for each item
425 * @returns {Stream} stream containing only items for which predicate returns truthy
426 */
427Stream.prototype.filter = function (p) {
428 return filter(p, this)
429}
430
431/**
432 * Skip repeated events, using === to compare items
433 * stream: -abbcd-
434 * distinct(stream): -ab-cd-
435 * @returns {Stream} stream with no repeated events
436 */
437Stream.prototype.skipRepeats = function () {
438 return skipRepeats(this)
439}
440
441/**
442 * Skip repeated events, using supplied equals function to compare items
443 * @param {function(a:*, b:*):boolean} equals function to compare items
444 * @returns {Stream} stream with no repeated events
445 */
446Stream.prototype.skipRepeatsWith = function (equals) {
447 return skipRepeatsWith(equals, this)
448}
449
450// -----------------------------------------------------------------------
451// Slicing
452
453import { take, skip, slice, takeWhile, skipWhile, skipAfter } from './combinator/slice'
454
455export { take, skip, slice, takeWhile, skipWhile, skipAfter }
456
457/**
458 * stream: -abcd-
459 * take(2, stream): -ab|
460 * @param {Number} n take up to this many events
461 * @returns {Stream} stream containing at most the first n items from this stream
462 */
463Stream.prototype.take = function (n) {
464 return take(n, this)
465}
466
467/**
468 * stream: -abcd->
469 * skip(2, stream): ---cd->
470 * @param {Number} n skip this many events
471 * @returns {Stream} stream not containing the first n events
472 */
473Stream.prototype.skip = function (n) {
474 return skip(n, this)
475}
476
477/**
478 * Slice a stream by event index. Equivalent to, but more efficient than
479 * stream.take(end).skip(start);
480 * NOTE: Negative start and end are not supported
481 * @param {Number} start skip all events before the start index
482 * @param {Number} end allow all events from the start index to the end index
483 * @returns {Stream} stream containing items where start <= index < end
484 */
485Stream.prototype.slice = function (start, end) {
486 return slice(start, end, this)
487}
488
489/**
490 * stream: -123451234->
491 * takeWhile(x => x < 5, stream): -1234|
492 * @param {function(x:*):boolean} p predicate
493 * @returns {Stream} stream containing items up to, but not including, the
494 * first item for which p returns falsy.
495 */
496Stream.prototype.takeWhile = function (p) {
497 return takeWhile(p, this)
498}
499
500/**
501 * stream: -123451234->
502 * skipWhile(x => x < 5, stream): -----51234->
503 * @param {function(x:*):boolean} p predicate
504 * @returns {Stream} stream containing items following *and including* the
505 * first item for which p returns falsy.
506 */
507Stream.prototype.skipWhile = function (p) {
508 return skipWhile(p, this)
509}
510
511/**
512 * stream: -123456789->
513 * skipAfter(x => x === 5, stream):-12345|
514 * @param {function(x:*):boolean} p predicate
515 * @returns {Stream} stream containing items up to, *and including*, the
516 * first item for which p returns truthy.
517 */
518Stream.prototype.skipAfter = function (p) {
519 return skipAfter(p, this)
520}
521
522// -----------------------------------------------------------------------
523// Time slicing
524
525import { takeUntil, skipUntil, during } from './combinator/timeslice'
526
527// @deprecated takeUntil, use until instead
528// @deprecated skipUntil, use since instead
529export { takeUntil, takeUntil as until, skipUntil, skipUntil as since, during }
530
531/**
532 * stream: -a-b-c-d-e-f-g->
533 * signal: -------x
534 * takeUntil(signal, stream): -a-b-c-|
535 * @param {Stream} signal retain only events in stream before the first
536 * event in signal
537 * @returns {Stream} new stream containing only events that occur before
538 * the first event in signal.
539 */
540Stream.prototype.until = function (signal) {
541 return takeUntil(signal, this)
542}
543
544// @deprecated use until instead
545Stream.prototype.takeUntil = Stream.prototype.until
546
547 /**
548 * stream: -a-b-c-d-e-f-g->
549 * signal: -------x
550 * takeUntil(signal, stream): -------d-e-f-g->
551 * @param {Stream} signal retain only events in stream at or after the first
552 * event in signal
553 * @returns {Stream} new stream containing only events that occur after
554 * the first event in signal.
555 */
556Stream.prototype.since = function (signal) {
557 return skipUntil(signal, this)
558}
559
560// @deprecated use since instead
561Stream.prototype.skipUntil = Stream.prototype.since
562
563 /**
564 * stream: -a-b-c-d-e-f-g->
565 * timeWindow: -----s
566 * s: -----t
567 * stream.during(timeWindow): -----c-d-e-|
568 * @param {Stream<Stream>} timeWindow a stream whose first event (s) represents
569 * the window start time. That event (s) is itself a stream whose first event (t)
570 * represents the window end time
571 * @returns {Stream} new stream containing only events within the provided timespan
572 */
573Stream.prototype.during = function (timeWindow) {
574 return during(timeWindow, this)
575}
576
577// -----------------------------------------------------------------------
578// Delaying
579
580import { delay } from './combinator/delay'
581
582export { delay }
583
584/**
585 * @param {Number} delayTime milliseconds to delay each item
586 * @returns {Stream} new stream containing the same items, but delayed by ms
587 */
588Stream.prototype.delay = function (delayTime) {
589 return delay(delayTime, this)
590}
591
592// -----------------------------------------------------------------------
593// Getting event timestamp
594
595import { timestamp } from './combinator/timestamp'
596export { timestamp }
597
598/**
599 * Expose event timestamps into the stream. Turns a Stream<X> into
600 * Stream<{time:t, value:X}>
601 * @returns {Stream<{time:number, value:*}>}
602 */
603Stream.prototype.timestamp = function () {
604 return timestamp(this)
605}
606
607// -----------------------------------------------------------------------
608// Rate limiting
609
610import { throttle, debounce } from './combinator/limit'
611
612export { throttle, debounce }
613
614/**
615 * Limit the rate of events
616 * stream: abcd----abcd----
617 * throttle(2, stream): a-c-----a-c-----
618 * @param {Number} period time to suppress events
619 * @returns {Stream} new stream that skips events for throttle period
620 */
621Stream.prototype.throttle = function (period) {
622 return throttle(period, this)
623}
624
625/**
626 * Wait for a burst of events to subside and emit only the last event in the burst
627 * stream: abcd----abcd----
628 * debounce(2, stream): -----d-------d--
629 * @param {Number} period events occuring more frequently than this
630 * on the provided scheduler will be suppressed
631 * @returns {Stream} new debounced stream
632 */
633Stream.prototype.debounce = function (period) {
634 return debounce(period, this)
635}
636
637// -----------------------------------------------------------------------
638// Awaiting Promises
639
640import { fromPromise, awaitPromises } from './combinator/promises'
641
642// @deprecated await, use awaitPromises instead
643export { fromPromise, awaitPromises, awaitPromises as await }
644
645/**
646 * Await promises, turning a Stream<Promise<X>> into Stream<X>. Preserves
647 * event order, but timeshifts events based on promise resolution time.
648 * @returns {Stream<X>} stream containing non-promise values
649 */
650Stream.prototype.awaitPromises = function () {
651 return awaitPromises(this)
652}
653
654// @deprecated use awaitPromises instead
655Stream.prototype.await = Stream.prototype.awaitPromises
656
657// -----------------------------------------------------------------------
658// Error handling
659
660import { recoverWith, flatMapError, throwError } from './combinator/errors'
661
662// @deprecated flatMapError, use recoverWith instead
663export { recoverWith, flatMapError, throwError }
664
665/**
666 * If this stream encounters an error, recover and continue with items from stream
667 * returned by f.
668 * stream: -a-b-c-X-
669 * f(X): d-e-f-g-
670 * flatMapError(f, stream): -a-b-c-d-e-f-g-
671 * @param {function(error:*):Stream} f function which returns a new stream
672 * @returns {Stream} new stream which will recover from an error by calling f
673 */
674Stream.prototype.recoverWith = function (f) {
675 return flatMapError(f, this)
676}
677
678// @deprecated use recoverWith instead
679Stream.prototype.flatMapError = Stream.prototype.recoverWith
680
681// -----------------------------------------------------------------------
682// Multicasting
683
684import multicast from '@most/multicast'
685
686export { multicast }
687
688/**
689 * Transform the stream into multicast stream. That means that many subscribers
690 * to the stream will not cause multiple invocations of the internal machinery.
691 * @returns {Stream} new stream which will multicast events to all observers.
692 */
693Stream.prototype.multicast = function () {
694 return multicast(this)
695}
696
697// export the instance of the defaultScheduler for third-party libraries
698import defaultScheduler from './scheduler/defaultScheduler'
699
700export { defaultScheduler }
701
702// export an implementation of Task used internally for third-party libraries
703import PropagateTask from './scheduler/PropagateTask'
704
705export { PropagateTask }