import {map} from 'rxjs/operators';
import {ModuleRef} from '@nestjs/core';
import {defer, lastValueFrom} from 'rxjs';
import {DynamicModule, Module, Global, Provider, OnModuleDestroy, Inject, Logger} from '@nestjs/common';

import {ScyllaModuleOptions, ScyllaModuleAsyncOptions, ScyllaOptionsFactory} from './interfaces';
import {getConnectionToken, handleRetry, generateString} from './utils/orm.utils';
import {ConnectionOptions, Connection} from './orm';

export const SCYLLA_MODULE_OPTIONS = 'ScyllaModuleOptions';
export const SCYLLA_MODULE_ID = 'ScyllaModuleId';

@Global()
@Module({})
export class ScyllaCoreModule implements OnModuleDestroy {
    constructor(
        @Inject(SCYLLA_MODULE_OPTIONS)
        private readonly options: ScyllaModuleOptions,
        private readonly moduleRef: ModuleRef,
    ) {}

    static forRoot(options: ScyllaModuleOptions = {}): DynamicModule {
        const expressModuleOptions = {
            provide: SCYLLA_MODULE_OPTIONS,
            useValue: options,
        };
        const connectionProvider = {
            provide: getConnectionToken(options as ConnectionOptions),
            useFactory: async () => await this.createConnectionFactory(options),
        };
        return {
            providers: [expressModuleOptions, connectionProvider],
            exports: [connectionProvider],
            module: ScyllaCoreModule,
        };
    }

    static forRootAsync(options: ScyllaModuleAsyncOptions): DynamicModule {
        const connectionProvider = {
            provide: getConnectionToken(options as ConnectionOptions),
            useFactory: async (typeormOptions: ScyllaModuleOptions) => {
                if (options.name) {
                    return await this.createConnectionFactory({
                        ...typeormOptions,
                        name: options.name,
                    });
                }
                return await this.createConnectionFactory(typeormOptions);
            },
            inject: [SCYLLA_MODULE_OPTIONS],
        };

        const asyncProviders = this.createAsyncProviders(options);
        return {
            module: ScyllaCoreModule,
            imports: options.imports,
            providers: [
                ...asyncProviders,
                connectionProvider,
                {
                    provide: SCYLLA_MODULE_ID,
                    useValue: generateString(),
                },
            ],
            exports: [connectionProvider],
        };
    }

    async onModuleDestroy() {
        if (this.options.keepConnectionAlive) {
            return;
        }
        Logger.log('Closing connection', 'ScyllaModule');
        const connection = this.moduleRef.get<Connection>(getConnectionToken(this.options as ConnectionOptions) as any);
        if (!connection) {
            return;
        }
        await connection.closeAsync();
    }

    private static createAsyncProviders(options: ScyllaModuleAsyncOptions): Provider[] {
        if (options.useExisting || options.useFactory) {
            return [this.createAsyncOptionsProvider(options)];
        }
        return [
            this.createAsyncOptionsProvider(options),
            {
                provide: options.useClass,
                useClass: options.useClass,
            },
        ];
    }

    private static createAsyncOptionsProvider(options: ScyllaModuleAsyncOptions): Provider {
        if (options.useFactory) {
            return {
                useFactory: options.useFactory,
                provide: SCYLLA_MODULE_OPTIONS,
                inject: options.inject || [],
            };
        }
        return {
            provide: SCYLLA_MODULE_OPTIONS,
            useFactory: async (optionsFactory: ScyllaOptionsFactory) => await optionsFactory.createScyllaOptions(),
            inject: [options.useClass || options.useExisting],
        };
    }

    private static async createConnectionFactory({
        retryDelay,
        retryAttempts,
        ...cassandraOptions
    }: ScyllaModuleOptions): Promise<Connection> {
        const connection = new Connection(cassandraOptions);
        return await lastValueFrom(
            defer(() => connection.initAsync()).pipe(
                handleRetry(retryAttempts, retryDelay),
                map(() => connection),
            ),
        );
    }
}
