import {
    Observable,
    Subject,
    filter,
    firstValueFrom,
    map,
    shareReplay
} from 'rxjs';
import {
    isBulkWriteConflictError,
    newRxError
} from '../../rx-error.ts';
import type {
    NumberFunctionMap,
    RxCollection,
    RxDatabase,
    RxError,
    RxReplicationWriteToMasterRow,
    RxStorageInstance,
    RxTypeError
} from '../../types/index.d.ts';
import {
    MIGRATION_DEFAULT_BATCH_SIZE,
    addMigrationStateToDatabase,
    getOldCollectionMeta,
    migrateDocumentData,
    mustMigrate
} from './migration-helpers.ts';
import {
    PROMISE_RESOLVE_TRUE,
    RXJS_SHARE_REPLAY_DEFAULTS,
    clone,
    deepEqual,
    ensureNotFalsy,
    errorToPlainJson,
    getDefaultRevision,
    getDefaultRxDocumentMeta
} from '../utils/index.ts';
import type {
    MigrationStatusUpdate,
    RxMigrationStatus,
    RxMigrationStatusDocument
} from './migration-types.ts';
import {
    getSingleDocument,
    hasEncryption,
    observeSingle,
    writeSingle
} from '../../rx-storage-helper.ts';
import {
    BroadcastChannel,
    createLeaderElection
} from 'broadcast-channel';
import {
    META_INSTANCE_SCHEMA_TITLE,
    awaitRxStorageReplicationFirstInSync,
    cancelRxStorageReplication,
    defaultConflictHandler,
    getRxReplicationMetaInstanceSchema,
    replicateRxStorageInstance,
    rxStorageInstanceToReplicationHandler
} from '../../replication-protocol/index.ts';
import { overwritable } from '../../overwritable.ts';
import {
    INTERNAL_CONTEXT_MIGRATION_STATUS,
    addConnectedStorageToCollection,
    getPrimaryKeyOfInternalDocument
} from '../../rx-database-internal-store.ts';
import { prepareQuery } from '../../rx-query.ts';
import { normalizeMangoQuery } from '../../rx-query-helper.ts';



export class RxMigrationState {

    public database: RxDatabase;


    private started: boolean = false;
    public readonly oldCollectionMeta: ReturnType<typeof getOldCollectionMeta>;
    public readonly mustMigrate: ReturnType<typeof mustMigrate>;
    public readonly statusDocId: string;
    public readonly $: Observable<RxMigrationStatus>;

    constructor(
        public readonly collection: RxCollection,
        public readonly migrationStrategies: NumberFunctionMap,
        public readonly statusDocKey = [
            collection.name,
            'v',
            collection.schema.version
        ].join('-'),
    ) {
        this.database = collection.database;
        this.oldCollectionMeta = getOldCollectionMeta(this);
        this.mustMigrate = mustMigrate(this);
        this.statusDocId = getPrimaryKeyOfInternalDocument(
            this.statusDocKey,
            INTERNAL_CONTEXT_MIGRATION_STATUS
        );
        addMigrationStateToDatabase(this);

        this.$ = observeSingle<RxMigrationStatusDocument>(
            this.database.internalStore,
            this.statusDocId
        ).pipe(
            filter(d => !!d),
            map(d => ensureNotFalsy(d).data),
            shareReplay(RXJS_SHARE_REPLAY_DEFAULTS)
        );
    }

    getStatus() {
        return firstValueFrom(this.$);
    }


