// classes.watcher.ts

import * as plugins from './plugins.js';
import * as interfaces from './interfaces.js';
import type { Bucket } from './classes.bucket.js';
import { EventEmitter } from 'node:events';

/**
 * BucketWatcher monitors a storage bucket for changes (add/modify/delete)
 * using a polling-based approach. Designed to follow the SmartdataDbWatcher pattern.
 *
 * @example
 * ```typescript
 * const watcher = bucket.createWatcher({ prefix: 'uploads/', pollIntervalMs: 3000 });
 *
 * // RxJS Observable pattern
 * watcher.changeSubject.subscribe((change) => {
 *   console.log('Change:', change);
 * });
 *
 * // EventEmitter pattern
 * watcher.on('change', (change) => console.log(change));
 * watcher.on('error', (err) => console.error(err));
 *
 * await watcher.start();
 * await watcher.readyDeferred.promise; // Wait for initial state
 *
 * // Later...
 * await watcher.stop();
 * ```
 */
export class BucketWatcher extends EventEmitter {
  /** Deferred that resolves when initial state is built and watcher is ready */
  public readyDeferred = plugins.smartpromise.defer();

  /** Observable for receiving change events (supports RxJS operators) */
  public changeSubject: plugins.smartrx.rxjs.Observable<interfaces.IStorageChangeEvent | interfaces.IStorageChangeEvent[]>;

  // Internal subjects and state
  private rawSubject: plugins.smartrx.rxjs.Subject<interfaces.IStorageChangeEvent>;
  private previousState: Map<string, interfaces.IStorageObjectState>;
  private pollIntervalId: ReturnType<typeof setInterval> | null = null;
  private isPolling = false;
  private isStopped = false;

  // Configuration
  private readonly bucketRef: Bucket;
  private readonly prefix: string;
  private readonly pollIntervalMs: number;
  private readonly bufferTimeMs?: number;
  private readonly includeInitial: boolean;
  private readonly pageSize: number;

  constructor(bucketRef: Bucket, options: interfaces.IBucketWatcherOptions = {}) {
    super();

    this.bucketRef = bucketRef;
    this.prefix = options.prefix ?? '';
    this.pollIntervalMs = options.pollIntervalMs ?? 5000;
    this.bufferTimeMs = options.bufferTimeMs;
    this.includeInitial = options.includeInitial ?? false;
    this.pageSize = options.pageSize ?? 1000;

    // Initialize state tracking
    this.previousState = new Map();

    // Initialize raw subject for emitting changes
    this.rawSubject = new plugins.smartrx.rxjs.Subject<interfaces.IStorageChangeEvent>();

    // Configure the public observable with optional buffering
    if (this.bufferTimeMs && this.bufferTimeMs > 0) {
      this.changeSubject = this.rawSubject.pipe(
        plugins.smartrx.rxjs.ops.bufferTime(this.bufferTimeMs),
        plugins.smartrx.rxjs.ops.filter((events: interfaces.IStorageChangeEvent[]) => events.length > 0)
      );
    } else {
      this.changeSubject = this.rawSubject.asObservable();
    }
  }

  /**
   * Start watching the bucket for changes
   */
  public async start(): Promise<void> {
    if (this.pollIntervalId !== null) {
      console.log('BucketWatcher is already running');
      return;
    }

    this.isStopped = false;

    // Build initial state
    await this.buildInitialState();

    // Emit initial state as 'add' events if configured
    if (this.includeInitial) {
      for (const state of this.previousState.values()) {
        this.emitChange({
          type: 'add',
          key: state.key,
          size: state.size,
          etag: state.etag,
          lastModified: state.lastModified,
          bucket: this.bucketRef.name,
        });
      }
    }

    // Mark as ready
    this.readyDeferred.resolve();

    // Start polling loop
    this.pollIntervalId = setInterval(() => {
      this.poll().catch((err) => {
        this.emit('error', err);
      });
    }, this.pollIntervalMs);
  }

