import { Subject } from 'rxjs';
import { decode } from '../rxjs/decoder.js';
import type { DecodeOptions } from '../options.js';

/** 流式解码 UBJSON */
export class DecodeTransformer implements Transformer<BufferSource, unknown> {
    constructor(options?: DecodeOptions) {
        this.buffer = new Subject<BufferSource>();
        this.buffer.pipe(decode(options)).subscribe({
            next: (value) => {
                // null is not allowed in a stream
                if (value == null) return;
                this.controller.enqueue(value);
            },
            error: (err: Error) => {
                this.controller.error(err);
            },
            complete: () => {
                this.controller.terminate();
            },
        });
    }
    private readonly buffer;
    private controller!: TransformStreamDefaultController<unknown>;

    /** @inheritdoc */
    transform(obj: BufferSource, controller: TransformStreamDefaultController<unknown>): void {
        try {
            this.controller = controller;
            this.buffer.next(obj);
        } catch (ex) {
            controller.error(ex);
        }
    }

    /** @inheritdoc */
    flush(controller: TransformStreamDefaultController<unknown>): void {
        this.controller = controller;
        this.buffer.complete();
    }
}
