import { Buffer } from 'buffer';
import * as fs from 'fs';
import * as net from "net";
import * as tls from "tls";
import * as http from "http";
import * as http2 from "http2";

import * as _ from "lodash";
import { EventEmitter } from 'events';
import getPort, { portNumbers } from 'get-port';
import connect = require("connect");
import cors = require("cors");
import WebSocket = require("ws");

const now = () => performance.now();
import { Mutex } from 'async-mutex';
import { ErrorLike, isErrorLike, UnreachableCheck } from '@httptoolkit/util';

import {
    InitiatedRequest,
    OngoingRequest,
    CompletedRequest,
    OngoingResponse,
    CompletedResponse,
    TlsHandshakeFailure,
    ClientError,
    WebSocketMessage,
    WebSocketClose,
    TlsPassthroughEvent,
    RuleEvent,
    RawPassthroughEvent,
    RawPassthroughDataEvent,
    RawHeaders,
    InformationalResponse,
    InitiatedResponse,
    BodyData
} from "../types";
import { DestroyableServer } from "destroyable-server";
import {
    Mockttp,
    AbstractMockttp,
    MockttpOptions,
    MockttpHttpsOptions,
    PortRange
} from "../mockttp";
import { RequestRule, RequestRuleData } from "../rules/requests/request-rule";
import { ServerMockedEndpoint } from "./mocked-endpoint";
import { createComboServer } from "./http-combo-server";
import { filter } from "../util/promise";

import {
    buildRawSocketEventData,
    buildTlsSocketEventData,
    isSocketLoop,
    resetOrDestroy,
    buildSocketErrorRequestTimings
} from "../util/socket-util";
import {
    ClientErrorInProgress,
    LastHopEncrypted,
    TlsSetupCompleted,
    SocketMetadata,
    TlsMetadata,
    Expects100Continue,
} from '../util/socket-extensions';
import { getSocketMetadataTags, getSocketMetadataFromProxyAuth } from '../util/socket-metadata'
import {
    waitForCompletedRequest,
    trackResponse,
    waitForCompletedResponse,
    buildInitiatedRequest,
    tryToParseHttpRequest,
    buildBodyReader,
    parseRawHttpResponse,
    buildInitiatedResponse,
    preprocessRequest,
    isHttp2,
    ExtendedRawRequest
} from "../util/request-utils";
import { asBuffer } from "../util/buffer-utils";
import {
    pairFlatRawHeaders,
    rawHeadersToObject
} from "../util/header-utils";
import { AbortError } from "../rules/requests/request-step-impls";
import { WebSocketRuleData, WebSocketRule } from "../rules/websockets/websocket-rule";
import { SocksServerOptions } from "./socks-server";

const serverPortCheckMutex = new Mutex();

/**
 * A in-process Mockttp implementation. This starts servers on the local machine in the
 * current process, and exposes methods to directly manage them.
 *
 * This class does not work in browsers, as it expects to be able to start HTTP servers.
 */
export class MockttpServer extends AbstractMockttp implements Mockttp {

    private requestRuleSets: { [priority: number]: RequestRule[] } = {};
    private webSocketRuleSets: { [priority: number]: WebSocketRule[] } = {};

    private httpsOptions: MockttpHttpsOptions | undefined;
    private isHttp2Enabled: boolean | 'fallback';
    private socksOptions: boolean | SocksServerOptions;
    private passthroughUnknownProtocols: boolean;
    private maxBodySize: number;
    private keyLogFilePath: string | undefined;
    private keyLogStream: fs.WriteStream | undefined;

    private app: connect.Server;
    private server: DestroyableServer<net.Server> | undefined;

    private eventEmitter: EventEmitter;

    private readonly initialDebugSetting: boolean;

    constructor(options: MockttpOptions = {}) {
        super(options);

        this.initialDebugSetting = this.debug;

        this.httpsOptions = options.https;
        this.isHttp2Enabled = options.http2 ?? 'fallback';
        this.socksOptions = options.socks ?? false;
        this.passthroughUnknownProtocols = options.passthrough?.includes('unknown-protocol') ?? false;
        this.maxBodySize = options.maxBodySize ?? Infinity;
        this.keyLogFilePath = options.https?.keyLogFile;

        this.eventEmitter = new EventEmitter();

        this.app = connect();

        if (this.corsOptions) {
            if (this.debug) console.log('Enabling CORS');

            const corsOptions = this.corsOptions === true
                ? { methods: ['GET', 'POST', 'PUT', 'DELETE', 'PATCH', 'OPTIONS'] }
                : this.corsOptions;

            this.app.use(cors(corsOptions) as connect.HandleFunction);
        }

        this.app.use(this.handleRequest.bind(this));
    }

