import {
  SQSClient,
  ChangeMessageVisibilityCommand,
  ChangeMessageVisibilityBatchCommand,
  DeleteMessageCommand,
  DeleteMessageBatchCommand,
  ReceiveMessageCommand,
} from "@aws-sdk/client-sqs";
import type {
  Message,
  ChangeMessageVisibilityCommandInput,
  ChangeMessageVisibilityCommandOutput,
  ChangeMessageVisibilityBatchCommandInput,
  ChangeMessageVisibilityBatchCommandOutput,
  DeleteMessageCommandInput,
  DeleteMessageBatchCommandInput,
  ReceiveMessageCommandInput,
  ReceiveMessageCommandOutput,
  QueueAttributeName,
  MessageSystemAttributeName,
} from "@aws-sdk/client-sqs";

import type { ConsumerOptions, StopOptions, UpdatableOptions } from "./types.js";
import { TypedEventEmitter } from "./emitter.js";
import {
  SQSError,
  TimeoutError,
  toStandardError,
  toTimeoutError,
  toSQSError,
  isConnectionError,
} from "./errors.js";
import { validateOption, assertOptions, hasMessages } from "./validation.js";
import { logger } from "./logger.js";

/**
 * [Usage](https://bbc.github.io/sqs-consumer/index.html#usage)
 */
export class Consumer extends TypedEventEmitter {
  private pollingTimeoutId: NodeJS.Timeout | undefined = undefined;
  private stopped = true;
  protected queueUrl: string;
  private isFifoQueue: boolean;
  private suppressFifoWarning: boolean;
  private handleMessage: (message: Message) => Promise<Message | undefined>;
  private handleMessageBatch: (messages: Message[]) => Promise<Message[] | undefined>;
  private preReceiveMessageCallback?: () => Promise<void>;
  private postReceiveMessageCallback?: () => Promise<void>;
  private sqs: SQSClient;
  private handleMessageTimeout: number;
  private attributeNames: QueueAttributeName[];
  private messageAttributeNames: string[];
  private messageSystemAttributeNames: MessageSystemAttributeName[];
  private shouldDeleteMessages: boolean;
  private alwaysAcknowledge: boolean;
  private batchSize: number;
  private visibilityTimeout: number;
  private terminateVisibilityTimeout: boolean | number | ((message: Message[]) => number);
  private waitTimeSeconds: number;
  private authenticationErrorTimeout: number;
  private pollingWaitTimeMs: number;
  private pollingCompleteWaitTimeMs: number;
  private heartbeatInterval: number;
  private isPolling = false;
  private stopRequestedAtTimestamp: number;
  public abortController: AbortController;
  private extendedAWSErrors: boolean;
  private strictReturn: boolean;

  constructor(options: ConsumerOptions) {
    super(options.queueUrl);
    assertOptions(options);
    this.queueUrl = options.queueUrl;
    this.isFifoQueue = this.queueUrl.endsWith(".fifo");
    this.suppressFifoWarning = options.suppressFifoWarning ?? false;
    this.handleMessage = options.handleMessage;
    this.handleMessageBatch = options.handleMessageBatch;
    this.preReceiveMessageCallback = options.preReceiveMessageCallback;
    this.postReceiveMessageCallback = options.postReceiveMessageCallback;
    this.handleMessageTimeout = options.handleMessageTimeout;
    this.attributeNames = options.attributeNames || [];
    this.messageAttributeNames = options.messageAttributeNames || [];
    this.messageSystemAttributeNames = options.messageSystemAttributeNames || [];
    this.batchSize = options.batchSize || 1;
    this.visibilityTimeout = options.visibilityTimeout;
    this.terminateVisibilityTimeout = options.terminateVisibilityTimeout || false;
    this.heartbeatInterval = options.heartbeatInterval;
    this.waitTimeSeconds = options.waitTimeSeconds ?? 20;
    this.authenticationErrorTimeout = options.authenticationErrorTimeout ?? 10000;
    this.pollingWaitTimeMs = options.pollingWaitTimeMs ?? 0;
    this.pollingCompleteWaitTimeMs = options.pollingCompleteWaitTimeMs ?? 0;
    this.shouldDeleteMessages = options.shouldDeleteMessages ?? true;
    this.alwaysAcknowledge = options.alwaysAcknowledge ?? false;
    this.extendedAWSErrors = options.extendedAWSErrors ?? false;
    this.strictReturn = options.strictReturn ?? false;
    this.sqs =
      options.sqs ||
      new SQSClient({
        useQueueUrlAsEndpoint: options.useQueueUrlAsEndpoint ?? true,
        region: options.region || process.env.AWS_REGION || "eu-west-1",
      });
  }

