'use strict';

import * as convert from '../../../../helpers/convert';
import * as helpers from '../../../../helpers/helpers';
import * as random from '../../../../helpers/random';
import * as format from '../../../../helpers/format';
import AsyncProcess from './asyncProcess';
import _ from 'lodash';
import ControlSignal from '../controlSignal';
import {Dict} from '../../../../types/util';
import UsageCounter from '../../tools/usageCounter';
import LoggerManager from '../../../../logger';
import EventEmitter from '../../../../tools/eventEmitter';
import TimeoutError from '../../../../clients/timeoutError';

/**
 * Starts, stops, restarts and provides long-running background processes (running functions within node process)
 * resolving race conditions and including process failovers until process canceled
 */
class AsyncProcessPool<
  Provider /* extends AsyncProcessPool.ProcessProvider<AsyncProcess> */,
  Process extends AsyncProcess
> {

  protected _logger = LoggerManager.getLogger('AsyncProcessPool');
  private _processProvider: Provider;
  private _scheduledProcesses: Dict<ExpectedProcess<Provider>> = {};
  private _runtimeProcesses: Dict<RuntimeProcess> = {};
  private _processRuntimes = new Map<AsyncProcess, RuntimeProcess>();
  private _options?: AsyncProcessPool.Options<Process>;
  private _events = new EventEmitter<AsyncProcessPool.Events<Process>>();
  private _label: string;
  private _stopped = false;
  private _usages = new UsageCounter();
  private _dependencies: any[];

  /**
   * Constructs instance
   * @param provider process provider
   * @param options additional options
   */
  constructor(provider: Provider, options: AsyncProcessPool.Options<Process>) {
    this._processProvider = provider;
    this._label = options?.label || 'default';
    this._options = options;
    this._dependencies = options.dependencies;
  }

  /**
   * Returns label for logging
   * @returns label
   */
  get label(): string {
    return this._label;
  }

  /**
   * Event emitter related to this pool
   * @returns event emitter
   * @internal
   */
  get events(): EventEmitter<AsyncProcessPool.Events<Process>> {
    return this._events;
  }

  /**
   * Returns whether the scheduler was given command to stop
   * @returns whether stopped
   */
  get stopped(): boolean {
    return this._stopped;
  }

  /**
   * Schedules a process. After the process has scheduled, it is immediately available in synchronous way. But it may
   * not be started fully yet (its `start` method not completed yet), or currently running process may be an old one,
   * which is going to stop. The latest actual and fully started process can be awaited with `waitProcess` method
   * @param id process ID. If a process with same ID is already scheduled, the call will be ignored
   * @param options process options
   * @throws if the pool is stopped and the `throwIfStopped` option is enabled
   */
  scheduleProcess(id: string, options: AsyncProcessPool.ScheduleProcessOptions<Provider>) {
    if (id.includes(':')) {
      throw new TypeError('Process ID must not contain any colons');
    }
    const usage = options.usage || 'default';
    if (this._stopped) {
      if (options.throwIfStopped ?? true) {
        throw new Error('Async pool stopped');
      } else {
        this._logger.debug(`${this._label}: won't scheduled process ${id} because the pool is stopped`);
        return;
      }
    }
    this._logger.debug(`${this._label}: scheduling process ${id} by usage ${usage}`);
    this._usages.acquire(id, usage);
    if (this._scheduledProcesses[id]) {
      this._logger.debug(`${this._label}: won't schedule process ${id} because it is already scheduled`);
      return;
    }
    this._logger.info(`${this._label}: scheduling process ${id}`);
    this._scheduledProcesses[id] = {
      args: options.args,
      failoverThrottleDelay: options.failoverThrottleDelay
    };
    this._run(id);
  }

  /**
   * Restarts a process if it is scheduled, otherwise the call will be ignored. If the process is waiting for throttle
   * delay to failover after error or unexpected stop, this method will force the failover
   * @param process process ID or process itself. If the process instance is specified, it will be restarted only if it
   * is still actual running process and not scheduled to be restarted yet
   */
  restartProcess(process: string | Process) {
    if (typeof process === 'string') {
      if (this._runtimeProcesses[process]) {
        this._logger.info(`${this._label}: restarting process ${process}`);
        this._cancelProcess(this._runtimeProcesses[process]);
      }
    } else {
      let runtime = this._processRuntimes.get(process);
      if (runtime?.process === process) {
        this._logger.info(`${this._label}: restarting process ${runtime.id} instance`);
        this._cancelProcess(runtime);
      }
    }
  }

  /**
   * Cancels a process
   * @param id process ID
   * @param options additional options
   * @returns promise resolving when currently running process stopped
   */
  async cancelProcess(id: string, options?: AsyncProcessPool.CancelProcessOptions) {
    const usage = options?.usage || 'default';
    if (!this._scheduledProcesses[id]) {
      this._logger.debug(`${this._label}: won't cancel process ${id} because it is not scheduled`);
      return;
    }
    if (options?.allUsages) {
      this._logger.debug(`${this._label}: releasing process ${id} by all usages`);
      this._usages.releaseAll(id);
    } else {
      this._logger.debug(`${this._label}: releasing process ${id} by usage ${usage}`);
      this._usages.release(id, usage);
    }
    if (this._usages.isInUse(id)) {
      this._logger.debug(`${this._label}: won't cancel process ${id} yet as it is still in use`);
      return;
    }
    this._logger.info(`${this._label}: canceling process ${id}`);
    delete this._scheduledProcesses[id];
    let runtime = this._runtimeProcesses[id];
    if (runtime && !runtime.cancelPromise.completed) {
      this._cancelProcess(runtime);
      this._events.emit(`canceled:${id}`);
    }
    return runtime?.stopPromise;
  }

  private _cancelProcess(runtime: RuntimeProcess) {
    runtime.context.canceled = true;
    runtime.cancelPromise.resolve();
  }

  /**
   * Returns scheduled process IDs
   * @returns process IDs
   */
  getScheduledIds(): string[] {
    return Object.keys(this._scheduledProcesses);
  }

  /**
   * Returns running at this moment process IDs. It will always include scheduled IDs + some processes that was canceled
   * but may be still running or stopping
   * @returns process IDs
   */
  getRunningIds(): string[] {
    return Object.keys(this._runtimeProcesses);
  }

  /**
   * Returns whether has scheduled process
   * @param id process ID
   * @returns true if scheduled
   */
  hasScheduled(id: string): boolean {
    return !!this._scheduledProcesses[id];
  }

  /**
   * Returns whether has a process, scheduled by specific usage
   * @param id process ID
   * @param usage usage
   * @returns true if scheduled by
   */
  hasScheduledBy(id: string, usage: any): boolean {
    return this._usages.isAcquiredBy(id, usage);
  }

  /**
   * Returns process IDs which are scheduled by specific usage
   * @param usage usage
   * @returns process IDs
   */
  getScheduledBy(usage: any): string[] {
    return this._usages.getAcquiredBy(usage);
  }

  /**
   * Returns process schedulement
   * @param id process ID
   * @returns schedulement or undefined if not scheduled
   */
  getSchedulement(id: string): AsyncProcessPool.Schedulement<Provider> | undefined {
    if (!this._scheduledProcesses[id]) {
      return;
    }
    return {
      options: this._scheduledProcesses[id],
      usages: this._usages.getUsages(id)
    };
  }

  /**
   * Returns current process in its running state which can be not started yet, starting, running, stopping or stopped
   * @param id process ID
   * @returns process in its state or undefined if there is no such process scheduled
   */
  getProcess(id: string): Process | undefined {
    return this._runtimeProcesses[id]?.process as Process;
  }

  /**
   * Waits for running started process instance. If the process was rescheduled/restarted and an old process instance is
   * still running, the method will wait for the new process instance to start only. Note, that a process can become
   * stopped or canceled till the method resolves after `await` in the calling code. So this method only increases
   * probability the returned process will be running
   * @param id process ID
   * @param options additional options
   * @returns promise resolving with process or `undefined`, depending on specified options
   * @throws if process is not scheduled or becomes unscheduled during waiting due to `throwIfNotScheduled` option
   * @throws `TimeoutError` if timed out waiting for the process due to timeout options
   */
  async waitProcess(id: string, options?: AsyncProcessPool.WaitProcessOptions): Promise<Process> {
    if (!this._scheduledProcesses[id]) {
      if (options?.throwIfNotScheduled) {
        throw new Error('Process is not scheduled');
      }
      return;
    }
    let result = this._runtimeProcesses[id];
    if (result?.context.stage === ProcessContext.ProcessStage.RUNNING && !result.context.canceled) {
      return result.process as Process;
    }
    return new Promise<Process>((_resolve, _reject) => {
      let cleanup = () => {
        this._events.off(`started:${id}`, resolve);
        this._events.off(`canceled:${id}`, processCanceledListener);
        clearTimeout(timeout);
        return true;
      };
      let resolve = (value: Process) => cleanup() && _resolve(value);
      let reject = (err: Error) => cleanup() && _reject(err);
      let processCanceledListener = () => setImmediate(() => {
        if (!this._scheduledProcesses[id]) {
          options?.throwIfNotScheduled ? reject(new Error('Process is not scheduled')) : resolve(undefined);
        }
      });
      this._events.on(`started:${id}`, resolve);
      this._events.on(`canceled:${id}`, processCanceledListener);
      let timeout: NodeJS.Timeout;
      if (!isNaN(options?.timeoutInMs)) {
        timeout = setTimeout(() => {
          options.throwOnTimeout ? reject(new TimeoutError('Timed out waiting for the process')) : resolve(undefined);
        }, options.timeoutInMs);
      }
      options?.stopPromise?.then(() => resolve(undefined)).catch(err => reject(err));
    });
  }

  /**
   * Cancels all currently scheduled processes and waits until they stop
   * @returns promise resolving when stopped
   */
  async cancelAll() {
    await Promise.all(Object.keys(this._scheduledProcesses).map(id => this.cancelProcess(id, {allUsages: true})));
  }

  /**
   * Stops all processes
   * @returns promise resolving when stopped
   */
  async stop() {
    this._stopped = true;
    await this.cancelAll();
    await Promise.all(Object.values(this._runtimeProcesses).map(runtime => runtime.stopPromise));
  }

  private async _run(id: string) {
    if (this._runtimeProcesses[id]) {
      return;
    }
    while (this._scheduledProcesses[id]) {
      try {
        let expected = this._scheduledProcesses[id];
        let basicContext: AsyncProcessPool.Context = {
          processId: id,
          pool: this,
          stage: ProcessContext.ProcessStage.STARTING
        };
        let {process, context, args} = (this._processProvider as AsyncProcessPool.ProcessProvider<AsyncProcess>)(
          basicContext, expected.args
        );
        let runtime: RuntimeProcess = this._runtimeProcesses[id] = {
          id: id,
          process,
          context: basicContext,
          cancelPromise: helpers.createHandlePromise<void>(),
          stopPromise: helpers.createHandlePromise(),
          startPromise: undefined
        };
        process.inject(...this._dependencies);
        process.initialize(...args);
        context.initialize(process);
        this._processRuntimes.set(runtime.process, runtime);
        this._events.emit(`created:${id}`);
        try {
          this._logger.debug(`${this._label}: starting process ${id}`);
          await (runtime.startPromise = helpers.wrapHandlePromise(helpers
            .ensurePromise(() => runtime.process.start(runtime.cancelPromise))
            .then(() => expected.nextPostProcessThrottling?.(true))
            .catch(err => {
              expected.nextPostProcessThrottling?.(false);
              throw err;
            })
            .finally(() => delete expected.nextPostProcessThrottling)
          ));
          if (!runtime.cancelPromise.completed) {
            basicContext.stage = ProcessContext.ProcessStage.RUNNING;
            this._logger.debug(`${this._label}: running process ${id}`);
            this._events.emit(`started:${id}`, runtime.process as Process);
            this._events.emit('started', runtime.process as Process);
            await runtime.process.run(runtime.cancelPromise);
          }
          basicContext.stage = ProcessContext.ProcessStage.STOPPING;
          await this._handleControlSignal(id, expected, runtime, runtime.cancelPromise.completed ?
            {action: 'stop', message: 'process ceased to run gracefully', severity: 'debug'} :
            {action: 'failover', message: 'process ceased to run unexpectedly'});
        } catch (err) {
          basicContext.stage = ProcessContext.ProcessStage.STOPPING;
          err instanceof ControlSignal ?
            await this._handleControlSignal(id, expected, runtime, err.options) :
            await this._handleControlSignal(id, expected, runtime, {error: err, message: 'failed to run process'});
        } finally {
          basicContext.stage = ProcessContext.ProcessStage.STOPPED;
          context.release();
          runtime.stopPromise.resolve();
          delete this._runtimeProcesses[id];
          this._processRuntimes.delete(runtime.process);
        }
      } catch (err) {
        this._logger.fatal(`${this._label}: failed to prepare process ${id} instance, it will be canceled`, err);
        this.cancelProcess(id, {allUsages: true});
      }
    }
  }

  // eslint-disable-next-line complexity
  private async _handleControlSignal(
    id: string, schedulement: ExpectedProcess<Provider>, runtime: RuntimeProcess,
    signal: ControlSignal.Options = {}
  ) {
    const action = signal.action || 'failover';
    const cancel = action === 'cancel' && this._scheduledProcesses[id] === schedulement;
    const failover = action === 'failover' && !!this._scheduledProcesses[id];
    const restart = action === 'stop' && !!this._scheduledProcesses[id];
    const forceFailover = failover && runtime.cancelPromise.completed;
    const throttleDelayInMs = (failover && !forceFailover) ? this._getThrottleDelay(schedulement, runtime) : undefined;

    let message = _.compact([
      signal.message ?
        `${this._label}: process ${id}: ${signal.message}` :
        `${this._label}: process ${id} completed with a ${signal.action} control signal`,
      cancel && 'The process will be canceled',
      failover && `The process will be failovered in ${
        format.simplifyTimeAmount(throttleDelayInMs || 0, 'ms').description
      }`,
      restart && 'The process will be restarted'
    ]).join('. ');
    if (signal.severity) {
      signal.error ?
        this._logger[signal.severity](message + '.', signal.error) :
        this._logger[signal.severity](message);
    } else if (signal.error) {
      this._logger.error(message + '.', signal.error);
    } else if (action === 'failover') {
      this._logger.warn(message);
    } else {
      this._logger.info(message);
    }
    if (cancel) {
      this.cancelProcess(id, {allUsages: true});
    }
    await this._stopProcess(id, runtime, failover, throttleDelayInMs);
  }

  // eslint-disable-next-line complexity
  private _getThrottleDelay(schedulement: ExpectedProcess<any>, runtime: RuntimeProcess): number {
    let options = schedulement.failoverThrottleDelay || {
      mode: 'fixed',
      delayInMs: this._options?.processFailoverThrottleDelayInMs ?? convert.time.secondsToMs(10)
    };
    if (options.mode === 'fixed') {
      return random.getIntegerAround(options.delayInMs, options.randomizationFactor ?? 0);
    }
    if (
      schedulement.throttlings?.lastSuccessfulConnectTime !== undefined &&
      Date.now() - schedulement.throttlings.lastSuccessfulConnectTime >= (options.resetDelayInMs ?? 0)
    ) {
      delete schedulement.throttlings;
    }
    schedulement.nextPostProcessThrottling = (successfulStart) => schedulement.throttlings = {
      counter: (schedulement.throttlings?.counter || 0) + (runtime.cancelPromise.completed ? 0 : 1),
      lastSuccessfulConnectTime: successfulStart ? Date.now() : undefined
    };
    return random.getIntegerAround(helpers.expBackoffDelay(
      (schedulement.throttlings?.counter || 0) + 1,
      Math.max(options.minDelayInMs, 1), options.maxDelayInMs
    ), options.randomizationFactor ?? 0);
  }

  private async _stopProcess(id: string, runtime: RuntimeProcess, failover: boolean, throttleDelayInMs?: number) {
    this._logger.debug(`${this._label}: stopping process ${id}`);
    await runtime.process.stop()
      .then(() => this._logger.debug(`${this._label}: process ${id} stopped`))
      .catch(e => this._logger.warn(`${this._label}: failed to stop process ${id} properly`, e));
    if (throttleDelayInMs) {
      let delay = helpers.delay(throttleDelayInMs);
      await Promise.race([delay, runtime.cancelPromise]);
      delay.cancel();
    }
    if (failover && runtime.cancelPromise.completed) {
      this._scheduledProcesses[id] ?
        this._logger.info(`${this._label}: forcing process ${id} failover`) :
        this._logger.info(`${this._label}: canceling process ${id} failover as it was canceled`);
    }
  }
}

