{"version":3,"file":"InngestGroupTools.cjs","names":["options: ParallelOptions","getAsyncCtxSync","getAsyncLocalStorage","isALSFallback","nestedCtx: AsyncContext","experiment: GroupExperiment","getStepOptions","experimentStepHashedId: string | undefined","selectedVariant: string","currentCtx","selectCtx: AsyncContext","result","NonRetriableError","result: unknown"],"sources":["../../src/components/InngestGroupTools.ts"],"sourcesContent":["import type { IsNever } from \"../helpers/types.ts\";\nimport type { StepOptionsOrId } from \"../types.ts\";\nimport {\n  type AsyncContext,\n  getAsyncCtxSync,\n  getAsyncLocalStorage,\n  isALSFallback,\n} from \"./execution/als.ts\";\nimport { getStepOptions } from \"./InngestStepTools.ts\";\nimport { NonRetriableError } from \"./NonRetriableError.ts\";\n\n/**\n * Options for the `group.parallel()` helper.\n */\nexport interface ParallelOptions {\n  /**\n   * The parallel mode to apply to all steps created within the callback.\n   *\n   * - `\"race\"`: Steps will be executed with race semantics, meaning the first\n   *   step to complete will \"win\" and remaining steps may be cancelled.\n   */\n  mode?: \"race\";\n}\n\n/**\n * A helper that sets the parallel mode for all steps created within the\n * callback. This allows you to use native `Promise.race()` with cleaner syntax.\n *\n * @example\n * ```ts\n * // Defaults to \"race\" mode\n * const winner = await group.parallel(async () => {\n *   return Promise.race([\n *     step.run(\"a\", () => \"a\"),\n *     step.run(\"b\", () => \"b\"),\n *     step.run(\"c\", () => \"c\"),\n *   ]);\n * });\n *\n * // Or explicitly specify the mode\n * const winner = await group.parallel({ mode: \"race\" }, async () => {\n *   return Promise.race([\n *     step.run(\"a\", () => \"a\"),\n *     step.run(\"b\", () => \"b\"),\n *   ]);\n * });\n * ```\n */\nconst parallel = async <T>(\n  optionsOrCallback: ParallelOptions | (() => Promise<T>),\n  maybeCallback?: () => Promise<T>,\n): Promise<T> => {\n  const options: ParallelOptions =\n    typeof optionsOrCallback === \"function\" ? {} : optionsOrCallback;\n  const callback =\n    typeof optionsOrCallback === \"function\" ? optionsOrCallback : maybeCallback;\n\n  if (!callback) {\n    throw new Error(\"`group.parallel()` requires a callback function\");\n  }\n\n  const currentCtx = getAsyncCtxSync();\n\n  if (!currentCtx?.execution) {\n    throw new Error(\n      \"`group.parallel()` must be called within an Inngest function execution\",\n    );\n  }\n\n  const als = await getAsyncLocalStorage();\n\n  if (isALSFallback()) {\n    throw new Error(\n      \"`group.parallel()` requires AsyncLocalStorage support, which is not available in this runtime. \" +\n        \"Workaround: Pass `parallelMode` directly to each step:\\n\" +\n        '  step.run({ id: \"my-step\", parallelMode: \"race\" }, fn)',\n    );\n  }\n\n  // Create a new context with the parallelMode set\n  const nestedCtx: AsyncContext = {\n    ...currentCtx,\n    execution: {\n      ...currentCtx.execution,\n      parallelMode: options.mode ?? \"race\",\n    },\n  };\n\n  // Run the callback inside the nested context\n  return als.run(nestedCtx, callback);\n};\n\n/**\n * Configuration for how the experiment selects a variant.\n */\nexport interface ExperimentStrategyConfig {\n  strategy: string;\n  weights?: Record<string, number>;\n  nullishBucket?: boolean;\n}\n\n/**\n * A callable selection function that also carries strategy metadata.\n */\nexport interface ExperimentSelectFn {\n  (variantNames?: string[]): Promise<string> | string;\n  __experimentConfig: ExperimentStrategyConfig;\n}\n\n/**\n * Options for `group.experiment()`.\n */\nexport interface ExperimentOptions<\n  TVariants extends Record<string, () => unknown>,\n> {\n  /**\n   * A map of variant names to callbacks. The selected variant's callback will\n   * be executed at the top level so that any `step.*` calls inside it go\n   * through normal step discovery.\n   */\n  variants: TVariants;\n\n  /**\n   * A selection function that returns the name of the variant to execute.\n   * The result is memoized via a step so the same variant is used on retries.\n   */\n  select: ExperimentSelectFn;\n}\n\n/**\n * Options for `group.experiment()` when `withVariant` is true, which causes\n * the return type to include both the result and the selected variant name.\n */\nexport interface ExperimentOptionsWithVariant<\n  TVariants extends Record<string, () => unknown>,\n> extends ExperimentOptions<TVariants> {\n  /**\n   * When true, the return value includes the variant name alongside the result.\n   */\n  withVariant: true;\n}\n\n/**\n * Computes the return type of an experiment based on variant callbacks.\n *\n * When `TConstraint` is `never`, the return type is inferred as the union of\n * all variant callback return types. Otherwise `TConstraint` is used directly.\n */\nexport type VariantResult<\n  TConstraint,\n  TVariants extends Record<string, () => unknown>,\n> = IsNever<TConstraint> extends true\n  ? Awaited<ReturnType<TVariants[keyof TVariants]>>\n  : TConstraint;\n\n/**\n * Metadata values stored alongside the experiment step for UI rendering.\n */\nexport interface ExperimentMetadataValues {\n  experiment_name: string;\n  variant: string;\n  selection_strategy: string;\n  available_variants: string[];\n  variant_weights?: Record<string, number>;\n}\n\n/**\n * Overloaded interface for `group.experiment()`.\n */\nexport interface GroupExperiment {\n  /**\n   * Run an A/B experiment that selects and executes a variant. Returns both\n   * the result and the selected variant name.\n   */\n  <TVariants extends Record<string, () => unknown>>(\n    idOrOptions: StepOptionsOrId,\n    options: ExperimentOptionsWithVariant<TVariants>,\n  ): Promise<{\n    result: VariantResult<never, TVariants>;\n    variant: string;\n  }>;\n\n  /**\n   * Run an A/B experiment that selects and executes a variant. Returns only\n   * the variant callback's result.\n   */\n  <TVariants extends Record<string, () => unknown>>(\n    idOrOptions: StepOptionsOrId,\n    options: ExperimentOptions<TVariants>,\n  ): Promise<VariantResult<never, TVariants>>;\n}\n\n/**\n * Tools for grouping and coordinating steps.\n *\n * @public\n */\nexport interface GroupTools {\n  /**\n   * Run a callback where all steps automatically receive a `parallelMode`\n   * option, removing the need to tag each step individually. Defaults to\n   * `\"race\"` mode.\n   *\n   * @example\n   * ```ts\n   * // Defaults to \"race\" mode\n   * const winner = await group.parallel(async () => {\n   *   return Promise.race([\n   *     step.run(\"a\", () => \"a\"),\n   *     step.run(\"b\", () => \"b\"),\n   *     step.run(\"c\", () => \"c\"),\n   *   ]);\n   * });\n   *\n   * // Or explicitly specify the mode\n   * const winner = await group.parallel({ mode: \"race\" }, async () => {\n   *   return Promise.race([\n   *     step.run(\"a\", () => \"a\"),\n   *     step.run(\"b\", () => \"b\"),\n   *   ]);\n   * });\n   * ```\n   */\n  parallel: <T>(\n    optionsOrCallback: ParallelOptions | (() => Promise<T>),\n    maybeCallback?: () => Promise<T>,\n  ) => Promise<T>;\n\n  /**\n   * Run an A/B experiment within a function. Selects a variant via a memoized\n   * step, then executes the selected variant's callback at the top level so\n   * its `step.*` calls go through normal step discovery.\n   *\n   * @example\n   * ```ts\n   * const result = await group.experiment(\"checkout-flow\", {\n   *   variants: {\n   *     control: () => step.run(\"control-checkout\", () => oldCheckout()),\n   *     new_flow: () => step.run(\"new-checkout\", () => newCheckout()),\n   *   },\n   *   select: Object.assign(() => \"control\", {\n   *     __experimentConfig: { strategy: \"weighted\", weights: { control: 80, new_flow: 20 } },\n   *   }),\n   * });\n   * ```\n   */\n  experiment: GroupExperiment;\n}\n\n/**\n * Dependencies injected into `createGroupTools` from the execution engine.\n */\nexport interface GroupToolsDeps {\n  /**\n   * A `step.run` variant with `opts.type = \"group.experiment\"`, extracted from\n   * step tools via the experiment symbol. Undefined when not available.\n   */\n  // biome-ignore lint/suspicious/noExplicitAny: internal plumbing\n  experimentStepRun?: (...args: any[]) => Promise<any>;\n}\n\n/**\n * Create the `group` tools object provided on the function execution context.\n *\n * @public\n */\nexport const createGroupTools = (deps?: GroupToolsDeps): GroupTools => {\n  const experiment: GroupExperiment = (async (\n    idOrOptions: StepOptionsOrId,\n    // biome-ignore lint/suspicious/noExplicitAny: implementation signature for overloaded interface\n    options: any,\n  ) => {\n    if (!deps?.experimentStepRun) {\n      throw new Error(\n        \"`group.experiment()` requires step tools to be available. \" +\n          \"Ensure you are calling this within an Inngest function execution.\",\n      );\n    }\n\n    const { variants, select, withVariant } = options;\n    const variantNames = Object.keys(variants);\n\n    if (variantNames.length === 0) {\n      throw new Error(\n        \"`group.experiment()` requires at least one variant to be defined.\",\n      );\n    }\n\n    if (isALSFallback()) {\n      throw new Error(\n        \"`group.experiment()` requires AsyncLocalStorage support, which is not available in this runtime.\",\n      );\n    }\n\n    const stepOpts = getStepOptions(idOrOptions);\n\n    // Use the experiment step run to memoize the variant selection.\n    // This creates a StepPlanned opcode with opts.type = \"group.experiment\".\n    let experimentStepHashedId: string | undefined;\n\n    const selectedVariant: string = await deps.experimentStepRun(\n      idOrOptions,\n      async () => {\n        // Capture the hashed step ID so we can propagate it to variant sub-steps.\n        experimentStepHashedId =\n          getAsyncCtxSync()?.execution?.executingStep?.id;\n\n        const alsInstance = await getAsyncLocalStorage();\n        const currentCtx = getAsyncCtxSync()!;\n        const selectCtx: AsyncContext = {\n          ...currentCtx,\n          execution: {\n            ...currentCtx.execution!,\n            insideExperimentSelect: true,\n          },\n        };\n        const result = await alsInstance.run(selectCtx, () =>\n          select(variantNames),\n        );\n\n        if (!variantNames.includes(result)) {\n          throw new NonRetriableError(\n            `group.experiment(\"${stepOpts.id}\"): select() returned \"${result}\" ` +\n              `which is not a known variant. Available variants: ${variantNames.join(\", \")}`,\n          );\n        }\n\n        // Attach experiment metadata to this step's OutgoingOp.\n        const ctx = getAsyncCtxSync();\n        const execInstance = ctx?.execution?.instance;\n\n        if (execInstance && experimentStepHashedId) {\n          execInstance.addMetadata(\n            experimentStepHashedId,\n            \"inngest.experiment\",\n            \"step\",\n            \"merge\",\n            {\n              experiment_name: stepOpts.id,\n              variant: result,\n              selection_strategy: select.__experimentConfig.strategy,\n              available_variants: variantNames,\n              ...(select.__experimentConfig.weights && {\n                variant_weights: select.__experimentConfig.weights,\n              }),\n            } satisfies ExperimentMetadataValues,\n          );\n\n          if (select.__experimentConfig.nullishBucket) {\n            execInstance.addMetadata(\n              experimentStepHashedId,\n              \"inngest.warnings\",\n              \"step\",\n              \"merge\",\n              {\n                message:\n                  \"experiment.bucket() received a null/undefined value; \" +\n                  'hashing empty string \"\" for variant selection',\n              },\n            );\n          }\n        }\n\n        return result;\n      },\n    );\n\n    // Look up and execute the selected variant's callback at the top level\n    // so its step.* calls go through normal step discovery.\n    const variantFn = variants[selectedVariant];\n\n    if (!variantFn) {\n      throw new Error(\n        `group.experiment(\"${stepOpts.id}\"): variant \"${selectedVariant}\" ` +\n          `was selected but is not defined. Available variants: ${variantNames.join(\", \")}`,\n      );\n    }\n\n    // Propagate experiment context via ALS so variant sub-steps include\n    // experiment fields in their OutgoingOp.opts. The executor reads these\n    // fields from opts and emits the step-scoped `inngest.experiment`\n    // metadata span itself — the SDK does not need to call addMetadata()\n    // for variant steps. See the companion executor change in inngest/inngest\n    // for the server-side emission path.\n    //\n    // Also track whether any step tool is invoked to detect zero-step\n    // variants.\n    //\n    // NOTE: experimentStepHashedId may be undefined on replay because it\n    // is captured inside the selection step callback, which doesn't run\n    // when memoized. We still set experimentContext (with an empty string\n    // for the hashed ID fallback) so that variant sub-steps discovered on\n    // replay still carry experiment fields in their opts and the executor\n    // can attach metadata to their ClickHouse rows.\n    const currentCtx = getAsyncCtxSync();\n    const stepTracker = { found: false };\n    let result: unknown;\n\n    if (currentCtx?.execution && !isALSFallback()) {\n      const als = await getAsyncLocalStorage();\n      const nestedCtx: AsyncContext = {\n        ...currentCtx,\n        execution: {\n          ...currentCtx.execution,\n          experimentContext: {\n            experimentStepID: experimentStepHashedId ?? \"\",\n            experimentName: stepOpts.id,\n            variant: selectedVariant,\n            selectionStrategy: select.__experimentConfig.strategy,\n          },\n          experimentStepTracker: stepTracker,\n        },\n      };\n      result = await als.run(nestedCtx, () => variantFn());\n    } else {\n      result = await variantFn();\n    }\n\n    // If the variant returned without invoking any step tools, it will\n    // silently re-execute on every replay. Throw a non-retriable error\n    // to prevent this.\n    if (!stepTracker.found && !isALSFallback()) {\n      throw new NonRetriableError(\n        `group.experiment(\"${stepOpts.id}\"): variant \"${selectedVariant}\" ` +\n          \"did not invoke any step tools. Wrap your variant logic in \" +\n          \"step.run() to ensure it is memoized and not re-executed on replay.\",\n      );\n    }\n\n    if (withVariant) {\n      return { result, variant: selectedVariant };\n    }\n\n    return result;\n  }) as GroupExperiment;\n\n  return { parallel, experiment };\n};\n"],"mappings":";;;;;;;;;;;;;;;;;;;;;;;;;;;;;AAgDA,MAAM,WAAW,OACf,mBACA,kBACe;CACf,MAAMA,UACJ,OAAO,sBAAsB,aAAa,EAAE,GAAG;CACjD,MAAM,WACJ,OAAO,sBAAsB,aAAa,oBAAoB;AAEhE,KAAI,CAAC,SACH,OAAM,IAAI,MAAM,kDAAkD;CAGpE,MAAM,aAAaC,6BAAiB;AAEpC,KAAI,CAAC,YAAY,UACf,OAAM,IAAI,MACR,yEACD;CAGH,MAAM,MAAM,MAAMC,kCAAsB;AAExC,KAAIC,2BAAe,CACjB,OAAM,IAAI,MACR,qNAGD;CAIH,MAAMC,YAA0B;EAC9B,GAAG;EACH,WAAW;GACT,GAAG,WAAW;GACd,cAAc,QAAQ,QAAQ;GAC/B;EACF;AAGD,QAAO,IAAI,IAAI,WAAW,SAAS;;;;;;;AAiLrC,MAAa,oBAAoB,SAAsC;CACrE,MAAMC,cAA+B,OACnC,aAEA,YACG;AACH,MAAI,CAAC,MAAM,kBACT,OAAM,IAAI,MACR,8HAED;EAGH,MAAM,EAAE,UAAU,QAAQ,gBAAgB;EAC1C,MAAM,eAAe,OAAO,KAAK,SAAS;AAE1C,MAAI,aAAa,WAAW,EAC1B,OAAM,IAAI,MACR,oEACD;AAGH,MAAIF,2BAAe,CACjB,OAAM,IAAI,MACR,mGACD;EAGH,MAAM,WAAWG,wCAAe,YAAY;EAI5C,IAAIC;EAEJ,MAAMC,kBAA0B,MAAM,KAAK,kBACzC,aACA,YAAY;AAEV,4BACEP,6BAAiB,EAAE,WAAW,eAAe;GAE/C,MAAM,cAAc,MAAMC,kCAAsB;GAChD,MAAMO,eAAaR,6BAAiB;GACpC,MAAMS,YAA0B;IAC9B,GAAGD;IACH,WAAW;KACT,GAAGA,aAAW;KACd,wBAAwB;KACzB;IACF;GACD,MAAME,WAAS,MAAM,YAAY,IAAI,iBACnC,OAAO,aAAa,CACrB;AAED,OAAI,CAAC,aAAa,SAASA,SAAO,CAChC,OAAM,IAAIC,4CACR,qBAAqB,SAAS,GAAG,yBAAyBD,SAAO,sDACV,aAAa,KAAK,KAAK,GAC/E;GAKH,MAAM,eADMV,6BAAiB,EACH,WAAW;AAErC,OAAI,gBAAgB,wBAAwB;AAC1C,iBAAa,YACX,wBACA,sBACA,QACA,SACA;KACE,iBAAiB,SAAS;KAC1B,SAASU;KACT,oBAAoB,OAAO,mBAAmB;KAC9C,oBAAoB;KACpB,GAAI,OAAO,mBAAmB,WAAW,EACvC,iBAAiB,OAAO,mBAAmB,SAC5C;KACF,CACF;AAED,QAAI,OAAO,mBAAmB,cAC5B,cAAa,YACX,wBACA,oBACA,QACA,SACA,EACE,SACE,wGAEH,CACF;;AAIL,UAAOA;IAEV;EAID,MAAM,YAAY,SAAS;AAE3B,MAAI,CAAC,UACH,OAAM,IAAI,MACR,qBAAqB,SAAS,GAAG,eAAe,gBAAgB,yDACN,aAAa,KAAK,KAAK,GAClF;EAmBH,MAAM,aAAaV,6BAAiB;EACpC,MAAM,cAAc,EAAE,OAAO,OAAO;EACpC,IAAIY;AAEJ,MAAI,YAAY,aAAa,CAACV,2BAAe,EAAE;GAC7C,MAAM,MAAM,MAAMD,kCAAsB;GACxC,MAAME,YAA0B;IAC9B,GAAG;IACH,WAAW;KACT,GAAG,WAAW;KACd,mBAAmB;MACjB,kBAAkB,0BAA0B;MAC5C,gBAAgB,SAAS;MACzB,SAAS;MACT,mBAAmB,OAAO,mBAAmB;MAC9C;KACD,uBAAuB;KACxB;IACF;AACD,YAAS,MAAM,IAAI,IAAI,iBAAiB,WAAW,CAAC;QAEpD,UAAS,MAAM,WAAW;AAM5B,MAAI,CAAC,YAAY,SAAS,CAACD,2BAAe,CACxC,OAAM,IAAIS,4CACR,qBAAqB,SAAS,GAAG,eAAe,gBAAgB,gIAGjE;AAGH,MAAI,YACF,QAAO;GAAE;GAAQ,SAAS;GAAiB;AAG7C,SAAO;;AAGT,QAAO;EAAE;EAAU;EAAY"}