  /**
   * Creates a new SQS consumer.
   */
  public static create(options: ConsumerOptions): Consumer {
    return new Consumer(options);
  }

  /**
   * Start polling the queue for messages.
   */
  public start(): void {
    if (this.stopped) {
      if (this.isFifoQueue && !this.suppressFifoWarning) {
        logger.warn(
          "WARNING: A FIFO queue was detected. SQS Consumer does not guarantee FIFO queues will work as expected. Set 'suppressFifoWarning: true' to disable this warning.",
        );
      }
      // Create a new abort controller each time the consumer is started
      this.abortController = new AbortController();
      logger.debug("starting");
      this.stopped = false;
      this.emit("started");
      this.poll();
    }
  }

  /**
   * A reusable options object for sqs.send that's used to avoid duplication.
   */
  private get sqsSendOptions(): { abortSignal: AbortSignal } {
    return {
      // return the current abortController signal or a fresh signal that has not been aborted.
      // This effectively defaults the signal sent to the AWS SDK to not aborted
      abortSignal: this.abortController?.signal || new AbortController().signal,
    };
  }

  /**
   * Stop polling the queue for messages (pre existing requests will still be made until concluded).
   */
  public stop(options?: StopOptions): void {
    if (this.stopped) {
      logger.debug("already_stopped");
      return;
    }

    logger.debug("stopping");
    this.stopped = true;

    if (this.pollingTimeoutId) {
      clearTimeout(this.pollingTimeoutId);
      this.pollingTimeoutId = undefined;
    }

    if (options?.abort) {
      logger.debug("aborting");
      this.abortController.abort();
      this.emit("aborted");
    }

    this.stopRequestedAtTimestamp = Date.now();
    this.waitForPollingToComplete();
  }

  /**
   * Wait for final poll and in flight messages to complete.
   * @private
   */
  private waitForPollingToComplete(): void {
    if (!this.isPolling || !(this.pollingCompleteWaitTimeMs > 0)) {
      this.emit("stopped");
      return;
    }

    const exceededTimeout: boolean =
      Date.now() - this.stopRequestedAtTimestamp > this.pollingCompleteWaitTimeMs;
    if (exceededTimeout) {
      this.emit("waiting_for_polling_to_complete_timeout_exceeded");
      this.emit("stopped");
      return;
    }

    this.emit("waiting_for_polling_to_complete");
    setTimeout(() => this.waitForPollingToComplete(), 1000);
  }

  /**
   * Returns the current status of the consumer.
   * This includes whether it is running or currently polling.
   */
  public get status(): {
    isRunning: boolean;
    isPolling: boolean;
  } {
    return {
      isRunning: !this.stopped,
      isPolling: this.isPolling,
    };
  }

  /**
   * Validates and then updates the provided option to the provided value.
   * @param option The option to validate and then update
   * @param value The value to set the provided option to
   */
  public updateOption(option: UpdatableOptions, value: ConsumerOptions[UpdatableOptions]): void {
    validateOption(option, value, this, true);

    this[option] = value;

    this.emit("option_updated", option, value);
  }

  /**
   * Emit one of the consumer's error events depending on the error received.
   * @param err The error object to forward on
   * @param message The message that the error occurred on
   */
  private emitError(err: Error, message?: Message): void {
    if (!message) {
      this.emit("error", err, undefined);
    } else if (err.name === SQSError.name) {
      this.emit("error", err, message);
    } else if (err instanceof TimeoutError) {
      this.emit("timeout_error", err, message);
    } else {
      this.emit("processing_error", err, message);
    }
  }

