import {
    BehaviorSubject,
    Observable,
    merge
} from 'rxjs';
import {
    mergeMap,
    filter,
    map,
    startWith,
    distinctUntilChanged,
    shareReplay
} from 'rxjs/operators';
import {
    sortObject,
    pluginMissing,
    overwriteGetterForCaching,
    clone,
    PROMISE_RESOLVE_FALSE,
    RXJS_SHARE_REPLAY_DEFAULTS,
    ensureNotFalsy,
    areRxDocumentArraysEqual,
    promiseWait
} from './plugins/utils/index.ts';
import {
    newRxError,
    rxStorageWriteErrorToRxError
} from './rx-error.ts';
import {
    runPluginHooks
} from './hooks.ts';
import type {
    RxCollection,
    RxDocument,
    RxQueryOP,
    RxQuery,
    MangoQuery,
    MangoQuerySortPart,
    MangoQuerySelector,
    PreparedQuery,
    FilledMangoQuery,
    RxDocumentWriteData,
    RxDocumentData,
    QueryMatcher,
    ModifyFunction,
    RxStorageChangeEvent,
    Reactified
} from './types/index.d.ts';
import { calculateNewResults } from './event-reduce.ts';
import { triggerCacheReplacement } from './query-cache.ts';
import {
    getQueryMatcher,
    getSortComparator,
    normalizeMangoQuery,
    prepareQuery,
    runQueryUpdateFunction

} from './rx-query-helper.ts';
import { RxQuerySingleResult } from './rx-query-single-result.ts';

let _queryCount = 0;
const newQueryID = function (): number {
    return ++_queryCount;
};

/**
 * Counter for _lastEnsureEqual.
 * We only need ordering and zero-check for cache replacement,
 * so a counter is cheaper than Date.now().
 */
let _ensureEqualCount = 0;

export class RxQueryBase<
    RxDocType,
    RxQueryResult,
    OrmMethods = {},
    Reactivity = unknown,
