import { ServiceBusClient, TopicClient, SubscriptionClient, QueueClient, Sender, Receiver, SendableMessageInfo, ReceiveMode } from "@azure/service-bus";

export interface ArrayOf<T> {
    [name: string]: T;
}

export interface ServiceBusClientExt extends ServiceBusClient {
    topicClients: TopicClientExt[];
    subscriptionClients: SubscriptionClientExt[];
    queueClients: QueueClientExt[];
}

export interface TopicClientExt extends TopicClient {
    sender: Sender;
}

export interface QueueClientExt extends QueueClient {
    sender: Sender;
}

export interface SubscriptionClientExt extends SubscriptionClient {
    receiver: Receiver;
}

export interface AzureServiceBusSenderInt {
    maxBatch(): number;
    sendToTopic(topicName: string, message: SendableMessageInfo): Promise<void>;
    sendBatchToTopic(topicName: string, messages: SendableMessageInfo[], moreProperties?: any): Promise<void>;
    sendToQueue(queueName: string, message: SendableMessageInfo): Promise<void>;
    sendBatchToQueue(queueName: string, messages: SendableMessageInfo[], moreProperties?: any): Promise<void>;
}

export class AzureServiceBusSender implements AzureServiceBusSenderInt {
    private busClient: ServiceBusClientExt;

    /**
     * Create the class with a given connection string that will contain all topics, queues and subscriptions.
     * @param connectionString
     */
    constructor(private senderName: string, private connectionString: string) {
        this.busClient = null;
    }

    maxBatch(): number {
        return 250;
    }

    private ensureClient(): ServiceBusClientExt {
        // make it lazy so this does not interfere with the startup which we want to be very fast
        if (!this.busClient) {
            this.busClient = <ServiceBusClientExt>ServiceBusClient.createFromConnectionString(this.connectionString);
            this.busClient.topicClients = [];
            this.busClient.subscriptionClients = [];
            this.busClient.queueClients = [];
        }

        return this.busClient;
    }

    // #region Topics
    private ensureTopicSender(topicName: string): Sender {
        let tc: TopicClientExt = this.ensureClient().topicClients[topicName];
        if (!tc) {
            tc = <TopicClientExt>this.ensureClient().createTopicClient(topicName);
            this.ensureClient().topicClients[topicName] = tc;
        }

        if (!tc.sender || tc.sender.isClosed) {
            tc.sender = tc.createSender();
        }

        return tc.sender;
    }

    private injectMessage(msg: SendableMessageInfo, moreProperties?: any): SendableMessageInfo {
        msg.contentType = 'application/json';
        msg.userProperties = msg.userProperties || {};
        msg.userProperties.senderName = this.senderName;

        if (moreProperties) {
            Object.assign(msg.userProperties, moreProperties);
        }

        return msg;
    }

    async sendToTopic(topicName: string, message: SendableMessageInfo): Promise<void> {
        await this.ensureTopicSender(topicName).send(this.injectMessage(message));
    };

    async sendBatchToTopic(topicName: string, messages: SendableMessageInfo[], moreProperties?: any): Promise<void> {
        if (messages.length > this.maxBatch()) {
            throw new Error(`batch can only be up to ${this.maxBatch} messages`);
        }

        messages.forEach(m => this.injectMessage(m, moreProperties));

        await this.ensureTopicSender(topicName).sendBatch(messages);
    };

    /**
     * Creates a subscription receiver to receive messages from the topic
     * @param topicName The topic to receive the messages from
     * @param subscriptionName The subscription name to use
     */
    createSubscriptionReceiver(topicName: string, subscriptionName: string): Receiver {
        let sc: SubscriptionClientExt = this.ensureClient().subscriptionClients[topicName];

        if (!sc) {
            sc = <SubscriptionClientExt>this.ensureClient().createSubscriptionClient(topicName, subscriptionName);
            this.ensureClient().subscriptionClients[topicName] = sc;
        }

        if (!sc.receiver || sc.receiver.isClosed) {
            // default mode is peek lock
            sc.receiver = <Receiver>sc.createReceiver(ReceiveMode.peekLock);
        }

        return sc.receiver;
    }
    //#endregion

    //#region Queue
    private ensureQueueSender(queueName: string): Sender {
        let qc: QueueClientExt = this.ensureClient().queueClients[queueName];

        if (!qc) {
            qc = <QueueClientExt>this.ensureClient().createQueueClient(queueName);
            this.ensureClient().queueClients[queueName] = qc;
        }

        if (!qc.sender || qc.sender.isClosed) {
            qc.sender = <Sender>qc.createSender();
        }

        return qc.sender;
    }

    async sendToQueue(queueName: string, message: SendableMessageInfo): Promise<void> {
        await this.ensureQueueSender(queueName).send(this.injectMessage(message));
    };

    async sendBatchToQueue(queueName: string, messages: SendableMessageInfo[], moreProperties?: any): Promise<void> {
        if (messages.length > this.maxBatch()) {
            throw new Error(`batch can only be up to ${this.maxBatch} messages`);
        }

        messages.forEach(m => this.injectMessage(m, moreProperties));

        await this.ensureQueueSender(queueName).sendBatch(messages);
    };
    //#endregion

    /**
     * Cleanup method that will close all senders, receivers, topics, subscriptions and connections
     */
    async cleanup() {
        // do not use logger to avoid any dependency. Cleanup should be as independent as possible.

        this.ensureClient().topicClients.forEach(async (x) => {
            try {
                if (x.sender && !x.sender.isClosed) {
                    await x.sender.close();
                }

                x.sender = null;
                await x.close();
            } catch (error) {
                console.error('error closing topic clients and senders', error);
            }
        });
        this.ensureClient().topicClients.splice(0);

        this.ensureClient().subscriptionClients.forEach(async (x) => {
            try {
                if (x.receiver && !x.receiver.isClosed) {
                    await x.receiver.close();
                }

                x.receiver = null;
                await x.close();
            } catch (error) {
                console.error('error closing subscriptions clients and receivers', error);
            }
        });
        this.ensureClient().subscriptionClients.splice(0);

        this.ensureClient().queueClients.forEach(async (x) => {
            try {
                if (x.sender && !x.sender.isClosed) {
                    await x.sender.close();
                }

                x.sender = null;
                await x.close();
            } catch (error) {
                console.error('error closing topic clients and senders', error);
            }
        });
        this.ensureClient().queueClients.splice(0);

        try {
            await this.ensureClient().close();
        } catch (error) {
            console.error('error closing bus client', error);
        }

        this.busClient = null;
    }
}