  /**
   * Poll for new messages from SQS
   */
  private poll(): void {
    if (this.stopped) {
      logger.debug("cancelling_poll", {
        detail: "Poll was called while consumer was stopped, cancelling poll...",
      });
      return;
    }

    logger.debug("polling");

    this.isPolling = true;

    let currentPollingTimeout: number = this.pollingWaitTimeMs;
    this.receiveMessage({
      QueueUrl: this.queueUrl,
      AttributeNames: this.attributeNames,
      MessageAttributeNames: this.messageAttributeNames,
      MessageSystemAttributeNames: this.messageSystemAttributeNames,
      MaxNumberOfMessages: this.batchSize,
      WaitTimeSeconds: this.waitTimeSeconds,
      VisibilityTimeout: this.visibilityTimeout,
    })
      .then((output: ReceiveMessageCommandOutput) => this.handleSqsResponse(output))
      .catch((err): void => {
        this.emitError(err);
        if (isConnectionError(err)) {
          logger.debug("authentication_error", {
            code: err.code || "Unknown",
            detail: "There was an authentication error. Pausing before retrying.",
          });
          currentPollingTimeout = this.authenticationErrorTimeout;
        }
        return;
      })
      .then((): void => {
        if (this.pollingTimeoutId) {
          clearTimeout(this.pollingTimeoutId);
        }
        this.pollingTimeoutId = setTimeout(() => this.poll(), currentPollingTimeout);
      })
      .catch((err): void => {
        this.emitError(err);
      })
      .finally((): void => {
        this.isPolling = false;
      });
  }

  /**
   * Send a request to SQS to retrieve messages
   * @param params The required params to receive messages from SQS
   */
  private async receiveMessage(
    params: ReceiveMessageCommandInput,
  ): Promise<ReceiveMessageCommandOutput> {
    try {
      if (this.preReceiveMessageCallback) {
        await this.preReceiveMessageCallback();
      }
      const result: ReceiveMessageCommandOutput = await this.sqs.send(
        new ReceiveMessageCommand(params),
        this.sqsSendOptions,
      );
      if (this.postReceiveMessageCallback) {
        await this.postReceiveMessageCallback();
      }

      return result;
    } catch (err) {
      throw toSQSError(
        err,
        `SQS receive message failed: ${err.message}`,
        this.extendedAWSErrors,
        this.queueUrl,
      );
    }
  }

  /**
   * Handles the response from AWS SQS, determining if we should proceed to
   * the message handler.
   * @param response The output from AWS SQS
   */
  private async handleSqsResponse(response: ReceiveMessageCommandOutput): Promise<void> {
    if (hasMessages(response)) {
      if (this.handleMessageBatch) {
        await this.processMessageBatch(response.Messages);
      } else {
        await Promise.all(
          response.Messages.map((message: Message) => this.processMessage(message)),
        );
      }

      this.emit("response_processed");
    } else if (response) {
      this.emit("empty");
    }
  }

  /**
   * Process a message that has been received from SQS. This will execute the message
   * handler and delete the message once complete.
   * @param message The message that was delivered from SQS
   */
  private async processMessage(message: Message): Promise<void> {
    let heartbeatTimeoutId: NodeJS.Timeout | undefined = undefined;

    try {
      this.emit("message_received", message);

      if (this.heartbeatInterval) {
        heartbeatTimeoutId = this.startHeartbeat(message);
      }

      const ackedMessage: Message = await this.executeHandler(message);

      if (ackedMessage?.MessageId === message.MessageId) {
        await this.deleteMessage(message);

        this.emit("message_processed", message);
      }
    } catch (err) {
      this.emitError(err, message);

      if (this.terminateVisibilityTimeout !== false) {
        if (typeof this.terminateVisibilityTimeout === "function") {
          const timeout = this.terminateVisibilityTimeout([message]);
          await this.changeVisibilityTimeout(message, timeout);
        } else {
          const timeout =
            this.terminateVisibilityTimeout === true ? 0 : this.terminateVisibilityTimeout;
          await this.changeVisibilityTimeout(message, timeout);
        }
      }
    } finally {
      if (this.heartbeatInterval) {
        clearInterval(heartbeatTimeoutId);
      }
    }
  }