    async start(portParam: number | PortRange = { startPort: 8000, endPort: 65535 }): Promise<void> {
        if (this.keyLogFilePath) {
            this.keyLogStream = fs.createWriteStream(this.keyLogFilePath, { flags: 'a' });
            this.keyLogStream.on('error', (err) => {
                console.warn(`Error writing TLS key log file ${this.keyLogFilePath}:`, err);
            });
        }

        this.server = await createComboServer({
            debug: this.debug,
            https: this.httpsOptions,
            http2: this.isHttp2Enabled,
            socks: this.socksOptions,
            passthroughUnknownProtocols: this.passthroughUnknownProtocols,
            keyLogStream: this.keyLogStream,

            requestListener: this.app,
            tlsClientErrorListener: this.announceTlsErrorAsync.bind(this),
            tlsPassthroughListener: this.passthroughSocket.bind(this, 'tls'),
            rawPassthroughListener: this.passthroughSocket.bind(this, 'raw')
        });

        // We use a mutex here to avoid contention on ports with parallel setup
        await serverPortCheckMutex.runExclusive(async () => {
            const port = typeof portParam === 'number'
                ? portParam
                : await getPort({
                    port: portNumbers(portParam.startPort, portParam.endPort)
                });

            if (this.debug) console.log(`Starting mock server on port ${port}`);
            this.server!.listen(port);
        });

        // Handle & report client request errors
        this.server!.on('clientError', this.handleInvalidHttp1Request.bind(this));
        this.server!.on('sessionError', this.handleInvalidHttp2Request.bind(this));

        // Track the socket of HTTP/2 sessions, for error reporting later:
        this.server!.on('session', (session) => {
            session.on('connect', (session: http2.Http2Session, socket: net.Socket) => {
                session.initialSocket = socket;
            });
        });

        this.server!.on('upgrade', this.handleWebSocket.bind(this));

        return new Promise<void>((resolve, reject) => {
            this.server!.on('listening', resolve);
            this.server!.on('error', (e: any) => {
                // Although we try to pick a free port, we may have race conditions, if something else
                // takes the same port at the same time. If you haven't explicitly picked a port, and
                // we do have a collision, simply try again.
                if (e.code === 'EADDRINUSE' && typeof portParam !== 'number') {
                    if (this.debug) console.log('Address in use, retrying...');

                    // Destroy just in case there is something that needs cleanup here. Catch because most
                    // of the time this will error with 'Server is not running'.
                    this.server!.destroy().catch(() => {});
                    resolve(this.start());
                } else {
                    reject(e);
                }
            });
        });
    }

    async stop(): Promise<void> {
        if (this.debug) console.log(`Stopping server at ${this.url}`);

        if (this.server) await this.server.destroy();

        if (this.keyLogStream) {
            const keyLogStream = this.keyLogStream;
            await new Promise<void>((resolve) => {
                keyLogStream.end();
                if (keyLogStream.writableFinished || keyLogStream.errored) {
                    resolve();
                    return;
                }
                // N.b. errors are already logged by setup logic
                keyLogStream.once('finish', resolve);
                keyLogStream.once('error', () => resolve());
            });
        }

        this.reset();
    }

    enableDebug() {
        this.debug = true;
    }

    reset() {
        Object.values(this.requestRuleSets).flat().forEach(r => r.dispose());
        this.requestRuleSets = [];

        Object.values(this.webSocketRuleSets).flat().forEach(r => r.dispose());
        this.webSocketRuleSets = [];

        this.debug = this.initialDebugSetting;

        this.eventEmitter.removeAllListeners();
        this.eventEmitter = new EventEmitter();
    }

    private get address() {
        if (!this.server) throw new Error('Cannot get address before server is started');

        return (this.server.address() as net.AddressInfo)
    }

    get url(): string {
        if (!this.server) throw new Error('Cannot get url before server is started');

        if (this.httpsOptions) {
            return "https://localhost:" + this.port;
        } else {
            return "http://localhost:" + this.port;
        }
    }

    get port(): number {
        if (!this.server) throw new Error('Cannot get port before server is started');

        return this.address.port;
    }

    private addToRuleSets<R extends RequestRule | WebSocketRule>(
        ruleSets: { [priority: number]: R[] },
        rule: R
    ) {
        ruleSets[rule.priority] ??= [];
        ruleSets[rule.priority].push(rule);
    }

    public setRequestRules = (...ruleData: RequestRuleData[]): Promise<ServerMockedEndpoint[]> => {
        Object.values(this.requestRuleSets).flat().forEach(r => r.dispose());

        const rules = ruleData.map((ruleDatum) => new RequestRule(ruleDatum));
        this.requestRuleSets = _.groupBy(rules, r => r.priority);

        return Promise.resolve(rules.map(r => new ServerMockedEndpoint(r)));
    }

    public addRequestRules = (...ruleData: RequestRuleData[]): Promise<ServerMockedEndpoint[]> => {
        return Promise.resolve(ruleData.map((ruleDatum) => {
            const rule = new RequestRule(ruleDatum);
            this.addToRuleSets(this.requestRuleSets, rule);
            return new ServerMockedEndpoint(rule);
        }));
    }

    public setWebSocketRules = (...ruleData: WebSocketRuleData[]): Promise<ServerMockedEndpoint[]> => {
        Object.values(this.webSocketRuleSets).flat().forEach(r => r.dispose());

        const rules = ruleData.map((ruleDatum) => new WebSocketRule(ruleDatum));
        this.webSocketRuleSets = _.groupBy(rules, r => r.priority);

        return Promise.resolve(rules.map(r => new ServerMockedEndpoint(r)));
    }

    public addWebSocketRules = (...ruleData: WebSocketRuleData[]): Promise<ServerMockedEndpoint[]> => {
        return Promise.resolve(ruleData.map((ruleDatum) => {
            const rule = new WebSocketRule(ruleDatum);
            (this.webSocketRuleSets[rule.priority] ??= []).push(rule);
            return new ServerMockedEndpoint(rule);
        }));
    }

    public async getMockedEndpoints(): Promise<ServerMockedEndpoint[]> {
        return [
            ...Object.values(this.requestRuleSets).flatMap(rules => rules.map(r => new ServerMockedEndpoint(r))),
            ...Object.values(this.webSocketRuleSets).flatMap(rules => rules.map(r => new ServerMockedEndpoint(r)))
        ];
    }

    public async getPendingEndpoints() {
        const withPendingPromises = (await this.getMockedEndpoints())
            .map(async (endpoint) => ({
                endpoint,
                isPending: await endpoint.isPending()
            }));

        const withPending = await Promise.all(withPendingPromises);
        return withPending.filter(wp => wp.isPending).map(wp => wp.endpoint);
    }

    public async getRuleParameterKeys() {
        return []; // Local servers never have rule parameters defined
    }

