import { from, Observable, ObservableInput, OperatorFunction, Subscriber, Subscription, TeardownLogic } from 'rxjs'
import { concatMap as rxConcatMap, mergeMap as rxMergeMap, switchMap as rxSwitchMap } from 'rxjs/operators'

const createAbortError = () => {
    const error = new Error('Aborted')
    error.name = 'AbortError'
    return error
}

/**
 * Creates an Observable just like RxJS `create`, but exposes an AbortSignal in addition to the subscriber
 */
export const create = <T>(
    subscribe?: (subscriber: Subscriber<T>, signal: AbortSignal) => TeardownLogic
): Observable<T> =>
    new Observable<T>(subscriber => {
        const abortController = new AbortController()
        const subscription = new Subscription()
        const teardown = subscribe && subscribe(subscriber, abortController.signal)
        subscription.add(teardown)
        subscription.add(() => abortController.abort())
        return subscription
    })

/**
 * Easiest way to wrap an abortable async function into a Promise.
 * The factory is called every time the Observable is subscribed to, and the AbortSignal is aborted on unsubscription.
 */
export const defer = <T>(factory: (signal: AbortSignal) => ObservableInput<T>): Observable<T> =>
    create((subscriber, signal) => from(factory(signal)).subscribe(subscriber))

/**
 * Returns a Promise that resolves with the last emission of the given Observable,
 * rejects if the Observable errors or rejects with an `AbortError` when the AbortSignal is aborted.
 */
export const toPromise = <T>(observable: Observable<T>, signal?: AbortSignal): Promise<T> =>
    new Promise((resolve, reject) => {
        if (signal && signal.aborted) {
            reject(createAbortError())
            return
        }
        let subscription: Subscription
        const listener = () => {
            subscription.unsubscribe()
            reject(createAbortError())
        }
        const cleanup = () => {
            if (signal) {
                signal.removeEventListener('abort', listener)
            }
        }
        let value: T
        subscription = observable.subscribe(
            val => {
                value = val
            },
            err => {
                cleanup()
                reject(err)
            },
            () => {
                cleanup()
                resolve(value)
            }
        )
        if (signal) {
            signal.addEventListener('abort', listener, { once: true })
        }
    })

/**
 * Calls `next` for every emission and returns a Promise that resolves when the Observable completed, rejects if the
 * Observable errors or rejects with an `AbortError` when the AbortSignal is aborted.
 */
export const forEach = <T>(source: Observable<T>, next: (value: T) => void, signal?: AbortSignal): Promise<void> =>
    new Promise<void>((resolve, reject) => {
        if (signal && signal.aborted) {
            reject(createAbortError())
            return
        }
        let subscription: Subscription
        const listener = () => {
            subscription.unsubscribe()
            reject(createAbortError())
        }
        const cleanup = () => {
            if (signal) {
                signal.removeEventListener('abort', listener)
            }
        }
        subscription = source.subscribe(
            value => {
                try {
                    next(value)
                } catch (err) {
                    reject(err)
                    if (subscription) {
                        subscription.unsubscribe()
                    }
                }
            },
            err => {
                cleanup()
                reject(err)
            },
            () => {
                cleanup()
                resolve()
            }
        )
        if (signal) {
            signal.addEventListener('abort', listener, { once: true })
        }
    })

/**
 * Like RxJS `switchMap`, but passes an AbortSignal that is aborted when the source emits another element.
 */
export const switchMap = <T, R>(
    project: (value: T, index: number, abortSignal: AbortSignal) => ObservableInput<R>
): OperatorFunction<T, R> => source =>
    source.pipe(rxSwitchMap((value, index) => defer(abortSignal => project(value, index, abortSignal))))

/**
 * Like RxJS `concatMap`, but passes an AbortSignal that is aborted when the returned Observable is unsubscribed from.
 */
export const concatMap = <T, R>(
    project: (value: T, index: number, abortSignal: AbortSignal) => ObservableInput<R>
): OperatorFunction<T, R> => source =>
    source.pipe(rxConcatMap((value, index) => defer(abortSignal => project(value, index, abortSignal))))

/**
 * Like RxJS `mergeMap`, but passes an AbortSignal that is aborted when the returned Observable is unsubscribed from.
 */
export const mergeMap = <T, R>(
    project: (value: T, index: number, abortSignal: AbortSignal) => ObservableInput<R>
): OperatorFunction<T, R> => source =>
    source.pipe(rxMergeMap((value, index) => defer(abortSignal => project(value, index, abortSignal))))
