{"version":3,"file":"drain-pipeline.cjs","names":[],"sources":["../src/drain-pipeline.ts"],"sourcesContent":["export interface DrainPipelineOptions<T = unknown> {\n  batch?: {\n    /** Maximum events per batch. @default 50 */\n    size?: number;\n    /** Max time an event can stay buffered before flush. @default 5000 */\n    intervalMs?: number;\n  };\n  retry?: {\n    /** Total attempts including first try. @default 3 */\n    maxAttempts?: number;\n    /** Delay strategy between attempts. @default 'exponential' */\n    backoff?: 'exponential' | 'linear' | 'fixed';\n    /** Base delay for first retry. @default 1000 */\n    initialDelayMs?: number;\n    /** Max delay cap. @default 30000 */\n    maxDelayMs?: number;\n    /** Add random jitter to delays. @default true */\n    jitter?: boolean;\n  };\n  /** Max buffered events before dropping. @default 1000 */\n  maxBufferSize?: number;\n  /** Overflow policy. @default 'oldest' */\n  dropPolicy?: 'oldest' | 'newest';\n  /** Called when events are dropped from overflow or exhausted retries. */\n  onDropped?: (events: T[], error?: Error) => void;\n}\n\nexport interface PipelineDrainFn<T> {\n  (ctx: T): void;\n  /** Flush all buffered events. */\n  flush: () => Promise<void>;\n  /** Flush and stop scheduling future timer work. */\n  shutdown: () => Promise<void>;\n  readonly pending: number;\n}\n\nfunction wait(ms: number): Promise<void> {\n  return new Promise((resolve) => {\n    const timer = setTimeout(resolve, ms);\n    timer.unref?.();\n  });\n}\n\nexport function createDrainPipeline<T = unknown>(\n  options?: DrainPipelineOptions<T>,\n): (drain: (batch: T[]) => void | Promise<void>) => PipelineDrainFn<T> {\n  const batchSize = options?.batch?.size ?? 50;\n  const intervalMs = options?.batch?.intervalMs ?? 5000;\n  const maxBufferSize = options?.maxBufferSize ?? 1000;\n  const maxAttempts = options?.retry?.maxAttempts ?? 3;\n  const backoff = options?.retry?.backoff ?? 'exponential';\n  const initialDelayMs = options?.retry?.initialDelayMs ?? 1000;\n  const maxDelayMs = options?.retry?.maxDelayMs ?? 30_000;\n  const jitter = options?.retry?.jitter ?? true;\n  const dropPolicy = options?.dropPolicy ?? 'oldest';\n  const onDropped = options?.onDropped;\n\n  if (!Number.isFinite(batchSize) || batchSize <= 0) {\n    throw new Error(\n      `[autotel/drain-pipeline] batch.size must be a positive finite number, got: ${batchSize}`,\n    );\n  }\n  if (!Number.isFinite(intervalMs) || intervalMs <= 0) {\n    throw new Error(\n      `[autotel/drain-pipeline] batch.intervalMs must be a positive finite number, got: ${intervalMs}`,\n    );\n  }\n  if (!Number.isFinite(maxBufferSize) || maxBufferSize <= 0) {\n    throw new Error(\n      `[autotel/drain-pipeline] maxBufferSize must be a positive finite number, got: ${maxBufferSize}`,\n    );\n  }\n  if (!Number.isFinite(maxAttempts) || maxAttempts <= 0) {\n    throw new Error(\n      `[autotel/drain-pipeline] retry.maxAttempts must be a positive finite number, got: ${maxAttempts}`,\n    );\n  }\n\n  return (drain: (batch: T[]) => void | Promise<void>): PipelineDrainFn<T> => {\n    const buffer: T[] = [];\n    let timer: ReturnType<typeof setTimeout> | null = null;\n    let activeFlush: Promise<void> | null = null;\n    let isShutdown = false;\n\n    const clearTimer = () => {\n      if (timer) {\n        clearTimeout(timer);\n        timer = null;\n      }\n    };\n\n    const computeDelay = (attempt: number): number => {\n      const base =\n        backoff === 'fixed'\n          ? initialDelayMs\n          : backoff === 'linear'\n            ? initialDelayMs * attempt\n            : initialDelayMs * 2 ** (attempt - 1);\n\n      const bounded = Math.min(base, maxDelayMs);\n      if (!jitter || bounded <= 0) return bounded;\n      const factor = 0.5 + Math.random(); // [0.5, 1.5)\n      return Math.max(0, Math.round(bounded * factor));\n    };\n\n    const sendWithRetry = async (batch: T[]): Promise<void> => {\n      let lastError: Error | undefined;\n      for (let attempt = 1; attempt <= maxAttempts; attempt++) {\n        try {\n          await drain(batch);\n          return;\n        } catch (error) {\n          lastError = error instanceof Error ? error : new Error(String(error));\n          if (attempt < maxAttempts) {\n            await wait(computeDelay(attempt));\n          }\n        }\n      }\n      onDropped?.(batch, lastError);\n    };\n\n    const drainBuffer = async (): Promise<void> => {\n      while (buffer.length > 0) {\n        const batch = buffer.splice(0, batchSize);\n        await sendWithRetry(batch);\n      }\n    };\n\n    const scheduleFlush = () => {\n      if (isShutdown || timer || activeFlush) return;\n      timer = setTimeout(() => {\n        timer = null;\n        startFlush();\n      }, intervalMs);\n      timer.unref?.();\n    };\n\n    const startFlush = () => {\n      if (activeFlush || isShutdown) return;\n      activeFlush = drainBuffer().finally(() => {\n        activeFlush = null;\n        if (isShutdown) return;\n        if (buffer.length >= batchSize) {\n          startFlush();\n        } else if (buffer.length > 0) {\n          scheduleFlush();\n        }\n      });\n    };\n\n    const push = (ctx: T) => {\n      if (isShutdown) return;\n\n      if (buffer.length >= maxBufferSize) {\n        if (dropPolicy === 'newest') {\n          onDropped?.([ctx]);\n          return;\n        }\n        const dropped = buffer.splice(0, 1);\n        onDropped?.(dropped);\n      }\n\n      buffer.push(ctx);\n      if (buffer.length >= batchSize) {\n        clearTimer();\n        startFlush();\n      } else {\n        scheduleFlush();\n      }\n    };\n\n    const flush = async (): Promise<void> => {\n      clearTimer();\n      if (activeFlush) await activeFlush;\n\n      const snapshot = buffer.length;\n      if (snapshot <= 0) return;\n      const toFlush = buffer.splice(0, snapshot);\n      while (toFlush.length > 0) {\n        const batch = toFlush.splice(0, batchSize);\n        await sendWithRetry(batch);\n      }\n    };\n\n    const shutdown = async (): Promise<void> => {\n      isShutdown = true;\n      await flush();\n    };\n\n    const fn = push as PipelineDrainFn<T>;\n    fn.flush = flush;\n    fn.shutdown = shutdown;\n    Object.defineProperty(fn, 'pending', {\n      enumerable: true,\n      get: () => buffer.length,\n    });\n    return fn;\n  };\n}\n"],"mappings":";;;AAoCA,SAAS,KAAK,IAA2B;CACvC,OAAO,IAAI,SAAS,YAAY;EAE9B,AADc,WAAW,SAAS,EAC9B,CAAC,CAAC,QAAQ;CAChB,CAAC;AACH;AAEA,SAAgB,oBACd,SACqE;CACrE,MAAM,YAAY,SAAS,OAAO,QAAQ;CAC1C,MAAM,aAAa,SAAS,OAAO,cAAc;CACjD,MAAM,gBAAgB,SAAS,iBAAiB;CAChD,MAAM,cAAc,SAAS,OAAO,eAAe;CACnD,MAAM,UAAU,SAAS,OAAO,WAAW;CAC3C,MAAM,iBAAiB,SAAS,OAAO,kBAAkB;CACzD,MAAM,aAAa,SAAS,OAAO,cAAc;CACjD,MAAM,SAAS,SAAS,OAAO,UAAU;CACzC,MAAM,aAAa,SAAS,cAAc;CAC1C,MAAM,YAAY,SAAS;CAE3B,IAAI,CAAC,OAAO,SAAS,SAAS,KAAK,aAAa,GAC9C,MAAM,IAAI,MACR,8EAA8E,WAChF;CAEF,IAAI,CAAC,OAAO,SAAS,UAAU,KAAK,cAAc,GAChD,MAAM,IAAI,MACR,oFAAoF,YACtF;CAEF,IAAI,CAAC,OAAO,SAAS,aAAa,KAAK,iBAAiB,GACtD,MAAM,IAAI,MACR,iFAAiF,eACnF;CAEF,IAAI,CAAC,OAAO,SAAS,WAAW,KAAK,eAAe,GAClD,MAAM,IAAI,MACR,qFAAqF,aACvF;CAGF,QAAQ,UAAoE;EAC1E,MAAM,SAAc,CAAC;EACrB,IAAI,QAA8C;EAClD,IAAI,cAAoC;EACxC,IAAI,aAAa;EAEjB,MAAM,mBAAmB;GACvB,IAAI,OAAO;IACT,aAAa,KAAK;IAClB,QAAQ;GACV;EACF;EAEA,MAAM,gBAAgB,YAA4B;GAChD,MAAM,OACJ,YAAY,UACR,iBACA,YAAY,WACV,iBAAiB,UACjB,iBAAiB,MAAM,UAAU;GAEzC,MAAM,UAAU,KAAK,IAAI,MAAM,UAAU;GACzC,IAAI,CAAC,UAAU,WAAW,GAAG,OAAO;GACpC,MAAM,SAAS,KAAM,KAAK,OAAO;GACjC,OAAO,KAAK,IAAI,GAAG,KAAK,MAAM,UAAU,MAAM,CAAC;EACjD;EAEA,MAAM,gBAAgB,OAAO,UAA8B;GACzD,IAAI;GACJ,KAAK,IAAI,UAAU,GAAG,WAAW,aAAa,WAC5C,IAAI;IACF,MAAM,MAAM,KAAK;IACjB;GACF,SAAS,OAAO;IACd,YAAY,iBAAiB,QAAQ,QAAQ,IAAI,MAAM,OAAO,KAAK,CAAC;IACpE,IAAI,UAAU,aACZ,MAAM,KAAK,aAAa,OAAO,CAAC;GAEpC;GAEF,YAAY,OAAO,SAAS;EAC9B;EAEA,MAAM,cAAc,YAA2B;GAC7C,OAAO,OAAO,SAAS,GAErB,MAAM,cADQ,OAAO,OAAO,GAAG,SACP,CAAC;EAE7B;EAEA,MAAM,sBAAsB;GAC1B,IAAI,cAAc,SAAS,aAAa;GACxC,QAAQ,iBAAiB;IACvB,QAAQ;IACR,WAAW;GACb,GAAG,UAAU;GACb,MAAM,QAAQ;EAChB;EAEA,MAAM,mBAAmB;GACvB,IAAI,eAAe,YAAY;GAC/B,cAAc,YAAY,CAAC,CAAC,cAAc;IACxC,cAAc;IACd,IAAI,YAAY;IAChB,IAAI,OAAO,UAAU,WACnB,WAAW;SACN,IAAI,OAAO,SAAS,GACzB,cAAc;GAElB,CAAC;EACH;EAEA,MAAM,QAAQ,QAAW;GACvB,IAAI,YAAY;GAEhB,IAAI,OAAO,UAAU,eAAe;IAClC,IAAI,eAAe,UAAU;KAC3B,YAAY,CAAC,GAAG,CAAC;KACjB;IACF;IACA,MAAM,UAAU,OAAO,OAAO,GAAG,CAAC;IAClC,YAAY,OAAO;GACrB;GAEA,OAAO,KAAK,GAAG;GACf,IAAI,OAAO,UAAU,WAAW;IAC9B,WAAW;IACX,WAAW;GACb,OACE,cAAc;EAElB;EAEA,MAAM,QAAQ,YAA2B;GACvC,WAAW;GACX,IAAI,aAAa,MAAM;GAEvB,MAAM,WAAW,OAAO;GACxB,IAAI,YAAY,GAAG;GACnB,MAAM,UAAU,OAAO,OAAO,GAAG,QAAQ;GACzC,OAAO,QAAQ,SAAS,GAEtB,MAAM,cADQ,QAAQ,OAAO,GAAG,SACR,CAAC;EAE7B;EAEA,MAAM,WAAW,YAA2B;GAC1C,aAAa;GACb,MAAM,MAAM;EACd;EAEA,MAAM,KAAK;EACX,GAAG,QAAQ;EACX,GAAG,WAAW;EACd,OAAO,eAAe,IAAI,WAAW;GACnC,YAAY;GACZ,WAAW,OAAO;EACpB,CAAC;EACD,OAAO;CACT;AACF"}