// classes.bucket.ts

import * as plugins from './plugins.js';
import * as helpers from './helpers.js';
import * as interfaces from './interfaces.js';
import { SmartBucket } from './classes.smartbucket.js';
import { Directory } from './classes.directory.js';
import { File } from './classes.file.js';
import { Trash } from './classes.trash.js';
import { ListCursor, type IListCursorOptions } from './classes.listcursor.js';
import { BucketWatcher } from './classes.watcher.js';

/**
 * The bucket class exposes the basic functionality of a bucket.
 * The functions of the bucket alone are enough to
 * operate on blobs of data in an S3-compatible object store.
 */
export class Bucket {
  public static async getBucketByName(smartbucketRef: SmartBucket, bucketNameArg: string): Promise<Bucket> {
    const command = new plugins.s3.ListBucketsCommand({});
    const buckets = await smartbucketRef.storageClient.send(command);
    const foundBucket = buckets.Buckets!.find((bucket) => bucket.Name === bucketNameArg);

    if (foundBucket) {
      console.log(`bucket with name ${bucketNameArg} exists.`);
      console.log(`Taking this as base for new Bucket instance`);
      return new this(smartbucketRef, bucketNameArg);
    } else {
      throw new Error(`Bucket '${bucketNameArg}' not found.`);
    }
  }

  public static async createBucketByName(smartbucketRef: SmartBucket, bucketName: string) {
    const command = new plugins.s3.CreateBucketCommand({ Bucket: bucketName });
    await smartbucketRef.storageClient.send(command);
    return new Bucket(smartbucketRef, bucketName);
  }

  public static async removeBucketByName(smartbucketRef: SmartBucket, bucketName: string) {
    const command = new plugins.s3.DeleteBucketCommand({ Bucket: bucketName });
    await smartbucketRef.storageClient.send(command);
  }

  public smartbucketRef: SmartBucket;
  public name: string;

  constructor(smartbucketRef: SmartBucket, bucketName: string) {
    this.smartbucketRef = smartbucketRef;
    this.name = bucketName;
  }

  /**
   * Returns the underlying AWS SDK v3 S3Client for this bucket.
   *
   * Use this when you need to perform operations smartbucket doesn't
   * wrap directly (e.g. lifecycle policies, bucket tagging, multipart
   * upload control, object-lock, inventory config, etc.).
   *
   * The returned client is shared with the parent SmartBucket — do not
   * call `.destroy()` on it.
   */
  public getStorageClient(): plugins.s3.S3Client {
    return this.smartbucketRef.storageClient;
  }

  /**
   * gets the base directory of the bucket
   */
  public async getBaseDirectory(): Promise<Directory> {
    return new Directory(this, null!, '');
  }

  /**
   * gets the trash directory
   */
  public async getTrash(): Promise<Trash> {
    const trash = new Trash(this);
    return trash;
  }

  public async getDirectoryFromPath(
    pathDescriptorArg: interfaces.IPathDecriptor
  ): Promise<Directory> {
    if (!pathDescriptorArg.path && !pathDescriptorArg.directory) {
      return this.getBaseDirectory();
    }
    const checkPath = await helpers.reducePathDescriptorToPath(pathDescriptorArg);
    const baseDirectory = await this.getBaseDirectory();
    return await baseDirectory.getSubDirectoryByName(checkPath, {
      getEmptyDirectory: true,
    });
  }

  // ===============
  // Fast Operations
  // ===============

  /**
   * store file
   */
  public async fastPut(
    optionsArg: interfaces.IPathDecriptor & {
      contents: string | Buffer;
      overwrite?: boolean;
    }
  ): Promise<File> {
    try {
      const reducedPath = await helpers.reducePathDescriptorToPath(optionsArg);
      const exists = await this.fastExists({ path: reducedPath });

      if (exists && !optionsArg.overwrite) {
        throw new Error(
          `Object already exists at path '${reducedPath}' in bucket '${this.name}'. ` +
          `Set overwrite:true to replace it.`
        );
      } else if (exists && optionsArg.overwrite) {
        console.log(
          `Overwriting existing object at path '${reducedPath}' in bucket '${this.name}'.`
        );
      } else {
        console.log(`Creating new object at path '${reducedPath}' in bucket '${this.name}'.`);
      }

      const command = new plugins.s3.PutObjectCommand({
        Bucket: this.name,
        Key: reducedPath,
        Body: optionsArg.contents,
      });
      await this.smartbucketRef.storageClient.send(command);

      console.log(`Object '${reducedPath}' has been successfully stored in bucket '${this.name}'.`);
      const parsedPath = plugins.path.parse(reducedPath);
      return new File({
        directoryRefArg: await this.getDirectoryFromPath({
          path: parsedPath.dir,
        }),
        fileName: parsedPath.base,
      });
    } catch (error) {
      console.error(
        `Error storing object at path '${optionsArg.path}' in bucket '${this.name}':`,
        error
      );
      throw error;
    }
  }