  /**
   * Stop watching the bucket
   */
  public async stop(): Promise<void> {
    this.isStopped = true;

    if (this.pollIntervalId !== null) {
      clearInterval(this.pollIntervalId);
      this.pollIntervalId = null;
    }

    // Wait for any in-progress poll to complete
    while (this.isPolling) {
      await new Promise<void>((resolve) => setTimeout(resolve, 50));
    }

    this.rawSubject.complete();
  }

  /**
   * Alias for stop() - for consistency with other APIs
   */
  public async close(): Promise<void> {
    return this.stop();
  }

  /**
   * Build the initial state by listing all objects with metadata
   */
  private async buildInitialState(): Promise<void> {
    this.previousState.clear();

    for await (const obj of this.listObjectsWithMetadata()) {
      if (obj.Key) {
        this.previousState.set(obj.Key, {
          key: obj.Key,
          etag: obj.ETag ?? '',
          size: obj.Size ?? 0,
          lastModified: obj.LastModified ?? new Date(0),
        });
      }
    }
  }

  /**
   * Poll for changes by comparing current state against previous state
   */
  private async poll(): Promise<void> {
    // Guard against overlapping polls
    if (this.isPolling || this.isStopped) {
      return;
    }

    this.isPolling = true;

    try {
      // Build current state
      const currentState = new Map<string, interfaces.IStorageObjectState>();

      for await (const obj of this.listObjectsWithMetadata()) {
        if (this.isStopped) {
          break;
        }

        if (obj.Key) {
          currentState.set(obj.Key, {
            key: obj.Key,
            etag: obj.ETag ?? '',
            size: obj.Size ?? 0,
            lastModified: obj.LastModified ?? new Date(0),
          });
        }
      }

      if (!this.isStopped) {
        this.detectChanges(currentState);
        this.previousState = currentState;
      }
    } catch (err) {
      this.emit('error', err);
    } finally {
      this.isPolling = false;
    }
  }

  /**
   * Detect changes between current and previous state
   */
  private detectChanges(currentState: Map<string, interfaces.IStorageObjectState>): void {
    // Detect added and modified objects
    for (const [key, current] of currentState) {
      const previous = this.previousState.get(key);

      if (!previous) {
        // New object - emit 'add' event
        this.emitChange({
          type: 'add',
          key: current.key,
          size: current.size,
          etag: current.etag,
          lastModified: current.lastModified,
          bucket: this.bucketRef.name,
        });
      } else if (
        previous.etag !== current.etag ||
        previous.size !== current.size ||
        previous.lastModified.getTime() !== current.lastModified.getTime()
      ) {
        // Object modified - emit 'modify' event
        this.emitChange({
          type: 'modify',
          key: current.key,
          size: current.size,
          etag: current.etag,
          lastModified: current.lastModified,
          bucket: this.bucketRef.name,
        });
      }
    }

    // Detect deleted objects
    for (const [key, previous] of this.previousState) {
      if (!currentState.has(key)) {
        // Object deleted - emit 'delete' event
        this.emitChange({
          type: 'delete',
          key: previous.key,
          bucket: this.bucketRef.name,
        });
      }
    }
  }

  /**
   * Emit a change event via both RxJS Subject and EventEmitter
   */
  private emitChange(event: interfaces.IStorageChangeEvent): void {
    this.rawSubject.next(event);
    this.emit('change', event);
  }

  /**
   * List objects with full metadata (ETag, Size, LastModified)
   * This is a private method that yields full _Object data, not just keys
   */
  private async *listObjectsWithMetadata(): AsyncIterableIterator<plugins.s3._Object> {
    let continuationToken: string | undefined;

    do {
      if (this.isStopped) {
        return;
      }

      const command = new plugins.s3.ListObjectsV2Command({
        Bucket: this.bucketRef.name,
        Prefix: this.prefix,
        MaxKeys: this.pageSize,
        ContinuationToken: continuationToken,
      });

      const response = await this.bucketRef.smartbucketRef.storageClient.send(command);

      for (const obj of response.Contents || []) {
        yield obj;
      }

      continuationToken = response.NextContinuationToken;
    } while (continuationToken);
  }
}
