/**
 * Copyright 2024 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import {
  ExporterOptions,
  MetricExporter,
} from '@google-cloud/opentelemetry-cloud-monitoring-exporter';
import { TraceExporter } from '@google-cloud/opentelemetry-cloud-trace-exporter';
import { GcpDetectorSync } from '@google-cloud/opentelemetry-resource-util';
import { Span, SpanStatusCode, TraceFlags } from '@opentelemetry/api';
import { getNodeAutoInstrumentations } from '@opentelemetry/auto-instrumentations-node';
import {
  ExportResult,
  hrTimeDuration,
  hrTimeToMilliseconds,
} from '@opentelemetry/core';
import { Instrumentation } from '@opentelemetry/instrumentation';
import { PinoInstrumentation } from '@opentelemetry/instrumentation-pino';
import { WinstonInstrumentation } from '@opentelemetry/instrumentation-winston';
import { Resource } from '@opentelemetry/resources';
import {
  AggregationTemporality,
  DefaultAggregation,
  ExponentialHistogramAggregation,
  InMemoryMetricExporter,
  InstrumentType,
  PeriodicExportingMetricReader,
  PushMetricExporter,
  ResourceMetrics,
} from '@opentelemetry/sdk-metrics';
import { NodeSDKConfiguration } from '@opentelemetry/sdk-node';
import {
  BatchSpanProcessor,
  InMemorySpanExporter,
  ReadableSpan,
  SpanExporter,
} from '@opentelemetry/sdk-trace-base';
import { GENKIT_VERSION } from 'genkit';
import { logger } from 'genkit/logging';
import { PathMetadata } from 'genkit/tracing';
import { actionTelemetry } from './telemetry/action.js';
import { engagementTelemetry } from './telemetry/engagement.js';
import { featuresTelemetry } from './telemetry/feature.js';
import { generateTelemetry } from './telemetry/generate.js';
import { pathsTelemetry } from './telemetry/path.js';
import { GcpTelemetryConfig } from './types.js';
import {
  extractErrorName,
  metricsDenied,
  metricsDeniedHelpText,
  tracingDenied,
  tracingDeniedHelpText,
} from './utils.js';

let metricExporter: PushMetricExporter;
let spanProcessor: BatchSpanProcessor;
let spanExporter: AdjustingTraceExporter;

/**
 * Provides a {TelemetryConfig} for exporting OpenTelemetry data (Traces,
 * Metrics, and Logs) to the Google Cloud Operations Suite.
 */
export class GcpOpenTelemetry {
  private readonly config: GcpTelemetryConfig;
  private readonly resource: Resource;

  constructor(config: GcpTelemetryConfig) {
    this.config = config;
    this.resource = new Resource({ type: 'global' }).merge(
      new GcpDetectorSync().detect()
    );
  }

  /**
   * Log hook for writing trace and span metadata to log messages in the format
   * required by GCP.
   */
  private gcpTraceLogHook = (span: Span, record: any) => {
    const spanContext = span.spanContext();
    const isSampled = !!(spanContext.traceFlags & TraceFlags.SAMPLED);
    const projectId = this.config.projectId;

    record['logging.googleapis.com/trace'] ??=
      `projects/${projectId}/traces/${spanContext.traceId}`;
    record['logging.googleapis.com/trace_sampled'] ??= isSampled ? '1' : '0';
    record['logging.googleapis.com/spanId'] ??= spanContext.spanId;

    // Clear out the duplicate trace and span information in the log metadata.
    // These will be incorrect for logs written during span export time since
    // the logs are written after the span has fully executed. Those logs are
    // explicitly tied to the correct span in createCommonLogAttributes in
    // utils.ts.
    delete record['span_id'];
    delete record['trace_id'];
    delete record['trace_flags'];
  };

  async getConfig(): Promise<Partial<NodeSDKConfiguration>> {
    spanProcessor = new BatchSpanProcessor(await this.createSpanExporter());
    return {
      resource: this.resource,
      spanProcessor: spanProcessor,
      sampler: this.config.sampler,
      instrumentations: this.getInstrumentations(),
      metricReader: await this.createMetricReader(),
    };
  }