  /**
   * get file
   */
  public async fastGet(optionsArg: { path: string }): Promise<Buffer> {
    const done = plugins.smartpromise.defer();
    let completeFile: Buffer;
    const replaySubject = await this.fastGetReplaySubject(optionsArg);
    const subscription = replaySubject.subscribe({
      next: (chunk) => {
        if (completeFile) {
          completeFile = Buffer.concat([completeFile, chunk]);
        } else {
          completeFile = chunk;
        }
      },
      complete: () => {
        done.resolve();
        subscription.unsubscribe();
      },
      error: (err) => {
        console.log(err);
      },
    });
    await done.promise;
    return completeFile!;
  }

  /**
   * good when time to first byte is important
   * and multiple subscribers are expected
   * @param optionsArg
   * @returns
   */
  public async fastGetReplaySubject(optionsArg: {
    path: string;
  }): Promise<plugins.smartrx.rxjs.ReplaySubject<Buffer>> {
    const command = new plugins.s3.GetObjectCommand({
      Bucket: this.name,
      Key: optionsArg.path,
    });
    const response = await this.smartbucketRef.storageClient.send(command);
    const replaySubject = new plugins.smartrx.rxjs.ReplaySubject<Buffer>();

    // Convert the stream to a format that supports piping
    const stream = response.Body as any; // SdkStreamMixin includes readable stream
    if (typeof stream.pipe === 'function') {
      const duplexStream = new plugins.smartstream.SmartDuplex<Buffer, void>({
        writeFunction: async (chunk) => {
          replaySubject.next(chunk);
          return;
        },
        finalFunction: async (cb) => {
          replaySubject.complete();
          return;
        },
      });

      stream.pipe(duplexStream);
    }

    return replaySubject;
  }

  public fastGetStream(
    optionsArg: {
      path: string;
    },
    typeArg: 'webstream'
  ): Promise<ReadableStream>;
  public async fastGetStream(
    optionsArg: {
      path: string;
    },
    typeArg: 'nodestream'
  ): Promise<plugins.stream.Readable>;

  public async fastGetStream(
    optionsArg: { path: string },
    typeArg: 'webstream' | 'nodestream' = 'nodestream'
  ): Promise<ReadableStream | plugins.stream.Readable> {
    const command = new plugins.s3.GetObjectCommand({
      Bucket: this.name,
      Key: optionsArg.path,
    });
    const response = await this.smartbucketRef.storageClient.send(command);
    const stream = response.Body as any; // SdkStreamMixin includes readable stream

    const duplexStream = new plugins.smartstream.SmartDuplex<Buffer, Buffer>({
      writeFunction: async (chunk) => {
        return chunk;
      },
      finalFunction: async (cb) => {
        return null!;
      },
    });

    if (typeof stream.pipe === 'function') {
      stream.pipe(duplexStream);
    }

    if (typeArg === 'nodestream') {
      return duplexStream;
    }
    if (typeArg === 'webstream') {
      return (await duplexStream.getWebStreams()).readable;
    }
    throw new Error('unknown typeArg');
  }

  /**
   * store file as stream
   */
  public async fastPutStream(optionsArg: {
    path: string;
    readableStream: plugins.stream.Readable | ReadableStream;
    nativeMetadata?: { [key: string]: string };
    overwrite?: boolean;
  }): Promise<void> {
    try {
      const exists = await this.fastExists({ path: optionsArg.path });

      if (exists && !optionsArg.overwrite) {
        throw new Error(
          `Object already exists at path '${optionsArg.path}' in bucket '${this.name}'. ` +
          `Set overwrite:true to replace it.`
        );
      } else if (exists && optionsArg.overwrite) {
        console.log(
          `Overwriting existing object at path '${optionsArg.path}' in bucket '${this.name}'.`
        );
      } else {
        console.log(`Creating new object at path '${optionsArg.path}' in bucket '${this.name}'.`);
      }

      const command = new plugins.s3.PutObjectCommand({
        Bucket: this.name,
        Key: optionsArg.path,
        Body: optionsArg.readableStream,
        Metadata: optionsArg.nativeMetadata,
      });
      await this.smartbucketRef.storageClient.send(command);

      console.log(
        `Object '${optionsArg.path}' has been successfully stored in bucket '${this.name}'.`
      );
    } catch (error) {
      console.error(
        `Error storing object at path '${optionsArg.path}' in bucket '${this.name}':`,
        error
      );
      throw error;
    }
  }

