import * as Cause from "../Cause.js"
import type * as Channel from "../Channel.js"
import * as Chunk from "../Chunk.js"
import * as Deferred from "../Deferred.js"
import * as Effect from "../Effect.js"
import * as Effectable from "../Effectable.js"
import * as Exit from "../Exit.js"
import { dual, pipe } from "../Function.js"
import type * as GroupBy from "../GroupBy.js"
import * as Option from "../Option.js"
import { pipeArguments } from "../Pipeable.js"
import { hasProperty, type Predicate } from "../Predicate.js"
import * as Queue from "../Queue.js"
import * as Ref from "../Ref.js"
import * as Scope from "../Scope.js"
import type * as Stream from "../Stream.js"
import type * as Take from "../Take.js"
import type { NoInfer } from "../Types.js"
import * as channel from "./channel.js"
import * as channelExecutor from "./channel/channelExecutor.js"
import * as core from "./core-stream.js"
import * as stream from "./stream.js"
import * as take from "./take.js"

/** @internal */
const GroupBySymbolKey = "effect/GroupBy"

/** @internal */
export const GroupByTypeId: GroupBy.GroupByTypeId = Symbol.for(
  GroupBySymbolKey
) as GroupBy.GroupByTypeId

const groupByVariance = {
  /* c8 ignore next */
  _R: (_: never) => _,
  /* c8 ignore next */
  _E: (_: never) => _,
  /* c8 ignore next */
  _K: (_: never) => _,
  /* c8 ignore next */
  _V: (_: never) => _
}

/** @internal */
export const isGroupBy = (u: unknown): u is GroupBy.GroupBy<unknown, unknown, unknown, unknown> =>
  hasProperty(u, GroupByTypeId)

/** @internal */
export const evaluate = dual<
  <K, V, E, A, E2, R2>(
    f: (key: K, stream: Stream.Stream<V, E>) => Stream.Stream<A, E2, R2>,
    options?: {
      readonly bufferSize?: number | undefined
    }
  ) => <R>(self: GroupBy.GroupBy<K, V, E, R>) => Stream.Stream<A, E | E2, R2 | R>,
  <K, V, E, R, A, E2, R2>(
    self: GroupBy.GroupBy<K, V, E, R>,
    f: (key: K, stream: Stream.Stream<V, E>) => Stream.Stream<A, E2, R2>,
    options?: {
      readonly bufferSize?: number | undefined
    }
  ) => Stream.Stream<A, E | E2, R2 | R>
>(
  (args) => isGroupBy(args[0]),
  <K, V, E, R, A, E2, R2>(
    self: GroupBy.GroupBy<K, V, E, R>,
    f: (key: K, stream: Stream.Stream<V, E>) => Stream.Stream<A, E2, R2>,
    options?: {
      readonly bufferSize?: number | undefined
    }
  ): Stream.Stream<A, E | E2, R2 | R> =>
    stream.flatMap(
      self.grouped,
      ([key, queue]) => f(key, stream.flattenTake(stream.fromQueue(queue, { shutdown: true }))),
      { concurrency: "unbounded", bufferSize: options?.bufferSize ?? 16 }
    )
)

/** @internal */
export const filter = dual<
  <K>(predicate: Predicate<NoInfer<K>>) => <V, E, R>(self: GroupBy.GroupBy<K, V, E, R>) => GroupBy.GroupBy<K, V, E, R>,
  <K, V, E, R>(self: GroupBy.GroupBy<K, V, E, R>, predicate: Predicate<K>) => GroupBy.GroupBy<K, V, E, R>
>(2, <K, V, E, R>(self: GroupBy.GroupBy<K, V, E, R>, predicate: Predicate<K>): GroupBy.GroupBy<K, V, E, R> =>
  make(
    pipe(
      self.grouped,
      stream.filterEffect((tuple) => {
        if (predicate(tuple[0])) {
          return pipe(Effect.succeed(tuple), Effect.as(true))
        }
        return pipe(Queue.shutdown(tuple[1]), Effect.as(false))
      })
    )
  ))