  private async createSpanExporter(): Promise<SpanExporter> {
    spanExporter = new AdjustingTraceExporter(
      this.shouldExportTraces()
        ? new TraceExporter({
            // provided projectId should take precedence over env vars, etc
            projectId: this.config.projectId,
            // creds for non-GCP environments, in lieu of using ADC.
            credentials: this.config.credentials,
          })
        : new InMemorySpanExporter(),
      this.config.exportInputAndOutput,
      this.config.projectId,
      getErrorHandler(
        (err) => {
          return tracingDenied(err);
        },
        await tracingDeniedHelpText()
      )
    );
    return spanExporter;
  }

  /**
   * Creates a {MetricReader} for pushing metrics out to GCP via OpenTelemetry.
   */
  private async createMetricReader(): Promise<PeriodicExportingMetricReader> {
    metricExporter = await this.buildMetricExporter();
    return new PeriodicExportingMetricReader({
      exportIntervalMillis: this.config.metricExportIntervalMillis,
      exportTimeoutMillis: this.config.metricExportTimeoutMillis,
      exporter: metricExporter,
    });
  }

  /** Gets all open telemetry instrumentations as configured by the plugin. */
  private getInstrumentations() {
    let instrumentations: Instrumentation[] = [];

    if (this.config.autoInstrumentation) {
      instrumentations = getNodeAutoInstrumentations(
        this.config.autoInstrumentationConfig
      );
    }

    return instrumentations
      .concat(this.getDefaultLoggingInstrumentations())
      .concat(this.config.instrumentations ?? []);
  }

  private shouldExportTraces(): boolean {
    return this.config.export && !this.config.disableTraces;
  }

  private shouldExportMetrics(): boolean {
    return this.config.export && !this.config.disableMetrics;
  }

  /** Always configure the Pino and Winston instrumentations */
  private getDefaultLoggingInstrumentations(): Instrumentation[] {
    return [
      new WinstonInstrumentation({ logHook: this.gcpTraceLogHook }),
      new PinoInstrumentation({ logHook: this.gcpTraceLogHook }),
    ];
  }

  private async buildMetricExporter(): Promise<PushMetricExporter> {
    const exporter: PushMetricExporter = this.shouldExportMetrics()
      ? new MetricExporterWrapper(
          {
            userAgent: {
              product: 'genkit',
              version: GENKIT_VERSION,
            },
            // provided projectId should take precedence over env vars, etc
            projectId: this.config.projectId,
            // creds for non-GCP environments, in lieu of using ADC.
            credentials: this.config.credentials,
          },
          getErrorHandler(
            (err) => {
              return metricsDenied(err);
            },
            await metricsDeniedHelpText()
          )
        )
      : new InMemoryMetricExporter(AggregationTemporality.DELTA);
    return exporter;
  }
}

/**
 * Rewrites the export method to include an error handler which logs
 * helpful information about how to set up metrics/telemetry in GCP.
 */
class MetricExporterWrapper extends MetricExporter {
  private promise = new Promise<void>((resolve) => resolve());

  constructor(
    options?: ExporterOptions,
    private errorHandler?: (error: Error) => void
  ) {
    super(options);
  }

  async export(
    metrics: ResourceMetrics,
    resultCallback: (result: ExportResult) => void
  ): Promise<void> {
    await this.promise;
    this.modifyStartTimes(metrics);
    this.promise = new Promise<void>((resolve) => {
      super.export(metrics, (result) => {
        try {
          if (this.errorHandler && result.error) {
            this.errorHandler(result.error);
          }
          resultCallback(result);
        } finally {
          resolve();
        }
      });
    });
  }

  selectAggregation(instrumentType: InstrumentType) {
    if (instrumentType === InstrumentType.HISTOGRAM) {
      return new ExponentialHistogramAggregation();
    }
    return new DefaultAggregation();
  }

  selectAggregationTemporality(instrumentType: InstrumentType) {
    return AggregationTemporality.DELTA;
  }

  /**
   * Modify the start times of each data point to ensure no
   * overlap with previous exports.
   *
   * Cloud metrics do not support delta metrics for custom metrics
   * and will convert any DELTA aggregations to CUMULATIVE ones on
   * export. There is implicit overlap in the start/end times that
   * the Metric reader is sending -- the end_time of the previous
   * export will become the start_time of the current export. The
   * overlap in times means that only one of those records will
   * persist and the other will be overwritten. This
   * method adds a thousandth of a second to ensure discrete export
   * timeframes.
   */
  private modifyStartTimes(metrics: ResourceMetrics): void {
    metrics.scopeMetrics.forEach((scopeMetric) => {
      scopeMetric.metrics.forEach((metric) => {
        metric.dataPoints.forEach((dataPoint) => {
          dataPoint.startTime[1] = dataPoint.startTime[1] + 1_000_000;
        });
      });
    });
  }