  public async fastCopy(optionsArg: {
    sourcePath: string;
    destinationPath?: string;
    targetBucket?: Bucket;
    nativeMetadata?: { [key: string]: string };
    deleteExistingNativeMetadata?: boolean;
  }): Promise<void> {
    try {
      const targetBucketName = optionsArg.targetBucket ? optionsArg.targetBucket.name : this.name;

      // Retrieve current object information to use in copy conditions
      const currentObjInfo = await this.smartbucketRef.storageClient.send(
        new plugins.s3.HeadObjectCommand({
          Bucket: this.name,
          Key: optionsArg.sourcePath,
        })
      );

      // Prepare new metadata
      const newNativeMetadata = {
        ...(optionsArg.deleteExistingNativeMetadata ? {} : currentObjInfo.Metadata),
        ...optionsArg.nativeMetadata,
      };

      // Define the copy operation
      const copySource = `${this.name}/${optionsArg.sourcePath}`;
      const command = new plugins.s3.CopyObjectCommand({
        Bucket: targetBucketName,
        CopySource: copySource,
        Key: optionsArg.destinationPath || optionsArg.sourcePath,
        Metadata: newNativeMetadata,
        MetadataDirective: optionsArg.deleteExistingNativeMetadata ? 'REPLACE' : 'COPY',
      });
      await this.smartbucketRef.storageClient.send(command);
    } catch (err) {
      console.error('Error updating metadata:', err);
      throw err; // rethrow to allow caller to handle
    }
  }

  /**
   * Move object from one path to another within the same bucket or to another bucket
   */
  public async fastMove(optionsArg: {
    sourcePath: string;
    destinationPath: string;
    targetBucket?: Bucket;
    overwrite?: boolean;
  }): Promise<void> {
    try {
      const destinationBucket = optionsArg.targetBucket || this;
      const exists = await destinationBucket.fastExists({
        path: optionsArg.destinationPath,
      });

      if (exists && !optionsArg.overwrite) {
        console.error(
          `Object already exists at destination path '${optionsArg.destinationPath}' in bucket '${destinationBucket.name}'.`
        );
        return;
      } else if (exists && optionsArg.overwrite) {
        console.log(
          `Overwriting existing object at destination path '${optionsArg.destinationPath}' in bucket '${destinationBucket.name}'.`
        );
      } else {
        console.log(
          `Moving object to path '${optionsArg.destinationPath}' in bucket '${destinationBucket.name}'.`
        );
      }

      await this.fastCopy(optionsArg);
      await this.fastRemove({ path: optionsArg.sourcePath });

      console.log(
        `Object '${optionsArg.sourcePath}' has been successfully moved to '${optionsArg.destinationPath}' in bucket '${destinationBucket.name}'.`
      );
    } catch (error) {
      console.error(
        `Error moving object from '${optionsArg.sourcePath}' to '${optionsArg.destinationPath}':`,
        error
      );
      throw error;
    }
  }

  /**
   * removeObject
   */
  public async fastRemove(optionsArg: { path: string }) {
    const command = new plugins.s3.DeleteObjectCommand({
      Bucket: this.name,
      Key: optionsArg.path,
    });
    await this.smartbucketRef.storageClient.send(command);
  }

  /**
   * check whether file exists
   * @param optionsArg
   * @returns
   */
  public async fastExists(optionsArg: { path: string }): Promise<boolean> {
    try {
      const command = new plugins.s3.HeadObjectCommand({
        Bucket: this.name,
        Key: optionsArg.path,
      });
      await this.smartbucketRef.storageClient.send(command);
      console.log(`Object '${optionsArg.path}' exists in bucket '${this.name}'.`);
      return true;
    } catch (error: any) {
      if (error?.name === 'NotFound') {
        console.log(`Object '${optionsArg.path}' does not exist in bucket '${this.name}'.`);
        return false;
      } else {
        console.error('Error checking object existence:', error);
        throw error; // Rethrow if it's not a NotFound error to handle unexpected issues
      }
    }
  }