/** @internal */
export const first = dual<
  (n: number) => <K, V, E, R>(self: GroupBy.GroupBy<K, V, E, R>) => GroupBy.GroupBy<K, V, E, R>,
  <K, V, E, R>(self: GroupBy.GroupBy<K, V, E, R>, n: number) => GroupBy.GroupBy<K, V, E, R>
>(2, <K, V, E, R>(self: GroupBy.GroupBy<K, V, E, R>, n: number): GroupBy.GroupBy<K, V, E, R> =>
  make(
    pipe(
      stream.zipWithIndex(self.grouped),
      stream.filterEffect((tuple) => {
        const index = tuple[1]
        const queue = tuple[0][1]
        if (index < n) {
          return pipe(Effect.succeed(tuple), Effect.as(true))
        }
        return pipe(Queue.shutdown(queue), Effect.as(false))
      }),
      stream.map((tuple) => tuple[0])
    )
  ))

/** @internal */
export const make = <K, V, E, R>(
  grouped: Stream.Stream<readonly [K, Queue.Dequeue<Take.Take<V, E>>], E, R>
): GroupBy.GroupBy<K, V, E, R> => ({
  [GroupByTypeId]: groupByVariance,
  pipe() {
    return pipeArguments(this, arguments)
  },
  grouped
})

// Circular with Stream

/** @internal */
export const groupBy = dual<
  <A, K, V, E2, R2>(
    f: (a: A) => Effect.Effect<readonly [K, V], E2, R2>,
    options?: {
      readonly bufferSize?: number | undefined
    }
  ) => <E, R>(self: Stream.Stream<A, E, R>) => GroupBy.GroupBy<K, V, E2 | E, R2 | R>,
  <A, E, R, K, V, E2, R2>(
    self: Stream.Stream<A, E, R>,
    f: (a: A) => Effect.Effect<readonly [K, V], E2, R2>,
    options?: {
      readonly bufferSize?: number | undefined
    }
  ) => GroupBy.GroupBy<K, V, E2 | E, R2 | R>
>(
  (args) => stream.isStream(args[0]),
  <A, E, R, K, V, E2, R2>(
    self: Stream.Stream<A, E, R>,
    f: (a: A) => Effect.Effect<readonly [K, V], E2, R2>,
    options?: {
      readonly bufferSize?: number | undefined
    }
  ): GroupBy.GroupBy<K, V, E | E2, R | R2> =>
    make(
      stream.unwrapScoped(
        Effect.gen(function*() {
          const decider = yield* Deferred.make<(key: K, value: V) => Effect.Effect<Predicate<number>>>()
          const output = yield* Effect.acquireRelease(
            Queue.bounded<Exit.Exit<readonly [K, Queue.Dequeue<Take.Take<V, E | E2>>], Option.Option<E | E2>>>(
              options?.bufferSize ?? 16
            ),
            (queue) => Queue.shutdown(queue)
          )
          const ref = yield* Ref.make<Map<K, number>>(new Map())
          const add = yield* pipe(
            stream.mapEffectSequential(self, f),
            stream.distributedWithDynamicCallback(
              options?.bufferSize ?? 16,
              ([key, value]) => Effect.flatMap(Deferred.await(decider), (f) => f(key, value)),
              (exit) => Queue.offer(output, exit)
            )
          )
          yield* Deferred.succeed(decider, (key, _) =>
            pipe(
              Ref.get(ref),
              Effect.map((map) => Option.fromNullable(map.get(key))),
              Effect.flatMap(Option.match({
                onNone: () =>
                  Effect.flatMap(add, ([index, queue]) =>
                    Effect.zipRight(
                      Ref.update(ref, (map) => map.set(key, index)),
                      pipe(
                        Queue.offer(
                          output,
                          Exit.succeed(
                            [
                              key,
                              mapDequeue(queue, (exit) =>
                                new take.TakeImpl(pipe(
                                  exit,
                                  Exit.map((tuple) => Chunk.of(tuple[1]))
                                )))
                            ] as const
                          )
                        ),
                        Effect.as<Predicate<number>>((n: number) => n === index)
                      )
                    )),
                onSome: (index) => Effect.succeed<Predicate<number>>((n: number) => n === index)
              }))
            ))
          return stream.flattenExitOption(stream.fromQueue(output, { shutdown: true }))
        })
      )
    )
)

