/* @flow */ import type EventEmitter from "events"; export type Event = {type: 'value', value: V} | {type: 'error', value: E} | {type: 'end', value: void}; export type Emitter = { value(value: V): boolean; event(event: Event): boolean; error(e: E): boolean; end(): void; // Deprecated methods emit(value: V): boolean; emitEvent(event: Event): boolean; }; export type Subscription = { closed: boolean; unsubscribe(): void; }; export type Observer = { +value?: (value: V) => void; +error?: (err: E) => void; +end?: () => void; }; interface ESObserver<-V,-E> { start?: Function, next?: (value: V) => any, error?: (error: E) => any, complete?: () => any, } /* * This is the interface described by https://github.com/tc39/proposal-observable * It provides interoperability between stream implementations, including Kefir, * RxJS, and zen-observable. */ type ESObservable<+V,+E=*> = { subscribe(callbacks: ESObserver): { unsubscribe: () => void }; }; declare class Observable<+V,+E=*> { toProperty($?: empty): Property; toProperty(getCurrent: () => V2): Property; changes(): Observable; observe(obs: Observer, $?: empty): Subscription; observe(onValue?: ?(v: V) => void, onError?: ?(err: E) => void, onEnd?: ?() => void): Subscription; onValue(cb: (v: V) => void): this; offValue(cb: (v: V) => void): this; onError(cb: (err: E) => void): this; offError(cb: (err: E) => void): this; onEnd(cb: () => void): this; offEnd(cb: () => void): this; onAny(cb: (event: Event) => void): this; offAny(cb: (event: Event) => void): this; log(name?: string): this; offLog(name?: string): this; spy(name?: string): this; offSpy(name?: string): this; setName(name: string): this; setName(Observable, name: string): this; toPromise(PromiseConstructor?: Function): Promise; toESObservable(): ESObservable; map(cb: (v: V) => V2): Observable; filter(cb?: typeof Boolean): Observable<$NonMaybeType,E>; filter(cb: (v: V) => any): Observable; take(n: number): Observable; takeWhile(cb?: (v: V) => boolean): Observable; last(): Observable; skip(n: number): Observable; skipWhile(cb?: (v: V) => boolean): Observable; skipDuplicates(comparator?: (a: V, b: V) => boolean): Observable; diff($?: empty): Observable<[V, V],E>; diff(fn: (prev: V, next: V) => V2, $?: empty): Observable; diff(fn: (prev: V | V3, next: V) => V2, seed: V3): Observable; diff(fn: null | void, seed: V3): Observable<[V | V3, V],E>; scan(cb: (prev: V, next: V) => V, $?: empty): Observable; scan(cb: (prev: V2, next: V) => V2, seed: V2): Observable; flatten($?: empty): Observable; flatten(transformer: (value: V) => V2[]): Observable; delay(n: number): Observable; throttle(n: number, options?: {leading?: boolean, trailing?: boolean}): Observable; debounce(n: number, options?: {immediate?: boolean}): Observable; mapErrors(fn: (error: E) => E2): Observable; filterErrors(fn?: typeof Boolean): Observable>; filterErrors(fn: (error: E) => any): Observable; takeErrors(n: number): Observable; ignoreValues(): Observable; ignoreErrors(): Observable; ignoreEnd(): Observable; beforeEnd(fn: () => V2): Observable; slidingWindow(max: number, min?: number): Observable; bufferWhile(predicate?: (value: V) => boolean, options?: {flushOnEnd?: boolean}): Observable; transduce(transducer: any): Observable; withHandler(handler: (emitter: Emitter, event: Event) => void): Observable; combine(otherObs: Observable, $?: empty): Observable<[V,V2],E|E2>; combine(otherObs: Observable, combinator: (v: V, v2: V2) => V3): Observable; combine( otherObs: Observable, passiveObss: Observable[], $?: empty ): Observable,E|E2|E3>; combine( otherObs: Observable, passiveObss: Observable[], combinator: (v: V, v2: V2, ...v3: V3[]) => V4 ): Observable; zip(otherObs: Observable, $?: empty): Observable<[V,V2],E|E2>; zip(otherObs: Observable, combinator: (v: V, v2: V2) => V3): Observable; merge(otherObs: Observable): Observable; concat(otherObs: Observable): Observable; flatMap(transform: (value: V) => Observable): Observable; flatMapLatest(transform: (value: V) => Observable): Observable; flatMapFirst(transform: (value: V) => Observable): Observable; flatMapConcat(transform: (value: V) => Observable): Observable; flatMapConcurLimit(transform: (value: V) => Observable, limit: number): Observable; flatMapErrors(transform: (error: E) => Observable): Observable; filterBy(otherObs: Observable): Observable; skipUntilBy(otherObs: Observable): Observable; takeUntilBy(otherObs: Observable): Observable; bufferBy(otherObs: Observable, options?: {flushOnEnd?: boolean}): Observable; bufferWhileBy(otherObs: Observable, options?: {flushOnEnd?: boolean, flushOnChange?: boolean}): Observable; sampledBy(otherObs: Observable, $?: empty): Observable; sampledBy(otherObs: Observable, combinator: (obsValue: V, otherObsValue: V2) => V3): Observable; bufferWithTimeOrCount(time: number, count: number, options?: {flushOnEnd: boolean}): Observable; } declare class Pool extends Observable { plug(s: Observable): () => void; } declare class Stream<+V,+E=*> extends Observable { } declare class Property<+V,+E=*> extends Observable { } // `$observedValuesTuple` is a helper type that describes a type for a tuple or // array of values in relation to a tuple or array of observables. This mapping // states that the type of each position in the new tuple type matches the type // of the value type parameter of the observable in the same position in the // input tuple. type $observedValuesTuple = $TupleMap(obs: Observable) => V> // `$observedValuesObject` is similar to `$observedValuesTuple` - except that // `$observedValuesObject` describes an object type where the type of each // property in the new object matches the value type parameter of the observable // type under the same key in the input object. type $observedValuesObject = $ObjMap(obs: Observable) => V> declare var Kefir: { Observable: typeof Observable; Pool: typeof Pool; Stream: typeof Stream; Property: typeof Property; staticLand: { Observable: { ap(obsF: Observable<(x: A) => B, E1>, obsV: Observable): Observable; bimap(fnE: (x: E1) => E2, fnV: (x: V1) => V2, obs: Observable): Observable; chain(cb: (value: V) => Observable, s: Observable): Observable; concat(obs1: Observable, obs2: Observable): Observable; empty(): Observable<*,*>; map(cb: (value: V) => V2, s: Observable): Observable; of(value: V): Observable; }; }; never(): Observable; later(delay: number, value: V): Observable; interval(interval: number, value: V): Observable; sequentially(interval: number, values: V[]): Observable; fromPoll(interval: number, f: () => V): Observable; withInterval(interval: number, f: (emitter: Emitter) => void): Observable; fromCallback(f: (cb: (value: V) => void) => void): Observable; fromNodeCallback(f: (cb: (err: E, value: ?V) => void) => void): Observable; fromEvents( target: EventTarget | EventEmitter, // `EventTarget` comes from Flow's built-in dom types eventName: string, transformer?: (...args: any[]) => V ): Observable; stream(subscribe: (emitter: Emitter) => ?() => void): Observable; constant(value: V): Property; constantError(err: E): Property; fromPromise(promise: Promise): Property; fromESObservable(observable: ESObservable): Observable; combine: // array of observables & ([]>( obss: Obss, $?: empty ) => Observable<$observedValuesTuple, E>) // array of observables, combinator & ((obss: Observable[], combinator: (...args: any[]) => V) => Observable) // array of observables, array of passive observables & ((obss: Observable[], passiveObss: Observable[], $?: empty) => Observable<(V|V2)[],E|E2>) // array of observables, array of passive observables, combinator & ((obss: Observable[], passiveObss: Observable[], combinator: (...args: any[]) => V) => Observable) // object of observables & ( }>( obss: Obss, $?: empty ) => Observable<$observedValuesObject, E>) // object of observables, combinator & ( }>( obss: Obss, combinator: (vs: $observedValuesObject) => V ) => Observable) // object of observables, object of passive observables & ( }, PassiveObss: { [key: string]: Observable }>( obss: Obss, passiveObss: PassiveObss, $?: empty ) => Observable< $observedValuesObject & $observedValuesObject, E|E2>) // object of observables, object of passive observables, combinator & ( }, PassiveObss: { [key: string]: Observable }>( obss: Obss, passiveObss: PassiveObss, combinator: (vs: $observedValuesObject & $observedValuesObject) => V, ) => Observable); zip: & ([]>( obss: Obss, $?: empty ) => Observable<$observedValuesTuple, E>) & ((obss: Observable[], combinator: (...args: any[]) => V2) => Observable); merge(obss: Observable[]): Observable; concat(obss: Observable[]): Observable; pool(): Pool<*,*>; repeat(fn: (i: number) => ?Observable): Observable; }; module.exports = Kefir;