  /**
   * deletes this bucket
   */
  public async delete() {
    await this.smartbucketRef.storageClient.send(
      new plugins.s3.DeleteBucketCommand({ Bucket: this.name })
    );
  }

  public async fastStat(pathDescriptor: interfaces.IPathDecriptor) {
    const checkPath = await helpers.reducePathDescriptorToPath(pathDescriptor);
    const command = new plugins.s3.HeadObjectCommand({
      Bucket: this.name,
      Key: checkPath,
    });
    return this.smartbucketRef.storageClient.send(command);
  }

  public async isDirectory(pathDescriptor: interfaces.IPathDecriptor): Promise<boolean> {
    const checkPath = await helpers.reducePathDescriptorToPath(pathDescriptor);
    const command = new plugins.s3.ListObjectsV2Command({
      Bucket: this.name,
      Prefix: checkPath,
      Delimiter: '/',
    });
    const { CommonPrefixes } = await this.smartbucketRef.storageClient.send(command);
    return !!CommonPrefixes && CommonPrefixes.length > 0;
  }

  public async isFile(pathDescriptor: interfaces.IPathDecriptor): Promise<boolean> {
    const checkPath = await helpers.reducePathDescriptorToPath(pathDescriptor);
    const command = new plugins.s3.ListObjectsV2Command({
      Bucket: this.name,
      Prefix: checkPath,
      Delimiter: '/',
    });
    const { Contents } = await this.smartbucketRef.storageClient.send(command);
    return !!Contents && Contents.length > 0;
  }

  public async getMagicBytes(optionsArg: { path: string; length: number }): Promise<Buffer> {
    try {
      const command = new plugins.s3.GetObjectCommand({
        Bucket: this.name,
        Key: optionsArg.path,
        Range: `bytes=0-${optionsArg.length - 1}`,
      });
      const response = await this.smartbucketRef.storageClient.send(command);
      const chunks: Buffer[] = [];
      const stream = response.Body as any; // SdkStreamMixin includes readable stream

      for await (const chunk of stream) {
        chunks.push(chunk);
      }
      return Buffer.concat(chunks);
    } catch (error) {
      console.error(
        `Error retrieving magic bytes from object at path '${optionsArg.path}' in bucket '${this.name}':`,
        error
      );
      throw error;
    }
  }

  // ==========================================
  // Memory-Efficient Listing Methods (Phase 1)
  // ==========================================

  /**
   * List all objects with a given prefix using async generator (memory-efficient streaming)
   * @param prefix - Optional prefix to filter objects (default: '' for all objects)
   * @yields Object keys one at a time
   * @example
   * ```ts
   * for await (const key of bucket.listAllObjects('npm/')) {
   *   console.log(key);
   *   if (shouldStop) break; // Early exit supported
   * }
   * ```
   */
  public async *listAllObjects(prefix: string = ''): AsyncIterableIterator<string> {
    let continuationToken: string | undefined;

    do {
      const command = new plugins.s3.ListObjectsV2Command({
        Bucket: this.name,
        Prefix: prefix,
        ContinuationToken: continuationToken,
      });

      const response = await this.smartbucketRef.storageClient.send(command);

      for (const obj of response.Contents || []) {
        if (obj.Key) yield obj.Key;
      }

      continuationToken = response.NextContinuationToken;
    } while (continuationToken);
  }

  /**
   * List all objects as an RxJS Observable (for complex reactive pipelines)
   * @param prefix - Optional prefix to filter objects (default: '' for all objects)
   * @returns Observable that emits object keys
   * @example
   * ```ts
   * bucket.listAllObjectsObservable('npm/')
   *   .pipe(
   *     filter(key => key.endsWith('.json')),
   *     take(100)
   *   )
   *   .subscribe(key => console.log(key));
   * ```
   */
  public listAllObjectsObservable(prefix: string = ''): plugins.smartrx.rxjs.Observable<string> {
    return new plugins.smartrx.rxjs.Observable<string>((subscriber) => {
      const fetchPage = async (token?: string) => {
        try {
          const command = new plugins.s3.ListObjectsV2Command({
            Bucket: this.name,
            Prefix: prefix,
            ContinuationToken: token,
          });

          const response = await this.smartbucketRef.storageClient.send(command);

          for (const obj of response.Contents || []) {
            if (obj.Key) subscriber.next(obj.Key);
          }

          if (response.NextContinuationToken) {
            await fetchPage(response.NextContinuationToken);
          } else {
            subscriber.complete();
          }
        } catch (error) {
          subscriber.error(error);
        }
      };

      fetchPage();
    });
  }