/** @internal */
export const mapEffectOptions = dual<
  {
    <A, A2, E2, R2>(
      f: (a: A) => Effect.Effect<A2, E2, R2>,
      options?: {
        readonly concurrency?: number | "unbounded" | undefined
        readonly unordered?: boolean | undefined
      }
    ): <E, R>(self: Stream.Stream<A, E, R>) => Stream.Stream<A2, E2 | E, R2 | R>
    <A, A2, E2, R2, K>(
      f: (a: A) => Effect.Effect<A2, E2, R2>,
      options: {
        readonly key: (a: A) => K
        readonly bufferSize?: number | undefined
      }
    ): <E, R>(self: Stream.Stream<A, E, R>) => Stream.Stream<A2, E2 | E, R2 | R>
  },
  {
    <A, E, R, A2, E2, R2>(
      self: Stream.Stream<A, E, R>,
      f: (a: A) => Effect.Effect<A2, E2, R2>,
      options?: {
        readonly concurrency?: number | "unbounded" | undefined
        readonly unordered?: boolean | undefined
      }
    ): Stream.Stream<A2, E2 | E, R2 | R>
    <A, E, R, A2, E2, R2, K>(
      self: Stream.Stream<A, E, R>,
      f: (a: A) => Effect.Effect<A2, E2, R2>,
      options: {
        readonly key: (a: A) => K
        readonly bufferSize?: number | undefined
      }
    ): Stream.Stream<A2, E2 | E, R2 | R>
  }
>(
  (args) => typeof args[0] !== "function",
  (<A, E, R, A2, E2, R2, K>(
    self: Stream.Stream<A, E, R>,
    f: (a: A) => Effect.Effect<A2, E2, R2>,
    options?: {
      readonly key?: ((a: A) => K) | undefined
      readonly concurrency?: number | "unbounded" | undefined
      readonly unordered?: boolean | undefined
      readonly bufferSize?: number | undefined
    }
  ): Stream.Stream<A2, E2 | E, R2 | R> => {
    if (options?.key) {
      return evaluate(
        groupByKey(self, options.key, { bufferSize: options.bufferSize }),
        (_, s) => stream.mapEffectSequential(s, f)
      )
    }

    return stream.matchConcurrency(
      options?.concurrency,
      () => stream.mapEffectSequential(self, f),
      (n) =>
        options?.unordered ?
          stream.flatMap(self, (a) => stream.fromEffect(f(a)), { concurrency: n }) :
          stream.mapEffectPar(self, n, f)
    )
  }) as any
)

/** @internal */
export const bindEffect = dual<
  <N extends string, A, B, E2, R2>(
    tag: Exclude<N, keyof A>,
    f: (_: NoInfer<A>) => Effect.Effect<B, E2, R2>,
    options?: {
      readonly concurrency?: number | "unbounded" | undefined
      readonly bufferSize?: number | undefined
    }
  ) => <E, R>(self: Stream.Stream<A, E, R>) => Stream.Stream<
    { [K in keyof A | N]: K extends keyof A ? A[K] : B },
    E | E2,
    R | R2
  >,
  <A, E, R, N extends string, B, E2, R2>(
    self: Stream.Stream<A, E, R>,
    tag: Exclude<N, keyof A>,
    f: (_: NoInfer<A>) => Effect.Effect<B, E2, R2>,
    options?: {
      readonly concurrency?: number | "unbounded" | undefined
      readonly unordered?: boolean | undefined
    }
  ) => Stream.Stream<
    { [K in keyof A | N]: K extends keyof A ? A[K] : B },
    E | E2,
    R | R2
  >
