import * as plugins from './plugins.js';
import * as paths from './paths.js';
import * as interfaces from './interfaces/index.js';
import { DockerContainer } from './classes.container.js';
import { DockerNetwork } from './classes.network.js';
import { DockerService } from './classes.service.js';
import { DockerSecret } from './classes.secret.js';
import { logger } from './logger.js';
import { DockerImageStore } from './classes.imagestore.js';
import { DockerImage } from './classes.image.js';

export interface IAuthData {
  serveraddress: string;
  username: string;
  password: string;
}

export interface IDockerHostConstructorOptions {
  socketPath?: string;
  imageStoreDir?: string;
}

export interface IHijackedStreamingResponse {
  stream: plugins.stream.Duplex;
  close: () => Promise<void>;
  statusCode: number;
  headers: plugins.http.IncomingHttpHeaders;
}

export class DockerHost {
  public options: IDockerHostConstructorOptions;

  /**
   * the path where the docker sock can be found
   */
  public socketPath: string;
  private registryToken: string = '';
  private imageStore: DockerImageStore; // Now private - use storeImage/retrieveImage instead
  public smartBucket!: plugins.smartbucket.SmartBucket;

  /**
   * the constructor to instantiate a new docker sock instance
   * @param pathArg
   */
  constructor(optionsArg: IDockerHostConstructorOptions) {
    this.options = {
      ...{
        imageStoreDir: plugins.path.join(
          paths.nogitDir,
          'temp-docker-image-store',
        ),
      },
      ...optionsArg,
    };
    let pathToUse: string;
    if (optionsArg.socketPath) {
      pathToUse = optionsArg.socketPath;
    } else if (process.env.DOCKER_HOST) {
      pathToUse = process.env.DOCKER_HOST;
    } else if (process.env.CI) {
      pathToUse = 'http://docker:2375/';
    } else {
      pathToUse = 'http://unix:/var/run/docker.sock:';
    }
    if (pathToUse.startsWith('unix:///')) {
      pathToUse = pathToUse.replace('unix://', 'http://unix:');
    }
    if (pathToUse.endsWith('.sock')) {
      pathToUse = pathToUse.replace('.sock', '.sock:');
    }
    console.log(`using docker sock at ${pathToUse}`);
    this.socketPath = pathToUse;
    this.imageStore = new DockerImageStore({
      bucketDir: null!,
      localDirPath: this.options.imageStoreDir!,
    });
  }

  public async start() {
    await this.imageStore.start();
  }
  public async stop() {
    await this.imageStore.stop();
    if (this.smartBucket) {
      this.smartBucket.storageClient.destroy();
    }
  }

  /**
   * Ping the Docker daemon to check if it's running and accessible
   * @returns Promise that resolves if Docker is available, rejects otherwise
   * @throws Error if Docker ping fails
   */
  public async ping(): Promise<void> {
    const response = await this.request('GET', '/_ping');
    if (response.statusCode !== 200) {
      throw new Error(`Docker ping failed with status ${response.statusCode}`);
    }
  }

  /**
   * Get Docker daemon version information
   * @returns Version info including Docker version, API version, OS, architecture, etc.
   */
  public async getVersion(): Promise<{
    Version: string;
    ApiVersion: string;
    MinAPIVersion?: string;
    GitCommit: string;
    GoVersion: string;
    Os: string;
    Arch: string;
    KernelVersion: string;
    BuildTime?: string;
  }> {
    const response = await this.request('GET', '/version');
    return response.body;
  }

  /**
   * authenticate against a registry
   * @param userArg
   * @param passArg
   */
  public async auth(authData: IAuthData) {
    const response = await this.request('POST', '/auth', authData);
    if (response.body.Status !== 'Login Succeeded') {
      console.log(`Login failed with ${response.body.Status}`);
      throw new Error(response.body.Status);
    }
    console.log(response.body.Status);
    this.registryToken = plugins.smartstring.base64.encode(
      plugins.smartjson.stringify(authData),
    );
  }