    /**
     * Starts the migration.
     * Returns void so that people to not get the idea to await
     * this function.
     * Instead use migratePromise() if you want to await
     * the migration. This ensures it works even if the migration
     * is run on a different browser tab.
     */
    async startMigration(batchSize: number = MIGRATION_DEFAULT_BATCH_SIZE): Promise<void> {
        const must = await this.mustMigrate;
        if (!must) {
            return;
        }
        if (this.started) {
            throw newRxError('DM1');
        }
        this.started = true;


        let broadcastChannel: BroadcastChannel | undefined = undefined;
        /**
         * To ensure that multiple tabs do not migrate the same collection,
         * we use a new broadcastChannel/leaderElector for each collection.
         * This is required because collections can be added dynamically and
         * not all tabs might know about this collection.
         */
        if (this.database.multiInstance) {
            broadcastChannel = new BroadcastChannel([
                'rx-migration-state',
                this.database.name,
                this.collection.name,
                this.collection.schema.version
            ].join('|'));
            const leaderElector = createLeaderElection(broadcastChannel);
            await leaderElector.awaitLeadership();
        }

        /**
         * Instead of writing a custom migration protocol,
         * we do a push-only replication from the old collection data to the new one.
         * This also ensure that restarting the replication works without problems.
         */
        const oldCollectionMeta = await this.oldCollectionMeta;
        const oldStorageInstance = await this.database.storage.createStorageInstance({
            databaseName: this.database.name,
            collectionName: this.collection.name,
            databaseInstanceToken: this.database.token,
            multiInstance: this.database.multiInstance,
            options: {},
            schema: oldCollectionMeta.data.schema,
            password: this.database.password,
            devMode: overwritable.isDevMode()
        });


        const connectedInstances = await this.getConnectedStorageInstances();


        /**
         * Initially write the migration status into a meta document.
         */
        const totalCount = await this.countAllDoucments(
            [oldStorageInstance].concat(connectedInstances.map(r => r.oldStorage))
        );
        await this.updateStatus(s => {
            s.count.total = totalCount;
            return s;
        });


        try {
            /**
             * First migrate the connected storages,
             * afterwards migrate the normal collection.
             */
            await Promise.all(
                connectedInstances.map(async (connectedInstance) => {
                    await addConnectedStorageToCollection(
                        this.collection,
                        connectedInstance.newStorage.collectionName,
                        connectedInstance.newStorage.schema
                    );
                    await this.migrateStorage(
                        connectedInstance.oldStorage,
                        connectedInstance.newStorage,
                        batchSize
                    );
                    await connectedInstance.newStorage.close();
                })
            );

            await this.migrateStorage(
                oldStorageInstance,
                /**
                 * Use the originalStorageInstance here
                 * so that the _meta.lwt time keeps the same
                 * and our replication checkpoints still point to the
                 * correct checkpoint.
                 */
                this.collection.storageInstance.originalStorageInstance,
                batchSize
            );
        } catch (err) {
            await oldStorageInstance.close();
            await this.updateStatus(s => {
                s.status = 'ERROR';
                s.error = errorToPlainJson(err as Error);
                return s;
            });
            return;
        }


        // remove old collection meta doc
        await writeSingle(
            this.database.internalStore,
            {
                previous: oldCollectionMeta,
                document: Object.assign(
                    {},
                    oldCollectionMeta,
                    {
                        _deleted: true
                    }
                )
            },
            'rx-migration-remove-collection-meta'
        );

        await this.updateStatus(s => {
            s.status = 'DONE';
            return s;
        });
        if (broadcastChannel) {
            await broadcastChannel.close();
        }
    }