    public on(event: 'request-initiated', callback: (req: InitiatedRequest) => void): Promise<void>;
    public on(event: 'request-body-data', callback: (req: BodyData) => void): Promise<void>;
    public on(event: 'request', callback: (req: CompletedRequest) => void): Promise<void>;
    public on(event: 'response-initiated', callback: (req: InitiatedResponse) => void): Promise<void>;
    public on(event: 'response-information', callback: (info: InformationalResponse) => void): Promise<void>;
    public on(event: 'response-body-data', callback: (req: BodyData) => void): Promise<void>;
    public on(event: 'response', callback: (req: CompletedResponse) => void): Promise<void>;
    public on(event: 'abort', callback: (req: InitiatedRequest) => void): Promise<void>;
    public on(event: 'websocket-request', callback: (req: CompletedRequest) => void): Promise<void>;
    public on(event: 'websocket-accepted', callback: (req: CompletedResponse) => void): Promise<void>;
    public on(event: 'websocket-message-received', callback: (req: WebSocketMessage) => void): Promise<void>;
    public on(event: 'websocket-message-sent', callback: (req: WebSocketMessage) => void): Promise<void>;
    public on(event: 'websocket-close', callback: (close: WebSocketClose) => void): Promise<void>;
    public on(event: 'tls-passthrough-opened', callback: (req: TlsPassthroughEvent) => void): Promise<void>;
    public on(event: 'tls-passthrough-closed', callback: (req: TlsPassthroughEvent) => void): Promise<void>;
    public on(event: 'tls-client-error', callback: (req: TlsHandshakeFailure) => void): Promise<void>;
    public on(event: 'client-error', callback: (error: ClientError) => void): Promise<void>;
    public on(event: 'raw-passthrough-opened', callback: (req: RawPassthroughEvent) => void): Promise<void>;
    public on(event: 'raw-passthrough-closed', callback: (req: RawPassthroughEvent) => void): Promise<void>;
    public on(event: 'raw-passthrough-data', callback: (req: RawPassthroughDataEvent) => void): Promise<void>;
    public on<T = unknown>(event: 'rule-event', callback: (event: RuleEvent<T>) => void): Promise<void>;
    public on(event: string, callback: (...args: any[]) => void): Promise<void> {
        this.eventEmitter.on(event, callback);
        return Promise.resolve();
    }

    public listenerCount(event: string, listener?: ((...args: any[]) => void)): number {
        return this.eventEmitter.listenerCount(event, listener);
    }

    private announceBodyDataAsync(
        emitter: EventEmitter,
        type: 'request' | 'response',
        id: string,
        eventTimestamp: number,
        content: Uint8Array,
        isEnded: boolean
    ) {
        setImmediate(() => {
            emitter.emit(`${type}-body-data`, {
                id,
                content,
                isEnded,
                eventTimestamp
            });
        });
    }

    private announceInitialRequestAsync(emitter: EventEmitter, request: OngoingRequest) {
        if (emitter.listenerCount('request-initiated') === 0) return;

        setImmediate(() => {
            const initiatedReq = buildInitiatedRequest(request);
            initiatedReq.timingEvents = { ...initiatedReq.timingEvents };
            initiatedReq.tags = initiatedReq.tags.slice();
            emitter.emit('request-initiated', initiatedReq);
        });
    }

    private announceCompletedRequestAsync(emitter: EventEmitter, request: OngoingRequest) {
        if (emitter.listenerCount('request') === 0) return;

        waitForCompletedRequest(request)
        .then((completedReq: CompletedRequest) => {
            setImmediate(() => {
                completedReq.timingEvents = { ...completedReq.timingEvents };
                completedReq.tags = completedReq.tags.slice();
                emitter.emit('request', completedReq);
            });
        })
        .catch(console.error);
    }

    private announceInitialResponseAsync(emitter: EventEmitter, response: OngoingResponse) {
        if (emitter.listenerCount('response-initiated') === 0) return;

        setImmediate(() => {
            const initiatedRes = buildInitiatedResponse(response);
            initiatedRes.timingEvents = { ...initiatedRes.timingEvents };
            initiatedRes.tags = initiatedRes.tags.slice();
            emitter.emit('response-initiated', initiatedRes);
        });
    }

    private announceResponseInformationAsync(
        emitter: EventEmitter,
        response: OngoingResponse,
        status: number,
        flatHeaders: string[]
    ) {
        if (emitter.listenerCount('response-information') === 0) return;

        setImmediate(() => {
            // This matches & extends initiatedResponse, but with tweaks for the info
            // that is otherwise not stored since we haven't called writeHead etc yet.
            const rawHeaders = pairFlatRawHeaders(flatHeaders);
            const info: InformationalResponse = {
                ...buildInitiatedResponse(response),
                statusCode: status,
                statusMessage: isHttp2(response) ? '' : (http.STATUS_CODES[status] ?? ''),
                headers: rawHeadersToObject(rawHeaders),
                rawHeaders,
                eventTimestamp: now()
            };
            info.timingEvents = { ...info.timingEvents };
            info.tags = info.tags.slice();
            emitter.emit('response-information', info);
        });
    }

    private announceResponseAsync(emitter: EventEmitter, response: OngoingResponse | CompletedResponse) {
        if (emitter.listenerCount('response') === 0) return;

        waitForCompletedResponse(response)
        .then((res: CompletedResponse) => {
            setImmediate(() => {
                res.timingEvents = { ...res.timingEvents };
                res.tags = res.tags.slice();
                emitter.emit('response', res);
            });
        })
        .catch(console.error);
    }

    private announceWebSocketRequestAsync(emitter: EventEmitter, request: OngoingRequest) {
        if (emitter.listenerCount('websocket-request') === 0) return;

        waitForCompletedRequest(request)
        .then((completedReq: CompletedRequest) => {
            setImmediate(() => {
                completedReq.timingEvents = { ...completedReq.timingEvents };
                completedReq.tags = completedReq.tags.slice();
                emitter.emit('websocket-request', completedReq);
            });
        })
        .catch(console.error);
    }