> {

    public id: number = newQueryID();

    /**
     * Some stats then are used for debugging and cache replacement policies
     */
    public _execOverDatabaseCount: number = 0;
    /**
     * @performance
     * Use Date.now() instead of now() for creation time.
     * The monotonic uniqueness guarantee of now() is not needed here
     * since _creationTime is only used by the cache replacement policy
     * for rough lifetime comparisons.
     */
    public _creationTime = Date.now();

    // used in the query-cache to determine if the RxQuery can be cleaned up.
    // 0 means never executed. Updated to an incrementing counter on each _ensureEqual call.
    public _lastEnsureEqual = 0;

    public uncached = false;

    // used to count the subscribers to the query
    // Lazy-initialized to avoid BehaviorSubject overhead for .exec()-only queries
    public _refCount$: BehaviorSubject<null> | null = null;
    public get refCount$(): BehaviorSubject<null> {
        if (!this._refCount$) {
            this._refCount$ = new BehaviorSubject<null>(null);
        }
        return this._refCount$;
    }

    public isFindOneByIdQuery: false | string | string[];


    /**
     * Contains the current result state
     * or null if query has not run yet.
     */
    public _result: RxQuerySingleResult<RxDocType> | null = null;


    constructor(
        public op: RxQueryOP,
        public mangoQuery: Readonly<MangoQuery<RxDocType>>,
        public collection: RxCollection<RxDocType>,
        // used by some plugins
        public other: any = {}
    ) {
        if (!mangoQuery) {
            this.mangoQuery = _getDefaultQuery();
        }

        /**
         * @performance
         * isFindOneByIdQuery is only used by queryCollection()
         * which is not called for 'count' queries.
         * Skip the check for count queries to avoid unnecessary work.
         */
        if (op === 'count') {
            this.isFindOneByIdQuery = false;
        } else {
            this.isFindOneByIdQuery = isFindOneByIdQuery(
                this.collection.schema.primaryPath as string,
                mangoQuery
            );
        }
    }
    get $(): Observable<RxQueryResult> {
        if (!this._$) {
            const results$ = this.collection.eventBulks$.pipe(
                /**
                 * Performance shortcut.
                 * Changes to local documents are not relevant for the query.
                 */
                filter((bulk: any) => !bulk.isLocal),
                /**
                 * Start once to ensure the querying also starts
                 * when there where no changes.
                 */
                startWith(null),
                // ensure query results are up to date.
                mergeMap(() => _ensureEqual(this as any)),
                // use the current result set, written by _ensureEqual().
                map(() => this._result),
                // do not run stuff above for each new subscriber, only once.
                shareReplay(RXJS_SHARE_REPLAY_DEFAULTS),
                // do not proceed if result set has not changed.
                distinctUntilChanged((prev: RxQuerySingleResult<RxDocType> | null, curr: RxQuerySingleResult<RxDocType> | null) => {
                    if (prev && prev.time === ensureNotFalsy(curr).time) {
                        return true;
                    } else {
                        return false;
                    }
                }),
                filter((result: RxQuerySingleResult<RxDocType> | null) => !!result),
                /**
                 * Map the result set to a single RxDocument or an array,
                 * depending on query type
                 */
                map((result: RxQuerySingleResult<RxDocType> | null) => {
                    return ensureNotFalsy(result).getValue();
                })
            );

            this._$ = merge<any>(
                results$,
                /**
                 * Also add the refCount$ to the query observable
                 * to allow us to count the amount of subscribers.
                 */
                this.refCount$.pipe(
                    filter(() => false)
                )
            );
        }
        return this._$ as any;
    }

    get $$(): Reactified<Reactivity, RxQueryResult> {
        const reactivity = this.collection.database.getReactivityFactory();
        return reactivity.fromObservable(
            this.$,
            undefined,
            this.collection.database
        ) as any;
    }

    // stores the changeEvent-number of the last handled change-event
    public _latestChangeEvent: -1 | number = -1;

    /**
     * ensures that the exec-runs
     * are not run in parallel
     */
    public _ensureEqualQueue: Promise<boolean> = PROMISE_RESOLVE_FALSE;

    /**
     * Returns an observable that emits the results
     * This should behave like an rxjs-BehaviorSubject which means:
     * - Emit the current result-set on subscribe
     * - Emit the new result-set when an RxChangeEvent comes in
     * - Do not emit anything before the first result-set was created (no null)
     */
    public _$?: Observable<RxQueryResult>;

    /**
     * set the new result-data as result-docs of the query
     * @param newResultData json-docs that were received from the storage
     */
    _setResultData(newResultData: RxDocumentData<RxDocType>[] | number | Map<string, RxDocumentData<RxDocType>>): void {
        if (typeof newResultData === 'undefined') {
            throw newRxError('QU18', {
                database: this.collection.database.name,
                collection: this.collection.name
            });
        }
        if (typeof newResultData === 'number') {
            this._result = new RxQuerySingleResult<RxDocType>(
                this as any,
                [],
                newResultData
            );
            return;
        } else if (newResultData instanceof Map) {
            newResultData = Array.from((newResultData as Map<string, RxDocumentData<RxDocType>>).values());
        }

        const newQueryResult = new RxQuerySingleResult<RxDocType>(
            this as any,
            newResultData,
            newResultData.length
        );
        this._result = newQueryResult;
    }

    /**
     * executes the query on the database
     * @return results-array with document-data
     */
    async _execOverDatabase(rerunCount = 0): Promise<{
        result: RxDocumentData<RxDocType>[] | number;
        counter: number;
    }> {
        this._execOverDatabaseCount = this._execOverDatabaseCount + 1;
        let result: {
            result: RxDocumentData<RxDocType>[] | number;
            counter: number;
        };

        /**
         * @performance
         * Instead of subscribing to eventBulks$ to detect concurrent writes,
         * we snapshot the change event counter before and after the query.
         * If the counter changed, a write happened during execution and
         * we must re-run the query to ensure correct results.
         * This avoids the overhead of RxJS Subject subscribe/unsubscribe per query.
         *
         * @link https://github.com/pubkey/rxdb/issues/7067
         */
        const counterBefore = this.collection._changeEventBuffer.getCounter();

        if (this.op === 'findByIds') {
            const ids: string[] = ensureNotFalsy(this.mangoQuery.selector as any)[this.collection.schema.primaryPath].$in;
            const docsData: RxDocumentData<RxDocType>[] = [];
            const mustBeQueried: string[] = [];
            // first try to fill from docCache
            for (let i = 0; i < ids.length; i++) {
                const id = ids[i];
                const docData = this.collection._docCache.getLatestDocumentDataIfExists(id);
                if (docData) {
                    if (!docData._deleted) {
                        docsData.push(docData);
                    }
                } else {
                    mustBeQueried.push(id);
                }
            }
            // everything which was not in docCache must be fetched from the storage
            if (mustBeQueried.length > 0) {
                const docs = await this.collection.storageInstance.findDocumentsById(mustBeQueried, false);
                for (let i = 0; i < docs.length; i++) {
                    docsData.push(docs[i]);
                }
            }
            result = {
                result: docsData,
                counter: this.collection._changeEventBuffer.getCounter()
            };
        } else if (this.op === 'count') {
            const preparedQuery = this.getPreparedQuery();
            const countResult = await this.collection.storageInstance.count(preparedQuery);
            if (countResult.mode === 'slow' && !this.collection.database.allowSlowCount) {
                throw newRxError('QU14', {
                    collection: this.collection,
                    queryObj: this.mangoQuery
                });
            } else {
                result = {
                    result: countResult.count,
                    counter: this.collection._changeEventBuffer.getCounter()
                };
            }
        } else {
            const queryResult = await queryCollection<RxDocType>(this as any);
            result = {
                result: queryResult.docs,
                counter: queryResult.counter
            };
        }

        if (this.collection._changeEventBuffer.getCounter() !== counterBefore) {
            await promiseWait(rerunCount * 20);
            return this._execOverDatabase(rerunCount + 1);
        }

        return result;
    }

    /**
     * Execute the query
     * To have an easier implementations,
     * just subscribe and use the first result
     */
    public exec(throwIfMissing: true): Promise<RxDocument<RxDocType, OrmMethods, Reactivity>>;
    public exec(): Promise<RxQueryResult>;
    public async exec(throwIfMissing?: boolean): Promise<any> {
        if (throwIfMissing && this.op !== 'findOne') {
            throw newRxError('QU9', {
                collection: this.collection.name,
                query: this.mangoQuery,
                op: this.op
            });
        }

        /**
         * run _ensureEqual() here,
         * this will make sure that errors in the query which throw inside of the RxStorage,
         * will be thrown at this execution context and not in the background.
         */
        await _ensureEqual(this as any);
        const useResult = ensureNotFalsy(this._result);
        return useResult.getValue(throwIfMissing);
    }



    /**
     * Returns the normalized query.
     * Caches the result so that multiple calls to
     * queryMatcher, toString() and getPreparedQuery()
     * do not have to run the normalization again.
     * @overwrites itself with the actual value.
     */
    get normalizedQuery(): FilledMangoQuery<RxDocType> {
        return overwriteGetterForCaching(
            this,
            'normalizedQuery',
            normalizeMangoQuery<RxDocType>(
                this.collection.schema.jsonSchema,
                this.mangoQuery,
                this.op === 'count'
            )
        );
    }

    /**
     * cached call to get the queryMatcher
     * @overwrites itself with the actual value
     */
    get queryMatcher(): QueryMatcher<RxDocumentWriteData<RxDocType>> {
        const schema = this.collection.schema.jsonSchema;
        return overwriteGetterForCaching(
            this,
            'queryMatcher',
            getQueryMatcher(
                schema,
                this.normalizedQuery
            ) as any
        );
    }

    /**
     * returns a string that is used for equal-comparisons
     * @overwrites itself with the actual value
     */
    toString(): string {
        /**
         * For findByIds queries, build the cache key directly from the IDs
         * to avoid the expensive normalizeMangoQuery + sortObject + JSON.stringify.
         * The selector structure is guaranteed by findByIds() which always creates
         * { [primaryPath]: { $in: ids } }
         */
        let value: string;
        if (this.op === 'findByIds') {
            const ids: string[] = (this.mangoQuery.selector as any)[this.collection.schema.primaryPath].$in;
            // slice() is needed because sort() mutates the array in-place
            const sortedIds = ids.slice().sort();
            value = '|findByIds|' + JSON.stringify(sortedIds);
        } else {
            const stringObj = sortObject({
                op: this.op,
                query: this.normalizedQuery,
                other: this.other
            }, true);
            value = JSON.stringify(stringObj);
        }
        this.toString = () => value;
        return value;
    }

    /**
     * returns the prepared query
     * which can be sent to the storage instance to query for documents.
     * @overwrites itself with the actual value.
     */
    getPreparedQuery(): PreparedQuery<RxDocType> {
        const hookInput = {
            rxQuery: this,
            // can be mutated by the hooks so we have to deep clone first.
            mangoQuery: clone(this.normalizedQuery) as FilledMangoQuery<RxDocType>
        };
        (hookInput.mangoQuery.selector as any)._deleted = { $eq: false };
        if (hookInput.mangoQuery.index) {
            hookInput.mangoQuery.index.unshift('_deleted');
        }
        runPluginHooks('prePrepareQuery', hookInput);

        const value = prepareQuery(
            this.collection.schema.jsonSchema,
            hookInput.mangoQuery as any
        );

        this.getPreparedQuery = () => value;
        return value;
    }

    /**
     * returns true if the document matches the query,
     * does not use the 'skip' and 'limit'
     */
    doesDocumentDataMatch(docData: RxDocType | any): boolean {
        // if doc is deleted, it cannot match
        if (docData._deleted) {
            return false;
        }

        return this.queryMatcher(docData);
    }

    /**
     * deletes all found documents
     * @return promise with deleted documents
     */
    async remove(throwIfMissing?: boolean): Promise<RxQueryResult> {
        if (throwIfMissing && this.op !== 'findOne') {
            throw newRxError('QU9', {
                collection: this.collection.name,
                query: this.mangoQuery,
                op: this.op
            });
        }
        const docs = await this.exec();
        if (Array.isArray(docs)) {
            const result = await this.collection.bulkRemove(docs);
            if (result.error.length > 0) {
                throw rxStorageWriteErrorToRxError(result.error[0]);
            } else {
                return result.success as any;
            }
        } else {
            // findOne() can return null when no document matches
            if (!docs) {
                if (throwIfMissing) {
                    throw newRxError('QU10', {
                        collection: this.collection.name,
                        query: this.mangoQuery,
                        op: this.op
                    });
                }
                return null as any;
            }
            return (docs as any).remove();
        }
    }
    incrementalRemove(): Promise<RxQueryResult> {
        return runQueryUpdateFunction(
            this.asRxQuery,
            (doc) => doc.incrementalRemove(),
        );
    }


    /**
     * helper function to transform RxQueryBase to RxQuery type
     */
    get asRxQuery(): RxQuery<RxDocType, RxQueryResult> {
        return this as any;
    }

    /**
     * updates all found documents
     * @overwritten by plugin (optional)
     */
    update(_updateObj: any): Promise<RxQueryResult> {
        throw pluginMissing('update');
    }

    patch(patch: Partial<RxDocType>): Promise<RxQueryResult> {
        return runQueryUpdateFunction(
            this.asRxQuery,
            (doc) => doc.patch(patch),
        );
    }
    incrementalPatch(patch: Partial<RxDocType>): Promise<RxQueryResult> {
        return runQueryUpdateFunction(
            this.asRxQuery,
            (doc) => doc.incrementalPatch(patch),
        );
    }
    modify(mutationFunction: ModifyFunction<RxDocType>): Promise<RxQueryResult> {
        return runQueryUpdateFunction(
            this.asRxQuery,
            (doc) => doc.modify(mutationFunction),
        );
    }
    incrementalModify(mutationFunction: ModifyFunction<RxDocType>): Promise<RxQueryResult> {
        return runQueryUpdateFunction(
            this.asRxQuery,
            (doc) => doc.incrementalModify(mutationFunction),
        );
    }


    // we only set some methods of query-builder here
    // because the others depend on these ones
    where(_queryObj: MangoQuerySelector<RxDocType> | keyof RxDocType | string): RxQuery<RxDocType, RxQueryResult> {
        throw pluginMissing('query-builder');
    }
    sort(_params: string | MangoQuerySortPart<RxDocType>): RxQuery<RxDocType, RxQueryResult> {
        throw pluginMissing('query-builder');
    }
    skip(_amount: number | null): RxQuery<RxDocType, RxQueryResult> {
        throw pluginMissing('query-builder');
    }
    limit(_amount: number | null): RxQuery<RxDocType, RxQueryResult> {
        throw pluginMissing('query-builder');
    }
}