>((args) => typeof args[0] !== "string", <A, E, R, N extends string, B, E2, R2>(
  self: Stream.Stream<A, E, R>,
  tag: Exclude<N, keyof A>,
  f: (_: A) => Effect.Effect<B, E2, R2>,
  options?: {
    readonly concurrency?: number | "unbounded" | undefined
    readonly unordered?: boolean | undefined
  }
) =>
  mapEffectOptions(self, (k) =>
    Effect.map(
      f(k),
      (a) => ({ ...k, [tag]: a } as { [K in keyof A | N]: K extends keyof A ? A[K] : B })
    ), options))

const mapDequeue = <A, B>(dequeue: Queue.Dequeue<A>, f: (a: A) => B): Queue.Dequeue<B> => new MapDequeue(dequeue, f)

class MapDequeue<in out A, out B> extends Effectable.Class<B> implements Queue.Dequeue<B> {
  readonly [Queue.DequeueTypeId] = {
    _Out: (_: never) => _
  }

  constructor(
    readonly dequeue: Queue.Dequeue<A>,
    readonly f: (a: A) => B
  ) {
    super()
  }

  capacity(): number {
    return Queue.capacity(this.dequeue)
  }

  get size(): Effect.Effect<number> {
    return Queue.size(this.dequeue)
  }

  unsafeSize(): Option.Option<number> {
    return this.dequeue.unsafeSize()
  }

  get awaitShutdown(): Effect.Effect<void> {
    return Queue.awaitShutdown(this.dequeue)
  }

  isActive(): boolean {
    return this.dequeue.isActive()
  }

  get isShutdown(): Effect.Effect<boolean> {
    return Queue.isShutdown(this.dequeue)
  }

  get shutdown(): Effect.Effect<void> {
    return Queue.shutdown(this.dequeue)
  }

  get isFull(): Effect.Effect<boolean> {
    return Queue.isFull(this.dequeue)
  }

  get isEmpty(): Effect.Effect<boolean> {
    return Queue.isEmpty(this.dequeue)
  }

  get take(): Effect.Effect<B> {
    return pipe(Queue.take(this.dequeue), Effect.map((a) => this.f(a)))
  }

  get takeAll(): Effect.Effect<Chunk.Chunk<B>> {
    return pipe(Queue.takeAll(this.dequeue), Effect.map(Chunk.map((a) => this.f(a))))
  }

  takeUpTo(max: number): Effect.Effect<Chunk.Chunk<B>> {
    return pipe(Queue.takeUpTo(this.dequeue, max), Effect.map(Chunk.map((a) => this.f(a))))
  }

  takeBetween(min: number, max: number): Effect.Effect<Chunk.Chunk<B>> {
    return pipe(Queue.takeBetween(this.dequeue, min, max), Effect.map(Chunk.map((a) => this.f(a))))
  }

  takeN(n: number): Effect.Effect<Chunk.Chunk<B>> {
    return pipe(Queue.takeN(this.dequeue, n), Effect.map(Chunk.map((a) => this.f(a))))
  }

  poll(): Effect.Effect<Option.Option<B>> {
    return pipe(Queue.poll(this.dequeue), Effect.map(Option.map((a) => this.f(a))))
  }

  pipe() {
    return pipeArguments(this, arguments)
  }

  commit() {
    return this.take
  }
}

/** @internal */
export const groupByKey = dual<
  <A, K>(
    f: (a: A) => K,
    options?: {
      readonly bufferSize?: number | undefined
    }
  ) => <E, R>(self: Stream.Stream<A, E, R>) => GroupBy.GroupBy<K, A, E, R>,
  <A, E, R, K>(
    self: Stream.Stream<A, E, R>,
    f: (a: A) => K,
    options?: {
      readonly bufferSize?: number | undefined
    }
  ) => GroupBy.GroupBy<K, A, E, R>