  /**
   * Process a batch of messages from the SQS queue.
   * @param messages The messages that were delivered from SQS
   */
  private async processMessageBatch(messages: Message[]): Promise<void> {
    let heartbeatTimeoutId: NodeJS.Timeout | undefined = undefined;

    try {
      messages.forEach((message: Message): void => {
        this.emit("message_received", message);
      });

      if (this.heartbeatInterval) {
        heartbeatTimeoutId = this.startHeartbeat(null, messages);
      }

      const ackedMessages: Message[] = await this.executeBatchHandler(messages);

      if (ackedMessages?.length > 0) {
        await this.deleteMessageBatch(ackedMessages);

        ackedMessages.forEach((message: Message): void => {
          this.emit("message_processed", message);
        });
      }
    } catch (err) {
      this.emit("error", err, messages);

      if (this.terminateVisibilityTimeout !== false) {
        if (typeof this.terminateVisibilityTimeout === "function") {
          const timeout = this.terminateVisibilityTimeout(messages);
          await this.changeVisibilityTimeoutBatch(messages, timeout);
        } else {
          const timeout =
            this.terminateVisibilityTimeout === true ? 0 : this.terminateVisibilityTimeout;
          await this.changeVisibilityTimeoutBatch(messages, timeout);
        }
      }
    } finally {
      clearInterval(heartbeatTimeoutId);
    }
  }

  /**
   * Trigger a function on a set interval
   * @param heartbeatFn The function that should be triggered
   */
  private startHeartbeat(message?: Message, messages?: Message[]): NodeJS.Timeout {
    return setInterval(() => {
      if (this.handleMessageBatch) {
        return this.changeVisibilityTimeoutBatch(messages, this.visibilityTimeout);
      }

      return this.changeVisibilityTimeout(message, this.visibilityTimeout);
    }, this.heartbeatInterval * 1000);
  }

  /**
   * Change the visibility timeout on a message
   * @param message The message to change the value of
   * @param timeout The new timeout that should be set
   */
  private async changeVisibilityTimeout(
    message: Message,
    timeout: number,
  ): Promise<ChangeMessageVisibilityCommandOutput> {
    try {
      const input: ChangeMessageVisibilityCommandInput = {
        QueueUrl: this.queueUrl,
        ReceiptHandle: message.ReceiptHandle,
        VisibilityTimeout: timeout,
      };
      return await this.sqs.send(new ChangeMessageVisibilityCommand(input), this.sqsSendOptions);
    } catch (err) {
      this.emit(
        "error",
        toSQSError(
          err,
          `Error changing visibility timeout: ${err.message}`,
          this.extendedAWSErrors,
          this.queueUrl,
          message,
        ),
        message,
      );
    }
  }

  /**
   * Change the visibility timeout on a batch of messages
   * @param messages The messages to change the value of
   * @param timeout The new timeout that should be set
   */
  private async changeVisibilityTimeoutBatch(
    messages: Message[],
    timeout: number,
  ): Promise<ChangeMessageVisibilityBatchCommandOutput> {
    const params: ChangeMessageVisibilityBatchCommandInput = {
      QueueUrl: this.queueUrl,
      Entries: messages.map((message: Message) => ({
        Id: message.MessageId,
        ReceiptHandle: message.ReceiptHandle,
        VisibilityTimeout: timeout,
      })),
    };
    try {
      return await this.sqs.send(
        new ChangeMessageVisibilityBatchCommand(params),
        this.sqsSendOptions,
      );
    } catch (err) {
      this.emit(
        "error",
        toSQSError(
          err,
          `Error changing visibility timeout: ${err.message}`,
          this.extendedAWSErrors,
          this.queueUrl,
          messages,
        ),
        messages,
      );
    }
  }

