import { Channel, connect, Connection, ConsumeMessage, Message } from "amqplib";

// const el = new BrokerProxy({ host: 'localhost' })
// await el.connect()
// await el.createChannel('docService.start');
// el.emit('docService.start','[*] Service start ' + new Date().toJSON());

// const el = new BrokerProxy({ host: 'localhost' })
// await el.connect()
// await el.createChannel('docService.start');
// el.on('docService.start').subscribe((msg) => { console.log(msg) });
export interface BrokerConfig {
  host: string;
}
export type TypeExchange = "fanout" | "direct" | "headers" | "topic";
export class BrokerProxy {
  private queue: string[] = [];
  private exchanges: string[] = [];

  private channel: Channel | undefined;
  private connection: Connection | undefined;

  constructor(private config: BrokerConfig) {}

  public async connect() {
    this.connection = await connect("amqp://" + this.config.host);
    this.channel = await this.connection.createChannel();
    console.log("[*] Доступно подключение rebbitmq");
  }

  /**
   * В случае использования подтверждения сообщений onAck параметр 
   * будет определять размер предворительной выборки
   * @param n количество сообщений
   */
  public async prefetch(n = 1) {
    this.channel?.prefetch(n);
  }

  public async createChannel(queue: string, durable = false) {
    if (this.queue.findIndex((r) => r === queue) !== -1) return;
    this.queue.push(queue);
    await this.channel?.assertQueue(queue, {
      durable: durable,
    });
  }
  public  async createExchange(name: string, type: TypeExchange, durable = false) {
    if (~this.exchanges.findIndex((r) => r === name)) return;
    this.exchanges.push(name);
    await this.channel?.assertExchange(name, type, {
      durable: durable,
    });
  }
  /**
   * Отправляет в брокер сообщений событие
   * @param queue Имя очереди
   * @param msg сообщение
   * @returns успех
   */
   public  emit<T>(queue: string, msg: T | Buffer): boolean {
    if (Buffer.isBuffer(msg)) {
      return !!this.channel?.sendToQueue(queue, msg);
    }
    return !!this.channel?.sendToQueue(queue, Buffer.from(JSON.stringify(msg)));
  }

  public publish<T>(exchange: string, routingKey: string, msg: T | Buffer) {
    if (Buffer.isBuffer(msg)) {
      return !!this.channel?.publish(exchange, routingKey, msg);
    }
    return !!this.channel?.publish(
      exchange,
      routingKey,
      Buffer.from(JSON.stringify(msg))
    );
  }

  public async bind<T>(exchange: string, routingKey: string, durable: boolean = false) {
    const q = await this.channel?.assertQueue("", {
      exclusive: true,
      durable: durable,
    });
    if (!q?.queue) throw new Error("Not have queue");
    this.channel?.bindQueue(q?.queue, exchange, routingKey);
    return this.on<T>(q.queue);
  }

  /**
   * Создаёт подписку на очередь
   * @param queue Очередь
   * @returns объект с возможностью подписаться через subscribe
   */
   public on<T>(queue: string) {
    return {
      subscribe: (fn: (msg?: T) => void) => {
        this.channel?.consume(
          queue,
          (msg: ConsumeMessage | null) => {
            if (msg == null) return fn?.();
            try {
              fn?.(JSON.parse(msg.content.toString("utf8")) as T);
            } catch (ex) {
              // TODO - Сделать отправку в логгер ошибок
              console.error(ex, msg);
            }
          },
          {
            noAck: true,
          }
        );
      },
    };
  }

  /**
   * Создаёт синхронную подписку на очередь
   * @param queue Очередь
   * @returns объект с возможностью подписаться через subscribe
   */
   public onAck<T>(queue: string) {
    return {
      subscribe: (fn: (msg?: T) => Promise<boolean>) => {
        this.channel?.consume(
          queue,
          async (msg: ConsumeMessage | null) => {
            if (msg == null) return fn?.();
            try {
              const success = await fn?.(JSON.parse(msg.content.toString("utf8")) as T);
              if (success) {
                this.channel?.ack(msg);
              } else {
                this.channel?.nack(msg);
              }
            } catch (ex) {
              // TODO - Сделать отправку в логгер ошибок
              console.error(ex, msg);
            }
          },
          {
            noAck: false,
          }
        );
      },
    };
  }
}
