import { Batch, IHeaderValue } from "./models/types";
import { Eno } from "./models/Eno";
import { Observable, throwError } from "rxjs";
import got, { OptionsOfTextResponseBody, Response } from "got";
import { IEnSrvOptions } from "./IEnSrvOptions";
import { getToken, hasToken, setToken } from "./sessionTokenCache";
import { isArray, set, pick, get, omit, size, each, unset, reduce } from "lodash";
import jwtDecode from "jwt-decode";
import { map, tap } from "rxjs/operators";
import { Agent as HttpsAgent } from 'https';
import { Agent as HttpAgent } from 'http';
import { IQueryOption } from "./query";

const SessionTokenHeader = "session-token";
const THIN_OPERATIONS = ["op/pull", "op/formula", "op/query"];
const SessionIdHeader = "Session-Id";
const httpAgent = new HttpAgent({ keepAlive: true });
const httpsAgent = new HttpsAgent({ keepAlive: true });
const retryIfFasterThanMs = 5000; // Only retry requests that fail faster than this number of millis

// Sends a batch to EnSrv
// If a new session token is return it is updated in place in the options
export function send(batch: Batch, options: IEnSrvOptions, queryOptions?: IQueryOption): Observable<Batch> {
  const requestOptions: OptionsOfTextResponseBody = {
    json: batch,
    responseType: "text",
    resolveBodyOnly: false,
    throwHttpErrors: false,
    headers: {
      "Content-type": ["application/json"],
      "Conn-Isolation": ["true"],
      "User-Agent": "ElasticNogginSDK",
    },
    agent: {
      http: httpAgent,
      https: httpsAgent,
    },
    retry: {
      methods: ['GET', 'POST'], // By default Got will not retry POST requests
      limit: 2,                 // Retry twice (a total of 3 times)
    },
    hooks: {
      beforeRetry: [
        (error, retryCount) => {
          const elapsedMs = retryCount.timings?.end-retryCount.timings?.start;
          if (isNaN(elapsedMs) || elapsedMs < retryIfFasterThanMs) {
            console.warn('[ElasticNogginSDK/Send] Retrying failed request', { elapsedMs, ...retryCount, namespace: options.namespace });
          } else {
            console.warn('[ElasticNogginSDK/Send] Not retrying failed request', { elapsedMs, ...retryCount, namespace: options.namespace });
            throw new Error('Send failed');
          }
        }
      ]
    }
  };
  if (options.clientIp) {
    set(requestOptions, ["headers", "encloud-clientip"], [options.clientIp]);
  }
  if (options.clientVia) {
    set(requestOptions, ["headers", "encloud-via"], [options.clientVia]);
  }
  if (options.bulk) {
    set(requestOptions, ["headers", "encloud-bulk"], ["true"]);
  }
  if (options.sessionToken) {
    requestOptions.headers[SessionTokenHeader] = options.sessionToken;
  } else if (options.useSharedAnonymousSession && hasToken(options.namespace)) {
    requestOptions.headers[SessionTokenHeader] = getToken(options.namespace);
  }
  if (options.useCurrentSession !== false) {
    try {
      requestOptions.headers[SessionIdHeader] = getSessionId(
        options.sessionToken
      );
    } catch (err) {
      // Generate a new session if it fails
    }
  }
  if (requestOptions.headers[SessionTokenHeader] && options.maintainInitialSessionToken && !options.initialSessionToken) {
    let sessionToken = requestOptions.headers[SessionTokenHeader];
    if (isArray(sessionToken)) {
      sessionToken = sessionToken[0];
    }
    options.initialSessionToken = sessionToken;
  }
  if (options.additionalHeaders) {
    each(options.additionalHeaders, (value, key) =>
      set(requestOptions, ["headers", key], value)
    );
  }

  updateRequestOptions(batch, options, requestOptions);
  if (options.additionalQueryString) {
    each(
      options.additionalQueryString,
      (value, key) =>
        (requestOptions.url += "&" + encodeURIComponent(key) + "=" + encodeURIComponent(value))
    );
  }

  // The abort signal has already been fired
  if (options.abortController?.signal.aborted) {
    if (options.debug) {
      console.debug(
        "[ElasticNogginSDK/Send] Already aborted",
        {
          url: requestOptions.url,
          requestHeaders: omit(get(requestOptions, "headers"), [SessionTokenHeader, 'Authorization']),
          namespace: options.namespace
        }
      );
    }
    return throwError(() => new Error('Request aborted'));
  }

  const postPromise = got.post(requestOptions);
  let subscriberCount = 0;
  const post$ = new Observable<Response<string>>(observer => {
    subscriberCount++;
    if (options.abortController) {
      options.abortController.signal.addEventListener('abort', (err) => {
        postPromise.cancel();
        if (options.debug) {
          console.debug(
            "[ElasticNogginSDK/Send] Abort signal",
            {
              url: requestOptions.url,
              requestHeaders: omit(get(requestOptions, "headers"), [SessionTokenHeader, 'Authorization']),
              namespace: options.namespace,
            }
          );
        }
        observer.error(new Error('Request aborted'))
      });
    }
    let isDone = false;
    postPromise.then(response => {
      isDone = true;
      observer.next(response)
      observer.complete();
    }).catch(err => {
      observer.error(err);
    });
    return () => {
      subscriberCount--;
      if (subscriberCount === 0 && !isDone && !postPromise.isCanceled) {
        if (options.debug) {
          console.debug(
            "[ElasticNogginSDK/Send] All unsubscribed before response",
            {
              url: requestOptions.url,
              requestHeaders: omit(get(requestOptions, "headers"), [SessionTokenHeader, 'Authorization']),
              namespace: options.namespace,
            }
          );
        }
        postPromise.cancel();
      }
    }
  });

  return post$.pipe(
    tap((response) => {
      if (options.debug) {
        console.debug("[ElasticNogginSDK/Send] Response", {
          url: requestOptions.url,
          requestHeaders: omit(get(requestOptions, "headers"), [SessionTokenHeader, 'Authorization']),
          responseHeaders: omit(get(response, "headers"), [SessionTokenHeader, 'Authorization']),
          time: pick(response, [
            "elapsedTime",
            "timingStart",
            "timings",
            "timingPhases",
          ]),
          namespace: options.namespace,
          statusCode: get(response, "statusCode"),
          requestBodySize: size(requestOptions.body),
          responseBodySize: size(requestOptions.body),
        });
      }
    }),
    map((response) => {
      if (response.statusCode == 200) {     
        if (queryOptions) {
          const responseHeadersValues = reduce(
            queryOptions.responseHeadersToInclude as string[],
            (headerValues: IHeaderValue[], currentHeader: string) => {
              if (!headerValues) {
                headerValues = [];
              }
              const currentHeaderValue = {
                [currentHeader]: get(
                  response,
                  ["headers", currentHeader],
                  null
                ),
              } as IHeaderValue;
              headerValues.push(currentHeaderValue);
              return headerValues;
            },
            null
          );
          queryOptions.responseHeadersToInclude = responseHeadersValues;
        }
        let newSessionToken = response.headers[SessionTokenHeader];
        if (isArray(newSessionToken)) {
          newSessionToken = newSessionToken[0];
        }
        if (newSessionToken) {
          // Set initialSessionToken if it hasn't been set yet. The initialSessionToken
          // is used when invoking a op/process/status
          if (options.maintainInitialSessionToken && !options.initialSessionToken) {
            options.initialSessionToken = newSessionToken;
          }

          if (options.useSharedAnonymousSession) {
            setToken(options.namespace, newSessionToken);
          } else {
            options.sessionToken = newSessionToken;
          }
        }
        const parsedBody = JSON.parse(response.body);
        return parsedBody.map((obj: any) => new Eno(obj));
      } else {
        throw { message: response.body, code: response.statusCode };
      }
    })
  );
}

