{"version":3,"file":"connection.cjs","names":["WaitGroup","resolve: () => void","HeartbeatManager","StatusReporter","RequestProcessor","ConnectionState","ensureUnsharedArrayBuffer","ConnectMessage","GatewayMessageType","WAKE_REASON","resolveApiBaseUrl","ages: number[]","establishConnection","ReconnectError","AuthError","ConnectionLimitError","waitWithCancel","expBackoff","parseConnectMessage","gatewayMessageTypeToJSON"],"sources":["../../../../../src/components/connect/strategies/core/connection.ts"],"sourcesContent":["/**\n * Shared connection core logic used by both SameThreadStrategy and\n * WorkerThreadStrategy.\n *\n * This module uses a **reconcile loop** that continuously ensures a live\n * WebSocket connection is open. Reconnection, drain, and shutdown are\n * expressed as state changes that wake the loop rather than recursive\n * calls or callback-driven control flow.\n *\n * Domain-specific logic is delegated to focused sub-modules:\n * - {@link HeartbeatManager} — periodic heartbeat pings\n * - {@link RequestProcessor} — executor requests, lease extensions, reply ACKs\n * - {@link establishConnection} — HTTP start + WebSocket handshake\n */\n\nimport { WaitGroup } from \"@jpwilliams/waitgroup\";\nimport { resolveApiBaseUrl } from \"../../../../helpers/url.ts\";\nimport type { Logger } from \"../../../../middleware/logger.ts\";\nimport type { GatewayExecutorRequestData } from \"../../../../proto/src/components/connect/protobuf/connect.ts\";\nimport {\n  ConnectMessage,\n  GatewayMessageType,\n  gatewayMessageTypeToJSON,\n} from \"../../../../proto/src/components/connect/protobuf/connect.ts\";\nimport { ensureUnsharedArrayBuffer } from \"../../buffer.ts\";\nimport { parseConnectMessage } from \"../../messages.ts\";\nimport {\n  type ConnectDebugState,\n  ConnectionState,\n  type InFlightRequest,\n} from \"../../types.ts\";\nimport {\n  AuthError,\n  ConnectionLimitError,\n  expBackoff,\n  ReconnectError,\n  waitWithCancel,\n} from \"../../util.ts\";\nimport { establishConnection } from \"./handshake.ts\";\nimport { HeartbeatManager } from \"./heartbeat.ts\";\nimport { RequestProcessor } from \"./requestProcessor.ts\";\nimport { StatusReporter } from \"./statusReporter.ts\";\nimport type { BaseConnectionConfig } from \"./types.ts\";\nimport { WAKE_REASON, type WakeReason } from \"./types.ts\";\n\n/**\n * Connection object representing an active WebSocket connection.\n */\nexport interface Connection {\n  id: string;\n  ws: WebSocket;\n  pendingHeartbeats: number;\n  /** When true the connection is considered unusable and the reconcile loop\n   *  will establish a replacement. */\n  dead: boolean;\n  heartbeatIntervalMs: number;\n  extendLeaseIntervalMs: number;\n  statusIntervalMs: number;\n  /** Timestamp (ms) when the connection was established. */\n  connectedAt: number;\n  /** Disable all handlers and close the underlying WebSocket. */\n  close(): void;\n}\n\n/**\n * Configuration for the connection core.\n * Extends BaseConnectionConfig with connection-specific options.\n */\nexport interface ConnectionCoreConfig extends BaseConnectionConfig {\n  instanceId?: string;\n  maxWorkerConcurrency?: number;\n  gatewayUrl?: string;\n  appIds: string[];\n}\n\n/**\n * Callbacks for connection core events.\n */\nexport interface ConnectionCoreCallbacks {\n  logger: Logger;\n  onStateChange: (state: ConnectionState) => void;\n  getState: () => ConnectionState;\n  handleExecutionRequest: (\n    request: GatewayExecutorRequestData,\n  ) => Promise<Uint8Array>;\n  onReplyAck?: (requestId: string) => void;\n  onBufferResponse?: (requestId: string, responseBytes: Uint8Array) => void;\n  onConnectionActive?: (signingKey: string | undefined) => void;\n}\n\n/**\n * Core connection manager that handles WebSocket connection lifecycle,\n * handshake, heartbeat, lease extension, and reconnection.\n *\n * Uses a reconcile loop that:\n * - Ensures a WebSocket connection is always open\n * - Manages a single heartbeat interval targeting the active connection\n * - Handles reconnection, drain, and shutdown as state changes\n */\nexport class ConnectionCore {\n  private config: ConnectionCoreConfig;\n  private callbacks: ConnectionCoreCallbacks;\n\n  // Exposed via ConnectionAccessor for sub-modules\n  private _activeConnection: Connection | undefined;\n  private _drainingConnection: Connection | undefined;\n  private _shutdownRequested = false;\n  private _inProgressRequests: {\n    wg: WaitGroup;\n    requestLeases: Record<string, string>;\n    requestMeta: Record<string, InFlightRequest>;\n  } = {\n    wg: new WaitGroup(),\n    requestLeases: {},\n    requestMeta: {},\n  };\n\n  private _lastHeartbeatSentAt: number | undefined;\n  private _lastHeartbeatReceivedAt: number | undefined;\n  private _lastMessageReceivedAt: number | undefined;\n\n  private excludeGateways: Set<string> = new Set();\n\n  // Wake signal for the reconcile loop\n  private wakeSignal: { promise: Promise<void>; resolve: () => void };\n  // Reasons accumulated since the last loop wake. Read + cleared by the loop.\n  private pendingWakeReasons: WakeReason[] = [];\n\n  // Whether we've ever successfully connected (used to distinguish\n  // CONNECTING from RECONNECTING state transitions).\n  private hasConnectedBefore = false;\n\n  // Shutdown diagnostics: periodic \"still draining\" dump logged while the\n  // drain is outstanding. Started in close(), cleared in teardown.\n  private shutdownDumpInterval: ReturnType<typeof setInterval> | undefined;\n\n  // Cadence for the periodic \"still draining\" debug dump.\n  private static readonly SHUTDOWN_DUMP_INTERVAL_MS = 60_000;\n\n  // Loop promise — resolved when the reconcile loop exits\n  private loopPromise: Promise<void> | undefined;\n  private closePromise: Promise<void> | undefined;\n\n  // First-ready resolution — resolves start() when first connection is ready\n  private resolveFirstReady: (() => void) | undefined;\n  private rejectFirstReady: ((err: unknown) => void) | undefined;\n\n  // Signing key management\n  private useSigningKey: string | undefined;\n\n  // Sub-modules\n  private readonly heartbeatManager: HeartbeatManager;\n  private readonly statusReporter: StatusReporter;\n  private readonly requestProcessor: RequestProcessor;\n\n  constructor(\n    config: ConnectionCoreConfig,\n    callbacks: ConnectionCoreCallbacks,\n  ) {\n    this.config = config;\n    this.callbacks = callbacks;\n    this.useSigningKey = config.hashedSigningKey;\n\n    // Initialize the wake signal\n    let resolve: () => void;\n    const promise = new Promise<void>((r) => {\n      resolve = r;\n    });\n    this.wakeSignal = { promise, resolve: resolve! };\n\n    // Build a ConnectionAccessor view for sub-modules\n    const accessor = {\n      get activeConnection() {\n        return self._activeConnection;\n      },\n      get drainingConnection() {\n        return self._drainingConnection;\n      },\n      get shutdownRequested() {\n        return self._shutdownRequested;\n      },\n      get inProgressRequests() {\n        return self._inProgressRequests;\n      },\n      get appIds() {\n        return self.config.appIds;\n      },\n    };\n\n    const wakeSignalRef = { wake: (reason?: WakeReason) => this.wake(reason) };\n\n    const self = this;\n\n    this.heartbeatManager = new HeartbeatManager(\n      accessor,\n      wakeSignalRef,\n      callbacks.logger,\n    );\n    this.heartbeatManager.onHeartbeatSent = () => {\n      this._lastHeartbeatSentAt = Date.now();\n    };\n\n    this.statusReporter = new StatusReporter(accessor, callbacks.logger);\n\n    this.requestProcessor = new RequestProcessor(\n      accessor,\n      wakeSignalRef,\n      callbacks,\n      callbacks.logger,\n    );\n  }\n\n  get connectionId(): string | undefined {\n    return this._activeConnection?.id;\n  }\n\n  /**\n   * Wait for all in-progress requests to complete.\n   */\n  async waitForInProgress(): Promise<void> {\n    await this._inProgressRequests.wg.wait();\n  }\n\n  /**\n   * Return a snapshot of debug/health information for this connection.\n   */\n  getDebugState(): ConnectDebugState {\n    return {\n      state: this.callbacks.getState(),\n      activeConnectionId: this._activeConnection?.id,\n      drainingConnectionId: this._drainingConnection?.id,\n      lastHeartbeatSentAt: this._lastHeartbeatSentAt,\n      lastHeartbeatReceivedAt: this._lastHeartbeatReceivedAt,\n      lastMessageReceivedAt: this._lastMessageReceivedAt,\n      shutdownRequested: this._shutdownRequested,\n      inFlightRequestCount: Object.keys(this._inProgressRequests.requestLeases)\n        .length,\n      inFlightRequests: Object.values(this._inProgressRequests.requestMeta),\n    };\n  }\n\n  /**\n   * Start the reconcile loop. Resolves when the first connection is active.\n   * The loop continues running in the background.\n   */\n  async start(attempt = 0): Promise<void> {\n    if (typeof WebSocket === \"undefined\") {\n      throw new Error(\"WebSockets not supported in current environment\");\n    }\n\n    const state = this.callbacks.getState();\n    if (state === ConnectionState.CLOSED) {\n      throw new Error(\"Connection already closed\");\n    }\n\n    const firstReadyPromise = new Promise<void>((resolve, reject) => {\n      this.resolveFirstReady = resolve;\n      this.rejectFirstReady = reject;\n    });\n\n    this.loopPromise = this.reconcileLoop(attempt);\n\n    // If the loop ends before firstReady resolves, propagate any error\n    this.loopPromise.catch((err) => {\n      this.rejectFirstReady?.(err);\n    });\n\n    await firstReadyPromise;\n  }\n\n  /**\n   * Request graceful shutdown. Resolves when fully closed (in-flight done,\n   * connection closed).\n   */\n  async close(): Promise<void> {\n    if (this.closePromise) return this.closePromise;\n\n    this.closePromise = this.closeOnce();\n    return this.closePromise;\n  }\n\n  private async closeOnce(): Promise<void> {\n    const inFlightCount = Object.keys(\n      this._inProgressRequests.requestLeases,\n    ).length;\n    this.callbacks.logger.info(\n      { inFlightCount },\n      \"Shutting down, waiting for in-flight requests\",\n    );\n    // Flip the shutdown flag before starting any timers so the periodic\n    // dump guard (`if (!this._shutdownRequested) return`) cannot observe a\n    // stale `false` on its first tick.\n    this._shutdownRequested = true;\n    // Verbose per-request dump (debug-only) at drain start so operators can\n    // immediately see which runs are holding the shutdown.\n    this.dumpInFlightForShutdown(\"drain-start\");\n    this.startShutdownInFlightDumpTimer();\n\n    if (this._activeConnection?.ws.readyState === WebSocket.OPEN) {\n      this._activeConnection.ws.send(\n        ensureUnsharedArrayBuffer(\n          ConnectMessage.encode(\n            ConnectMessage.create({\n              kind: GatewayMessageType.WORKER_PAUSE,\n            }),\n          ).finish(),\n        ),\n      );\n      this.callbacks.logger.info(\n        { connectionId: this._activeConnection.id },\n        \"Sent WORKER_PAUSE, draining\",\n      );\n    }\n\n    this.wake(WAKE_REASON.ShutdownRequested);\n\n    if (this.loopPromise) {\n      await this.loopPromise;\n    }\n\n    this.callbacks.logger.info(\"Connection closed\");\n  }\n\n  async getApiBaseUrl(): Promise<string> {\n    return resolveApiBaseUrl({\n      apiBaseUrl: this.config.apiBaseUrl,\n      mode: this.config.mode,\n    });\n  }\n\n  // ---------------------------------------------------------------------------\n  // Wake signal\n  // ---------------------------------------------------------------------------\n\n  private resetWakeSignal(): void {\n    let resolve: () => void;\n    const promise = new Promise<void>((r) => {\n      resolve = r;\n    });\n    this.wakeSignal = { promise, resolve: resolve! };\n  }\n\n  private wake(reason: WakeReason = WAKE_REASON.Unknown): void {\n    // Only the first pending wake needs to resolve the parked loop; later\n    // wakes are accumulated and consumed together on the next iteration.\n    const shouldResolve = this.pendingWakeReasons.length === 0;\n    this.pendingWakeReasons.push(reason);\n    if (shouldResolve) {\n      this.wakeSignal.resolve();\n    }\n  }\n\n  // ---------------------------------------------------------------------------\n  // Signing key management\n  // ---------------------------------------------------------------------------\n\n  private switchAuthKey(): void {\n    const switchToFallback =\n      this.useSigningKey === this.config.hashedSigningKey;\n    if (switchToFallback) {\n      this.callbacks.logger.debug(\"Switching to fallback signing key\");\n    }\n    this.useSigningKey = switchToFallback\n      ? this.config.hashedFallbackKey\n      : this.config.hashedSigningKey;\n  }\n\n  // ---------------------------------------------------------------------------\n  // In-flight helpers\n  // ---------------------------------------------------------------------------\n\n  private hasInFlightRequests(): boolean {\n    return Object.keys(this._inProgressRequests.requestLeases).length > 0;\n  }\n\n  /**\n   * Debug-level \"still draining\" dump emitted at drain start and periodically\n   * thereafter while in-flight requests are holding the shutdown. One summary\n   * line plus one line per request carrying `requestId`, `runId`, `stepId`,\n   * `functionSlug`, `ageMs`, and `sinceLastLeaseExtendMs`. Does not affect\n   * info/warn logs.\n   *\n   * `requestLeases` drives the reconcile-loop exit gate, so use it as the\n   * single source of truth for the in-flight set; `requestMeta` carries the\n   * enrichment fields and is kept in sync alongside the lease map.\n   */\n  private dumpInFlightForShutdown(reason: string): void {\n    const leaseIds = Object.keys(this._inProgressRequests.requestLeases);\n    if (leaseIds.length === 0) return;\n    const now = Date.now();\n    const ages: number[] = [];\n    for (const id of leaseIds) {\n      const m = this._inProgressRequests.requestMeta[id];\n      if (m?.leaseAcquiredAt) ages.push(now - m.leaseAcquiredAt);\n    }\n\n    this.callbacks.logger.debug(\n      {\n        reason,\n        inFlightCount: leaseIds.length,\n        oldestAgeMs: ages.length > 0 ? Math.max(...ages) : undefined,\n      },\n      \"Shutdown: still draining\",\n    );\n\n    for (const id of leaseIds) {\n      const m = this._inProgressRequests.requestMeta[id];\n      if (!m) continue;\n      this.callbacks.logger.debug(\n        {\n          reason,\n          requestId: m.requestId,\n          runId: m.runId,\n          stepId: m.stepId,\n          functionSlug: m.functionSlug,\n          appId: m.appId,\n          ageMs: m.leaseAcquiredAt ? now - m.leaseAcquiredAt : undefined,\n          sinceLastLeaseExtendMs: m.leaseLastExtendedAt\n            ? now - m.leaseLastExtendedAt\n            : undefined,\n        },\n        \"Shutdown: still draining in-flight request\",\n      );\n    }\n  }\n\n  private startShutdownInFlightDumpTimer(): void {\n    if (this.shutdownDumpInterval) return;\n    this.shutdownDumpInterval = setInterval(() => {\n      if (!this._shutdownRequested) return;\n      this.dumpInFlightForShutdown(\"periodic\");\n      // Wake the loop so its \"Reconcile loop woken\" line emits a fresh\n      // state snapshot alongside the in-flight dump. Loop will park again\n      // immediately if nothing has changed.\n      this.wake(WAKE_REASON.ShutdownStillPending);\n    }, ConnectionCore.SHUTDOWN_DUMP_INTERVAL_MS);\n  }\n\n  private stopShutdownInFlightDumpTimer(): void {\n    if (this.shutdownDumpInterval) {\n      clearInterval(this.shutdownDumpInterval);\n      this.shutdownDumpInterval = undefined;\n    }\n  }\n\n  // ---------------------------------------------------------------------------\n  // Reconcile loop\n  // ---------------------------------------------------------------------------\n\n  private async reconcileLoop(initialAttempt: number): Promise<void> {\n    let attempt = initialAttempt;\n\n    this.callbacks.logger.debug({ initialAttempt }, \"Reconcile loop entered\");\n\n    while (true) {\n      // Exit condition: shutdown requested + no in-flight requests\n      if (this._shutdownRequested && !this.hasInFlightRequests()) {\n        break;\n      }\n\n      // Ensure we have a live connection\n      if (!this._activeConnection || this._activeConnection.dead) {\n        this.callbacks.logger.debug(\n          {\n            hasActiveConnection: !!this._activeConnection,\n            activeConnectionDead: this._activeConnection?.dead,\n            hasDrainingConnection: !!this._drainingConnection,\n            drainingConnectionId: this._drainingConnection?.id,\n          },\n          \"No active connection\",\n        );\n\n        if (this.hasConnectedBefore) {\n          this.callbacks.logger.info({ attempt }, \"Reconnecting\");\n        } else {\n          this.callbacks.logger.info(\"Connecting\");\n        }\n\n        this.callbacks.onStateChange(\n          this.hasConnectedBefore\n            ? ConnectionState.RECONNECTING\n            : ConnectionState.CONNECTING,\n        );\n\n        try {\n          const { conn, gatewayGroup } = await establishConnection(\n            this.config,\n            this.useSigningKey,\n            attempt,\n            this.excludeGateways,\n            this.callbacks.logger,\n          );\n\n          // Attach post-handshake handlers\n          this.attachHandlers(conn, gatewayGroup);\n\n          // Clean up draining connection after new one is ready\n          if (this._drainingConnection) {\n            this.callbacks.logger.info(\n              {\n                oldConnectionId: this._drainingConnection.id,\n                newConnectionId: conn.id,\n              },\n              \"Replaced draining connection\",\n            );\n            this._drainingConnection.close();\n            this._drainingConnection = undefined;\n          }\n\n          this._activeConnection = conn;\n          this.heartbeatManager.updateInterval(conn.heartbeatIntervalMs);\n          this.statusReporter.updateInterval(conn.statusIntervalMs);\n          attempt = 0;\n          this.hasConnectedBefore = true;\n          this.callbacks.logger.info(\n            { connectionId: conn.id, gatewayGroup },\n            \"Connection active\",\n          );\n          this.callbacks.onStateChange(ConnectionState.ACTIVE);\n\n          if (this._shutdownRequested) {\n            // Reconnected during shutdown to keep in-flight requests alive.\n            // Send WORKER_PAUSE instead of WORKER_READY so no new work is routed.\n            conn.ws.send(\n              ensureUnsharedArrayBuffer(\n                ConnectMessage.encode(\n                  ConnectMessage.create({\n                    kind: GatewayMessageType.WORKER_PAUSE,\n                  }),\n                ).finish(),\n              ),\n            );\n            this.callbacks.logger.info(\n              { connectionId: conn.id },\n              \"Sent WORKER_PAUSE on reconnect during shutdown\",\n            );\n          } else {\n            // Signal the gateway that we're ready to receive requests.\n            // This must happen after ACTIVE so the gateway doesn't route\n            // requests before handlers are fully attached.\n            conn.ws.send(\n              ensureUnsharedArrayBuffer(\n                ConnectMessage.encode(\n                  ConnectMessage.create({\n                    kind: GatewayMessageType.WORKER_READY,\n                  }),\n                ).finish(),\n              ),\n            );\n            this.callbacks.logger.info(\n              { connectionId: conn.id },\n              \"Sent WORKER_READY\",\n            );\n          }\n\n          // Flush any buffered responses via HTTP now that we're active.\n          this.callbacks.onConnectionActive?.(this.useSigningKey);\n\n          this.resolveFirstReady?.();\n          this.resolveFirstReady = undefined;\n          this.rejectFirstReady = undefined;\n        } catch (err) {\n          if (!(err instanceof ReconnectError)) throw err;\n\n          attempt = err.attempt + 1;\n          if (err instanceof AuthError) this.switchAuthKey();\n          if (err instanceof ConnectionLimitError) {\n            this.callbacks.logger.error(\"Max concurrent connections reached\");\n          }\n\n          // Gateway is draining, we should retry much faster\n          if (err.message?.includes(\"connect_gateway_closing\")) {\n            const jitter = 500 + Math.random() * 1000;\n            this.callbacks.logger.info(\n              { attempt, delay: Math.round(jitter), error: err.message },\n              \"Gateway draining, retrying\",\n            );\n            const cancelled = await waitWithCancel(jitter, () => {\n              return this._shutdownRequested && !this.hasInFlightRequests();\n            });\n            if (cancelled) break;\n            continue;\n          }\n\n          const delay = expBackoff(attempt);\n          this.callbacks.logger.info(\n            { attempt, delay },\n            \"Reconnecting after failure\",\n          );\n\n          const cancelled = await waitWithCancel(delay, () => {\n            return this._shutdownRequested && !this.hasInFlightRequests();\n          });\n          if (cancelled) break;\n          continue;\n        }\n      }\n\n      // Wait for something to change. If a wake fired while this loop was\n      // doing async work above, pendingWakeReasons is already populated; don't\n      // wait on the replacement wakeSignal or the wake can be missed.\n      if (this.pendingWakeReasons.length === 0) {\n        await this.wakeSignal.promise;\n      }\n      const reasons = this.pendingWakeReasons;\n      this.pendingWakeReasons = [];\n      this.resetWakeSignal();\n      this.callbacks.logger.debug(\n        {\n          reasons,\n          shutdownRequested: this._shutdownRequested,\n          hasActiveConnection: !!this._activeConnection,\n          activeConnectionDead: this._activeConnection?.dead,\n        },\n        \"Reconcile loop woken\",\n      );\n    }\n\n    this.callbacks.logger.debug(\n      {\n        shutdownRequested: this._shutdownRequested,\n        inFlightCount: Object.keys(this._inProgressRequests.requestLeases)\n          .length,\n      },\n      \"Reconcile loop exiting\",\n    );\n\n    // Teardown\n    this.heartbeatManager.stop();\n    this.statusReporter.stop();\n    this.stopShutdownInFlightDumpTimer();\n    this._activeConnection?.close();\n    this._activeConnection = undefined;\n    this._drainingConnection?.close();\n    this._drainingConnection = undefined;\n  }\n\n  // ---------------------------------------------------------------------------\n  // Post-handshake handler attachment\n  // ---------------------------------------------------------------------------\n\n  /**\n   * Wire up error, close, and message handlers on a newly-handshaked connection.\n   */\n  private attachHandlers(conn: Connection, gatewayGroup: string): void {\n    const { ws } = conn;\n    const connectionId = conn.id;\n\n    // Error/close handlers: mark connection as dead and wake the loop\n    ws.onerror = (ev) => {\n      if (conn.dead) return;\n      const uptimeMs = Date.now() - conn.connectedAt;\n      this.callbacks.logger.warn(\n        {\n          connectionId,\n          gatewayGroup,\n          uptimeMs,\n          error: (ev as ErrorEvent)?.message,\n        },\n        \"Connection lost (error)\",\n      );\n      conn.dead = true;\n      this.excludeGateways.add(gatewayGroup);\n      if (this._activeConnection?.id === connectionId) {\n        this._activeConnection = undefined;\n      }\n      this.wake(WAKE_REASON.WsError);\n    };\n\n    ws.onclose = (ev) => {\n      if (conn.dead) return;\n      const uptimeMs = Date.now() - conn.connectedAt;\n      this.callbacks.logger.warn(\n        {\n          connectionId,\n          gatewayGroup,\n          uptimeMs,\n          code: ev.code,\n          reason: ev.reason,\n        },\n        \"Connection lost (close)\",\n      );\n      conn.dead = true;\n      this.excludeGateways.add(gatewayGroup);\n      if (this._activeConnection?.id === connectionId) {\n        this._activeConnection = undefined;\n      }\n      this.wake(WAKE_REASON.WsClose);\n    };\n\n    // Message handler for post-handshake messages\n    ws.onmessage = async (event) => {\n      this._lastMessageReceivedAt = Date.now();\n\n      const messageBytes = new Uint8Array(event.data as ArrayBuffer);\n      const connectMessage = parseConnectMessage(messageBytes);\n\n      if (connectMessage.kind === GatewayMessageType.GATEWAY_CLOSING) {\n        const uptimeMs = Date.now() - conn.connectedAt;\n        this.callbacks.logger.info(\n          { connectionId: conn.id, gatewayGroup, uptimeMs },\n          \"Gateway draining, opening new connection\",\n        );\n        // Move current connection to draining, clear active so the loop\n        // establishes a replacement.\n        this._drainingConnection = this._activeConnection;\n        this._activeConnection = undefined;\n        this.wake(WAKE_REASON.GatewayClosing);\n        return;\n      }\n\n      if (connectMessage.kind === GatewayMessageType.GATEWAY_HEARTBEAT) {\n        this._lastHeartbeatReceivedAt = Date.now();\n        conn.pendingHeartbeats = 0;\n        this.callbacks.logger.debug(\n          { connectionId },\n          \"Handled gateway heartbeat\",\n        );\n        return;\n      }\n\n      if (connectMessage.kind === GatewayMessageType.GATEWAY_EXECUTOR_REQUEST) {\n        await this.requestProcessor.handleExecutorRequest(connectMessage, conn);\n        return;\n      }\n\n      if (connectMessage.kind === GatewayMessageType.WORKER_REPLY_ACK) {\n        this.requestProcessor.handleReplyAck(connectMessage, connectionId);\n        return;\n      }\n\n      if (\n        connectMessage.kind ===\n        GatewayMessageType.WORKER_REQUEST_EXTEND_LEASE_ACK\n      ) {\n        this.requestProcessor.handleExtendLeaseAck(\n          connectMessage,\n          connectionId,\n        );\n        return;\n      }\n\n      this.callbacks.logger.warn(\n        {\n          kind: gatewayMessageTypeToJSON(connectMessage.kind),\n          rawKind: connectMessage.kind,\n          state: this.callbacks.getState(),\n          connectionId,\n        },\n        \"Unexpected message type\",\n      );\n    };\n  }\n}\n"],"mappings":";;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;AAmGA,IAAa,iBAAb,MAAa,eAAe;CAC1B,AAAQ;CACR,AAAQ;CAGR,AAAQ;CACR,AAAQ;CACR,AAAQ,qBAAqB;CAC7B,AAAQ,sBAIJ;EACF,IAAI,IAAIA,kCAAW;EACnB,eAAe,EAAE;EACjB,aAAa,EAAE;EAChB;CAED,AAAQ;CACR,AAAQ;CACR,AAAQ;CAER,AAAQ,kCAA+B,IAAI,KAAK;CAGhD,AAAQ;CAER,AAAQ,qBAAmC,EAAE;CAI7C,AAAQ,qBAAqB;CAI7B,AAAQ;CAGR,OAAwB,4BAA4B;CAGpD,AAAQ;CACR,AAAQ;CAGR,AAAQ;CACR,AAAQ;CAGR,AAAQ;CAGR,AAAiB;CACjB,AAAiB;CACjB,AAAiB;CAEjB,YACE,QACA,WACA;AACA,OAAK,SAAS;AACd,OAAK,YAAY;AACjB,OAAK,gBAAgB,OAAO;EAG5B,IAAIC;AAIJ,OAAK,aAAa;GAAE,SAHJ,IAAI,SAAe,MAAM;AACvC,cAAU;KACV;GACoC;GAAU;EAGhD,MAAM,WAAW;GACf,IAAI,mBAAmB;AACrB,WAAO,KAAK;;GAEd,IAAI,qBAAqB;AACvB,WAAO,KAAK;;GAEd,IAAI,oBAAoB;AACtB,WAAO,KAAK;;GAEd,IAAI,qBAAqB;AACvB,WAAO,KAAK;;GAEd,IAAI,SAAS;AACX,WAAO,KAAK,OAAO;;GAEtB;EAED,MAAM,gBAAgB,EAAE,OAAO,WAAwB,KAAK,KAAK,OAAO,EAAE;EAE1E,MAAM,OAAO;AAEb,OAAK,mBAAmB,IAAIC,mCAC1B,UACA,eACA,UAAU,OACX;AACD,OAAK,iBAAiB,wBAAwB;AAC5C,QAAK,uBAAuB,KAAK,KAAK;;AAGxC,OAAK,iBAAiB,IAAIC,sCAAe,UAAU,UAAU,OAAO;AAEpE,OAAK,mBAAmB,IAAIC,0CAC1B,UACA,eACA,WACA,UAAU,OACX;;CAGH,IAAI,eAAmC;AACrC,SAAO,KAAK,mBAAmB;;;;;CAMjC,MAAM,oBAAmC;AACvC,QAAM,KAAK,oBAAoB,GAAG,MAAM;;;;;CAM1C,gBAAmC;AACjC,SAAO;GACL,OAAO,KAAK,UAAU,UAAU;GAChC,oBAAoB,KAAK,mBAAmB;GAC5C,sBAAsB,KAAK,qBAAqB;GAChD,qBAAqB,KAAK;GAC1B,yBAAyB,KAAK;GAC9B,uBAAuB,KAAK;GAC5B,mBAAmB,KAAK;GACxB,sBAAsB,OAAO,KAAK,KAAK,oBAAoB,cAAc,CACtE;GACH,kBAAkB,OAAO,OAAO,KAAK,oBAAoB,YAAY;GACtE;;;;;;CAOH,MAAM,MAAM,UAAU,GAAkB;AACtC,MAAI,OAAO,cAAc,YACvB,OAAM,IAAI,MAAM,kDAAkD;AAIpE,MADc,KAAK,UAAU,UAAU,KACzBC,8BAAgB,OAC5B,OAAM,IAAI,MAAM,4BAA4B;EAG9C,MAAM,oBAAoB,IAAI,SAAe,SAAS,WAAW;AAC/D,QAAK,oBAAoB;AACzB,QAAK,mBAAmB;IACxB;AAEF,OAAK,cAAc,KAAK,cAAc,QAAQ;AAG9C,OAAK,YAAY,OAAO,QAAQ;AAC9B,QAAK,mBAAmB,IAAI;IAC5B;AAEF,QAAM;;;;;;CAOR,MAAM,QAAuB;AAC3B,MAAI,KAAK,aAAc,QAAO,KAAK;AAEnC,OAAK,eAAe,KAAK,WAAW;AACpC,SAAO,KAAK;;CAGd,MAAc,YAA2B;EACvC,MAAM,gBAAgB,OAAO,KAC3B,KAAK,oBAAoB,cAC1B,CAAC;AACF,OAAK,UAAU,OAAO,KACpB,EAAE,eAAe,EACjB,gDACD;AAID,OAAK,qBAAqB;AAG1B,OAAK,wBAAwB,cAAc;AAC3C,OAAK,gCAAgC;AAErC,MAAI,KAAK,mBAAmB,GAAG,eAAe,UAAU,MAAM;AAC5D,QAAK,kBAAkB,GAAG,KACxBC,yCACEC,+BAAe,OACbA,+BAAe,OAAO,EACpB,MAAMC,mCAAmB,cAC1B,CAAC,CACH,CAAC,QAAQ,CACX,CACF;AACD,QAAK,UAAU,OAAO,KACpB,EAAE,cAAc,KAAK,kBAAkB,IAAI,EAC3C,8BACD;;AAGH,OAAK,KAAKC,4BAAY,kBAAkB;AAExC,MAAI,KAAK,YACP,OAAM,KAAK;AAGb,OAAK,UAAU,OAAO,KAAK,oBAAoB;;CAGjD,MAAM,gBAAiC;AACrC,SAAOC,8BAAkB;GACvB,YAAY,KAAK,OAAO;GACxB,MAAM,KAAK,OAAO;GACnB,CAAC;;CAOJ,AAAQ,kBAAwB;EAC9B,IAAIT;AAIJ,OAAK,aAAa;GAAE,SAHJ,IAAI,SAAe,MAAM;AACvC,cAAU;KACV;GACoC;GAAU;;CAGlD,AAAQ,KAAK,SAAqBQ,4BAAY,SAAe;EAG3D,MAAM,gBAAgB,KAAK,mBAAmB,WAAW;AACzD,OAAK,mBAAmB,KAAK,OAAO;AACpC,MAAI,cACF,MAAK,WAAW,SAAS;;CAQ7B,AAAQ,gBAAsB;EAC5B,MAAM,mBACJ,KAAK,kBAAkB,KAAK,OAAO;AACrC,MAAI,iBACF,MAAK,UAAU,OAAO,MAAM,oCAAoC;AAElE,OAAK,gBAAgB,mBACjB,KAAK,OAAO,oBACZ,KAAK,OAAO;;CAOlB,AAAQ,sBAA+B;AACrC,SAAO,OAAO,KAAK,KAAK,oBAAoB,cAAc,CAAC,SAAS;;;;;;;;;;;;;CActE,AAAQ,wBAAwB,QAAsB;EACpD,MAAM,WAAW,OAAO,KAAK,KAAK,oBAAoB,cAAc;AACpE,MAAI,SAAS,WAAW,EAAG;EAC3B,MAAM,MAAM,KAAK,KAAK;EACtB,MAAME,OAAiB,EAAE;AACzB,OAAK,MAAM,MAAM,UAAU;GACzB,MAAM,IAAI,KAAK,oBAAoB,YAAY;AAC/C,OAAI,GAAG,gBAAiB,MAAK,KAAK,MAAM,EAAE,gBAAgB;;AAG5D,OAAK,UAAU,OAAO,MACpB;GACE;GACA,eAAe,SAAS;GACxB,aAAa,KAAK,SAAS,IAAI,KAAK,IAAI,GAAG,KAAK,GAAG;GACpD,EACD,2BACD;AAED,OAAK,MAAM,MAAM,UAAU;GACzB,MAAM,IAAI,KAAK,oBAAoB,YAAY;AAC/C,OAAI,CAAC,EAAG;AACR,QAAK,UAAU,OAAO,MACpB;IACE;IACA,WAAW,EAAE;IACb,OAAO,EAAE;IACT,QAAQ,EAAE;IACV,cAAc,EAAE;IAChB,OAAO,EAAE;IACT,OAAO,EAAE,kBAAkB,MAAM,EAAE,kBAAkB;IACrD,wBAAwB,EAAE,sBACtB,MAAM,EAAE,sBACR;IACL,EACD,6CACD;;;CAIL,AAAQ,iCAAuC;AAC7C,MAAI,KAAK,qBAAsB;AAC/B,OAAK,uBAAuB,kBAAkB;AAC5C,OAAI,CAAC,KAAK,mBAAoB;AAC9B,QAAK,wBAAwB,WAAW;AAIxC,QAAK,KAAKF,4BAAY,qBAAqB;KAC1C,eAAe,0BAA0B;;CAG9C,AAAQ,gCAAsC;AAC5C,MAAI,KAAK,sBAAsB;AAC7B,iBAAc,KAAK,qBAAqB;AACxC,QAAK,uBAAuB;;;CAQhC,MAAc,cAAc,gBAAuC;EACjE,IAAI,UAAU;AAEd,OAAK,UAAU,OAAO,MAAM,EAAE,gBAAgB,EAAE,yBAAyB;AAEzE,SAAO,MAAM;AAEX,OAAI,KAAK,sBAAsB,CAAC,KAAK,qBAAqB,CACxD;AAIF,OAAI,CAAC,KAAK,qBAAqB,KAAK,kBAAkB,MAAM;AAC1D,SAAK,UAAU,OAAO,MACpB;KACE,qBAAqB,CAAC,CAAC,KAAK;KAC5B,sBAAsB,KAAK,mBAAmB;KAC9C,uBAAuB,CAAC,CAAC,KAAK;KAC9B,sBAAsB,KAAK,qBAAqB;KACjD,EACD,uBACD;AAED,QAAI,KAAK,mBACP,MAAK,UAAU,OAAO,KAAK,EAAE,SAAS,EAAE,eAAe;QAEvD,MAAK,UAAU,OAAO,KAAK,aAAa;AAG1C,SAAK,UAAU,cACb,KAAK,qBACDJ,8BAAgB,eAChBA,8BAAgB,WACrB;AAED,QAAI;KACF,MAAM,EAAE,MAAM,iBAAiB,MAAMO,sCACnC,KAAK,QACL,KAAK,eACL,SACA,KAAK,iBACL,KAAK,UAAU,OAChB;AAGD,UAAK,eAAe,MAAM,aAAa;AAGvC,SAAI,KAAK,qBAAqB;AAC5B,WAAK,UAAU,OAAO,KACpB;OACE,iBAAiB,KAAK,oBAAoB;OAC1C,iBAAiB,KAAK;OACvB,EACD,+BACD;AACD,WAAK,oBAAoB,OAAO;AAChC,WAAK,sBAAsB;;AAG7B,UAAK,oBAAoB;AACzB,UAAK,iBAAiB,eAAe,KAAK,oBAAoB;AAC9D,UAAK,eAAe,eAAe,KAAK,iBAAiB;AACzD,eAAU;AACV,UAAK,qBAAqB;AAC1B,UAAK,UAAU,OAAO,KACpB;MAAE,cAAc,KAAK;MAAI;MAAc,EACvC,oBACD;AACD,UAAK,UAAU,cAAcP,8BAAgB,OAAO;AAEpD,SAAI,KAAK,oBAAoB;AAG3B,WAAK,GAAG,KACNC,yCACEC,+BAAe,OACbA,+BAAe,OAAO,EACpB,MAAMC,mCAAmB,cAC1B,CAAC,CACH,CAAC,QAAQ,CACX,CACF;AACD,WAAK,UAAU,OAAO,KACpB,EAAE,cAAc,KAAK,IAAI,EACzB,iDACD;YACI;AAIL,WAAK,GAAG,KACNF,yCACEC,+BAAe,OACbA,+BAAe,OAAO,EACpB,MAAMC,mCAAmB,cAC1B,CAAC,CACH,CAAC,QAAQ,CACX,CACF;AACD,WAAK,UAAU,OAAO,KACpB,EAAE,cAAc,KAAK,IAAI,EACzB,oBACD;;AAIH,UAAK,UAAU,qBAAqB,KAAK,cAAc;AAEvD,UAAK,qBAAqB;AAC1B,UAAK,oBAAoB;AACzB,UAAK,mBAAmB;aACjB,KAAK;AACZ,SAAI,EAAE,eAAeK,6BAAiB,OAAM;AAE5C,eAAU,IAAI,UAAU;AACxB,SAAI,eAAeC,uBAAW,MAAK,eAAe;AAClD,SAAI,eAAeC,kCACjB,MAAK,UAAU,OAAO,MAAM,qCAAqC;AAInE,SAAI,IAAI,SAAS,SAAS,0BAA0B,EAAE;MACpD,MAAM,SAAS,MAAM,KAAK,QAAQ,GAAG;AACrC,WAAK,UAAU,OAAO,KACpB;OAAE;OAAS,OAAO,KAAK,MAAM,OAAO;OAAE,OAAO,IAAI;OAAS,EAC1D,6BACD;AAID,UAHkB,MAAMC,4BAAe,cAAc;AACnD,cAAO,KAAK,sBAAsB,CAAC,KAAK,qBAAqB;QAC7D,CACa;AACf;;KAGF,MAAM,QAAQC,wBAAW,QAAQ;AACjC,UAAK,UAAU,OAAO,KACpB;MAAE;MAAS;MAAO,EAClB,6BACD;AAKD,SAHkB,MAAMD,4BAAe,aAAa;AAClD,aAAO,KAAK,sBAAsB,CAAC,KAAK,qBAAqB;OAC7D,CACa;AACf;;;AAOJ,OAAI,KAAK,mBAAmB,WAAW,EACrC,OAAM,KAAK,WAAW;GAExB,MAAM,UAAU,KAAK;AACrB,QAAK,qBAAqB,EAAE;AAC5B,QAAK,iBAAiB;AACtB,QAAK,UAAU,OAAO,MACpB;IACE;IACA,mBAAmB,KAAK;IACxB,qBAAqB,CAAC,CAAC,KAAK;IAC5B,sBAAsB,KAAK,mBAAmB;IAC/C,EACD,uBACD;;AAGH,OAAK,UAAU,OAAO,MACpB;GACE,mBAAmB,KAAK;GACxB,eAAe,OAAO,KAAK,KAAK,oBAAoB,cAAc,CAC/D;GACJ,EACD,yBACD;AAGD,OAAK,iBAAiB,MAAM;AAC5B,OAAK,eAAe,MAAM;AAC1B,OAAK,+BAA+B;AACpC,OAAK,mBAAmB,OAAO;AAC/B,OAAK,oBAAoB;AACzB,OAAK,qBAAqB,OAAO;AACjC,OAAK,sBAAsB;;;;;CAU7B,AAAQ,eAAe,MAAkB,cAA4B;EACnE,MAAM,EAAE,OAAO;EACf,MAAM,eAAe,KAAK;AAG1B,KAAG,WAAW,OAAO;AACnB,OAAI,KAAK,KAAM;GACf,MAAM,WAAW,KAAK,KAAK,GAAG,KAAK;AACnC,QAAK,UAAU,OAAO,KACpB;IACE;IACA;IACA;IACA,OAAQ,IAAmB;IAC5B,EACD,0BACD;AACD,QAAK,OAAO;AACZ,QAAK,gBAAgB,IAAI,aAAa;AACtC,OAAI,KAAK,mBAAmB,OAAO,aACjC,MAAK,oBAAoB;AAE3B,QAAK,KAAKP,4BAAY,QAAQ;;AAGhC,KAAG,WAAW,OAAO;AACnB,OAAI,KAAK,KAAM;GACf,MAAM,WAAW,KAAK,KAAK,GAAG,KAAK;AACnC,QAAK,UAAU,OAAO,KACpB;IACE;IACA;IACA;IACA,MAAM,GAAG;IACT,QAAQ,GAAG;IACZ,EACD,0BACD;AACD,QAAK,OAAO;AACZ,QAAK,gBAAgB,IAAI,aAAa;AACtC,OAAI,KAAK,mBAAmB,OAAO,aACjC,MAAK,oBAAoB;AAE3B,QAAK,KAAKA,4BAAY,QAAQ;;AAIhC,KAAG,YAAY,OAAO,UAAU;AAC9B,QAAK,yBAAyB,KAAK,KAAK;GAGxC,MAAM,iBAAiBS,qCADF,IAAI,WAAW,MAAM,KAAoB,CACN;AAExD,OAAI,eAAe,SAASV,mCAAmB,iBAAiB;IAC9D,MAAM,WAAW,KAAK,KAAK,GAAG,KAAK;AACnC,SAAK,UAAU,OAAO,KACpB;KAAE,cAAc,KAAK;KAAI;KAAc;KAAU,EACjD,2CACD;AAGD,SAAK,sBAAsB,KAAK;AAChC,SAAK,oBAAoB;AACzB,SAAK,KAAKC,4BAAY,eAAe;AACrC;;AAGF,OAAI,eAAe,SAASD,mCAAmB,mBAAmB;AAChE,SAAK,2BAA2B,KAAK,KAAK;AAC1C,SAAK,oBAAoB;AACzB,SAAK,UAAU,OAAO,MACpB,EAAE,cAAc,EAChB,4BACD;AACD;;AAGF,OAAI,eAAe,SAASA,mCAAmB,0BAA0B;AACvE,UAAM,KAAK,iBAAiB,sBAAsB,gBAAgB,KAAK;AACvE;;AAGF,OAAI,eAAe,SAASA,mCAAmB,kBAAkB;AAC/D,SAAK,iBAAiB,eAAe,gBAAgB,aAAa;AAClE;;AAGF,OACE,eAAe,SACfA,mCAAmB,iCACnB;AACA,SAAK,iBAAiB,qBACpB,gBACA,aACD;AACD;;AAGF,QAAK,UAAU,OAAO,KACpB;IACE,MAAMW,yCAAyB,eAAe,KAAK;IACnD,SAAS,eAAe;IACxB,OAAO,KAAK,UAAU,UAAU;IAChC;IACD,EACD,0BACD"}