export function _getDefaultQuery<RxDocType>(): MangoQuery<RxDocType> {
    return {
        selector: {}
    };
}

/**
 * run this query through the QueryCache
 */
export function tunnelQueryCache<RxDocumentType, RxQueryResult>(
    rxQuery: RxQueryBase<RxDocumentType, RxQueryResult>
): RxQuery<RxDocumentType, RxQueryResult> {
    return rxQuery.collection._queryCache.getByQuery(rxQuery as any);
}

export function createRxQuery<RxDocType>(
    op: RxQueryOP,
    queryObj: MangoQuery<RxDocType>,
    collection: RxCollection<RxDocType>,
    other?: any
) {
    runPluginHooks('preCreateRxQuery', {
        op,
        queryObj,
        collection,
        other
    });

    let ret = new RxQueryBase<RxDocType, any>(op, queryObj, collection, other);

    // ensure when created with same params, only one is created
    ret = tunnelQueryCache(ret);
    triggerCacheReplacement(collection);

    return ret;
}

/**
 * Check if the current results-state is in sync with the database
 * which means that no write event happened since the last run.
 * @return false if not which means it should re-execute
 */
function _isResultsInSync(rxQuery: RxQueryBase<any, any>): boolean {
    const currentLatestEventNumber = rxQuery.asRxQuery.collection._changeEventBuffer.getCounter();
    if (rxQuery._latestChangeEvent >= currentLatestEventNumber) {
        return true;
    } else {
        return false;
    }
}