namespace AsyncProcessPool {

  /** Process provider */
  export type ProcessProvider<Process extends AsyncProcess> = (
    context: Context, args: any[]
  ) => ConstructedProcess<Process>;

  /** Constructed process */
  export type ConstructedProcess<Process extends AsyncProcess> = {
    /** Process */
    process: Process;
    /** Process context */
    context: ProcessContext;
    /** Mapped process args */
    args: any[];
  };

  /** Basic process context */
  export type Context = {
    /** Process ID */
    processId: string;
    /** Current pool */
    pool: AsyncProcessPool<any, any>;
    /** Process stage */
    stage: ProcessContext.ProcessStage;
    /** Whether the process was scheduled to cancel by external command (cancel or restart) */
    canceled?: boolean;
  };

  /** Constructing options */
  export type Options<Process> = {
    /** Logging label. Defaults to `default` */
    label?: string;
    /**
     * Process run attempt throttle delay in case of failover. May be overriden by processes when scheduling,
     * Defaults to 30 seconds
     */
    processFailoverThrottleDelayInMs?: number;
    /** Dependencies shared across all processes */
    dependencies: AsyncProcess.Dependencies<Process>;
  };

  /** Method options */
  export type ScheduleProcessOptions<Provider /* extends AsyncProcessPool.ProcessProvider<any> */> = {
    /** Process constructor parameters */
    args: ProcessArgs<Provider>;
    /** Whether to throw an error if the pool is already stopped. Defaults to `true` */
    throwIfStopped?: boolean;
    /**
     * Usage to track to make it possible to cancel process only when all usages are canceled.
     * Defaults to `default`
     */
    usage?: any;
    /** Overriden failover throttle delay to use */
    failoverThrottleDelay?: ThrottleDelay;
  };