  /**
   * gets the token from the .docker/config.json file for GitLab registry
   */
  public async getAuthTokenFromDockerConfig(registryUrlArg: string) {
    const dockerConfigPath = plugins.smartpath.get.home(
      '~/.docker/config.json',
    );
    const configObject = JSON.parse(plugins.fs.readFileSync(dockerConfigPath, 'utf8'));
    const gitlabAuthBase64 = configObject.auths[registryUrlArg].auth;
    const gitlabAuth: string =
      plugins.smartstring.base64.decode(gitlabAuthBase64);
    const gitlabAuthArray = gitlabAuth.split(':');
    await this.auth({
      username: gitlabAuthArray[0],
      password: gitlabAuthArray[1],
      serveraddress: registryUrlArg,
    });
  }

  // ==============
  // NETWORKS - Public Factory API
  // ==============

  /**
   * Lists all networks
   */
  public async listNetworks() {
    return await DockerNetwork._list(this);
  }

  /**
   * Gets a network by name
   */
  public async getNetworkByName(networkNameArg: string) {
    return await DockerNetwork._fromName(this, networkNameArg);
  }

  /**
   * Creates a network
   */
  public async createNetwork(
    descriptor: interfaces.INetworkCreationDescriptor,
  ) {
    return await DockerNetwork._create(this, descriptor);
  }

  // ==============
  // CONTAINERS - Public Factory API
  // ==============

  /**
   * Lists all containers
   */
  public async listContainers() {
    return await DockerContainer._list(this);
  }

  /**
   * Gets a container by ID
   * Returns undefined if container does not exist
   */
  public async getContainerById(containerId: string): Promise<DockerContainer | undefined> {
    return await DockerContainer._fromId(this, containerId);
  }

  /**
   * Creates a container
   */
  public async createContainer(
    descriptor: interfaces.IContainerCreationDescriptor,
  ) {
    return await DockerContainer._create(this, descriptor);
  }

  // ==============
  // SERVICES - Public Factory API
  // ==============

  /**
   * Lists all services
   */
  public async listServices() {
    return await DockerService._list(this);
  }

  /**
   * Gets a service by name
   */
  public async getServiceByName(serviceName: string) {
    return await DockerService._fromName(this, serviceName);
  }

  /**
   * Creates a service
   */
  public async createService(
    descriptor: interfaces.IServiceCreationDescriptor,
  ) {
    return await DockerService._create(this, descriptor);
  }

  // ==============
  // IMAGES - Public Factory API
  // ==============

  /**
   * Lists all images
   */
  public async listImages() {
    return await DockerImage._list(this);
  }

  /**
   * Gets an image by name
   */
  public async getImageByName(imageNameArg: string) {
    return await DockerImage._fromName(this, imageNameArg);
  }

  /**
   * Creates an image from a registry
   */
  public async createImageFromRegistry(
    descriptor: interfaces.IImageCreationDescriptor,
  ) {
    return await DockerImage._createFromRegistry(this, {
      creationObject: descriptor,
    });
  }

  /**
   * Creates an image from a tar stream
   */
  public async createImageFromTarStream(
    tarStream: plugins.smartstream.stream.Readable,
    descriptor: interfaces.IImageCreationDescriptor,
  ) {
    return await DockerImage._createFromTarStream(this, {
      creationObject: descriptor,
      tarStream: tarStream,
    });
  }

  /**
   * Prune unused images
   * @param options Optional filters (dangling, until, label)
   * @returns Object with deleted images and space reclaimed
   */
  public async pruneImages(options?: {
    dangling?: boolean;
    filters?: Record<string, string[]>;
  }): Promise<{
    ImagesDeleted: Array<{ Untagged?: string; Deleted?: string }>;
    SpaceReclaimed: number;
  }> {
    const filters: Record<string, string[]> = options?.filters || {};

    // Add dangling filter if specified
    if (options?.dangling !== undefined) {
      filters.dangling = [options.dangling.toString()];
    }

    let route = '/images/prune';
    if (filters && Object.keys(filters).length > 0) {
      route += `?filters=${encodeURIComponent(JSON.stringify(filters))}`;
    }

    const response = await this.request('POST', route);

    return response.body;
  }

  /**
   * Builds an image from a Dockerfile
   */
  public async buildImage(imageTag: string) {
    return await DockerImage._build(this, imageTag);
  }