    private announceWebSocketUpgradeAsync(emitter: EventEmitter, response: CompletedResponse) {
        if (emitter.listenerCount('websocket-accepted') === 0) return;

        setImmediate(() => {
            emitter.emit('websocket-accepted', {
                ...response,
                timingEvents: { ...response.timingEvents },
                tags: response.tags.slice()
            });
        });
    }

    private announceWebSocketMessageAsync(
        emitter: EventEmitter,
        request: OngoingRequest,
        direction: 'sent' | 'received',
        content: Buffer,
        isBinary: boolean
    ) {
        const eventName = `websocket-message-${direction}`;
        if (emitter.listenerCount(eventName) === 0) return;

        setImmediate(() => {
            emitter.emit(eventName, {
                streamId: request.id,

                direction,
                content,
                isBinary,

                eventTimestamp: now(),
                timingEvents: request.timingEvents,
                tags: request.tags
            } as WebSocketMessage);
        });
    }

    private announceWebSocketCloseAsync(
        emitter: EventEmitter,
        request: OngoingRequest,
        closeCode: number | undefined,
        closeReason?: string
    ) {
        if (emitter.listenerCount('websocket-close') === 0) return;

        setImmediate(() => {
            emitter.emit('websocket-close', {
                streamId: request.id,

                closeCode,
                closeReason,

                timingEvents: request.timingEvents,
                tags: request.tags
            } as WebSocketClose);
        });
    }

    // Hook the request and socket to announce all WebSocket events after the initial request:
    private trackWebSocketEvents(emitter: EventEmitter, request: OngoingRequest, socket: net.Socket) {
        const originalWrite = socket._write;
        const originalWriteV = socket._writev;

        // Hook the socket to capture our upgrade response:
        let data = Buffer.from([]);
        socket._writev = undefined;
        socket._write = function (): any {
            data = Buffer.concat([data, asBuffer(arguments[0])]);
            return originalWrite.apply(this, arguments as any);
        };

        let upgradeCompleted = false;

        socket.once('close', () => {
            if (upgradeCompleted) return;

            if (data.length) {
                request.timingEvents.responseSentTimestamp = now();

                const httpResponse = parseRawHttpResponse(data, request);
                this.announceResponseAsync(emitter, httpResponse);
            } else {
                // Connect closed during upgrade, before we responded:
                request.timingEvents.abortedTimestamp = now();
                this.announceAbortAsync(emitter, request);
            }
        });

        socket.once('ws-upgrade', (ws: WebSocket) => {
            upgradeCompleted = true;

            // Undo our write hook setup:
            socket._write = originalWrite;
            socket._writev = originalWriteV;

            request.timingEvents.wsAcceptedTimestamp = now();

            const httpResponse = parseRawHttpResponse(data, request);
            this.announceWebSocketUpgradeAsync(emitter, httpResponse);

            ws.on('message', (data: Buffer, isBinary) => {
                this.announceWebSocketMessageAsync(emitter, request, 'received', data, isBinary);
            });

            // Wrap ws.send() to report all sent data:
            const _send = ws.send;
            const self = this;
            ws.send = function (data: any, options: any): any {
                const isBinary = options.binary
                    ?? typeof data !== 'string';

                _send.apply(this, arguments as any);
                self.announceWebSocketMessageAsync(emitter, request, 'sent', asBuffer(data), isBinary);
            };

            ws.on('close', (closeCode, closeReason) => {
                if (closeCode === 1006) {
                    // Not a clean close!
                    request.timingEvents.abortedTimestamp = now();
                    this.announceAbortAsync(emitter, request);
                } else {
                    request.timingEvents.wsClosedTimestamp = now();

                    this.announceWebSocketCloseAsync(
                        emitter,
                        request,
                        closeCode === 1005
                            ? undefined // Clean close, but with a close frame with no status
                            : closeCode,
                        closeReason.toString('utf8')
                    );
                }
            });
        });
    }

    private async announceAbortAsync(emitter: EventEmitter, request: OngoingRequest, abortError?: ErrorLike) {
        setImmediate(() => {
            const req = buildInitiatedRequest(request);
            req.timingEvents = { ...req.timingEvents };
            req.tags = req.tags.slice();
            emitter.emit('abort', Object.assign(req, {
                error: abortError ? {
                    name: abortError.name,
                    code: abortError.code,
                    message: abortError.message,
                    stack: abortError.stack
                } : undefined
            }));
        });
    }

    private async announceTlsErrorAsync(socket: net.Socket, request: TlsHandshakeFailure) {
        // Ignore errors after TLS is setup, those are client errors
        if (socket instanceof tls.TLSSocket && socket[TlsSetupCompleted]) return;

        const emitter = this.eventEmitter;
        setImmediate(() => {
            if (this.debug) console.warn(`TLS client error: ${JSON.stringify(request)}`);
            emitter.emit('tls-client-error', request);
        });
    }

    private async announceClientErrorAsync(emitter: EventEmitter, socket: net.Socket | undefined, error: ClientError) {
        // Ignore errors before TLS is setup, those are TLS errors
        if (
            socket instanceof tls.TLSSocket &&
            !socket[TlsSetupCompleted] &&
            error.errorCode !== 'ERR_HTTP2_ERROR' // Initial HTTP/2 errors are considered post-TLS
        ) return;

        setImmediate(() => {
            if (this.debug) console.warn(`Client error: ${JSON.stringify(error)}`);
            emitter.emit('client-error', error);
        });
    }

    private async announceRuleEventAsync(emitter: EventEmitter, requestId: string, ruleId: string, eventType: string, eventData: unknown) {
        setImmediate(() => {
            emitter.emit('rule-event', {
                requestId,
                ruleId,
                eventType,
                eventData
            });
        });
    }