>(
  (args) => typeof args[0] !== "function",
  <A, E, R, K>(
    self: Stream.Stream<A, E, R>,
    f: (a: A) => K,
    options?: {
      readonly bufferSize?: number | undefined
    }
  ): GroupBy.GroupBy<K, A, E, R> => {
    const loop = (
      map: Map<K, Queue.Queue<Take.Take<A, E>>>,
      outerQueue: Queue.Queue<Take.Take<readonly [K, Queue.Queue<Take.Take<A, E>>], E>>
    ): Channel.Channel<never, Chunk.Chunk<A>, E, E, unknown, unknown, R> =>
      core.readWithCause({
        onInput: (input: Chunk.Chunk<A>) =>
          core.flatMap(
            core.fromEffect(
              Effect.forEach(groupByIterable(input, f), ([key, values]) => {
                const innerQueue = map.get(key)
                if (innerQueue === undefined) {
                  return pipe(
                    Queue.bounded<Take.Take<A, E>>(options?.bufferSize ?? 16),
                    Effect.flatMap((innerQueue) =>
                      pipe(
                        Effect.sync(() => {
                          map.set(key, innerQueue)
                        }),
                        Effect.zipRight(
                          Queue.offer(outerQueue, take.of([key, innerQueue] as const))
                        ),
                        Effect.zipRight(
                          pipe(
                            Queue.offer(innerQueue, take.chunk(values)),
                            Effect.catchSomeCause((cause) =>
                              Cause.isInterruptedOnly(cause) ?
                                Option.some(Effect.void) :
                                Option.none()
                            )
                          )
                        )
                      )
                    )
                  )
                }
                return Effect.catchSomeCause(
                  Queue.offer(innerQueue, take.chunk(values)),
                  (cause) =>
                    Cause.isInterruptedOnly(cause) ?
                      Option.some(Effect.void) :
                      Option.none()
                )
              }, { discard: true })
            ),
            () => loop(map, outerQueue)
          ),
        onFailure: (cause) => core.fromEffect(Queue.offer(outerQueue, take.failCause(cause))),
        onDone: () =>
          pipe(
            core.fromEffect(
              pipe(
                Effect.forEach(map.entries(), ([_, innerQueue]) =>
                  pipe(
                    Queue.offer(innerQueue, take.end),
                    Effect.catchSomeCause((cause) =>
                      Cause.isInterruptedOnly(cause) ?
                        Option.some(Effect.void) :
                        Option.none()
                    )
                  ), { discard: true }),
                Effect.zipRight(Queue.offer(outerQueue, take.end))
              )
            )
          )
      })
    return make(stream.unwrapScopedWith((scope) =>
      Effect.gen(function*() {
        const map = new Map<K, Queue.Queue<Take.Take<A, E>>>()
        const queue = yield* Queue.unbounded<Take.Take<readonly [K, Queue.Queue<Take.Take<A, E>>], E>>()
        yield* Scope.addFinalizer(scope, Queue.shutdown(queue))
        return yield* stream.toChannel(self).pipe(
          core.pipeTo(loop(map, queue)),
          channel.drain,
          channelExecutor.runIn(scope),
          Effect.forkIn(scope),
          Effect.as(stream.flattenTake(stream.fromQueue(queue, { shutdown: true })))
        )
      })
    ))
  }
)

/**
 * A variant of `groupBy` that retains the insertion order of keys.
 *
 * @internal
 */
const groupByIterable = dual<
  <V, K>(f: (value: V) => K) => (iterable: Iterable<V>) => Chunk.Chunk<[K, Chunk.Chunk<V>]>,
  <V, K>(iterable: Iterable<V>, f: (value: V) => K) => Chunk.Chunk<[K, Chunk.Chunk<V>]>
>(2, <V, K>(iterable: Iterable<V>, f: (value: V) => K): Chunk.Chunk<[K, Chunk.Chunk<V>]> => {
  const builder: Array<[K, Array<V>]> = []
  const iterator = iterable[Symbol.iterator]()
  const map = new Map<K, Array<V>>()
  let next: IteratorResult<V, any>
  while ((next = iterator.next()) && !next.done) {
    const value = next.value
    const key = f(value)
    if (map.has(key)) {
      const innerBuilder = map.get(key)!
      innerBuilder.push(value)
    } else {
      const innerBuilder: Array<V> = [value]
      builder.push([key, innerBuilder])
      map.set(key, innerBuilder)
    }
  }
  return Chunk.unsafeFromArray(
    builder.map((tuple) => [tuple[0], Chunk.unsafeFromArray(tuple[1])])
  )
})