  // ==============
  // SECRETS - Public Factory API
  // ==============

  /**
   * Lists all secrets
   */
  public async listSecrets() {
    return await DockerSecret._list(this);
  }

  /**
   * Gets a secret by name
   */
  public async getSecretByName(secretName: string) {
    return await DockerSecret._fromName(this, secretName);
  }

  /**
   * Gets a secret by ID
   */
  public async getSecretById(secretId: string) {
    return await DockerSecret._fromId(this, secretId);
  }

  /**
   * Creates a secret
   */
  public async createSecret(
    descriptor: interfaces.ISecretCreationDescriptor,
  ) {
    return await DockerSecret._create(this, descriptor);
  }

  // ==============
  // IMAGE STORE - Public API
  // ==============

  /**
   * Stores an image in the local image store
   */
  public async storeImage(
    imageName: string,
    tarStream: plugins.smartstream.stream.Readable,
  ): Promise<void> {
    return await this.imageStore.storeImage(imageName, tarStream);
  }

  /**
   * Retrieves an image from the local image store
   */
  public async retrieveImage(
    imageName: string,
  ): Promise<plugins.smartstream.stream.Readable> {
    return await this.imageStore.getImage(imageName);
  }

  /**
   *
   */
  public async getEventObservable(): Promise<plugins.rxjs.Observable<any>> {
    const response = await this.requestStreaming('GET', '/events');

    // requestStreaming now returns Node.js stream, not web stream
    const nodeStream = response as plugins.smartstream.stream.Readable;

    return plugins.rxjs.Observable.create((observer) => {
      nodeStream.on('data', (data) => {
        const eventString = data.toString();
        try {
          const eventObject = JSON.parse(eventString);
          observer.next(eventObject);
        } catch (e) {
          console.log(e);
        }
      });
      nodeStream.on('error', (err) => {
        // Connection resets are expected when the stream is destroyed
        if ((err as any).code !== 'ECONNRESET') {
          observer.error(err);
        }
      });
      return () => {
        nodeStream.destroy();
      };
    });
  }

  /**
   * activates docker swarm
   */
  public async activateSwarm(addvertisementIpArg?: string) {
    // determine advertisement address
    let addvertisementIp: string = '';
    if (addvertisementIpArg) {
      addvertisementIp = addvertisementIpArg;
    } else {
      try {
        const smartnetworkInstance = new plugins.smartnetwork.SmartNetwork();
        const defaultGateway = await smartnetworkInstance.getDefaultGateway();
        if (defaultGateway) {
          addvertisementIp = defaultGateway.ipv4.address;
        }
      } catch (err) {
        // Failed to determine default gateway (e.g. in Deno without --allow-run)
        // Docker will auto-detect the advertise address
      }
    }

    const response = await this.request('POST', '/swarm/init', {
      ListenAddr: '0.0.0.0:2377',
      AdvertiseAddr: addvertisementIp,
      DataPathPort: 4789,
      DefaultAddrPool: ['10.10.0.0/8', '20.20.0.0/8'],
      SubnetSize: 24,
      ForceNewCluster: false,
    });
    if (response.statusCode === 200) {
      logger.log('info', 'created Swam succesfully');
    } else {
      logger.log('error', 'could not initiate swarm');
    }
  }