    private preprocessRequest(req: ExtendedRawRequest, type: 'request' | 'websocket', emitter: EventEmitter): OngoingRequest | null {
        try {
            return preprocessRequest(req, {
                type,
                maxBodySize: this.maxBodySize,
                serverPort: this.port,
                onBodyData: emitter.listenerCount('request-body-data') > 0
                    ? this.announceBodyDataAsync.bind(this, emitter, 'request')
                    : undefined

            })
        } catch (e: any) {
            const error: Error = Object.assign(e, {
                code: e.code ?? 'PREPROCESSING_FAILED',
                badRequest: req
            });

            const h2Session = req.httpVersionMajor > 1 &&
                (req as any).stream?.session;

            if (h2Session) {
                this.handleInvalidHttp2Request(error, h2Session);
            } else {
                this.handleInvalidHttp1Request(error, req.socket)
            }

            return null; // Null -> preprocessing failed, error already handled here
        }
    }

    private async handleRequest(rawRequest: ExtendedRawRequest, rawResponse: http.ServerResponse) {
        // Capture the event emitter for this request's lifecycle to avoid races where events
        // fire on servers after they're closed/reset and the emitter has been changed.
        const emitter = this.eventEmitter;

        const request = this.preprocessRequest(rawRequest, 'request', emitter);
        if (request === null) return; // Preprocessing failed - don't handle this

        if (this.debug) console.log(`Handling request for ${rawRequest.url}`);

        let result: 'responded' | 'aborted' | null = null;
        const abort = (error?: Error) => {
            if (result === null) {
                result = 'aborted';
                request.timingEvents.abortedTimestamp = now();
                this.announceAbortAsync(emitter, request, error);
            }
        }
        request.once('aborted', abort);
        // In Node 16+ we don't get an abort event in many cases, just closes, but we know
        // it's aborted because the response is closed with no other result being set.
        rawResponse.once('close', () => setImmediate(abort));
        request.once('error', (error) => setImmediate(() => abort(error)));

        this.announceInitialRequestAsync(emitter, request);

        const response = trackResponse(
            rawResponse,
            request.timingEvents,
            request.tags,
            {
                maxSize: this.maxBodySize,
                onWriteHead: () => this.announceInitialResponseAsync(emitter, response),
                onBodyData: emitter.listenerCount('response-body-data') > 0
                    ? this.announceBodyDataAsync.bind(this, emitter, 'response')
                    : undefined,
                onInformationalResponse: (status, flatHeaders) =>
                    this.announceResponseInformationAsync(emitter, response, status, flatHeaders)
            }
        );

        // If the request had Expect: 100-continue, we trigger the auto response now.
        // Standard Node behaviour, reproduced manually for event visibility.
        if (rawRequest[Expects100Continue]) {
            response.sendInformationalResponse(100, []);
        }

        const hasResponseListener = emitter.listenerCount('response') > 0;
        if (hasResponseListener) {
            // Start buffering response body if there's somebody who
            // might want to hear about it later
            response.body.asBuffer().catch(() => {});
        }

        response.id = request.id;
        response.on('error', (error) => {
            console.log('Response error:', this.debug ? error : error.message);
            abort(error);
        });

        try {
            let nextRulePromise = this.findMatchingRule(this.requestRuleSets, request);

            // Async: once we know what the next rule is, ping a request event
            nextRulePromise
                .then((rule) => rule ? rule.id : undefined)
                .catch(() => undefined)
                .then((ruleId) => {
                    request.matchedRuleId = ruleId;
                    this.announceCompletedRequestAsync(emitter, request);
                });

            let nextRule = await nextRulePromise;
            if (nextRule) {
                if (this.debug) console.log(`Request matched rule: ${nextRule.explain()}`);
                await nextRule.handle(request, response, {
                    record: this.recordTraffic,
                    debug: this.debug,
                    keyLogStream: this.keyLogStream,
                    emitEventCallback: (emitter.listenerCount('rule-event') !== 0)
                        ? (type, event) => this.announceRuleEventAsync(emitter, request.id, nextRule!.id, type, event)
                        : undefined
                });
            } else {
                await this.sendUnmatchedRequestError(request, response);
            }

            if (!response.writableEnded && !response.destroyed) {
                throw new Error("Request handler finished successfully without ending the response");
            }

            result ||= 'responded';
        } catch (e) {
            if (e instanceof AbortError) {
                abort(e);
                response.destroy(e);

                if (this.debug) {
                    console.error("Failed to handle request due to abort:", e);
                }
            } else {
                console.error("Failed to handle request:",
                    this.debug
                        ? e
                        : (isErrorLike(e) && e.message) || e
                );

                // Do whatever we can to tell the client we broke
                try {
                    response.writeHead(
                        (isErrorLike(e) && e.statusCode) || 500,
                        (isErrorLike(e) && e.statusMessage) || 'Server error'
                    );
                } catch (e) {}

                try {
                    response.end((isErrorLike(e) && e.toString()) || e);
                    result ||= 'responded';
                } catch (e) {
                    abort(e as Error);
                }
            }
        }

        if (result === 'responded' && hasResponseListener) {
            this.announceResponseAsync(emitter, response);
        }
    }