/**
 * wraps __ensureEqual()
 * to ensure it does not run in parallel
 * @return true if has changed, false if not
 *
 * @performance
 * Avoid async wrapper when awaitBeforeReads is empty (common case).
 * This eliminates one unnecessary Promise allocation per query execution.
 */
function _ensureEqual(rxQuery: RxQueryBase<any, any>): Promise<boolean> {
    if (rxQuery.collection.awaitBeforeReads.size > 0) {
        return Promise.all(Array.from(rxQuery.collection.awaitBeforeReads).map(fn => fn()))
            .then(() => {
                rxQuery._ensureEqualQueue = rxQuery._ensureEqualQueue
                    .then(() => __ensureEqual(rxQuery));
                return rxQuery._ensureEqualQueue;
            });
    }

    rxQuery._ensureEqualQueue = rxQuery._ensureEqualQueue
        .then(() => __ensureEqual(rxQuery));
    return rxQuery._ensureEqualQueue;
}

/**
 * ensures that the results of this query is equal to the results which a query over the database would give
 * @return true if results have changed
 */
function __ensureEqual<RxDocType>(rxQuery: RxQueryBase<RxDocType, any>): Promise<boolean> {
    /**
     * @performance
     * Use a counter instead of Date.now() since _lastEnsureEqual
     * is only used by the cache replacement policy for sorting queries
     * by last usage and zero-check, not for time-based comparison.
     */
    rxQuery._lastEnsureEqual = ++_ensureEqualCount;

    /**
     * Optimisation shortcuts
     */
    if (
        // db is closed
        rxQuery.collection.database.closed ||
        // nothing happened since last run
        _isResultsInSync(rxQuery)
    ) {
        return PROMISE_RESOLVE_FALSE;
    }

    let ret = false;
    let mustReExec = false; // if this becomes true, a whole execution over the database is made
    if (rxQuery._latestChangeEvent === -1) {
        // have not executed yet -> must run
        mustReExec = true;
    }

    /**
     * try to use EventReduce to calculate the new results
     */
    if (!mustReExec) {
        const missedChangeEvents = rxQuery.asRxQuery.collection._changeEventBuffer.getFrom(rxQuery._latestChangeEvent + 1);
        if (missedChangeEvents === null) {
            // changeEventBuffer is of bounds -> we must re-execute over the database
            mustReExec = true;
        } else {
            rxQuery._latestChangeEvent = rxQuery.asRxQuery.collection._changeEventBuffer.getCounter();
            const runChangeEvents: RxStorageChangeEvent<RxDocType>[] = rxQuery.asRxQuery.collection
                ._changeEventBuffer
                .reduceByLastOfDoc(missedChangeEvents);

            if (rxQuery.op === 'count') {
                // 'count' query
                const previousCount = ensureNotFalsy(rxQuery._result).count;
                let newCount = previousCount;
                runChangeEvents.forEach(cE => {
                    const didMatchBefore = cE.previousDocumentData && rxQuery.doesDocumentDataMatch(cE.previousDocumentData);
                    const doesMatchNow = rxQuery.doesDocumentDataMatch(cE.documentData);

                    if (!didMatchBefore && doesMatchNow) {
                        newCount++;
                    }
                    if (didMatchBefore && !doesMatchNow) {
                        newCount--;
                    }
                });
                if (newCount !== previousCount) {
                    ret = true; // true because results changed
                    rxQuery._setResultData(newCount as any);
                }
            } else {
                // 'find' or 'findOne' query
                const eventReduceResult = calculateNewResults(
                    rxQuery as any,
                    runChangeEvents
                );
                if (eventReduceResult.runFullQueryAgain) {
                    // could not calculate the new results, execute must be done
                    mustReExec = true;
                } else if (eventReduceResult.changed) {
                    // we got the new results, we do not have to re-execute, mustReExec stays false
                    ret = true; // true because results changed
                    rxQuery._setResultData(eventReduceResult.newResults as any);
                }
            }
        }
    }

    // oh no we have to re-execute the whole query over the database
    if (mustReExec) {
        return rxQuery._execOverDatabase()
            .then(result => {
                const newResultData = result.result;

                /**
                 * The RxStorage is defined to always first emit events and then return
                 * on bulkWrite() calls. So here we have to use the counter AFTER the execOverDatabase()
                 * has been run, not the one from before.
                 */
                rxQuery._latestChangeEvent = result.counter;

                // A count query needs a different has-changed check.
                if (typeof newResultData === 'number') {
                    if (
                        !rxQuery._result ||
                        newResultData !== rxQuery._result.count
                    ) {
                        ret = true;
                        rxQuery._setResultData(newResultData as any);
                    }
                    return ret;
                }
                if (
                    !rxQuery._result ||
                    !areRxDocumentArraysEqual(
                        rxQuery.collection.schema.primaryPath,
                        newResultData,
                        rxQuery._result.docsData
                    )
                ) {
                    ret = true; // true because results changed
                    rxQuery._setResultData(newResultData as any);
                }
                return ret;
            });
    }
    return Promise.resolve(ret); // true if results have changed
}