  /** Throttle delay */
  export type ThrottleDelay = FixedDelay | ExponentialDelay;

  /** Fixed delay */
  export type FixedDelay = {
    /** Mode */
    mode: 'fixed';
    /** Delay value */
    delayInMs: number;
    /** Delay randomization factor in range [0; 1]. Defaults to `0` */
    randomizationFactor?: number;
  };

  /** Exponential delay */
  export type ExponentialDelay = {
    /** Mode */
    mode: 'exponential';
    /** Minimum (start) delay */
    minDelayInMs: number;
    /** Maximum (end) delay */
    maxDelayInMs: number;
    /**
     * Delay to wait after successful connection to reset current exponential delay to its minimum value. If the process
     * should be throttled again and this delay has not timed out yet, the throttle delay will increase exponentially.
     * Otherwise, the throttle delay will start from beginning. Defaults to `0`
     */
    resetDelayInMs?: number;
    /** Delay randomization factor in range [0; 1]. Defaults to `0` */
    randomizationFactor?: number;
  };

  /** Method options */
  export type CancelProcessOptions = {
    /** Usage to actually cancel process only when last usage is canceled. Defaults to `default` */
    usage?: any;
    /** Whether to cancel all usages */
    allUsages?: boolean;
  };

  /** Latest schedulement data */
  export type Schedulement<Provider /* extends ProcessProvider<any> */> = {
    /** Options with which the account was scheduled */
    options: {
      /** Process constructor parameters */
      args: ProcessArgs<Provider>;
    };
    /** Usages */
    usages: Set<any>;
  };