    private async handleWebSocket(rawRequest: ExtendedRawRequest, socket: net.Socket, head: Buffer) {
        const emitter = this.eventEmitter;

        const request = this.preprocessRequest(rawRequest, 'websocket', emitter);
        if (request === null) return; // Preprocessing failed - don't handle this

        if (this.debug) console.log(`Handling websocket for ${rawRequest.url}`);

        socket.on('error', (error) => {
            console.log('Response error:', this.debug ? error : error.message);
            socket.destroy();
        });

        try {
            let nextRulePromise = this.findMatchingRule(this.webSocketRuleSets, request);

            // Async: once we know what the next rule is, ping a websocket-request event
            nextRulePromise
                .then((rule) => rule ? rule.id : undefined)
                .catch(() => undefined)
                .then((ruleId) => {
                    request.matchedRuleId = ruleId;
                    this.announceWebSocketRequestAsync(emitter, request);
                });

            this.trackWebSocketEvents(emitter, request, socket);

            let nextRule = await nextRulePromise;
            if (nextRule) {
                if (this.debug) console.log(`Websocket matched rule: ${nextRule.explain()}`);
                await nextRule.handle(request, socket, head, {
                    record: this.recordTraffic,
                    debug: this.debug,
                    keyLogStream: this.keyLogStream,
                    emitEventCallback: (emitter.listenerCount('rule-event') !== 0)
                        ? (type, event) => this.announceRuleEventAsync(emitter, request.id, nextRule!.id, type, event)
                        : undefined
                });
            } else {
                await this.sendUnmatchedWebSocketError(request, socket, head);
            }
        } catch (e) {
            if (e instanceof AbortError) {
                if (this.debug) {
                    console.error("Failed to handle websocket due to abort:", e);
                }
            } else {
                console.error("Failed to handle websocket:",
                    this.debug
                    ? e
                    : (isErrorLike(e) && e.message) || e
                );
                this.sendWebSocketErrorResponse(socket, e);
            }
        }
    }

    /**
     * To match rules, we find the first rule (by priority then by set order) which matches and which is
     * either not complete (has a completion check that's false) or which has no completion check defined
     * and is the last option at that priority (i.e. by the last option at each priority repeats indefinitely.
     *
     * We move down the priority list only when either no rules match at all, or when all matching rules
     * have explicit completion checks defined that are completed.
     */
    private async findMatchingRule<R extends WebSocketRule | RequestRule>(
        ruleSets: { [priority: number]: Array<R> },
        request: OngoingRequest
    ): Promise<R | undefined> {
        for (let ruleSet of Object.values(ruleSets).reverse()) { // Obj.values returns numeric keys in ascending order
            // Start all rules matching immediately
            const rulesMatches = ruleSet
                .filter((r) => r.isComplete() !== true) // Skip all rules that are definitely completed
                .map((r) => ({ rule: r, match: r.matches(request) }));

            // Evaluate the matches one by one, and immediately use the first
            for (let { rule, match } of rulesMatches) {
                if (await match && rule.isComplete() === false) {
                    // The first matching incomplete rule we find is the one we should use
                    return rule;
                }
            }

            // There are no incomplete & matching rules! One last option: if the last matching rule is
            // maybe-incomplete (i.e. default completion status but has seen >0 requests) then it should
            // match anyway. This allows us to add rules and have the last repeat indefinitely.
            const matchingRules = await filter(rulesMatches, m => m.match);
            const lastMatchingRule = matchingRules[matchingRules.length - 1]?.rule;
            if (!lastMatchingRule || lastMatchingRule.isComplete()) continue; // On to lower priority matches
            // Otherwise, must be a rule with isComplete === null, i.e. no specific completion check:
            else return lastMatchingRule;
        }

        return undefined; // There are zero valid matching rules at any priority, give up.
    }

    private async getUnmatchedRequestExplanation(request: OngoingRequest) {
        let requestExplanation = await this.explainRequest(request);
        if (this.debug) console.warn(`Unmatched request received: ${requestExplanation}`);

        const requestRules = Object.values(this.requestRuleSets).flat();
        const webSocketRules = Object.values(this.webSocketRuleSets).flat();

        return `No rules were found matching this request.
This request was: ${requestExplanation}

${(requestRules.length > 0 || webSocketRules.length > 0)
    ? `The configured rules are:
${requestRules.map((rule) => rule.explain()).join("\n")}
${webSocketRules.map((rule) => rule.explain()).join("\n")}
`
    : "There are no rules configured."
}
${await this.suggestRule(request)}`
    }

    private async sendUnmatchedRequestError(request: OngoingRequest, response: http.ServerResponse) {
        response.setHeader('Content-Type', 'text/plain');
        response.writeHead(503, "Request for unmocked endpoint");
        response.end(await this.getUnmatchedRequestExplanation(request));
    }

    private async sendUnmatchedWebSocketError(
        request: OngoingRequest,
        socket: net.Socket,
        head: Buffer
    ) {
        const errorBody = await this.getUnmatchedRequestExplanation(request);
        socket.on('error', () => {}); // Best efforts, we don't care about failures here.
        socket.end([
            'HTTP/1.1 503 Request for unmocked endpoint',
            'Connection: close',
            'Content-Type: text/plain'
        ].join('\r\n') +
        '\r\n\r\n' +
        errorBody);
        socket.destroy();
    }

    private async sendWebSocketErrorResponse(socket: net.Socket, error: unknown) {
        if (socket.writable) {
            socket.end(
                'HTTP/1.1 500 Internal Server Error\r\n' +
                '\r\n' +
                (isErrorLike(error)
                    ? error.message ?? error.toString()
                    : ''
                )
            );
        }

        socket.destroy(error as Error);
    }

    private async explainRequest(request: OngoingRequest): Promise<string> {
        let msg = `${request.method} request to ${request.url}`;

        let bodyText = await request.body.asText();
        if (bodyText) msg += ` with body \`${bodyText}\``;

        if (!_.isEmpty(request.headers)) {
            msg += ` with headers:\n${JSON.stringify(request.headers, null, 2)}`;
        }

        return msg;
    }

    private async suggestRule(request: OngoingRequest): Promise<string> {
        if (!this.suggestChanges) return '';

        let msg = "You can fix this by adding a rule to match this request, for example:\n"

        msg += `mockServer.for${_.startCase(request.method.toLowerCase())}("${request.path}")`;

        const contentType = request.headers['content-type'];
        let isFormRequest = !!contentType && contentType.indexOf("application/x-www-form-urlencoded") > -1;
        let formBody = await request.body.asFormData().catch(() => undefined);

        if (isFormRequest && !!formBody) {
            msg += `.withForm(${JSON.stringify(formBody)})`;
        }

        msg += '.thenReply(200, "your response");';

        return msg;
    }

