{"version":3,"file":"index.cjs","names":["BaseStrategy","internalLoggerSymbol","ConnectionState","execArgv: string[]","Worker","GatewayExecutorRequestData","SDKResponse","error: Error | undefined"],"sources":["../../../../../src/components/connect/strategies/workerThread/index.ts"],"sourcesContent":["/**\n * Worker thread connection strategy.\n *\n * This strategy runs the WebSocket connection, heartbeater, and lease extender\n * in a separate worker thread. Userland code execution still happens in the\n * main thread.\n */\n\nimport { createRequire } from \"node:module\";\nimport { dirname, extname, join } from \"node:path\";\nimport { fileURLToPath } from \"node:url\";\nimport { Worker } from \"node:worker_threads\";\nimport type { Logger } from \"../../../../middleware/logger.ts\";\nimport {\n  GatewayExecutorRequestData,\n  SDKResponse,\n} from \"../../../../proto/src/components/connect/protobuf/connect.ts\";\nimport { internalLoggerSymbol } from \"../../../Inngest.ts\";\nimport { type ConnectDebugState, ConnectionState } from \"../../types.ts\";\nimport { BaseStrategy } from \"../core/BaseStrategy.ts\";\nimport type { StrategyConfig } from \"../core/types.ts\";\nimport type {\n  MainToWorkerMessage,\n  SerializableConfig,\n  WorkerToMainMessage,\n} from \"./protocol.ts\";\n\nconst maxConsecutiveCrashes = 10;\nconst baseBackoffMs = 500;\nconst maxBackoffMs = 30_000;\n\n/**\n * Worker thread connection strategy.\n *\n * This strategy runs the WebSocket connection, heartbeater, and lease extender\n * in a separate Node.js worker thread. This prevents blocked user code from\n * interfering with connection health.\n */\nexport class WorkerThreadStrategy extends BaseStrategy {\n  private readonly config: StrategyConfig;\n  private worker: Worker | undefined;\n  private consecutiveCrashes = 0;\n  private _connectionId: string | undefined;\n  private _cachedDebugState: ConnectDebugState | undefined;\n\n  constructor(config: StrategyConfig) {\n    const primaryApp = config.options.apps[0];\n    if (!primaryApp) {\n      // Unreachable\n      throw new Error(\"No apps\");\n    }\n\n    super({ logger: primaryApp.client[internalLoggerSymbol] });\n    this.config = config;\n  }\n\n  get connectionId(): string | undefined {\n    return this._connectionId;\n  }\n\n  getDebugState(): ConnectDebugState {\n    if (this._cachedDebugState) {\n      return this._cachedDebugState;\n    }\n\n    return {\n      state: this._state,\n      activeConnectionId: this._connectionId,\n      drainingConnectionId: undefined,\n      lastHeartbeatSentAt: undefined,\n      lastHeartbeatReceivedAt: undefined,\n      lastMessageReceivedAt: undefined,\n      shutdownRequested:\n        this._state === ConnectionState.CLOSING ||\n        this._state === ConnectionState.CLOSED,\n      inFlightRequestCount: 0,\n      inFlightRequests: [],\n    };\n  }\n\n  async close(): Promise<void> {\n    this.cleanupShutdown();\n    this.setClosing();\n    this.internalLogger.debug(\"Closing worker thread connection\");\n\n    if (this.worker) {\n      // Send close message to worker\n      this.sendToWorker({ type: \"CLOSE\" });\n\n      // Wait for worker to finish\n      await new Promise<void>((resolve) => {\n        if (!this.worker) {\n          resolve();\n          return;\n        }\n\n        this.worker.once(\"exit\", () => {\n          resolve();\n        });\n      });\n\n      this.worker = undefined;\n    }\n\n    this.setClosed();\n    this.internalLogger.debug(\"Worker thread connection closed\");\n  }\n\n  async connect(attempt = 0): Promise<void> {\n    this.throwIfClosingOrClosed();\n    this.internalLogger.debug({ attempt }, \"Starting worker thread connection\");\n\n    this.setupShutdownSignalIfConfigured(\n      this.config.options.handleShutdownSignals,\n    );\n\n    // Create the worker thread\n    await this.createWorker();\n\n    // Initialize the worker with config\n    const serializableConfig = await this.buildSerializableConfig();\n    this.sendToWorker({ type: \"INIT\", config: serializableConfig });\n\n    // Wait for connection to be ready\n    await new Promise<void>((resolve, reject) => {\n      if (!this.worker) {\n        reject(new Error(\"Worker not created\"));\n        return;\n      }\n\n      const cleanup = () => {\n        this.worker?.off(\"message\", handleMessage);\n        this.worker?.off(\"exit\", handleExit);\n      };\n\n      const handleMessage = (msg: WorkerToMainMessage) => {\n        if (msg.type === \"CONNECTION_READY\") {\n          this._connectionId = msg.connectionId;\n          cleanup();\n          resolve();\n        } else if (msg.type === \"ERROR\" && msg.fatal) {\n          cleanup();\n          reject(new Error(msg.error));\n        }\n      };\n\n      const handleExit = (code: number) => {\n        cleanup();\n        reject(\n          new Error(`Worker thread exited with code ${code} during connect`),\n        );\n      };\n\n      this.worker.on(\"message\", handleMessage);\n      this.worker.on(\"exit\", handleExit);\n\n      // Send connect command\n      this.sendToWorker({ type: \"CONNECT\", attempt });\n    });\n  }\n\n  private async createWorker(): Promise<void> {\n    // Get the path to the runner file\n    // Use the same extension as the current file (.ts in dev, .js in prod)\n    const currentFilePath = fileURLToPath(import.meta.url);\n    const ext = extname(currentFilePath);\n    const runnerPath = join(dirname(currentFilePath), `runner${ext}`);\n\n    this.internalLogger.debug({ runnerPath }, \"Creating worker thread\");\n    let execArgv: string[] = [];\n    if (ext === \".ts\") {\n      // In dev/test (.ts), Node's native strip-only TS mode can't handle\n      // constructs like `enum` used throughout the SDK. Use tsx as a loader so\n      // the worker gets full TypeScript support. In production (.js) no loader\n      // is needed.\n      //\n      // It might be a good idea to stop using enums in our codebase for this\n      // reason.\n\n      const require = createRequire(import.meta.url);\n      const tsxPath = require.resolve(\"tsx\");\n      execArgv = [\"--no-experimental-strip-types\", \"--loader\", tsxPath];\n    }\n\n    this.worker = new Worker(runnerPath, {\n      env: process.env as NodeJS.ProcessEnv,\n      execArgv,\n      // Capture stdout/stderr to prevent tsx loader warnings from leaking to\n      // the parent process. The worker uses postMessage for all communication\n      // so nothing meaningful is lost.\n      stdout: true,\n      stderr: true,\n    });\n\n    // Set up worker event handlers\n    this.worker.on(\"message\", (msg: WorkerToMainMessage) => {\n      this.handleWorkerMessage(msg);\n    });\n\n    this.worker.on(\"error\", (err) => {\n      this.internalLogger.debug({ err }, \"Worker error\");\n      this._state = ConnectionState.RECONNECTING;\n    });\n\n    this.worker.on(\"exit\", (code) => {\n      this.internalLogger.debug({ code }, \"Worker exited\");\n      if (\n        this._state === ConnectionState.CLOSING ||\n        this._state === ConnectionState.CLOSED\n      ) {\n        return;\n      }\n\n      // Assume the worker crashed due to an unhandled exception, because the\n      // connection state isn't CLOSING or CLOSED. We'll try to respawn the\n      // worker after a backoff.\n\n      this.consecutiveCrashes++;\n      this._state = ConnectionState.RECONNECTING;\n\n      if (this.consecutiveCrashes > maxConsecutiveCrashes) {\n        this.internalLogger.error(\n          {\n            consecutiveCrashes: this.consecutiveCrashes,\n          },\n          \"Worker thread crashed consecutively, giving up\",\n        );\n        return;\n      }\n\n      const backoff = Math.min(\n        baseBackoffMs * 2 ** (this.consecutiveCrashes - 1),\n        maxBackoffMs,\n      );\n\n      this.internalLogger.warn(\n        {\n          consecutiveCrashes: this.consecutiveCrashes,\n          backoffMs: backoff,\n        },\n        \"Respawning worker after backoff\",\n      );\n\n      setTimeout(() => {\n        if (\n          this._state === ConnectionState.CLOSING ||\n          this._state === ConnectionState.CLOSED\n        ) {\n          return;\n        }\n\n        this.createWorker()\n          .then(async () => {\n            const config = await this.buildSerializableConfig();\n            this.sendToWorker({ type: \"INIT\", config });\n            this.sendToWorker({ type: \"CONNECT\", attempt: 0 });\n          })\n          .catch((err) => {\n            this.internalLogger.debug({ err }, \"Failed to recreate worker\");\n          });\n      }, backoff);\n    });\n  }\n\n  private handleWorkerMessage(msg: WorkerToMainMessage): void {\n    switch (msg.type) {\n      case \"STATE_CHANGE\":\n        this._state = msg.state;\n        this.internalLogger.debug({ state: msg.state }, \"State changed\");\n        break;\n\n      case \"CONNECTION_READY\":\n        this._connectionId = msg.connectionId;\n        this.consecutiveCrashes = 0;\n        this.internalLogger.debug(\n          { connectionId: msg.connectionId },\n          \"Connection ready\",\n        );\n        break;\n\n      case \"ERROR\":\n        if (msg.fatal) {\n          this.internalLogger.error(\n            { errorMessage: msg.error },\n            \"Fatal error from worker\",\n          );\n        } else {\n          this.internalLogger.error(\n            { errorMessage: msg.error },\n            \"Worker error\",\n          );\n        }\n        break;\n\n      case \"EXECUTION_REQUEST\":\n        this.handleExecutionRequest(msg.requestId, msg.request);\n        break;\n\n      case \"DEBUG_STATE\":\n        this._cachedDebugState = msg.state;\n        break;\n\n      case \"CLOSED\":\n        this._state = ConnectionState.CLOSED;\n        this.resolveClosingPromise?.();\n        break;\n\n      case \"LOG\":\n        this.handleWorkerLog(msg.level, msg.message, msg.data);\n        break;\n    }\n  }\n\n  private handleWorkerLog(\n    level: \"debug\" | \"info\" | \"warn\" | \"error\",\n    message: string,\n    data?: Record<string, unknown>,\n  ): void {\n    if (data) {\n      this.internalLogger[level](data, message);\n    } else {\n      this.internalLogger[level](message);\n    }\n  }\n\n  private async handleExecutionRequest(\n    requestId: string,\n    requestBytes: Uint8Array,\n  ): Promise<void> {\n    try {\n      // Decode the request\n      const gatewayExecutorRequest =\n        GatewayExecutorRequestData.decode(requestBytes);\n\n      // Find the request handler\n      const requestHandler =\n        this.config.requestHandlers[gatewayExecutorRequest.appName];\n\n      if (!requestHandler) {\n        this.internalLogger.debug(\n          { appName: gatewayExecutorRequest.appName },\n          \"No handler for app\",\n        );\n        this.sendToWorker({\n          type: \"EXECUTION_ERROR\",\n          requestId,\n          error: `No handler for app: ${gatewayExecutorRequest.appName}`,\n        });\n        return;\n      }\n\n      // Execute the handler in the main thread\n      const response = await requestHandler(gatewayExecutorRequest);\n\n      // Encode and send response back to worker\n      const responseBytes = SDKResponse.encode(response).finish();\n      this.sendToWorker({\n        type: \"EXECUTION_RESPONSE\",\n        requestId,\n        response: responseBytes,\n      });\n    } catch (err) {\n      let error: Error | undefined;\n      if (err instanceof Error) {\n        error = err;\n      } else {\n        error = new Error(String(err));\n      }\n      this.internalLogger.debug({ err: error, requestId }, \"Execution error\");\n      this.sendToWorker({\n        type: \"EXECUTION_ERROR\",\n        requestId,\n        error: err instanceof Error ? err.message : \"Unknown error\",\n      });\n    }\n  }\n\n  private sendToWorker(msg: MainToWorkerMessage): void {\n    if (!this.worker) {\n      this.internalLogger.error(\"Cannot send message, no worker\");\n      return;\n    }\n    this.worker.postMessage(msg);\n  }\n\n  private async buildSerializableConfig(): Promise<SerializableConfig> {\n    return {\n      apiBaseUrl: this.config.apiBaseUrl,\n      appIds: Object.keys(this.config.requestHandlers),\n      connectionData: this.config.connectionData,\n      envName: this.config.envName,\n      gatewayUrl: this.config.options.gatewayUrl,\n      handleShutdownSignals: this.config.options.handleShutdownSignals,\n      hashedFallbackKey: this.config.hashedFallbackKey,\n      hashedSigningKey: this.config.hashedSigningKey,\n      instanceId: this.config.options.instanceId,\n      maxWorkerConcurrency: this.config.options.maxWorkerConcurrency,\n      mode: this.config.mode,\n    };\n  }\n}\n"],"mappings":";;;;;;;;;;;;;;;;;;AA2BA,MAAM,wBAAwB;AAC9B,MAAM,gBAAgB;AACtB,MAAM,eAAe;;;;;;;;AASrB,IAAa,uBAAb,cAA0CA,kCAAa;CACrD,AAAiB;CACjB,AAAQ;CACR,AAAQ,qBAAqB;CAC7B,AAAQ;CACR,AAAQ;CAER,YAAY,QAAwB;EAClC,MAAM,aAAa,OAAO,QAAQ,KAAK;AACvC,MAAI,CAAC,WAEH,OAAM,IAAI,MAAM,UAAU;AAG5B,QAAM,EAAE,QAAQ,WAAW,OAAOC,uCAAuB,CAAC;AAC1D,OAAK,SAAS;;CAGhB,IAAI,eAAmC;AACrC,SAAO,KAAK;;CAGd,gBAAmC;AACjC,MAAI,KAAK,kBACP,QAAO,KAAK;AAGd,SAAO;GACL,OAAO,KAAK;GACZ,oBAAoB,KAAK;GACzB,sBAAsB;GACtB,qBAAqB;GACrB,yBAAyB;GACzB,uBAAuB;GACvB,mBACE,KAAK,WAAWC,8BAAgB,WAChC,KAAK,WAAWA,8BAAgB;GAClC,sBAAsB;GACtB,kBAAkB,EAAE;GACrB;;CAGH,MAAM,QAAuB;AAC3B,OAAK,iBAAiB;AACtB,OAAK,YAAY;AACjB,OAAK,eAAe,MAAM,mCAAmC;AAE7D,MAAI,KAAK,QAAQ;AAEf,QAAK,aAAa,EAAE,MAAM,SAAS,CAAC;AAGpC,SAAM,IAAI,SAAe,YAAY;AACnC,QAAI,CAAC,KAAK,QAAQ;AAChB,cAAS;AACT;;AAGF,SAAK,OAAO,KAAK,cAAc;AAC7B,cAAS;MACT;KACF;AAEF,QAAK,SAAS;;AAGhB,OAAK,WAAW;AAChB,OAAK,eAAe,MAAM,kCAAkC;;CAG9D,MAAM,QAAQ,UAAU,GAAkB;AACxC,OAAK,wBAAwB;AAC7B,OAAK,eAAe,MAAM,EAAE,SAAS,EAAE,oCAAoC;AAE3E,OAAK,gCACH,KAAK,OAAO,QAAQ,sBACrB;AAGD,QAAM,KAAK,cAAc;EAGzB,MAAM,qBAAqB,MAAM,KAAK,yBAAyB;AAC/D,OAAK,aAAa;GAAE,MAAM;GAAQ,QAAQ;GAAoB,CAAC;AAG/D,QAAM,IAAI,SAAe,SAAS,WAAW;AAC3C,OAAI,CAAC,KAAK,QAAQ;AAChB,2BAAO,IAAI,MAAM,qBAAqB,CAAC;AACvC;;GAGF,MAAM,gBAAgB;AACpB,SAAK,QAAQ,IAAI,WAAW,cAAc;AAC1C,SAAK,QAAQ,IAAI,QAAQ,WAAW;;GAGtC,MAAM,iBAAiB,QAA6B;AAClD,QAAI,IAAI,SAAS,oBAAoB;AACnC,UAAK,gBAAgB,IAAI;AACzB,cAAS;AACT,cAAS;eACA,IAAI,SAAS,WAAW,IAAI,OAAO;AAC5C,cAAS;AACT,YAAO,IAAI,MAAM,IAAI,MAAM,CAAC;;;GAIhC,MAAM,cAAc,SAAiB;AACnC,aAAS;AACT,2BACE,IAAI,MAAM,kCAAkC,KAAK,iBAAiB,CACnE;;AAGH,QAAK,OAAO,GAAG,WAAW,cAAc;AACxC,QAAK,OAAO,GAAG,QAAQ,WAAW;AAGlC,QAAK,aAAa;IAAE,MAAM;IAAW;IAAS,CAAC;IAC/C;;CAGJ,MAAc,eAA8B;EAG1C,MAAM,4FAAgD;EACtD,MAAM,6BAAc,gBAAgB;EACpC,MAAM,wDAA0B,gBAAgB,EAAE,SAAS,MAAM;AAEjE,OAAK,eAAe,MAAM,EAAE,YAAY,EAAE,yBAAyB;EACnE,IAAIC,WAAqB,EAAE;AAC3B,MAAI,QAAQ,MAWV,YAAW;GAAC;GAAiC;gFAFC,CACtB,QAAQ,MAAM;GAC2B;AAGnE,OAAK,SAAS,IAAIC,2BAAO,YAAY;GACnC,KAAK,QAAQ;GACb;GAIA,QAAQ;GACR,QAAQ;GACT,CAAC;AAGF,OAAK,OAAO,GAAG,YAAY,QAA6B;AACtD,QAAK,oBAAoB,IAAI;IAC7B;AAEF,OAAK,OAAO,GAAG,UAAU,QAAQ;AAC/B,QAAK,eAAe,MAAM,EAAE,KAAK,EAAE,eAAe;AAClD,QAAK,SAASF,8BAAgB;IAC9B;AAEF,OAAK,OAAO,GAAG,SAAS,SAAS;AAC/B,QAAK,eAAe,MAAM,EAAE,MAAM,EAAE,gBAAgB;AACpD,OACE,KAAK,WAAWA,8BAAgB,WAChC,KAAK,WAAWA,8BAAgB,OAEhC;AAOF,QAAK;AACL,QAAK,SAASA,8BAAgB;AAE9B,OAAI,KAAK,qBAAqB,uBAAuB;AACnD,SAAK,eAAe,MAClB,EACE,oBAAoB,KAAK,oBAC1B,EACD,iDACD;AACD;;GAGF,MAAM,UAAU,KAAK,IACnB,gBAAgB,MAAM,KAAK,qBAAqB,IAChD,aACD;AAED,QAAK,eAAe,KAClB;IACE,oBAAoB,KAAK;IACzB,WAAW;IACZ,EACD,kCACD;AAED,oBAAiB;AACf,QACE,KAAK,WAAWA,8BAAgB,WAChC,KAAK,WAAWA,8BAAgB,OAEhC;AAGF,SAAK,cAAc,CAChB,KAAK,YAAY;KAChB,MAAM,SAAS,MAAM,KAAK,yBAAyB;AACnD,UAAK,aAAa;MAAE,MAAM;MAAQ;MAAQ,CAAC;AAC3C,UAAK,aAAa;MAAE,MAAM;MAAW,SAAS;MAAG,CAAC;MAClD,CACD,OAAO,QAAQ;AACd,UAAK,eAAe,MAAM,EAAE,KAAK,EAAE,4BAA4B;MAC/D;MACH,QAAQ;IACX;;CAGJ,AAAQ,oBAAoB,KAAgC;AAC1D,UAAQ,IAAI,MAAZ;GACE,KAAK;AACH,SAAK,SAAS,IAAI;AAClB,SAAK,eAAe,MAAM,EAAE,OAAO,IAAI,OAAO,EAAE,gBAAgB;AAChE;GAEF,KAAK;AACH,SAAK,gBAAgB,IAAI;AACzB,SAAK,qBAAqB;AAC1B,SAAK,eAAe,MAClB,EAAE,cAAc,IAAI,cAAc,EAClC,mBACD;AACD;GAEF,KAAK;AACH,QAAI,IAAI,MACN,MAAK,eAAe,MAClB,EAAE,cAAc,IAAI,OAAO,EAC3B,0BACD;QAED,MAAK,eAAe,MAClB,EAAE,cAAc,IAAI,OAAO,EAC3B,eACD;AAEH;GAEF,KAAK;AACH,SAAK,uBAAuB,IAAI,WAAW,IAAI,QAAQ;AACvD;GAEF,KAAK;AACH,SAAK,oBAAoB,IAAI;AAC7B;GAEF,KAAK;AACH,SAAK,SAASA,8BAAgB;AAC9B,SAAK,yBAAyB;AAC9B;GAEF,KAAK;AACH,SAAK,gBAAgB,IAAI,OAAO,IAAI,SAAS,IAAI,KAAK;AACtD;;;CAIN,AAAQ,gBACN,OACA,SACA,MACM;AACN,MAAI,KACF,MAAK,eAAe,OAAO,MAAM,QAAQ;MAEzC,MAAK,eAAe,OAAO,QAAQ;;CAIvC,MAAc,uBACZ,WACA,cACe;AACf,MAAI;GAEF,MAAM,yBACJG,2CAA2B,OAAO,aAAa;GAGjD,MAAM,iBACJ,KAAK,OAAO,gBAAgB,uBAAuB;AAErD,OAAI,CAAC,gBAAgB;AACnB,SAAK,eAAe,MAClB,EAAE,SAAS,uBAAuB,SAAS,EAC3C,qBACD;AACD,SAAK,aAAa;KAChB,MAAM;KACN;KACA,OAAO,uBAAuB,uBAAuB;KACtD,CAAC;AACF;;GAIF,MAAM,WAAW,MAAM,eAAe,uBAAuB;GAG7D,MAAM,gBAAgBC,4BAAY,OAAO,SAAS,CAAC,QAAQ;AAC3D,QAAK,aAAa;IAChB,MAAM;IACN;IACA,UAAU;IACX,CAAC;WACK,KAAK;GACZ,IAAIC;AACJ,OAAI,eAAe,MACjB,SAAQ;OAER,SAAQ,IAAI,MAAM,OAAO,IAAI,CAAC;AAEhC,QAAK,eAAe,MAAM;IAAE,KAAK;IAAO;IAAW,EAAE,kBAAkB;AACvE,QAAK,aAAa;IAChB,MAAM;IACN;IACA,OAAO,eAAe,QAAQ,IAAI,UAAU;IAC7C,CAAC;;;CAIN,AAAQ,aAAa,KAAgC;AACnD,MAAI,CAAC,KAAK,QAAQ;AAChB,QAAK,eAAe,MAAM,iCAAiC;AAC3D;;AAEF,OAAK,OAAO,YAAY,IAAI;;CAG9B,MAAc,0BAAuD;AACnE,SAAO;GACL,YAAY,KAAK,OAAO;GACxB,QAAQ,OAAO,KAAK,KAAK,OAAO,gBAAgB;GAChD,gBAAgB,KAAK,OAAO;GAC5B,SAAS,KAAK,OAAO;GACrB,YAAY,KAAK,OAAO,QAAQ;GAChC,uBAAuB,KAAK,OAAO,QAAQ;GAC3C,mBAAmB,KAAK,OAAO;GAC/B,kBAAkB,KAAK,OAAO;GAC9B,YAAY,KAAK,OAAO,QAAQ;GAChC,sBAAsB,KAAK,OAAO,QAAQ;GAC1C,MAAM,KAAK,OAAO;GACnB"}