  /**
   * Create a cursor for manual pagination control
   * @param prefix - Optional prefix to filter objects (default: '' for all objects)
   * @param options - Cursor options (pageSize, etc.)
   * @returns ListCursor instance
   * @example
   * ```ts
   * const cursor = bucket.createCursor('npm/', { pageSize: 500 });
   * while (cursor.hasMore()) {
   *   const { keys, done } = await cursor.next();
   *   console.log(`Processing ${keys.length} keys...`);
   * }
   * ```
   */
  public createCursor(prefix: string = '', options?: IListCursorOptions): ListCursor {
    return new ListCursor(this, prefix, options);
  }

  /**
   * Create a watcher for monitoring bucket changes (add/modify/delete)
   * @param options - Watcher options (prefix, pollIntervalMs, etc.)
   * @returns BucketWatcher instance
   * @example
   * ```ts
   * const watcher = bucket.createWatcher({ prefix: 'uploads/', pollIntervalMs: 3000 });
   * watcher.changeSubject.subscribe((change) => console.log('Change:', change));
   * await watcher.start();
   * // ... later
   * await watcher.stop();
   * ```
   */
  public createWatcher(options?: interfaces.IBucketWatcherOptions): BucketWatcher {
    return new BucketWatcher(this, options);
  }

  // ==========================================
  // High-Level Listing Helpers (Phase 2)
  // ==========================================

  /**
   * Find objects matching a glob pattern (memory-efficient)
   * @param pattern - Glob pattern (e.g., "**\/*.json", "npm/packages/*\/index.json")
   * @yields Matching object keys
   * @example
   * ```ts
   * for await (const key of bucket.findByGlob('npm/packages/*\/index.json')) {
   *   console.log('Found package index:', key);
   * }
   * ```
   */
  public async *findByGlob(pattern: string): AsyncIterableIterator<string> {
    const matcher = new plugins.Minimatch(pattern);
    for await (const key of this.listAllObjects('')) {
      if (matcher.match(key)) yield key;
    }
  }

  /**
   * List all objects and collect into an array (convenience method)
   * WARNING: Loads entire result set into memory. Use listAllObjects() generator for large buckets.
   * @param prefix - Optional prefix to filter objects (default: '' for all objects)
   * @returns Array of all object keys
   * @example
   * ```ts
   * const allKeys = await bucket.listAllObjectsArray('npm/');
   * console.log(`Found ${allKeys.length} objects`);
   * ```
   */
  public async listAllObjectsArray(prefix: string = ''): Promise<string[]> {
    const keys: string[] = [];
    for await (const key of this.listAllObjects(prefix)) {
      keys.push(key);
    }
    return keys;
  }

  public async cleanAllContents(): Promise<void> {
    try {
      // Define the command type explicitly
      const listCommandInput: plugins.s3.ListObjectsV2CommandInput = {
        Bucket: this.name,
      };
  
      let isTruncated = true;
      let continuationToken: string | undefined = undefined;
  
      while (isTruncated) {
        // Add the continuation token to the input if present
        const listCommand = new plugins.s3.ListObjectsV2Command({
          ...listCommandInput,
          ContinuationToken: continuationToken,
        });
  
        // Explicitly type the response
        const response: plugins.s3.ListObjectsV2Output =
          await this.smartbucketRef.storageClient.send(listCommand);
        
        console.log(`Cleaning contents of bucket '${this.name}': Now deleting ${response.Contents?.length} items...`);

        if (response.Contents && response.Contents.length > 0) {
          // Delete objects in batches, mapping each item to { Key: string }
          const deleteCommand = new plugins.s3.DeleteObjectsCommand({
            Bucket: this.name,
            Delete: {
              Objects: response.Contents.map((item) => ({ Key: item.Key! })),
              Quiet: true,
            },
          });
  
          await this.smartbucketRef.storageClient.send(deleteCommand);
        }
  
        // Update continuation token and truncation status
        isTruncated = response.IsTruncated || false;
        continuationToken = response.NextContinuationToken;
      }
  
      console.log(`All contents in bucket '${this.name}' have been deleted.`);
    } catch (error) {
      console.error(`Error cleaning contents of bucket '${this.name}':`, error);
      throw error;
    }
  }
}