  /** Method options */
  export type WaitProcessOptions = {
    /** Waiting timeout */
    timeoutInMs?: number;
    /**
     * Promise signalizing when waiting should be stopped. Makes the method return `undefined` if resolved or rejects
     * with the promise's error
     */
    stopPromise?: Promise<void>;
    /**
     * Whether to throw error if process is not scheduled or becomes canceled during waiting. If `false` and process is
     * not scheduled, `undefined` will be returned. The error won't be thrown if the process was canceled and then
     * immediately scheduled again in synchronous way
     */
    throwIfNotScheduled?: boolean;
    /** Whether to throw `TimeoutError` error if waiting timeout exceeded */
    throwOnTimeout?: boolean;
  };

  /** Wait process attempt options */
  export type WaitAttemptOptions = {
    /** One attempt timeout */
    timeoutInMsPerAttempt?: number;
    /** Whether to throw `TimeoutError` error if waiting attempts exceeded */
    throwIfExceeded?: boolean;
    /**
     * Max attempts to wait. If the attempts exceeded, then eithier:
     * - if the process has still been scheduled, `undefined` will be returned
     * - otherwise, depends on `throwIfNotScheduled` option
     * @default Infinity
     */
    maxAmount?: number;
    /**
     * Predicate to check if new waiting attempt should be performed.
     * If specified, has larger priority than `maxAmount`
     */
    waitIf?: (attempt: number) => boolean;
  };