  /**
   * fire a request
   */
  public async request(methodArg: string, routeArg: string, dataArg = {}) {
    const requestUrl = `${this.socketPath}${routeArg}`;

    // Build the request using the fluent API
    const smartRequest = plugins.smartrequest.SmartRequest.create()
      .url(requestUrl)
      .header('Content-Type', 'application/json')
      .header('X-Registry-Auth', this.registryToken)
      .header('Host', 'docker.sock')
      .options({ keepAlive: false });

    // Add body for methods that support it
    if (dataArg && Object.keys(dataArg).length > 0) {
      smartRequest.json(dataArg);
    }

    // Execute the request based on method
    let response;
    switch (methodArg.toUpperCase()) {
      case 'GET':
        response = await smartRequest.get();
        break;
      case 'POST':
        response = await smartRequest.post();
        break;
      case 'PUT':
        response = await smartRequest.put();
        break;
      case 'DELETE':
        response = await smartRequest.delete();
        break;
      default:
        throw new Error(`Unsupported HTTP method: ${methodArg}`);
    }

    // Parse the response body based on content type
    let body;
    const contentType = response.headers['content-type'] || '';

    // Docker's streaming endpoints (like /images/create) return newline-delimited JSON
    // which can't be parsed as a single JSON object
    const isStreamingEndpoint =
      routeArg.includes('/images/create') ||
      routeArg.includes('/images/load') ||
      routeArg.includes('/build');

    if (contentType.includes('application/json') && !isStreamingEndpoint) {
      body = await response.json();
    } else {
      body = await response.text();
      // Try to parse as JSON if it looks like JSON and is not a streaming response
      if (
        !isStreamingEndpoint &&
        body &&
        (body.startsWith('{') || body.startsWith('['))
      ) {
        try {
          body = JSON.parse(body);
        } catch {
          // Keep as text if parsing fails
        }
      }
    }

    // Create a response object compatible with existing code
    const legacyResponse = {
      statusCode: response.status,
      body: body,
      headers: response.headers,
    };

    if (response.status !== 200) {
      console.log(body);
    }

    return legacyResponse;
  }

  public async requestStreaming(
    methodArg: string,
    routeArg: string,
    readStream?: plugins.smartstream.stream.Readable,
    jsonData?: any,
  ): Promise<plugins.smartstream.stream.Readable | { statusCode: number; body: string; headers: any }> {
    const requestUrl = `${this.socketPath}${routeArg}`;

    // Build the request using the fluent API
    const smartRequest = plugins.smartrequest.SmartRequest.create()
      .url(requestUrl)
      .header('Content-Type', 'application/json')
      .header('X-Registry-Auth', this.registryToken)
      .header('Host', 'docker.sock')
      .timeout(30000)
      .options({ keepAlive: false, autoDrain: true }); // Disable auto-drain for streaming

    // If we have JSON data, add it to the request
    if (jsonData && Object.keys(jsonData).length > 0) {
      smartRequest.json(jsonData);
    }

    // If we have a readStream, use the new stream method with logging
    if (readStream) {
      let counter = 0;
      const smartduplex = new plugins.smartstream.SmartDuplex({
        writeFunction: async (chunkArg) => {
          if (counter % 1000 === 0) {
            console.log(`posting chunk ${counter}`);
          }
          counter++;
          return chunkArg;
        },
      });

      // Pipe through the logging duplex stream
      const loggedStream = readStream.pipe(smartduplex);

      // Use the new stream method to stream the data
      smartRequest.stream(loggedStream, 'application/octet-stream');
    }

    // Execute the request based on method
    let response: plugins.smartrequest.ICoreResponse;
    switch (methodArg.toUpperCase()) {
      case 'GET':
        response = await smartRequest.get();
        break;
      case 'POST':
        response = await smartRequest.post();
        break;
      case 'PUT':
        response = await smartRequest.put();
        break;
      case 'DELETE':
        response = await smartRequest.delete();
        break;
      default:
        throw new Error(`Unsupported HTTP method: ${methodArg}`);
    }

    console.log(response.status);

    // For streaming responses, get the web stream
    const webStream = response.stream();

    if (!webStream) {
      // If no stream is available, consume the body as text
      const body = await response.text();
      console.log(body);

      // Return a compatible response object
      return {
        statusCode: response.status,
        body: body,
        headers: response.headers,
      };
    }

    // Convert web ReadableStream to Node.js stream for backward compatibility
    const nodeStream = plugins.smartstream.nodewebhelpers.convertWebReadableToNodeReadable(webStream);

    // Add a default error handler to prevent unhandled 'error' events from crashing the process.
    // Callers that attach their own 'error' listener will still receive the event.
    nodeStream.on('error', () => {});

    // Add properties for compatibility
    (nodeStream as any).statusCode = response.status;
    (nodeStream as any).body = ''; // For compatibility

    return nodeStream;
  }

