import {Operator, Stream} from 'xstream';

class UponStopOperator<T> implements Operator<T, T> {
  public type = 'uponStop';
  public out: Stream<T> = null as any;

  constructor(public op: () => void, public ins: Stream<T>) {}

  public _start(out: Stream<T>): void {
    this.out = out;
    this.ins._add(this);
  }

  public _stop(): void {
    this.op();
    this.ins._remove(this);
    this.out = null as any;
  }

  public _n(t: T) {
    this.out._n(t);
  }

  public _e(err: any) {
    this.out._e(err);
  }

  public _c() {
    this.out._c();
  }
}

/**
 * Delays periodic events by a given time period.
 *
 * Marble diagram:
 *
 * ```text
 * 1----2--3--4----5|
 *     delay(60)
 * ---1----2--3--4----5|
 * ```
 *
 * Example:
 *
 * ```js
 * import fromDiagram from 'xstream/extra/fromDiagram'
 * import uponStop from './uponStop'
 *
 * let is_complete = false
 * const stream = fromDiagram('1----2--3--4----5|')
 *  .compose(uponStop(() => {
 *    is_complete = true
 *  }))
 *
 * stream.addListener({
 *   next: i => console.log(i),
 *   error: err => console.error(err),
 *   complete: () => console.log('completed')
 * })
 * ```
 *
 * ```text
 * > 1  (after 60 ms)
 * > 2  (after 160 ms)
 * > 3  (after 220 ms)
 * > 4  (after 280 ms)
 * > 5  (after 380 ms)
 * > completed
 * ```
 *
 * @param {Function} op The amount of silence required in milliseconds.
 * @return {Stream}
 */
export default function uponStop<T>(
  op: () => void
): (ins: Stream<T>) => Stream<T> {
  return function uponStopOperator(ins: Stream<T>): Stream<T> {
    return new Stream<T>(new UponStopOperator(op, ins));
  };
}