  /** Event emitter events */
  export type Events<Process extends AsyncProcess> = {
    /** Called when a specific process created */
    [event: `created:${ProcessId}`]: () => void;
    /** Called when a specific process fully successfully started */
    [event: `started:${ProcessId}`]: (process: Process) => void;
    /** Called when a specific process canceled */
    [event: `canceled:${ProcessId}`]: () => void;
    /** Called when any process fully successfully started */
    started: (process: Process) => void;
  };

  /** Process ID */
  export type ProcessId = string;
}

export default AsyncProcessPool;

// Import in the end to resolve errors caused by recursive imports
import ProcessContext from './processContext';

type ExpectedProcess<Provider /* extends AsyncProcessPool.ProcessProvider<any> */> = {
  args: ProcessArgs<Provider>;
  failoverThrottleDelay?: AsyncProcessPool.ThrottleDelay;
  throttlings?: RecentThrottlings;
  /** Function to call after next start attempt, set after throttling */
  nextPostProcessThrottling?: (successfulStart: boolean) => void;
};

type ProcessArgs<Provider> = Provider extends AsyncProcessPool.ProcessProvider<any> ? Parameters<Provider>[1] : never;

type RuntimeProcess = {
  id: string;
  context: AsyncProcessPool.Context;
  process: AsyncProcess<ProcessContext>;
  startPromise: helpers.HandlePromise<void>;
  cancelPromise: helpers.HandlePromise<void>;
  stopPromise: helpers.HandlePromise<void>;
};

type RecentThrottlings = {
  counter: number;
  lastSuccessfulConnectTime?: number;
};