    // Called on server clientError, e.g. if the client disconnects during initial
    // request data, or sends totally invalid gibberish. Only called for HTTP/1.1 errors.
    private handleInvalidHttp1Request(
        error: Error & { code?: string, rawPacket?: Buffer, badRequest?: ExtendedRawRequest },
        socket: net.Socket
    ) {
        const emitter = this.eventEmitter;
        if (socket[ClientErrorInProgress]) {
            // For subsequent errors on the same socket, accumulate packet data (linked to the socket)
            // so that the error (probably delayed until next tick) has it all to work with
            const previousPacket = socket[ClientErrorInProgress].rawPacket;
            const newPacket = error.rawPacket;
            if (!newPacket || newPacket === previousPacket) return;

            if (previousPacket && previousPacket.length > 0) {
                if (previousPacket.equals(newPacket.slice(0, previousPacket.length))) {
                    // This is the same data, but more - update the client error data
                    socket[ClientErrorInProgress].rawPacket = newPacket;
                } else {
                    // This is different data for the same socket, probably an overflow, append it
                    socket[ClientErrorInProgress].rawPacket = Buffer.concat([
                        previousPacket,
                        newPacket
                    ]);
                }
            } else {
                // The first error had no data, we have data - use our data
                socket[ClientErrorInProgress]!.rawPacket = newPacket;
            }
            return;
        }

        // We can get multiple errors for the same socket in rapid succession as the parser works,
        // so we store the initial buffer, wait a tick, and then reply/report the accumulated
        // buffer from all errors together.
        socket[ClientErrorInProgress] = {
            // We use HTTP peeked data to catch extra data the parser sees due to httpolyglot peeking,
            // but which gets lost from the raw packet. If that data alone causes an error though
            // (e.g. Q as first char) then this packet data does get thrown! Eugh. In that case,
            // we need to avoid using both by accident, so we use just the non-peeked data instead
            // if the initial data is _exactly_ identical.
            rawPacket: error.rawPacket
        };

        setImmediate(async () => {
            const errorCode = error.code;
            const isHeaderOverflow = errorCode === "HPE_HEADER_OVERFLOW";

            const commonParams = {
                id: crypto.randomUUID(),
                tags: [
                    `client-error:${error.code || 'UNKNOWN'}`,
                    ...getSocketMetadataTags(socket[SocketMetadata])
                ],
                timingEvents: buildSocketErrorRequestTimings(socket)
            };

            const rawPacket = socket[ClientErrorInProgress]?.rawPacket
                ?? Buffer.from([]);

            // For packets where we get more than just httpolyglot-peeked data, guess-parse them:
            const parsedRequest = error.badRequest ??
                (rawPacket.byteLength > 1
                    ? tryToParseHttpRequest(rawPacket, socket)
                    : {}
                );

            if (isHeaderOverflow) commonParams.tags.push('header-overflow');

            const rawHeaders = parsedRequest.rawHeaders?.[0] && typeof parsedRequest.rawHeaders[0] === 'string'
                ? pairFlatRawHeaders(parsedRequest.rawHeaders as string[])
                : parsedRequest.rawHeaders as RawHeaders | undefined;

            const request: ClientError['request'] = {
                ...commonParams,
                httpVersion: parsedRequest.httpVersion || '1.1',
                method: parsedRequest.method,
                protocol: parsedRequest.protocol,
                url: parsedRequest.url,
                path: parsedRequest.path,
                headers: parsedRequest.headers || {},
                rawHeaders: rawHeaders || [],
                remoteIpAddress: socket.remoteAddress,
                remotePort: socket.remotePort,
                destination: parsedRequest.destination
            };

            let response: ClientError['response'];

            if (socket.writable) {
                response = {
                    ...commonParams,
                    headers: { 'connection': 'close' },
                    rawHeaders: [['Connection', 'close']],
                    trailers: {},
                    rawTrailers: [],
                    statusCode:
                        isHeaderOverflow
                            ? 431
                        : 400,
                    statusMessage:
                        isHeaderOverflow
                            ? "Request Header Fields Too Large"
                        : "Bad Request",
                    body: buildBodyReader(Buffer.from([]), {})
                };

                const responseBuffer = Buffer.from(
                    `HTTP/1.1 ${response.statusCode} ${response.statusMessage}\r\n` +
                    "Connection: close\r\n\r\n",
                    'ascii'
                );

                // Wait for the write to complete before we destroy() below
                await new Promise((resolve) => socket.write(responseBuffer, resolve));

                commonParams.timingEvents.headersSentTimestamp = now();
                commonParams.timingEvents.responseSentTimestamp = now();
            } else {
                response = 'aborted';
                commonParams.timingEvents.abortedTimestamp = now();
            }

            this.announceClientErrorAsync(emitter, socket, { errorCode, request, response });

            socket.on('error', () => {}); // Just announce the error to listeners, don't actually die from it
            socket.destroy(error);
        });
    }