  public async requestHijackedStreaming(
    methodArg: string,
    routeArg: string,
    jsonData: Record<string, unknown> = {},
  ): Promise<IHijackedStreamingResponse> {
    const body = JSON.stringify(jsonData);
    const headers: Record<string, string | number> = {
      'Content-Type': 'application/json',
      'Content-Length': Buffer.byteLength(body),
      'Connection': 'Upgrade',
      'Upgrade': 'tcp',
      'X-Registry-Auth': this.registryToken,
      'Host': 'docker.sock',
    };

    if (this.socketPath.startsWith('http://unix:')) {
      return await this.requestHijackedStreamingOverRawSocket(methodArg, routeArg, body, headers);
    }

    const { requestModule, options } = this.getNodeRequestOptions(methodArg, routeArg, headers);

    return await new Promise<IHijackedStreamingResponse>((resolve, reject) => {
      const request = requestModule.request(options, (response) => {
        if ((response.statusCode || 0) >= 400) {
          this.collectErrorResponse(response).then((bodyText) => {
            reject(new Error(`Docker hijack request failed with HTTP ${response.statusCode}: ${bodyText}`));
          }).catch(reject);
          return;
        }

        if (!response.socket) {
          reject(new Error('Docker hijack response did not include a socket'));
          return;
        }

        resolve({
          stream: this.createDuplexForHijackedResponse(response, response.socket),
          close: async () => {
            response.destroy();
            response.socket?.destroy();
          },
          statusCode: response.statusCode || 0,
          headers: response.headers,
        });
      });

      request.on('upgrade', (response, socket, head) => {
        if (head.length > 0) {
          socket.unshift(head);
        }
        resolve({
          stream: socket,
          close: async () => {
            socket.destroy();
          },
          statusCode: response.statusCode || 0,
          headers: response.headers,
        });
      });

      request.on('error', reject);
      request.end(body);
    });
  }

  private async requestHijackedStreamingOverRawSocket(
    methodArg: string,
    routeArg: string,
    bodyArg: string,
    headersArg: Record<string, string | number>,
  ): Promise<IHijackedStreamingResponse> {
    const socketPath = this.socketPath.slice('http://unix:'.length, -1);
    const denoGlobal = (globalThis as any).Deno;
    if (denoGlobal?.connect) {
      return await this.requestHijackedStreamingOverDenoUnixSocket(
        socketPath,
        methodArg,
        routeArg,
        bodyArg,
        headersArg,
      );
    }

    const socket = plugins.net.connect(socketPath);
    const requestHead = [
      `${methodArg.toUpperCase()} ${routeArg} HTTP/1.1`,
      ...Object.entries(headersArg).map(([key, value]) => `${key}: ${value}`),
      '',
      bodyArg,
    ].join('\r\n');

    return await new Promise<IHijackedStreamingResponse>((resolve, reject) => {
      let settled = false;
      let responseBuffer = Buffer.alloc(0);

      const cleanupBeforeResolve = () => {
        socket.off('data', handleData);
        socket.off('error', handleConnectError);
      };

      const handleConnectError = (errorArg: Error) => {
        if (!settled) {
          settled = true;
          reject(errorArg);
        }
      };

      const rejectWithDockerError = (statusCodeArg: number, bodyHeadArg: Buffer) => {
        const chunks: Buffer[] = [];
        if (bodyHeadArg.length > 0) {
          chunks.push(bodyHeadArg);
        }
        socket.on('data', (chunkArg) => chunks.push(Buffer.isBuffer(chunkArg) ? chunkArg : Buffer.from(chunkArg)));
        socket.on('end', () => {
          reject(new Error(`Docker hijack request failed with HTTP ${statusCodeArg}: ${Buffer.concat(chunks).toString('utf8')}`));
        });
        socket.end();
      };

      const handleData = (chunkArg: Buffer) => {
        responseBuffer = Buffer.concat([responseBuffer, chunkArg]);
        const headerEndIndex = responseBuffer.indexOf('\r\n\r\n');
        if (headerEndIndex === -1) {
          return;
        }

        cleanupBeforeResolve();
        const headerBuffer = responseBuffer.subarray(0, headerEndIndex);
        const bodyHead = responseBuffer.subarray(headerEndIndex + 4);
        const { statusCode, headers } = this.parseRawHttpResponseHeaders(headerBuffer.toString('utf8'));

        if (statusCode >= 400) {
          if (!settled) {
            settled = true;
            rejectWithDockerError(statusCode, bodyHead);
          }
          return;
        }

        if (!settled) {
          settled = true;
          resolve({
            stream: this.createDuplexForRawSocket(socket, bodyHead),
            close: async () => {
              socket.destroy();
            },
            statusCode,
            headers,
          });
        }
      };

      socket.once('connect', () => {
        socket.write(requestHead);
      });
      socket.on('data', handleData);
      socket.on('error', handleConnectError);
    });
  }