  /**
   * Trigger the applications handleMessage function
   * @param message The message that was received from SQS
   */
  private async executeHandler(message: Message): Promise<Message> {
    let handleMessageTimeoutId: NodeJS.Timeout | undefined = undefined;

    try {
      let result: Message | undefined | null;

      if (this.handleMessageTimeout) {
        const pending: Promise<never> = new Promise<never>((_, reject) => {
          handleMessageTimeoutId = setTimeout(() => {
            reject(new TimeoutError());
          }, this.handleMessageTimeout);
        });
        result = await Promise.race([this.handleMessage(message), pending]);
      } else {
        result = await this.handleMessage(message);
      }

      if (this.alwaysAcknowledge) {
        return message;
      }

      if (result instanceof Object) {
        return result;
      }

      if (result === undefined) {
        return null;
      }

      if (result === null) {
        if (this.strictReturn) {
          throw new Error(
            "strictReturn is enabled: handleMessage must return a Message object or an object with the same MessageId. Returning null is not allowed.",
          );
        }
        console.warn(
          "[DEPRECATION] Future versions will throw on void/null returns. Enable `strictReturn` now to prepare.",
        );
        return null;
      }

      return null;
    } catch (err) {
      if (err instanceof TimeoutError) {
        throw toTimeoutError(
          err,
          `Message handler timed out after ${this.handleMessageTimeout}ms: Operation timed out.`,
          message,
        );
      }
      if (err instanceof Error) {
        throw toStandardError(err, `Unexpected message handler failure: ${err.message}`, message);
      }
      throw err;
    } finally {
      if (handleMessageTimeoutId) {
        clearTimeout(handleMessageTimeoutId);
      }
    }
  }

  /**
   * Execute the application's message batch handler
   * @param messages The messages that should be forwarded from the SQS queue
   */
  private async executeBatchHandler(messages: Message[]): Promise<Message[]> {
    try {
      const result: Message[] | undefined | null = await this.handleMessageBatch(messages);

      if (this.alwaysAcknowledge) {
        return messages;
      }

      if (Array.isArray(result)) {
        return result;
      }

      if (result === undefined) {
        return [];
      }

      if (result === null) {
        if (this.strictReturn) {
          throw new Error(
            "strictReturn is enabled: handleMessageBatch must return an array of Message objects. Returning null is not allowed.",
          );
        }

        console.warn(
          "[DEPRECATION] Future versions will throw on void/null returns. Enable `strictReturn` now to prepare.",
        );
        return [];
      }

      return [];
    } catch (err) {
      if (err instanceof Error) {
        throw toStandardError(
          err,
          `Unexpected message batch handler failure: ${err.message}`,
          messages,
        );
      }
      throw err;
    }
  }

  /**
   * Delete a single message from SQS
   * @param message The message to delete from the SQS queue
   */
  private async deleteMessage(message: Message): Promise<void> {
    if (!this.shouldDeleteMessages) {
      logger.debug("skipping_delete", {
        detail: "Skipping message delete since shouldDeleteMessages is set to false",
      });
      return;
    }
    logger.debug("deleting_message", { messageId: message.MessageId });

    const deleteParams: DeleteMessageCommandInput = {
      QueueUrl: this.queueUrl,
      ReceiptHandle: message.ReceiptHandle,
    };

    try {
      await this.sqs.send(new DeleteMessageCommand(deleteParams), this.sqsSendOptions);
    } catch (err) {
      throw toSQSError(
        err,
        `SQS delete message failed: ${err.message}`,
        this.extendedAWSErrors,
        this.queueUrl,
        message,
      );
    }
  }

  /**
   * Delete a batch of messages from the SQS queue.
   * @param messages The messages that should be deleted from SQS
   */
  private async deleteMessageBatch(messages: Message[]): Promise<void> {
    if (!this.shouldDeleteMessages) {
      logger.debug("skipping_delete", {
        detail: "Skipping message delete since shouldDeleteMessages is set to false",
      });
      return;
    }
    logger.debug("deleting_messages", {
      messageIds: messages.map((msg: Message) => msg.MessageId),
    });

    const deleteParams: DeleteMessageBatchCommandInput = {
      QueueUrl: this.queueUrl,
      Entries: messages.map((message: Message) => ({
        Id: message.MessageId,
        ReceiptHandle: message.ReceiptHandle,
      })),
    };

    try {
      await this.sqs.send(new DeleteMessageBatchCommand(deleteParams), this.sqsSendOptions);
    } catch (err) {
      throw toSQSError(
        err,
        `SQS delete message failed: ${err.message}`,
        this.extendedAWSErrors,
        this.queueUrl,
        messages,
      );
    }
  }
}