/**
 * Runs the query over the storage instance
 * of the collection.
 * Does some optimizations to ensure findById is used
 * when specific queries are used.
 */
export async function queryCollection<RxDocType>(
    rxQuery: RxQuery<RxDocType> | RxQueryBase<RxDocType, any>
): Promise<{
    docs: RxDocumentData<RxDocType>[];
    /**
     * We need to remember the counter directly here
     * because getting if after the returned Promise is resolved,
     * can result in a value that no longer matches the result set.
     */
    counter: number;
}> {
    let docs: RxDocumentData<RxDocType>[] = [];
    const collection = rxQuery.collection;

    /**
     * Optimizations shortcut.
     * If query is find-one-document-by-id,
     * then we do not have to use the slow query() method
     * but instead can use findDocumentsById()
     */
    if (rxQuery.isFindOneByIdQuery) {
        const selector = rxQuery.mangoQuery.selector;
        const primaryPath = collection.schema.primaryPath as string;
        // isFindOneByIdQuery guarantees the primary key is in the selector
        const primarySelectorValue = selector ? (selector as any)[primaryPath] : undefined;

        // Check if there are extra operators on the primary key selector (e.g. $ne alongside $in)
        const hasExtraOperators = typeof primarySelectorValue === 'object' &&
            primarySelectorValue !== null &&
            Object.keys(primarySelectorValue).length > 1;

        // Check if there are selectors OTHER than the primary key
        const hasOtherSelectors = selector ? Object.keys(selector).length > 1 : false;

        // Normalize single ID to array and de-duplicate to avoid returning the same document multiple times
        const docIdArray = Array.isArray(rxQuery.isFindOneByIdQuery)
            ? rxQuery.isFindOneByIdQuery
            : [rxQuery.isFindOneByIdQuery];
        const docIds = Array.from(new Set(docIdArray));

        // Separate cache hits from storage misses
        const cacheMisses: string[] = [];
        docIds.forEach(docId => {
            const docData = rxQuery.collection._docCache.getLatestDocumentDataIfExists(docId);
            if (docData && !docData._deleted) {
                docs.push(docData);
            } else if (!docData) {
                // Only fetch from storage if not in cache
                cacheMisses.push(docId);
            }
            // If found but deleted, skip entirely (no refetch)
        });

        // Fetch only cache misses from storage
        if (cacheMisses.length > 0) {
            const docsFromStorage = await collection.storageInstance.findDocumentsById(cacheMisses, false);
            docs = docs.concat(docsFromStorage);
        }

        // Apply query matcher if there are extra operators or other selectors
        if (hasExtraOperators || hasOtherSelectors) {
            docs = docs.filter(doc => rxQuery.queryMatcher(doc));
        }

        /**
         * The findDocumentsById() fast-path also does not apply `skip`/`limit`/`sort`.
         * To keep behavior consistent with storageInstance.query(), we must
         * apply them after queryMatcher for both find() and findOne() queries.
         */
        // Apply sorting for both find and findOne
        if (docs.length > 1) {
            const preparedQuery = rxQuery.getPreparedQuery();
            const sortComparator = getSortComparator(collection.schema.jsonSchema, preparedQuery.query);
            docs = docs.sort(sortComparator);
        }

        // Apply skip for both find and findOne
        const skip = typeof rxQuery.mangoQuery.skip === 'number' && rxQuery.mangoQuery.skip > 0
            ? rxQuery.mangoQuery.skip
            : 0;
        if (skip > 0) {
            docs = docs.slice(skip);
        }

        // Apply limit for both find and findOne
        const limitIsNumber = typeof rxQuery.mangoQuery.limit === 'number' && rxQuery.mangoQuery.limit > 0;
        if (limitIsNumber) {
            const limit = rxQuery.mangoQuery.limit as number;
            docs = docs.slice(0, limit);
        }

    } else {
        const preparedQuery = rxQuery.getPreparedQuery();
        const queryResult = await collection.storageInstance.query(preparedQuery);
        docs = queryResult.documents;
    }

    return {
        docs,
        counter: collection._changeEventBuffer.getCounter()
    };

}