  async shutdown(): Promise<void> {
    return await this.forceFlush();
  }

  async forceFlush(): Promise<void> {
    await this.promise;
  }
}

/**
 * Adjusts spans before exporting to GCP. Redacts model input
 * and output content, and augments span attributes before sending to GCP.
 */
class AdjustingTraceExporter implements SpanExporter {
  constructor(
    private exporter: SpanExporter,
    private logInputAndOutput: boolean,
    private projectId?: string,
    private errorHandler?: (error: Error) => void
  ) {}

  export(
    spans: ReadableSpan[],
    resultCallback: (result: ExportResult) => void
  ): void {
    this.exporter?.export(this.adjust(spans), (result) => {
      if (this.errorHandler && result.error) {
        this.errorHandler(result.error);
      }
      resultCallback(result);
    });
  }

  shutdown(): Promise<void> {
    return this.exporter?.shutdown();
  }

  getExporter(): SpanExporter {
    return this.exporter;
  }

  forceFlush(): Promise<void> {
    if (this.exporter?.forceFlush) {
      return this.exporter.forceFlush();
    }
    return Promise.resolve();
  }

  private adjust(spans: ReadableSpan[]): ReadableSpan[] {
    const allPaths = spans
      .filter((span) => span.attributes['genkit:path'])
      .map(
        (span) =>
          ({
            path: span.attributes['genkit:path'] as string,
            status:
              (span.attributes['genkit:state'] as string) === 'error'
                ? 'failure'
                : 'success',
            error: extractErrorName(span.events),
            latency: hrTimeToMilliseconds(
              hrTimeDuration(span.startTime, span.endTime)
            ),
          }) as PathMetadata
      );

    const allLeafPaths = new Set<PathMetadata>(
      allPaths.filter((leafPath) =>
        allPaths.every(
          (path) =>
            path.path === leafPath.path ||
            !path.path.startsWith(leafPath.path) ||
            (path.path.startsWith(leafPath.path) &&
              path.status !== leafPath.status)
        )
      )
    );

    // Check if we have a failure in the root span that requires special handling
    const rootSpan = spans.find((s) =>
      Object.keys(s.attributes).includes('genkit:isRoot')
    );
    if (rootSpan) {
      const rootSpanFailed =
        (rootSpan.attributes['genkit:state'] as string) === 'error';
      const anotherFailedSpan = spans.find(
        (s) =>
          !Object.keys(s.attributes).includes('genkit:isRoot') &&
          s.attributes['genkit:state'] === 'error'
      );
      if (rootSpanFailed && !anotherFailedSpan) {
        rootSpan.attributes['genkit:failedSpan'] =
          rootSpan.attributes['genkit:name'];
        rootSpan.attributes['genkit:failedPath'] =
          rootSpan.attributes['genkit:path'];
      }
    }

    return spans.map((span) => {
      this.tickTelemetry(span, allLeafPaths);

      span = this.redactInputOutput(span);
      span = this.markErrorSpanAsError(span);
      span = this.markFailedSpan(span);
      span = this.markGenkitFeature(span);
      span = this.markGenkitModel(span);
      span = this.normalizeLabels(span);
      return span;
    });
  }

