// @flow declare type SeedValue = { seed: S, value: V }; declare type TimeValue = { time: number, value: V }; declare type CreateGenerator = (...args: Array) => Generator, any, any>; export type Sink = { event(time: number, value: A): void; // end value parameter is deprecated end(time: number, value?: A): void; error(time: number, err: Error): void; } export type Task = { run(time: number): void; error(time: number, e: Error): void; dispose(): void; } export type ScheduledTask = { task: Task; run(): void; error(err: Error): void; dispose(): void; } export type Scheduler = { now(): number; asap(task: Task): ScheduledTask; delay(delay: number, task: Task): ScheduledTask; periodic(period: number, task: Task): ScheduledTask; schedule(delay: number, period: number, task: Task): ScheduledTask; cancel(task: Task): void; cancelAll(predicate: (task: Task) => boolean): void; } export type Disposable = { dispose(): void | Promise; } export type Source = { run (sink: Sink, scheduler: Scheduler): Disposable; } export type Observable = { subscribe(subscriber: Subscriber): Subscription; } export type Subscriber = { +next?: (value: A) => void; +error?: (err: Error) => void; // complete value parameter is deprecated +complete?: (value?: A) => void; } export type Subscription = { unsubscribe(): void; } declare export class Stream { constructor(source: Source): Stream; source: Source; run (sink: Sink, scheduler: Scheduler): Disposable; reduce(f: (b: B, a: A) => B, b: B): Promise; observe(f: (a: A) => any): Promise; forEach(f: (a: A) => any): Promise; drain(): Promise; subscribe(subscriber: Subscriber): Subscription; constant(b: B): Stream; map(f: (a: A) => B): Stream; tap(f: (a: A) => any): Stream; chain(f: (a: A) => Stream): Stream; flatMap(f: (a: A) => Stream): Stream; ap(fs: Stream<(a: A) => B>): Stream; continueWith(f: (a: any) => Stream): Stream; concatMap(f: (a: A) => Stream): Stream; mergeConcurrently(concurrency: number): Stream; merge(...ss: Array>): Stream; mergeArray(streams: Array>): Stream; combine( fn: (a: A, b: B) => R, b: Stream ): Stream; combine( fn: (a: A, b: B, c: C) => R, b: Stream, c: Stream ): Stream; combine( fn: (a: A, b: B, c: C, d: D) => R, b: Stream, c: Stream, d: Stream ): Stream; combine( fn: (a: A, b: B, c: C, d: D, e: E) => R, b: Stream, c: Stream, d: Stream, e: Stream ): Stream; combineArray( fn: (a: A, b: B) => R, streams: [Stream] ): Stream; combineArray( fn: (a: A, b: B, c: C) => R, streams: [Stream, Stream] ): Stream; combineArray( fn: (a: A, b: B, c: C, d: D) => R, streams: [Stream, Stream, Stream] ): Stream; combineArray( fn: (a: A, b: B, c: C, d: D, e: E) => R, streams: [Stream, Stream, Stream, Stream] ): Stream; combineArray( fn: (a: A, ...rest: V[]) => R, streams: Stream[] ): Stream; scan(f: (b: B, a: A) => B, b: B): Stream; loop(f: (seed: S, a: A) => SeedValue, seed: S): Stream; concat(s2: Stream): Stream; startWith(a: A): Stream; filter(p: (a: A) => boolean): Stream; skipRepeats(): Stream; skipRepeatsWith(eq: (a1: A, a2: A) => boolean): Stream; take(n: number): Stream; skip(n: number): Stream; takeWhile(p: (a: A) => boolean): Stream; skipWhile(p: (a: A) => boolean): Stream; skipAfter(p: (a: A) => boolean): Stream; slice(start: number, end: number): Stream; until(signal: Stream): Stream; takeUntil(signal: Stream): Stream; since(signal: Stream): Stream; skipUntil(signal: Stream): Stream; during(timeWindow: Stream>): Stream; throttle(period: number): Stream; debounce(period: number): Stream; timestamp(): Stream>; delay(dt: number): Stream; await(this: Stream>): Stream; awaitPromises(this: Stream>): Stream; sample( fn: (b: B, c: C) => R, b: Stream, c: Stream ): Stream; sample( fn: (b: B, c: C, d: D) => R, b: Stream, c: Stream, d: Stream ): Stream; sample( fn: (b: B, c: C, d: D, e: E) => R, b: Stream, c: Stream, d: Stream, e: Stream ): Stream; sampleWith(sampler: Stream): Stream; zip( fn: (a: A, b: B) => R, b: Stream ): Stream; zip( fn: (a: A, b: B, c: C) => R, b: Stream, c: Stream ): Stream; zip( fn: (a: A, b: B, c: C, d: D) => R, b: Stream, c: Stream, d: Stream ): Stream; zip( fn: (a: A, b: B, c: C, d: D, e: E) => R, b: Stream, c: Stream, d: Stream, e: Stream ): Stream; recoverWith(p: (a: B) => Stream): Stream; multicast(): Stream; thru(transform: (stream: Stream) => B): B; } declare export function just(a: A): Stream; declare export function of(a: A): Stream; declare export function empty(): Stream; declare export function never(): Stream; declare export function from(as: A[] | Iterable | Observable): Stream; declare export function periodic(period: number, a?: A): Stream; declare export function fromEvent(event: string, target: any, useCapture?: boolean): Stream; declare export function fromEvent(event: string, target: any): Stream; declare export function unfold(f: (seed: S) => SeedValue>, seed: S): Stream; declare export function iterate(f: (a: A) => A|Promise, a: A): Stream; declare export function generate(g: CreateGenerator, ...args: Array): Stream; declare export function reduce(f: (b: B, a: A) => B, b: B, s: Stream): Promise; declare export function observe(f: (a: A) => any, s: Stream): Promise; declare export function forEach(f: (a: A) => any, s: Stream): Promise; declare export function drain(s: Stream): Promise; declare export function subscribe(subscriber: Subscriber, s: Stream): Subscription; declare export function constant(b: B, s: Stream): Stream; declare export function map(f: (a: A) => B, s: Stream): Stream; declare export function tap(f: (a: A) => any, s: Stream): Stream; declare export function ap(fs: Stream<(a: A) => B>, as: Stream ): Stream; declare export function chain(f: (a: A) => Stream, s: Stream): Stream; declare export function flatMap(f: (a: A) => Stream, s: Stream): Stream; declare export function join(s: Stream>): Stream; declare export function switchLatest(s: Stream>): Stream; declare export function continueWith(f: (a: any) => Stream, s: Stream): Stream; declare export function concatMap(f: (a: A) => Stream, s: Stream): Stream; declare export function mergeConcurrently(concurrency: number, s: Stream>): Stream; declare export function merge(...ss: Array>): Stream; declare export function mergeArray(streams: Array>): Stream; declare export function combine( fn: (a: A, b: B) => R, a: Stream, b: Stream ): Stream; declare export function combine( fn: (a: A, b: B, c: C) => R, a: Stream, b: Stream, c: Stream ): Stream; declare export function combine( fn: (a: A, b: B, c: C, d: D) => R, a: Stream, b: Stream, c: Stream, d: Stream ): Stream; declare export function combine( fn: (a: A, b: B, c: C, d: D, e: E) => R, a: Stream, b: Stream, c: Stream, d: Stream, e: Stream ): Stream; declare export function combineArray( fn: (a: A, b: B) => R, streams: [Stream, Stream] ): Stream; declare export function combineArray( fn: (a: A, b: B, c: C) => R, streams: [Stream, Stream, Stream] ): Stream; declare export function combineArray( fn: (a: A, b: B, c: C, d: D) => R, streams: [Stream, Stream, Stream, Stream] ): Stream; declare export function combineArray( fn: (a: A, b: B, c: C, d: D, e: E) => R, streams: [Stream, Stream, Stream, Stream, Stream] ): Stream; declare export function combineArray ( fn: (...items: V[]) => R, items: Stream[] ): Stream; declare export function scan(f: (b: B, a: A) => B, b: B, s: Stream): Stream; declare export function loop(f: (seed: S, a: A) => SeedValue, seed: S, s: Stream): Stream; declare export function concat(s1: Stream, s2: Stream): Stream; declare export function startWith(a: A, s: Stream): Stream; declare export function filter(p: (a: A) => boolean, s: Stream): Stream; declare export function skipRepeats(s: Stream): Stream; declare export function skipRepeatsWith(eq: (a1: A, a2: A) => boolean, s: Stream): Stream; declare export function take(n: number, s: Stream): Stream; declare export function skip(n: number, s: Stream): Stream; declare export function takeWhile(p: (a: A) => boolean, s: Stream): Stream; declare export function skipWhile(p: (a: A) => boolean, s: Stream): Stream; declare export function skipAfter(p: (a: A) => boolean, s: Stream): Stream; declare export function slice(start: number, end: number, s: Stream): Stream; declare export function until(signal: Stream, s: Stream): Stream; declare export function takeUntil(signal: Stream, s: Stream): Stream; declare export function since(signal: Stream, s: Stream): Stream; declare export function skipUntil(signal: Stream, s: Stream): Stream; declare export function during(timeWindow: Stream>, s: Stream): Stream; declare export function throttle(period: number, s: Stream): Stream; declare export function debounce(period: number, s: Stream): Stream; declare export function timestamp(s: Stream): Stream>; declare export function delay(dt: number, s: Stream): Stream; declare export function fromPromise(p: Promise): Stream; declare export function await(s: Stream>): Stream; declare export function awaitPromises(s: Stream>): Stream; declare export function sample( fn: (a: A, b: B) => R, sampler: Stream, a: Stream, b: Stream ): Stream; declare export function sample( fn: (a: A, b: B, c: C) => R, sampler: Stream, a: Stream, b: Stream, c: Stream ): Stream; declare export function sample( fn: (a: A, b: B, c: C, d: D) => R, sampler: Stream, a: Stream, b: Stream, c: Stream, d: Stream ): Stream; declare export function sample( fn: (a: A, b: B, c: C, d: D, e: E) => R, sampler: Stream, a: Stream, b: Stream, c: Stream, d: Stream, e: Stream ): Stream; declare export function sampleWith(sampler: Stream, s: Stream): Stream; declare export function zip( fn: (a: A, b: B) => R, a: Stream, b: Stream ): Stream; declare export function zip( fn: (a: A, b: B, c: C) => R, a: Stream, b: Stream, c: Stream ): Stream; declare export function zip( fn: (a: A, b: B, c: C, d: D) => R, a: Stream, b: Stream, c: Stream, d: Stream ): Stream; declare export function zip( fn: (a: A, b: B, c: C, d: D, e: E) => R, a: Stream, b: Stream, c: Stream, d: Stream, e: Stream ): Stream; declare export function recoverWith(p: (a: B) => Stream, s: Stream): Stream; declare export function throwError(e: Error): Stream; declare export function multicast(s: Stream): Stream; declare export var defaultScheduler: Scheduler; declare export class PropagateTask { static event (value: T, sink: Sink): PropagateTask; static error (error: Error, sink: Sink): PropagateTask; // end value parameter is deprecated static end (value: T, sink: Sink): PropagateTask; constructor (run: (time: number, value: T, sink: Sink) => any, value: T, sink: Sink): PropagateTask; run(time: number): void; error(time: number, e: Error): void; dispose(): void; }