/**
 * Returns true if the given query
 * selects documents by primary key using $eq or $in.
 * Used to optimize performance: these queries use get-by-id
 * instead of a full index scan. Additional operators beyond
 * $eq/$in are handled via the queryMatcher after fetching.
 * Skip, limit, and sort are also applied after fetching.
 * Returns false if no such optimization is possible.
 * Returns the document id (string) or ids (string[]) otherwise.
 */
export function isFindOneByIdQuery(
    primaryPath: string,
    query: MangoQuery<any>
): false | string | string[] {
    // primary key constraint can coexist with other selectors, skip, limit, and sort
    // The optimization will fetch by ID, then apply queryMatcher, sort, skip, and limit
    // Use hasOwnProperty to avoid prototype pollution from user-controlled input
    if (
        query.selector &&
        Object.prototype.hasOwnProperty.call(query.selector, primaryPath)
    ) {
        const value: any = (query.selector as any)[primaryPath];
        if (typeof value === 'string') {
            return value;
        } else if (
            typeof value.$eq === 'string'
        ) {
            return value.$eq;
        }

        // same with $in string arrays
        if (
            Array.isArray(value.$in) &&
            // must only contain strings
            !(value.$in as any[]).find(r => typeof r !== 'string')
        ) {
            return value.$in;
        }
    }
    return false;
}



export function isRxQuery(obj: any): boolean {
    return obj instanceof RxQueryBase;
}