    // Handle HTTP/2 client errors. This is a work in progress, but usefully reports
    // some of the most obvious cases.
    private handleInvalidHttp2Request(
        error: Error & { code?: string, errno?: number, badRequest?: ExtendedRawRequest },
        session: http2.Http2Session
    ) {
        // Unlike with HTTP/1.1, we have no control of the actual handling of
        // the error here, so this is just a matter of announcing the error to subscribers.

        const socket = session.initialSocket;
        const isTLS = socket instanceof tls.TLSSocket;

        const isBadPreface = (error.errno === -903);

        const rawHeaders = error.badRequest?.rawHeaders?.[0] && typeof error.badRequest?.rawHeaders[0] === 'string'
            ? pairFlatRawHeaders(error.badRequest?.rawHeaders as string[])
            : error.badRequest?.rawHeaders as RawHeaders | undefined;

        const timingEvents = buildSocketErrorRequestTimings(socket);
        timingEvents.abortedTimestamp = now();

        this.announceClientErrorAsync(this.eventEmitter, session.initialSocket, {
            errorCode: error.code,
            request: {
                id: crypto.randomUUID(),
                tags: [
                    `client-error:${error.code || 'UNKNOWN'}`,
                    ...(isBadPreface ? ['client-error:bad-preface'] : []),
                    ...getSocketMetadataTags(socket?.[SocketMetadata])
                ],
                httpVersion: error.badRequest?.httpVersion ?? '2',
                timingEvents,

                // Best guesses:
                protocol: error.badRequest?.protocol || (isTLS ? "https" : "http"),
                url: error.badRequest?.url ||
                    (isTLS ? `https://${(socket as tls.TLSSocket).servername}/` : undefined),

                path: error.badRequest?.path,
                headers: error.badRequest?.headers || {},
                rawHeaders: rawHeaders || [],
                destination: error.badRequest?.destination
            },
            response: 'aborted' // These h2 errors get no app-level response, just a shutdown.
        });
    }

    private outgoingPassthroughSockets: Set<net.Socket> = new Set();

    private passthroughSocket(
        type: 'raw' | 'tls',
        socket: net.Socket,
        hostname: string,
        port?: number
    ) {
        const emitter = this.eventEmitter;
        const targetPort = port ?? 443; // Should only be undefined on SNI-only TLS passthrough

        if (isSocketLoop(this.outgoingPassthroughSockets, socket)) {
            // Hard to reproduce: loops can only happen if a) SNI triggers this (because tunnels
            // require a repeated client request at each step) and b) the hostname points back to
            // us, and c) we're running on the default port. Still good to guard against though.
            console.warn(`Socket bypass loop for ${hostname}:${targetPort}`);
            resetOrDestroy(socket);
            return;
        }

        if (socket.closed) return; // Nothing to do

        let eventData: TlsPassthroughEvent | RawPassthroughEvent = Object.assign(
            type === 'raw'
                ? buildRawSocketEventData(socket)
                : buildTlsSocketEventData(socket as tls.TLSSocket),
            {
                id: crypto.randomUUID(),
                hostname: hostname, // Deprecated, but kept here for backward compat
                destination: { hostname, port: targetPort }
            }
        );

        setImmediate(() => emitter.emit(`${type}-passthrough-opened`, eventData));

        let upstreamSocket;
        if (type === 'raw' && socket[LastHopEncrypted]) {
            // Awkward edge case. If we are passing through raw data, but we've already unwrapped TLS beforehand,
            // we need to recreate the TLS for the passthrough. This is more art than science but we can
            // get pretty close to simulating the original incoming configuration:
            upstreamSocket = tls.connect({
                host: hostname,
                port: targetPort,
                servername: socket[TlsMetadata]?.sniHostname,
                // We have to mirror the ALPN protocols, which might be messy since we've actually already
                // negotiated one. Here we blindly hope (!) that we end up on the same page:
                ALPNProtocols: socket[TlsMetadata]?.clientAlpn,

                // We have no way to know what certs the client trusts and no config options for this yet, so
                // we just make do with default trust settings here in all cases.
            });
        } else if (type === 'tls' || type === 'raw') {
            // For raw traffic we pass through raw - not surprising. For TLS traffic this might be surprising,
            // but it's because a TLS tunnel is when we _don't_ terminate TLS ourselves, so we can't get inside
            // the tunnel at all here.
            upstreamSocket = net.connect({ host: hostname, port: targetPort });
        } else {
            throw new UnreachableCheck(type);
        }

        upstreamSocket.setNoDelay(true);

        socket.pipe(upstreamSocket);
        upstreamSocket.pipe(socket);

        if (type === 'raw') {
            socket.on('data', (data: Buffer) => {
                const eventTimestamp = now();
                setImmediate(() => {
                    emitter.emit('raw-passthrough-data', {
                        id: eventData.id,
                        direction: 'received',
                        content: data,
                        eventTimestamp
                    } satisfies RawPassthroughDataEvent);
                });
            });
            upstreamSocket.on('data', (data: Buffer) => {
                const eventTimestamp = now();
                setImmediate(() => {
                    emitter.emit('raw-passthrough-data', {
                        id: eventData.id,
                        direction: 'sent',
                        content: data,
                        eventTimestamp
                    } satisfies RawPassthroughDataEvent);
                });
            });
        }

        socket.on('error', (e: any) => {
            if (this.debug) console.warn(`Downstream ${type} passthrough error to ${hostname}:${targetPort}:`, e);
            eventData.tags.push(`${type}-passthrough-error:${e.code || 'UNKNOWN'}`);
            upstreamSocket.destroy();
        });

        upstreamSocket.on('error', (e: ErrorLike) => {
            if (this.debug) console.warn(`Upstream ${type} passthrough error to ${hostname}:${targetPort}:`, e);
            eventData.tags.push(`${type}-passthrough-error:${e.code || 'UNKNOWN'}`);
            socket.destroy()
        });

        upstreamSocket.on('close', () => socket.destroy());
        socket.on('close', () => {
            upstreamSocket.destroy();
            setImmediate(() => {
                emitter.emit(`${type}-passthrough-closed`, {
                    ...eventData,
                    timingEvents: {
                        ...eventData.timingEvents,
                        disconnectTimestamp: now()
                    }
                });
            });
        });

        upstreamSocket.once('connect', () => this.outgoingPassthroughSockets.add(upstreamSocket));
        upstreamSocket.once('close', () => this.outgoingPassthroughSockets.delete(upstreamSocket));

        if (this.debug) console.log(`Passing through bypassed ${type} connection to ${hostname}:${targetPort}${
            !port ? ' (assumed port)' : ''
        }`);
    }
}