    public updateStatusHandlers: MigrationStatusUpdate[] = [];
    public updateStatusQueue: Promise<any> = PROMISE_RESOLVE_TRUE;
    public updateStatus(
        handler: MigrationStatusUpdate
    ) {
        this.updateStatusHandlers.push(handler);
        this.updateStatusQueue = this.updateStatusQueue.then(async () => {
            if (this.updateStatusHandlers.length === 0) {
                return;
            }
            // re-run until no conflict
            const useHandlers = this.updateStatusHandlers;
            this.updateStatusHandlers = [];
            while (true) {
                const previous = await getSingleDocument<RxMigrationStatusDocument>(
                    this.database.internalStore,
                    this.statusDocId
                );
                let newDoc = clone(previous);
                if (!previous) {
                    newDoc = {
                        id: this.statusDocId,
                        key: this.statusDocKey,
                        context: INTERNAL_CONTEXT_MIGRATION_STATUS,
                        data: {
                            collectionName: this.collection.name,
                            status: 'RUNNING',
                            count: {
                                total: 0,
                                handled: 0,
                                percent: 0
                            }
                        },
                        _deleted: false,
                        _meta: getDefaultRxDocumentMeta(),
                        _rev: getDefaultRevision(),
                        _attachments: {}
                    };
                }

                let status = ensureNotFalsy(newDoc).data;
                for (const oneHandler of useHandlers) {
                    status = oneHandler(status);
                }
                status.count.percent = Math.round((status.count.handled / status.count.total) * 100);

                if (
                    newDoc && previous &&
                    deepEqual(newDoc.data, previous.data)
                ) {
                    break;
                }


                try {
                    await writeSingle<RxMigrationStatusDocument>(
                        this.database.internalStore,
                        {
                            previous,
                            document: ensureNotFalsy(newDoc)
                        },
                        INTERNAL_CONTEXT_MIGRATION_STATUS
                    );

                    // write successful
                    break;
                } catch (err) {
                    // ignore conflicts
                    if (!isBulkWriteConflictError(err)) {
                        throw err;
                    }
                }
            }
        });
        return this.updateStatusQueue;
    }


    public async migrateStorage(
        oldStorage: RxStorageInstance<any, any, any>,
        newStorage: RxStorageInstance<any, any, any>,
        batchSize: number
    ) {
        const replicationMetaStorageInstance = await this.database.storage.createStorageInstance({
            databaseName: this.database.name,
            collectionName: 'rx-migration-state-meta-' + this.collection.name + '-' + this.collection.schema.version,
            databaseInstanceToken: this.database.token,
            multiInstance: this.database.multiInstance,
            options: {},
            schema: getRxReplicationMetaInstanceSchema(oldStorage.schema, hasEncryption(oldStorage.schema)),
            password: this.database.password,
            devMode: overwritable.isDevMode()
        });

        const replicationHandlerBase = rxStorageInstanceToReplicationHandler(
            newStorage,
            /**
             * Ignore push-conflicts.
             * If this happens we drop the 'old' document state.
             */
            defaultConflictHandler,
            this.database.token,
            true
        );

        const replicationState = replicateRxStorageInstance({
            keepMeta: true,
            identifier: [
                'rx-migration-state',
                this.collection.name,
                oldStorage.schema.version,
                this.collection.schema.version
            ].join('-'),
            replicationHandler: {
                masterChangesSince() {
                    return Promise.resolve({
                        checkpoint: null,
                        documents: []
                    });
                },
                masterWrite: async (rows) => {
                    rows = await Promise.all(
                        rows
                            .map(async (row) => {
                                let newDocData = row.newDocumentState;
                                if (newStorage.schema.title === META_INSTANCE_SCHEMA_TITLE) {
                                    newDocData = row.newDocumentState.docData;
                                    if (row.newDocumentState.isCheckpoint === '1') {
                                        return {
                                            assumedMasterState: undefined,
                                            newDocumentState: row.newDocumentState
                                        };
                                    }
                                }
                                const migratedDocData: RxReplicationWriteToMasterRow<any> = await migrateDocumentData(
                                    this.collection,
                                    oldStorage.schema.version,
                                    newDocData
                                );
                                const newRow: RxReplicationWriteToMasterRow<any> = {
                                    // drop the assumed master state, we do not have to care about conflicts here.
                                    assumedMasterState: undefined,
                                    newDocumentState: newStorage.schema.title === META_INSTANCE_SCHEMA_TITLE
                                        ? Object.assign({}, row.newDocumentState, { docData: migratedDocData })
                                        : migratedDocData
                                };
                                return newRow;
                            })
                    );

                    // filter out the documents where the migration strategy returned null
                    rows = rows.filter(row => !!row.newDocumentState);

                    const result = await replicationHandlerBase.masterWrite(rows);
                    return result;
                },
                masterChangeStream$: new Subject<any>().asObservable()
            },
            forkInstance: oldStorage,
            metaInstance: replicationMetaStorageInstance,
            pushBatchSize: batchSize,
            pullBatchSize: 0,
            conflictHandler: defaultConflictHandler,
            hashFunction: this.database.hashFunction
        });


        let hasError: RxError | RxTypeError | false = false;
        replicationState.events.error.subscribe(err => hasError = err);

        // update replication status on each change
        replicationState.events.processed.up.subscribe(() => {
            this.updateStatus(status => {
                status.count.handled = status.count.handled + 1;
                return status;
            });
        });

        await awaitRxStorageReplicationFirstInSync(replicationState);
        await cancelRxStorageReplication(replicationState);

        await this.updateStatusQueue;
        if (hasError) {
            await replicationMetaStorageInstance.close();
            throw hasError;
        }

        // cleanup old storages
        await Promise.all([
            oldStorage.remove(),
            replicationMetaStorageInstance.remove()
        ]);
    }

