import * as amqp from 'amqplib';
import { Channel, Connection, Replies } from 'amqplib';
import { EventSubscriber } from './event-subscriber';
import { LoggerService } from '@nestjs/common';
import { ConsumeHandler } from './consume-handler';
import { ValidationMiddleware } from './validation/validation-middleware';
import { ClassConstructor } from 'class-transformer';
import { BrokerConfigInterface } from './broker-config.interface';
import { CustomLogger } from '../../logger/custom-logger';
import AssertQueue = Replies.AssertQueue;

// Inspired by: https://docs.nestjs.com/fundamentals/custom-providers and https://www.rabbitmq.com/tutorials/tutorial-three-javascript.html
export class RabbitMQSubscriber implements EventSubscriber {
  private static instance: RabbitMQSubscriber;

  private channel: Channel;
  private server: Connection;
  private createdQueue: AssertQueue;

  private readonly consumeHandlers: ConsumeHandler[] = [];
  private readonly usingRandomQueueName: boolean;
  private readonly queueName: string;
  private readonly logger: LoggerService = new CustomLogger(RabbitMQSubscriber.name);

  private readonly eventRetryTimeoutSeconds = 2 * 1000;
  private readonly consumeTimeoutSeconds = 5 * 1000;
  private readonly autoAcknowledge: boolean;
  private readonly isExclusive: boolean;
  private readonly autoDeleteQueue: boolean;
  private isConsuming = false;

  private constructor(
    private readonly config: BrokerConfigInterface,
  ) {
    // private
    this.queueName = config.QUEUE_NAME;
    this.usingRandomQueueName = this.queueName === '';
    this.autoDeleteQueue = this.queueName === '';
    this.autoAcknowledge = config.AUTO_ACKNOWLEDGE === undefined ? false : config.AUTO_ACKNOWLEDGE;
    this.isExclusive = config.EXCLUSIVE_QUEUES === undefined ? true : config.EXCLUSIVE_QUEUES;
  }

  public async subscribe(
    bindingKey: string,
    onConsume: (message: Record<string, unknown>) => Promise<void>,
    messageClass: ClassConstructor<any>,
    enableLogging = true,
  ): Promise<void> {
    let channel;
    try {
      channel = await this.getChannel();
    } catch (e) {
      this.logger.error(`Got an error when subscribing for key ${bindingKey}`);

      setTimeout(() => {
        this.logger.debug(`Retrying subscribe after channel error for key ${bindingKey}`);
        this.subscribe(bindingKey, onConsume, messageClass);
      }, this.eventRetryTimeoutSeconds);
      return;
    }

    channel.on('close', () => {
      this.logger.debug(`Channel closed for key ${bindingKey}`);
      this.subscribe(bindingKey, onConsume, messageClass);
    });

    try {
      await this.startEventSubscribe(channel, bindingKey, onConsume, messageClass, enableLogging);
    } catch (e) {
      this.logger.error(`Got an error when consuming for key ${bindingKey}`);

      setTimeout(() => {
        this.logger.debug(`Retrying subscribe after consume error for key ${bindingKey}`);
        this.subscribe(bindingKey, onConsume, messageClass, enableLogging);
      }, this.eventRetryTimeoutSeconds);
    }
  }

  private async startEventSubscribe(
    channel: Channel,
    bindingKey: string,
    onConsume: (message: Record<string, unknown>) => Promise<void>,
    messageClass: ClassConstructor<any>,
    enableLogging: boolean,
  ) {
    this.consumeHandlers.push(new ConsumeHandler(bindingKey, onConsume, messageClass, enableLogging));

    const exchangeName = this.config.EXCHANGE_NAME;
    await channel.assertExchange(
      exchangeName, this.config.EXCHANGE_TYPE, { durable: this.config.DURABLE_EXCHANGE },
    );

    const queueOptions = { durable: this.config.DURABLE_QUEUES, autoDelete: this.autoDeleteQueue };
    if (this.usingRandomQueueName && !this.createdQueue) {
      this.createdQueue = await channel.assertQueue(this.queueName, queueOptions);
    }
    if (!this.usingRandomQueueName) {
      this.createdQueue = await channel.assertQueue(this.queueName, queueOptions);
    }

    await channel.bindQueue(this.createdQueue.queue, exchangeName, bindingKey);
    this.logger.debug(`Subscriber is now consuming on ${bindingKey}`);
  }

  public async startReceivingMessages(): Promise<void> {
    if (this.isConsuming) {
      return;
    }

    let channel;
    try {
      channel = await this.getChannel();
    } catch (e) {
      this.logger.error('Got an error while getting the channel to receive messages.');
      setTimeout(() => {
        this.startReceivingMessages();
      }, this.consumeTimeoutSeconds);
      return;
    }

    this.isConsuming = true;
    await channel.consume(this.queueName, async (msg) => {
      const { content } = msg;
      if (!content) {
        return;
      }

      try {
        const { routingKey } = msg.fields;
        const consumeHandler = this.findConsumeHandler(routingKey);
        if (!consumeHandler) {
          this.logger.debug(`Could not find consume handler for routingKey: ${routingKey}. Message lost.`);
          const requeue = false;
          channel.nack(msg, false, requeue);
          return;
        }

        const messageObject = JSON.parse(content.toString());
        await consumeHandler.consume(messageObject);
        channel.ack(msg);
      } catch (err) {
        this.logger.error(`Got an error when consuming event from queue ${this.queueName}: ${err}`);
      }
    }, { noAck: this.autoAcknowledge, exclusive: this.isExclusive });
  }

  private findConsumeHandler(routingKey: string): ConsumeHandler {
    return this.consumeHandlers.find(handler => handler.bindingKey === routingKey);
  }

  public async close(): Promise<void> {
    if (this.channel) {
      await this.channel.close();
    }
  }

  public setValidationMiddleware(middleware: ValidationMiddleware): void {
    this.consumeHandlers.forEach(handler => handler.setValidationMiddleware(middleware));
  }

  private async getChannel(): Promise<Channel> {
    if (this.channel) {
      return this.channel;
    }
    const server = await this.getServer();
    this.channel = await server.createChannel();

    this.channel.on('close', () => {
      this.logger.debug('Channel closed');
      this.channel = undefined;

      if (this.isConsuming) {
        this.isConsuming = false;
        this.startReceivingMessages();
      }
      this.isConsuming = false;
    });

    this.channel.on('err', () => {
      this.logger.debug('Channel got an error');
      this.channel = undefined;
      this.isConsuming = false;
    });
    return this.channel;
  }

  private async getServer(): Promise<Connection> {
    if (this.server) {
      return this.server;
    }
    this.server = await this.connect();

    this.server.on('close', () => {
      this.logger.debug('RabbitMQ server closed');
      this.server = undefined;
    });

    this.server.on('err', () => {
      this.logger.debug('RabbitMQ server got an error');
      this.server = undefined;
    });
    return this.server;
  }

  private async connect(): Promise<Connection> {
    return amqp.connect(this.config.AMQP_CONNECTION_URI);
  }

  public static getInstance(config: BrokerConfigInterface): RabbitMQSubscriber {
    if (!RabbitMQSubscriber.instance) {
      RabbitMQSubscriber.instance = new RabbitMQSubscriber(config);
    }
    return RabbitMQSubscriber.instance;
  }
}
