import { forkJoin, Observable, of, ReplaySubject, Subject, Subscription, SubscriptionLike } from 'rxjs'

function unsubscribe(sub: Subscription) {
    if (sub != undefined && !sub.closed) {
        sub.unsubscribe()
    }
}

/**
 * Exposes helper methods to automatically manage disposable resources (such as rxjs Subscriptions) within the scope of their immediate useage.
 */
export class AutoDispose {
    private constructor() {}

    /**
     * Subscribes to an observable for zero or one events, and automatically unsubscribes when an event has completed.
     * @param observable The observable to subscribe to.
     * @param {Function} next a handler for each value emitted by the observable
     * @returns A throw-away Observable which can be subscribed to inline by client code but needn't be unsubscribed from.
     */
    static subscribeSubjectOnce<T>(observable: Observable<T>, next: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription {
        return this.observeSubjectOnce(observable).subscribe(next, error, complete)
    }

    /**
     * Returns an observable which proxies zero or one events raised by a source observable, and automatically unsubscribes from
     * the source observable when the event has completed.
     * @param observable The source observable to subscribe to.
     * @returns A throw-away Observable which can be subscribed to inline by client code but needn't be unsubscribed from.
     */
    static observeSubjectOnce<T>(observable: Observable<T>): Observable<T> {
        return AutoDispose.observeOnce(new Subject<T>(), observable)
    }

    /**
     * Subscribes to an observable for zero or one events, and automatically unsubscribes when an event has completed.
     * @param observable The observable to subscribe to.
     * @param {Function} next a handler for each value emitted by the observable
     * @returns A throw-away Observable which can be subscribed to inline by client code but needn't be unsubscribed from.
     */
    static subscribeReplayOnce<T>(observable: Observable<T>, next: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription {
        return this.observeReplayOnce(observable).subscribe(next, error, complete)
    }

    /**
     * Returns an observable which proxies zero or one events raised by a source observable, and automatically unsubscribes from
     * the source observable when the event has completed.
     * @param observable The source observable to subscribe to.
     * @returns A throw-away Observable which can be subscribed to inline by client code but needn't be unsubscribed from.
     */
    static observeReplayOnce<T>(observable: Observable<T>): Observable<T> {
        return AutoDispose.observeOnce(new ReplaySubject<T>(1), observable)
    }

    /**
     * Returns an observable that emits and completes when an array of Observables have all emitted and completed.
     * @param items
     */
    static all(items: Observable<any>[]): Observable<any> {
        if (items == undefined || items.length === 0) return of(Promise.resolve())
        let result = new ReplaySubject(1)

        forkJoin(items).subscribe(() => {
            result.next(undefined)
            result.complete()
        })

        return result
    }

    /**
     * Returns a Promise that completes when an array of Observables have all emitted and completed.
     * @param items
     */
    static allAsPromise(items: Observable<any>[]): Promise<any> {
        return AutoDispose.all(items).toPromise()
    }

    private static observeOnce<T>(result: Subject<T>, observable: Observable<T>): Observable<T> {
        let sub = observable.subscribe(
            value => {
                unsubscribe(sub)
                result.next(value)
            },
            e => {
                unsubscribe(sub)
                result.error(e)
            },
            () => {
                unsubscribe(sub)
                result.complete()
            }
        )
        return result.asObservable()
    }
}

/**
 * Manages disposable resources such as rxjs Subscriptions.
 */
export class Disposables {
    /** A convenience reference to the static AutoDispose type. */
    static readonly global = AutoDispose

    private _subscriptions: SubscriptionLike[]

    /**
     * Registers a subscription for automatic disposal when dispose() has been invoked.
     * @param {Function} next a handler for each value emitted by the observable
     */
    addSubscription(subscription: SubscriptionLike) {
        if (!this._subscriptions) this._subscriptions = []
        this._subscriptions.push(subscription)
    }

    /**
     * Subscribes to an observable and automatically unsubscribes from it when dispose() has been invoked.
     * @param {Function} next a handler for each value emitted by the observable
     */
    subscribeTo<T>(observable: Observable<T>, next: (value: T) => void, error?: (error: any) => void, complete?: () => void) {
        this.addSubscription(observable.subscribe(next, error, complete))
    }

    /**
     * Subscribes to an observable for zero or one events, and automatically unsubscribes when an event has completed or
     * dispose() has been invoked, whichever occurs first.
     * @param observable The observable to subscribe to.
     * @param {Function} next a handler for each value emitted by the observable
     * @returns A throw-away Observable which can be subscribed to inline by client code but needn't be unsubscribed from.
     */
    subscribeOnce<T>(observable: Observable<T>, next: (value: T) => void, error?: (error: any) => void, complete?: () => void) {
        let result = this.observeOnce(observable)
        result.subscribe(next, error, complete)
        return result
    }

    /**
     * Returns an observable which proxies zero or one events raised by a source observable, and automatically unsubscribes from
     * the source observable when the event has completed or dispose() has been invoked, whichever occurs first.
     * @param observable The source observable to subscribe to.
     * @returns A throw-away Observable which can be subscribed to inline by client code but needn't be unsubscribed from.
     */
    observeOnce<T>(observable: Observable<T>): Observable<T> {
        let result = new ReplaySubject<T>(1)
        let sub = observable.subscribe(
            value => {
                removeSub(this._subscriptions, sub)
                sub = undefined
                result.next(value)
                result.complete()
            },
            e => {
                removeSub(this._subscriptions, sub)
                sub = undefined
                result.error(e)
                result.complete()
            },
            () => {
                removeSub(this._subscriptions, sub)
                result.complete()
            }
        )
        if (sub != undefined && !sub.closed) this.addSubscription(sub)
        return result.asObservable()
    }

    /**
     * Releases all resources maintained by this instance.
     */
    dispose() {
        if (!this._subscriptions) return
        this._subscriptions.forEach(s => {
            s.unsubscribe()
        })
        this._subscriptions = undefined
    }
}

function removeSub(array: SubscriptionLike[], sub: SubscriptionLike) {
    if (sub == undefined) return
    if (!sub.closed) sub.unsubscribe()
    if (array == undefined) return
    let idx = array.indexOf(sub)
    if (idx >= 0) array.splice(idx, 1)
}
