/**
 * Copyright 2024 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import { GenkitError } from './error.js';

/**
 * Error thrown when a stream cannot be found.
 */
export class StreamNotFoundError extends GenkitError {
  constructor(message: string) {
    super({ status: 'NOT_FOUND', message });
    this.name = 'StreamNotFoundError';
  }
}

/**
 * Interface for writing content to a stream.
 * @template S The type of the stream chunks.
 * @template O The type of the final output.
 */
export interface ActionStreamInput<S, O> {
  /**
   * Writes a chunk to the stream.
   * @param chunk The chunk data to write.
   */
  write(chunk: S): Promise<void>;
  /**
   * Closes the stream with a final output.
   * @param output The final output data.
   */
  done(output: O): Promise<void>;
  /**
   * Closes the stream with an error.
   * @param err The error that occurred.
   */
  error(err: any): Promise<void>;
}

/**
 * Subscriber callbacks for receiving stream events.
 * @template S The type of the stream chunks.
 * @template O The type of the final output.
 */
export type ActionStreamSubscriber<S, O> = {
  /** Called when a chunk is received. */
  onChunk: (chunk: S) => void;
  /** Called when the stream completes successfully. */
  onDone: (output: O) => void;
  /** Called when the stream encounters an error. */
  onError: (error: any) => void;
};

/**
 * Interface for managing streaming actions, allowing creation and subscription to streams.
 * Implementations can provide different storage backends (e.g., in-memory, database, cache).
 */
export interface StreamManager {
  /**
   * Opens a stream for writing.
   * @param streamId The unique identifier for the stream.
   * @returns An object to write to the stream.
   */
  open<S, O>(streamId: string): Promise<ActionStreamInput<S, O>>;
  /**
   * Subscribes to a stream to receive its events.
   * @param streamId The unique identifier for the stream.
   * @param options The subscriber callbacks.
   * @returns A promise resolving to an object containing an unsubscribe function.
   */
  subscribe<S, O>(
    streamId: string,
    options: ActionStreamSubscriber<S, O>
  ): Promise<{ unsubscribe: () => void }>;
}

type StreamState<S, O> =
  | {
      status: 'open';
      chunks: S[];
      subscribers: ActionStreamSubscriber<S, O>[];
      lastTouched: number;
    }
  | { status: 'done'; chunks: S[]; output: O; lastTouched: number }
  | { status: 'error'; chunks: S[]; error: any; lastTouched: number };

/**
 * An in-memory implementation of StreamManager.
 * Useful for testing or single-instance deployments where persistence is not required.
 */
export class InMemoryStreamManager implements StreamManager {
  private streams: Map<string, StreamState<any, any>> = new Map();

  /**
   * @param options Configuration options.
   * @param options.ttlSeconds Time-to-live for streams in seconds. Defaults to 5 minutes.
   */
  constructor(private options: { ttlSeconds?: number } = {}) {}

  private _cleanup() {
    const ttl = (this.options.ttlSeconds ?? 5 * 60) * 1000;
    const now = Date.now();
    for (const [streamId, stream] of this.streams.entries()) {
      if (stream.status !== 'open' && now - stream.lastTouched > ttl) {
        this.streams.delete(streamId);
      }
    }
  }

  async open<S, O>(streamId: string): Promise<ActionStreamInput<S, O>> {
    this._cleanup();
    if (this.streams.has(streamId)) {
      throw new Error(`Stream with id ${streamId} already exists.`);
    }
    this.streams.set(streamId, {
      status: 'open',
      chunks: [],
      subscribers: [],
      lastTouched: Date.now(),
    });

    return {
      write: async (chunk: S) => {
        const stream = this.streams.get(streamId);
        if (stream?.status === 'open') {
          stream.chunks.push(chunk);
          stream.subscribers.forEach((s) => s.onChunk(chunk));
          stream.lastTouched = Date.now();
        }
      },
      done: async (output: O) => {
        const stream = this.streams.get(streamId);
        if (stream?.status === 'open') {
          this.streams.set(streamId, {
            status: 'done',
            chunks: stream.chunks,
            output,
            lastTouched: Date.now(),
          });
          stream.subscribers.forEach((s) => s.onDone(output));
        }
      },
      error: async (err: any) => {
        const stream = this.streams.get(streamId);
        if (stream?.status === 'open') {
          stream.subscribers.forEach((s) => s.onError(err));
          this.streams.set(streamId, {
            status: 'error',
            chunks: stream.chunks,
            error: err,
            lastTouched: Date.now(),
          });
        }
      },
    };
  }

  async subscribe<S, O>(
    streamId: string,
    subscriber: ActionStreamSubscriber<S, O>
  ): Promise<{ unsubscribe: () => void }> {
    const stream = this.streams.get(streamId);
    if (!stream) {
      throw new StreamNotFoundError(`Stream with id ${streamId} not found.`);
    }

    if (stream.status === 'done') {
      for (const chunk of stream.chunks) {
        subscriber.onChunk(chunk);
      }
      subscriber.onDone(stream.output);
    } else if (stream.status === 'error') {
      for (const chunk of stream.chunks) {
        subscriber.onChunk(chunk);
      }
      subscriber.onError(stream.error);
    } else {
      stream.chunks.forEach((chunk) => subscriber.onChunk(chunk));
      stream.subscribers.push(subscriber);
    }

    return {
      unsubscribe: () => {
        const currentStream = this.streams.get(streamId);
        if (currentStream?.status === 'open') {
          const index = currentStream.subscribers.indexOf(subscriber);
          if (index > -1) {
            currentStream.subscribers.splice(index, 1);
          }
        }
      },
    };
  }
}