  private async requestHijackedStreamingOverDenoUnixSocket(
    socketPathArg: string,
    methodArg: string,
    routeArg: string,
    bodyArg: string,
    headersArg: Record<string, string | number>,
  ): Promise<IHijackedStreamingResponse> {
    const denoGlobal = (globalThis as any).Deno;
    const conn = await denoGlobal.connect({ transport: 'unix', path: socketPathArg });
    const requestHead = [
      `${methodArg.toUpperCase()} ${routeArg} HTTP/1.1`,
      ...Object.entries(headersArg).map(([key, value]) => `${key}: ${value}`),
      '',
      bodyArg,
    ].join('\r\n');
    await conn.write(new TextEncoder().encode(requestHead));

    let responseBuffer = Buffer.alloc(0);
    const readBuffer = new Uint8Array(65536);
    while (true) {
      const bytesRead = await conn.read(readBuffer);
      if (bytesRead === null) {
        conn.close();
        throw new Error('Docker hijack connection closed before response headers were received');
      }
      responseBuffer = Buffer.concat([
        responseBuffer,
        Buffer.from(readBuffer.subarray(0, bytesRead)),
      ]);
      const headerEndIndex = responseBuffer.indexOf('\r\n\r\n');
      if (headerEndIndex === -1) {
        continue;
      }

      const headerBuffer = responseBuffer.subarray(0, headerEndIndex);
      const bodyHead = responseBuffer.subarray(headerEndIndex + 4);
      const { statusCode, headers } = this.parseRawHttpResponseHeaders(headerBuffer.toString('utf8'));
      if (statusCode >= 400) {
        conn.close();
        throw new Error(`Docker hijack request failed with HTTP ${statusCode}: ${bodyHead.toString('utf8')}`);
      }

      const stream = this.createDuplexForDenoConn(conn, bodyHead);
      return {
        stream,
        close: async () => {
          stream.destroy();
        },
        statusCode,
        headers,
      };
    }
  }

  private getNodeRequestOptions(
    methodArg: string,
    routeArg: string,
    headersArg: Record<string, string | number>,
  ): {
    requestModule: typeof plugins.http | typeof plugins.https;
    options: plugins.http.RequestOptions | plugins.https.RequestOptions;
  } {
    const options: plugins.http.RequestOptions | plugins.https.RequestOptions = {
      method: methodArg.toUpperCase(),
      headers: headersArg,
    };

    if (this.socketPath.startsWith('http://unix:')) {
      options.socketPath = this.socketPath.slice('http://unix:'.length, -1);
      options.path = routeArg;
      return { requestModule: plugins.http, options };
    }

    const requestUrl = new URL(routeArg, this.socketPath);
    options.protocol = requestUrl.protocol;
    options.hostname = requestUrl.hostname;
    options.port = requestUrl.port;
    options.path = `${requestUrl.pathname}${requestUrl.search}`;
    return {
      requestModule: requestUrl.protocol === 'https:' ? plugins.https : plugins.http,
      options,
    };
  }

  private createDuplexForHijackedResponse(
    responseArg: plugins.http.IncomingMessage,
    writableArg: plugins.stream.Duplex,
  ): plugins.stream.Duplex {
    const duplex = new plugins.stream.Duplex({
      write(chunkArg, encodingArg, callbackArg) {
        writableArg.write(chunkArg, encodingArg, callbackArg);
      },
      read() {},
    });

    responseArg.on('data', (chunkArg) => duplex.push(chunkArg));
    responseArg.on('end', () => duplex.push(null));
    responseArg.on('error', (errorArg) => duplex.destroy(errorArg));
    writableArg.on('error', (errorArg) => duplex.destroy(errorArg));
    duplex.on('finish', () => writableArg.end());
    return duplex;
  }