    public async countAllDoucments(
        storageInstances: RxStorageInstance<any, any, any>[]
    ): Promise<number> {
        let ret = 0;
        await Promise.all(
            storageInstances.map(async (instance) => {

                const preparedQuery = prepareQuery(
                    instance.schema,
                    normalizeMangoQuery(
                        instance.schema,
                        {
                            selector: {}
                        }
                    )
                );
                const countResult = await instance.count(preparedQuery);
                ret += countResult.count;
            })
        );
        return ret;
    }

    public async getConnectedStorageInstances() {
        const oldCollectionMeta = await this.oldCollectionMeta;
        const ret: {
            oldStorage: RxStorageInstance<any, any, any>;
            newStorage: RxStorageInstance<any, any, any>;
        }[] = [];

        await Promise.all(
            await Promise.all(
                oldCollectionMeta
                    .data
                    .connectedStorages
                    .map(async (connectedStorage) => {

                        // atm we can only migrate replication states.
                        if (connectedStorage.schema.title !== META_INSTANCE_SCHEMA_TITLE) {
                            throw new Error('unknown migration handling for schema');
                        }

                        const newSchema = getRxReplicationMetaInstanceSchema(
                            clone(this.collection.schema.jsonSchema),
                            hasEncryption(connectedStorage.schema)
                        );
                        newSchema.version = this.collection.schema.version;
                        const [oldStorage, newStorage] = await Promise.all([
                            this.database.storage.createStorageInstance({
                                databaseInstanceToken: this.database.token,
                                databaseName: this.database.name,
                                devMode: overwritable.isDevMode(),
                                multiInstance: this.database.multiInstance,
                                options: {},
                                schema: connectedStorage.schema,
                                password: this.database.password,
                                collectionName: connectedStorage.collectionName
                            }),
                            this.database.storage.createStorageInstance({
                                databaseInstanceToken: this.database.token,
                                databaseName: this.database.name,
                                devMode: overwritable.isDevMode(),
                                multiInstance: this.database.multiInstance,
                                options: {},
                                schema: newSchema,
                                password: this.database.password,
                                collectionName: connectedStorage.collectionName
                            })
                        ]);
                        ret.push({ oldStorage, newStorage });
                    })
            )
        );

        return ret;
    }



    async migratePromise(batchSize?: number): Promise<RxMigrationStatus> {
        this.startMigration(batchSize);
        const must = await this.mustMigrate;
        if (!must) {
            return {
                status: 'DONE',
                collectionName: this.collection.name,
                count: {
                    handled: 0,
                    percent: 0,
                    total: 0
                }
            };
        }

        const result = await Promise.race([
            firstValueFrom(
                this.$.pipe(
                    filter(d => d.status === 'DONE')
                )
            ),
            firstValueFrom(
                this.$.pipe(
                    filter(d => d.status === 'ERROR')
                )
            )
        ]);

        if (result.status === 'ERROR') {
            throw newRxError('DM4', {
                collection: this.collection.name,
                error: result.error
            });
        } else {
            return result;
        }

    }
}
