{"version":3,"file":"handshake.cjs","names":["createStartRequest","headers: Record<string, string>","headerKeys","resolveApiBaseUrl","ReconnectError","AuthError","ConnectionLimitError","resolveWsConnected: (() => void) | undefined","rejectWsConnected: ((reason?: unknown) => void) | undefined","workerDisconnectReasonToJSON","WorkerDisconnectReason","heartbeatIntervalMs: number | undefined","extendLeaseIntervalMs: number | undefined","statusIntervalMs: number | undefined","parseConnectMessage","gatewayMessageTypeToJSON","GatewayMessageType","WorkerConnectRequestData","getPlatformName","getProcessEnv","version","retrieveSystemAttributes","getHostname","ensureUnsharedArrayBuffer","ConnectMessage","GatewayConnectionReadyData","conn: Connection"],"sources":["../../../../../src/components/connect/strategies/core/handshake.ts"],"sourcesContent":["/**\n * Connection establishment logic: HTTP start request + WebSocket handshake.\n *\n * This module is stateless — callers pass in configuration and receive a\n * fully-handshaked {@link Connection} object. Post-handshake handler wiring\n * (error/close/message) is the caller's responsibility.\n */\n\nimport ms from \"ms\";\nimport { headerKeys } from \"../../../../helpers/consts.ts\";\nimport { getPlatformName, getProcessEnv } from \"../../../../helpers/env.ts\";\nimport { resolveApiBaseUrl } from \"../../../../helpers/url.ts\";\nimport type { Logger } from \"../../../../middleware/logger.ts\";\nimport {\n  ConnectMessage,\n  GatewayConnectionReadyData,\n  GatewayMessageType,\n  gatewayMessageTypeToJSON,\n  WorkerConnectRequestData,\n  WorkerDisconnectReason,\n  workerDisconnectReasonToJSON,\n} from \"../../../../proto/src/components/connect/protobuf/connect.ts\";\nimport { version } from \"../../../../version.ts\";\nimport { ensureUnsharedArrayBuffer } from \"../../buffer.ts\";\nimport { createStartRequest, parseConnectMessage } from \"../../messages.ts\";\nimport { getHostname, retrieveSystemAttributes } from \"../../os.ts\";\nimport { AuthError, ConnectionLimitError, ReconnectError } from \"../../util.ts\";\nimport type { Connection, ConnectionCoreConfig } from \"./connection.ts\";\n\nconst ConnectWebSocketProtocol = \"v0.connect.inngest.com\";\n\nexport interface EstablishConnectionResult {\n  conn: Connection;\n  gatewayGroup: string;\n}\n\n/**\n * Send the HTTP start request to the Inngest API to obtain a gateway endpoint\n * and session tokens.\n */\nexport async function sendStartRequest(\n  config: ConnectionCoreConfig,\n  hashedSigningKey: string | undefined,\n  attempt: number,\n  excludeGateways: Set<string>,\n  logger: Logger,\n) {\n  const msg = createStartRequest(Array.from(excludeGateways));\n\n  const headers: Record<string, string> = {\n    \"Content-Type\": \"application/protobuf\",\n    ...(hashedSigningKey\n      ? { Authorization: `Bearer ${hashedSigningKey}` }\n      : {}),\n  };\n\n  if (config.envName) {\n    headers[headerKeys.Environment] = config.envName;\n  }\n\n  const targetUrl = new URL(\n    \"/v0/connect/start\",\n    await resolveApiBaseUrl({\n      apiBaseUrl: config.apiBaseUrl,\n      mode: config.mode,\n    }),\n  );\n\n  let resp;\n  try {\n    resp = await fetch(targetUrl, {\n      method: \"POST\",\n      body: new Uint8Array(msg),\n      headers: headers,\n    });\n  } catch (err) {\n    const errMsg = err instanceof Error ? err.message : \"Unknown error\";\n    throw new ReconnectError(\n      `Failed initial API handshake request to ${targetUrl.toString()}, ${errMsg}`,\n      attempt,\n    );\n  }\n\n  if (!resp.ok) {\n    if (resp.status === 401) {\n      throw new AuthError(\n        `Failed initial API handshake request to ${targetUrl.toString()}${\n          config.envName ? ` (env: ${config.envName})` : \"\"\n        }, ${await resp.text()}`,\n        attempt,\n      );\n    }\n\n    if (resp.status === 429) {\n      throw new ConnectionLimitError(attempt);\n    }\n\n    throw new ReconnectError(\n      `Failed initial API handshake request to ${targetUrl.toString()}, ${await resp.text()}`,\n      attempt,\n    );\n  }\n\n  const { parseStartResponse } = await import(\"../../messages.ts\");\n  return parseStartResponse(resp);\n}\n\n/**\n * Establish a WebSocket connection to the gateway.\n *\n * Performs the full handshake sequence (HTTP start → WS open → HELLO →\n * WORKER_CONNECT → CONNECTION_READY) and returns a {@link Connection} with\n * post-handshake handlers left unset — the caller must wire `ws.onerror`,\n * `ws.onclose`, and `ws.onmessage`.\n */\nexport async function establishConnection(\n  config: ConnectionCoreConfig,\n  hashedSigningKey: string | undefined,\n  attempt: number,\n  excludeGateways: Set<string>,\n  logger: Logger,\n): Promise<EstablishConnectionResult> {\n  logger.debug({ attempt }, \"Preparing connection\");\n\n  const startedAt = new Date();\n  const startResp = await sendStartRequest(\n    config,\n    hashedSigningKey,\n    attempt,\n    excludeGateways,\n    logger,\n  );\n\n  const connectionId = startResp.connectionId;\n\n  let resolveWsConnected: (() => void) | undefined;\n  let rejectWsConnected: ((reason?: unknown) => void) | undefined;\n  const wsConnectedPromise = new Promise<void>((resolve, reject) => {\n    resolveWsConnected = resolve;\n    rejectWsConnected = reject;\n  });\n\n  const connectTimeout = setTimeout(() => {\n    excludeGateways.add(startResp.gatewayGroup);\n    rejectWsConnected?.(\n      new ReconnectError(`Connection ${connectionId} timed out`, attempt),\n    );\n  }, 10_000);\n\n  const finalEndpoint = config.gatewayUrl || startResp.gatewayEndpoint;\n  if (finalEndpoint !== startResp.gatewayEndpoint) {\n    logger.debug(\n      { original: startResp.gatewayEndpoint, override: finalEndpoint },\n      \"Overriding gateway endpoint\",\n    );\n  }\n\n  logger.debug(\n    {\n      endpoint: finalEndpoint,\n      gatewayGroup: startResp.gatewayGroup,\n      connectionId,\n    },\n    \"Connecting to gateway\",\n  );\n\n  const ws = new WebSocket(finalEndpoint, [ConnectWebSocketProtocol]);\n  ws.binaryType = \"arraybuffer\";\n\n  // Track whether we've rejected/resolved the handshake promise so we\n  // don't double-settle from concurrent error/close events.\n  let settled = false;\n\n  const rejectHandshake = (error: unknown) => {\n    if (settled) return;\n    settled = true;\n\n    excludeGateways.add(startResp.gatewayGroup);\n    clearTimeout(connectTimeout);\n\n    ws.onerror = () => {};\n    ws.onclose = () => {};\n    ws.close(\n      4001,\n      workerDisconnectReasonToJSON(WorkerDisconnectReason.UNEXPECTED),\n    );\n\n    rejectWsConnected?.(\n      new ReconnectError(\n        `Error while connecting (${connectionId}): ${\n          error instanceof Error ? error.message : \"Unknown error\"\n        }`,\n        attempt,\n      ),\n    );\n  };\n\n  ws.onerror = (err) => rejectHandshake(err);\n  ws.onclose = (ev) => {\n    rejectHandshake(\n      new ReconnectError(\n        `Connection ${connectionId} closed: ${ev.reason}`,\n        attempt,\n      ),\n    );\n  };\n\n  const setupState = {\n    receivedGatewayHello: false,\n    sentWorkerConnect: false,\n    receivedConnectionReady: false,\n  };\n\n  let heartbeatIntervalMs: number | undefined;\n  let extendLeaseIntervalMs: number | undefined;\n  let statusIntervalMs: number | undefined;\n\n  ws.onmessage = async (event) => {\n    const messageBytes = new Uint8Array(event.data as ArrayBuffer);\n    const connectMessage = parseConnectMessage(messageBytes);\n\n    logger.debug(\n      { kind: gatewayMessageTypeToJSON(connectMessage.kind), connectionId },\n      \"Received message\",\n    );\n\n    if (!setupState.receivedGatewayHello) {\n      if (connectMessage.kind !== GatewayMessageType.GATEWAY_HELLO) {\n        rejectHandshake(\n          new ReconnectError(\n            `Expected hello message, got ${gatewayMessageTypeToJSON(\n              connectMessage.kind,\n            )}`,\n            attempt,\n          ),\n        );\n        return;\n      }\n      setupState.receivedGatewayHello = true;\n    }\n\n    if (!setupState.sentWorkerConnect) {\n      const workerConnectRequestMsg = WorkerConnectRequestData.create({\n        connectionId: startResp.connectionId,\n        environment: config.envName,\n        platform: getPlatformName({ ...getProcessEnv() }),\n        sdkVersion: `v${version}`,\n        sdkLanguage: \"typescript\",\n        framework: \"connect\",\n        workerManualReadinessAck: config.connectionData.manualReadinessAck,\n        systemAttributes: await retrieveSystemAttributes(),\n        authData: {\n          sessionToken: startResp.sessionToken,\n          syncToken: startResp.syncToken,\n        },\n        apps: config.connectionData.apps,\n        capabilities: new TextEncoder().encode(\n          config.connectionData.marshaledCapabilities,\n        ),\n        startedAt: startedAt,\n        instanceId: config.instanceId || (await getHostname()),\n        maxWorkerConcurrency: config.maxWorkerConcurrency,\n      });\n\n      const workerConnectRequestMsgBytes = WorkerConnectRequestData.encode(\n        workerConnectRequestMsg,\n      ).finish();\n\n      ws.send(\n        ensureUnsharedArrayBuffer(\n          ConnectMessage.encode(\n            ConnectMessage.create({\n              kind: GatewayMessageType.WORKER_CONNECT,\n              payload: workerConnectRequestMsgBytes,\n            }),\n          ).finish(),\n        ),\n      );\n\n      setupState.sentWorkerConnect = true;\n      return;\n    }\n\n    if (!setupState.receivedConnectionReady) {\n      if (connectMessage.kind !== GatewayMessageType.GATEWAY_CONNECTION_READY) {\n        rejectHandshake(\n          new ReconnectError(\n            `Expected ready message, got ${gatewayMessageTypeToJSON(\n              connectMessage.kind,\n            )}`,\n            attempt,\n          ),\n        );\n        return;\n      }\n\n      const readyPayload = GatewayConnectionReadyData.decode(\n        connectMessage.payload,\n      );\n\n      setupState.receivedConnectionReady = true;\n\n      heartbeatIntervalMs =\n        readyPayload.heartbeatInterval.length > 0\n          ? ms(readyPayload.heartbeatInterval as ms.StringValue)\n          : 10_000;\n      extendLeaseIntervalMs =\n        readyPayload.extendLeaseInterval.length > 0\n          ? ms(readyPayload.extendLeaseInterval as ms.StringValue)\n          : 5_000;\n\n      statusIntervalMs =\n        readyPayload.statusInterval.length > 0\n          ? ms(readyPayload.statusInterval as ms.StringValue)\n          : 0;\n\n      resolveWsConnected?.();\n      return;\n    }\n\n    logger.warn(\n      {\n        kind: gatewayMessageTypeToJSON(connectMessage.kind),\n        rawKind: connectMessage.kind,\n        attempt,\n        setupState,\n        connectionId,\n      },\n      \"Unexpected message type during setup\",\n    );\n  };\n\n  await wsConnectedPromise;\n\n  clearTimeout(connectTimeout);\n  excludeGateways.delete(startResp.gatewayGroup);\n\n  // Build the Connection object\n  const conn: Connection = {\n    id: connectionId,\n    ws,\n    pendingHeartbeats: 0,\n    dead: false,\n    heartbeatIntervalMs: heartbeatIntervalMs ?? 10_000,\n    extendLeaseIntervalMs: extendLeaseIntervalMs ?? 5_000,\n    statusIntervalMs: statusIntervalMs ?? 0,\n    connectedAt: Date.now(),\n    close: () => {\n      if (conn.dead) return;\n      conn.dead = true;\n      ws.onerror = () => {};\n      ws.onclose = () => {};\n      ws.close();\n    },\n  };\n\n  logger.info(\n    { connectionId, gatewayGroup: startResp.gatewayGroup },\n    \"Connection established\",\n  );\n\n  return { conn, gatewayGroup: startResp.gatewayGroup };\n}\n"],"mappings":";;;;;;;;;;;;;;;;;;;;;AA6BA,MAAM,2BAA2B;;;;;AAWjC,eAAsB,iBACpB,QACA,kBACA,SACA,iBACA,QACA;CACA,MAAM,MAAMA,oCAAmB,MAAM,KAAK,gBAAgB,CAAC;CAE3D,MAAMC,UAAkC;EACtC,gBAAgB;EAChB,GAAI,mBACA,EAAE,eAAe,UAAU,oBAAoB,GAC/C,EAAE;EACP;AAED,KAAI,OAAO,QACT,SAAQC,0BAAW,eAAe,OAAO;CAG3C,MAAM,YAAY,IAAI,IACpB,qBACA,MAAMC,8BAAkB;EACtB,YAAY,OAAO;EACnB,MAAM,OAAO;EACd,CAAC,CACH;CAED,IAAI;AACJ,KAAI;AACF,SAAO,MAAM,MAAM,WAAW;GAC5B,QAAQ;GACR,MAAM,IAAI,WAAW,IAAI;GAChB;GACV,CAAC;UACK,KAAK;EACZ,MAAM,SAAS,eAAe,QAAQ,IAAI,UAAU;AACpD,QAAM,IAAIC,4BACR,2CAA2C,UAAU,UAAU,CAAC,IAAI,UACpE,QACD;;AAGH,KAAI,CAAC,KAAK,IAAI;AACZ,MAAI,KAAK,WAAW,IAClB,OAAM,IAAIC,uBACR,2CAA2C,UAAU,UAAU,GAC7D,OAAO,UAAU,UAAU,OAAO,QAAQ,KAAK,GAChD,IAAI,MAAM,KAAK,MAAM,IACtB,QACD;AAGH,MAAI,KAAK,WAAW,IAClB,OAAM,IAAIC,kCAAqB,QAAQ;AAGzC,QAAM,IAAIF,4BACR,2CAA2C,UAAU,UAAU,CAAC,IAAI,MAAM,KAAK,MAAM,IACrF,QACD;;CAGH,MAAM,EAAE,uBAAuB,2CAAM;AACrC,QAAO,mBAAmB,KAAK;;;;;;;;;;AAWjC,eAAsB,oBACpB,QACA,kBACA,SACA,iBACA,QACoC;AACpC,QAAO,MAAM,EAAE,SAAS,EAAE,uBAAuB;CAEjD,MAAM,4BAAY,IAAI,MAAM;CAC5B,MAAM,YAAY,MAAM,iBACtB,QACA,kBACA,SACA,iBACA,OACD;CAED,MAAM,eAAe,UAAU;CAE/B,IAAIG;CACJ,IAAIC;CACJ,MAAM,qBAAqB,IAAI,SAAe,SAAS,WAAW;AAChE,uBAAqB;AACrB,sBAAoB;GACpB;CAEF,MAAM,iBAAiB,iBAAiB;AACtC,kBAAgB,IAAI,UAAU,aAAa;AAC3C,sBACE,IAAIJ,4BAAe,cAAc,aAAa,aAAa,QAAQ,CACpE;IACA,IAAO;CAEV,MAAM,gBAAgB,OAAO,cAAc,UAAU;AACrD,KAAI,kBAAkB,UAAU,gBAC9B,QAAO,MACL;EAAE,UAAU,UAAU;EAAiB,UAAU;EAAe,EAChE,8BACD;AAGH,QAAO,MACL;EACE,UAAU;EACV,cAAc,UAAU;EACxB;EACD,EACD,wBACD;CAED,MAAM,KAAK,IAAI,UAAU,eAAe,CAAC,yBAAyB,CAAC;AACnE,IAAG,aAAa;CAIhB,IAAI,UAAU;CAEd,MAAM,mBAAmB,UAAmB;AAC1C,MAAI,QAAS;AACb,YAAU;AAEV,kBAAgB,IAAI,UAAU,aAAa;AAC3C,eAAa,eAAe;AAE5B,KAAG,gBAAgB;AACnB,KAAG,gBAAgB;AACnB,KAAG,MACD,MACAK,6CAA6BC,uCAAuB,WAAW,CAChE;AAED,sBACE,IAAIN,4BACF,2BAA2B,aAAa,KACtC,iBAAiB,QAAQ,MAAM,UAAU,mBAE3C,QACD,CACF;;AAGH,IAAG,WAAW,QAAQ,gBAAgB,IAAI;AAC1C,IAAG,WAAW,OAAO;AACnB,kBACE,IAAIA,4BACF,cAAc,aAAa,WAAW,GAAG,UACzC,QACD,CACF;;CAGH,MAAM,aAAa;EACjB,sBAAsB;EACtB,mBAAmB;EACnB,yBAAyB;EAC1B;CAED,IAAIO;CACJ,IAAIC;CACJ,IAAIC;AAEJ,IAAG,YAAY,OAAO,UAAU;EAE9B,MAAM,iBAAiBC,qCADF,IAAI,WAAW,MAAM,KAAoB,CACN;AAExD,SAAO,MACL;GAAE,MAAMC,yCAAyB,eAAe,KAAK;GAAE;GAAc,EACrE,mBACD;AAED,MAAI,CAAC,WAAW,sBAAsB;AACpC,OAAI,eAAe,SAASC,mCAAmB,eAAe;AAC5D,oBACE,IAAIZ,4BACF,+BAA+BW,yCAC7B,eAAe,KAChB,IACD,QACD,CACF;AACD;;AAEF,cAAW,uBAAuB;;AAGpC,MAAI,CAAC,WAAW,mBAAmB;GACjC,MAAM,0BAA0BE,yCAAyB,OAAO;IAC9D,cAAc,UAAU;IACxB,aAAa,OAAO;IACpB,UAAUC,4BAAgB,EAAE,GAAGC,2BAAe,EAAE,CAAC;IACjD,YAAY,IAAIC;IAChB,aAAa;IACb,WAAW;IACX,0BAA0B,OAAO,eAAe;IAChD,kBAAkB,MAAMC,qCAA0B;IAClD,UAAU;KACR,cAAc,UAAU;KACxB,WAAW,UAAU;KACtB;IACD,MAAM,OAAO,eAAe;IAC5B,cAAc,IAAI,aAAa,CAAC,OAC9B,OAAO,eAAe,sBACvB;IACU;IACX,YAAY,OAAO,cAAe,MAAMC,wBAAa;IACrD,sBAAsB,OAAO;IAC9B,CAAC;GAEF,MAAM,+BAA+BL,yCAAyB,OAC5D,wBACD,CAAC,QAAQ;AAEV,MAAG,KACDM,yCACEC,+BAAe,OACbA,+BAAe,OAAO;IACpB,MAAMR,mCAAmB;IACzB,SAAS;IACV,CAAC,CACH,CAAC,QAAQ,CACX,CACF;AAED,cAAW,oBAAoB;AAC/B;;AAGF,MAAI,CAAC,WAAW,yBAAyB;AACvC,OAAI,eAAe,SAASA,mCAAmB,0BAA0B;AACvE,oBACE,IAAIZ,4BACF,+BAA+BW,yCAC7B,eAAe,KAChB,IACD,QACD,CACF;AACD;;GAGF,MAAM,eAAeU,2CAA2B,OAC9C,eAAe,QAChB;AAED,cAAW,0BAA0B;AAErC,yBACE,aAAa,kBAAkB,SAAS,oBACjC,aAAa,kBAAoC,GACpD;AACN,2BACE,aAAa,oBAAoB,SAAS,oBACnC,aAAa,oBAAsC,GACtD;AAEN,sBACE,aAAa,eAAe,SAAS,oBAC9B,aAAa,eAAiC,GACjD;AAEN,yBAAsB;AACtB;;AAGF,SAAO,KACL;GACE,MAAMV,yCAAyB,eAAe,KAAK;GACnD,SAAS,eAAe;GACxB;GACA;GACA;GACD,EACD,uCACD;;AAGH,OAAM;AAEN,cAAa,eAAe;AAC5B,iBAAgB,OAAO,UAAU,aAAa;CAG9C,MAAMW,OAAmB;EACvB,IAAI;EACJ;EACA,mBAAmB;EACnB,MAAM;EACN,qBAAqB,uBAAuB;EAC5C,uBAAuB,yBAAyB;EAChD,kBAAkB,oBAAoB;EACtC,aAAa,KAAK,KAAK;EACvB,aAAa;AACX,OAAI,KAAK,KAAM;AACf,QAAK,OAAO;AACZ,MAAG,gBAAgB;AACnB,MAAG,gBAAgB;AACnB,MAAG,OAAO;;EAEb;AAED,QAAO,KACL;EAAE;EAAc,cAAc,UAAU;EAAc,EACtD,yBACD;AAED,QAAO;EAAE;EAAM,cAAc,UAAU;EAAc"}