  private tickTelemetry(span: ReadableSpan, paths: Set<PathMetadata>) {
    const attributes = span.attributes;
    if (!Object.keys(attributes).includes('genkit:type')) {
      return;
    }

    const type = attributes['genkit:type'] as string;
    const subtype = attributes['genkit:metadata:subtype'] as string;
    const isRoot = !!span.attributes['genkit:isRoot'];
    const unused: Set<PathMetadata> = new Set();

    if (isRoot) {
      // Report top level feature request and latency only for root spans
      // Log input to and output from to the feature
      featuresTelemetry.tick(
        span,
        unused,
        this.logInputAndOutput,
        this.projectId
      );
      // Report executions and latency for all flow paths only on the root span
      pathsTelemetry.tick(span, paths, this.logInputAndOutput, this.projectId);
      // Set root status explicitly
      span.attributes['genkit:rootState'] = span.attributes['genkit:state'];
    }
    if (type === 'action' && subtype === 'model') {
      // Report generate metrics () for all model actions
      generateTelemetry.tick(
        span,
        unused,
        this.logInputAndOutput,
        this.projectId
      );
    }
    if (type === 'action' && subtype === 'tool') {
      // TODO: Report input and output for tool actions
    }
    if (type === 'action' || type === 'flow' || type == 'flowStep') {
      // Report request and latency metrics for all actions
      actionTelemetry.tick(
        span,
        unused,
        this.logInputAndOutput,
        this.projectId
      );
    }
    if (type === 'userEngagement') {
      // Report user acceptance and feedback metrics
      engagementTelemetry.tick(
        span,
        unused,
        this.logInputAndOutput,
        this.projectId
      );
    }
  }

  private redactInputOutput(span: ReadableSpan): ReadableSpan {
    const hasInput = 'genkit:input' in span.attributes;
    const hasOutput = 'genkit:output' in span.attributes;

    return !hasInput && !hasOutput
      ? span
      : {
          ...span,
          spanContext: span.spanContext,
          attributes: {
            ...span.attributes,
            'genkit:input': '<redacted>',
            'genkit:output': '<redacted>',
          },
        };
  }

  // This is a workaround for GCP Trace to mark a span with a red
  // exclamation mark indicating that it is an error.
  private markErrorSpanAsError(span: ReadableSpan): ReadableSpan {
    return span.status.code !== SpanStatusCode.ERROR
      ? span
      : {
          ...span,
          spanContext: span.spanContext,
          attributes: {
            ...span.attributes,
            '/http/status_code': '599',
          },
        };
  }

  private normalizeLabels(span: ReadableSpan): ReadableSpan {
    const normalized = {} as Record<string, any>;
    for (const [key, value] of Object.entries(span.attributes)) {
      normalized[key.replace(/\:/g, '/')] = value;
    }
    return {
      ...span,
      spanContext: span.spanContext,
      attributes: normalized,
    };
  }

  private markFailedSpan(span: ReadableSpan): ReadableSpan {
    if (
      span.attributes['genkit:state'] === 'error' &&
      !span.attributes['genkit:isRoot']
    ) {
      if (!!span.attributes['genkit:name']) {
        span.attributes['genkit:failedSpan'] = span.attributes['genkit:name'];
      }
      if (!!span.attributes['genkit:path']) {
        span.attributes['genkit:failedPath'] = span.attributes['genkit:path'];
      }
    }
    return span;
  }

  private markGenkitFeature(span: ReadableSpan): ReadableSpan {
    if (span.attributes['genkit:isRoot'] && !!span.attributes['genkit:name']) {
      span.attributes['genkit:feature'] = span.attributes['genkit:name'];
    }
    return span;
  }

  private markGenkitModel(span: ReadableSpan): ReadableSpan {
    if (
      span.attributes['genkit:metadata:subtype'] === 'model' &&
      !!span.attributes['genkit:name']
    ) {
      span.attributes['genkit:model'] = span.attributes['genkit:name'];
    }
    return span;
  }
}

function getErrorHandler(
  shouldLogFn: (err: Error) => boolean,
  helpText: string
): (err: Error) => void {
  // only log the first time
  let instructionsLogged = false;

  return (err) => {
    // Use the defaultLogger so that logs don't get swallowed by the open
    // telemetry exporter
    const defaultLogger = logger.defaultLogger;
    if (err && shouldLogFn(err)) {
      if (!instructionsLogged) {
        instructionsLogged = true;
        defaultLogger.error(
          `Unable to send telemetry to Google Cloud: ${err.message}\n\n${helpText}\n`
        );
      }
    } else if (err) {
      defaultLogger.error(`Unable to send telemetry to Google Cloud: ${err}`);
    }
  };
}

/** @hidden */
export function __getMetricExporterForTesting(): InMemoryMetricExporter {
  return metricExporter as InMemoryMetricExporter;
}

/** @hidden */
export function __getSpanExporterForTesting(): InMemorySpanExporter {
  return spanExporter.getExporter() as InMemorySpanExporter;
}

/** @hidden */
export function __forceFlushSpansForTesting() {
  spanProcessor.forceFlush();
}