export function updateRequestOptions(batch: Batch, options: IEnSrvOptions, requestOptions: OptionsOfTextResponseBody ) {
  if (batch.length === 1) {
    const type = batch[0].getType();
    if (options.useQueryService && type === 'op/query') {
      requestOptions.url = options.enSrvUrl.replace(/\/ensrv\/+$/, "/query/ensrv");
      const sessionToken = get(requestOptions, ['headers', SessionTokenHeader]);
      if (sessionToken) {
        set(requestOptions, ['headers', 'Authorization'], 'Bearer ' + sessionToken);
        unset(requestOptions, ['headers', SessionTokenHeader]);
        unset(requestOptions, ['headers', SessionIdHeader]);
      }
      set(requestOptions, ['headers', 'en-namespace'], options.namespace);
      return;
    }
    if (THIN_OPERATIONS.indexOf(type) > -1) {
      requestOptions.url = options.enSrvUrl.replace(/\/+$/, "") + "/" + type + "?ns=" + encodeURIComponent(options.namespace);
      return;
    }
  }
  requestOptions.url = options.enSrvUrl + "?ns=" + encodeURIComponent(options.namespace);
}

function getSessionId(sessionToken: string): string {
  const tokenPayload = jwtDecode(
    Buffer.from(sessionToken, "base64").toString()
  );
  return get(tokenPayload, "sessionId");
}