  private createDuplexForRawSocket(
    socketArg: plugins.net.Socket,
    bodyHeadArg: Buffer,
  ): plugins.stream.Duplex {
    const duplex = new plugins.stream.Duplex({
      write(chunkArg, encodingArg, callbackArg) {
        socketArg.write(chunkArg, encodingArg, callbackArg);
      },
      read() {},
    });

    if (bodyHeadArg.length > 0) {
      duplex.push(bodyHeadArg);
    }
    socketArg.on('data', (chunkArg) => duplex.push(chunkArg));
    socketArg.on('end', () => duplex.push(null));
    socketArg.on('error', (errorArg) => duplex.destroy(errorArg));
    duplex.on('finish', () => socketArg.end());
    return duplex;
  }

  private createDuplexForDenoConn(
    connArg: any,
    bodyHeadArg: Buffer,
  ): plugins.stream.Duplex {
    let closed = false;
    const closeConn = () => {
      if (closed) {
        return;
      }
      closed = true;
      try {
        connArg.close();
      } catch {}
    };

    const duplex = new plugins.stream.Duplex({
      async write(chunkArg, encodingArg, callbackArg) {
        try {
          const chunkBuffer = Buffer.isBuffer(chunkArg)
            ? chunkArg
            : Buffer.from(chunkArg, encodingArg);
          await connArg.write(new Uint8Array(chunkBuffer));
          callbackArg();
        } catch (error) {
          callbackArg(error as Error);
        }
      },
      read() {},
      destroy(errorArg, callbackArg) {
        closeConn();
        callbackArg(errorArg || null);
      },
    });

    if (bodyHeadArg.length > 0) {
      duplex.push(bodyHeadArg);
    }

    const readLoop = async () => {
      const readBuffer = new Uint8Array(65536);
      try {
        while (!duplex.destroyed) {
          const bytesRead = await connArg.read(readBuffer);
          if (bytesRead === null) {
            break;
          }
          if (bytesRead > 0) {
            duplex.push(Buffer.from(readBuffer.subarray(0, bytesRead)));
          }
        }
        duplex.push(null);
      } catch (error) {
        if (!closed && !duplex.destroyed) {
          duplex.destroy(error as Error);
        }
      }
    };
    void readLoop();
    duplex.on('finish', () => {
      closeConn();
    });
    return duplex;
  }

  private parseRawHttpResponseHeaders(headerTextArg: string): {
    statusCode: number;
    headers: plugins.http.IncomingHttpHeaders;
  } {
    const [statusLine, ...headerLines] = headerTextArg.split('\r\n');
    const statusCode = Number(statusLine.split(' ')[1]);
    const headers: plugins.http.IncomingHttpHeaders = {};
    for (const headerLine of headerLines) {
      const separatorIndex = headerLine.indexOf(':');
      if (separatorIndex === -1) {
        continue;
      }
      const key = headerLine.slice(0, separatorIndex).trim().toLowerCase();
      const value = headerLine.slice(separatorIndex + 1).trim();
      headers[key] = value;
    }
    return { statusCode, headers };
  }

  private async collectErrorResponse(responseArg: plugins.http.IncomingMessage): Promise<string> {
    const chunks: Buffer[] = [];
    for await (const chunk of responseArg) {
      chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk));
    }
    return Buffer.concat(chunks).toString('utf8');
  }

  /**
   * add s3 storage
   * @param optionsArg
   */
  public async addS3Storage(optionsArg: plugins.tsclass.storage.IS3Descriptor) {
    this.smartBucket = new plugins.smartbucket.SmartBucket(optionsArg);
    if (!optionsArg.bucketName) {
      throw new Error('bucketName is required');
    }
    const bucket = await this.smartBucket.getBucketByName(
      optionsArg.bucketName,
    );
    let wantedDirectory = await bucket.getBaseDirectory();
    if (optionsArg.directoryPath) {
      wantedDirectory = await wantedDirectory.getSubDirectoryByName(
        optionsArg.directoryPath,
      );
    }
    this.imageStore.options.bucketDir = wantedDirectory;
  }
}
