/**
 * Copyright 2024 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import {
  afterAll,
  beforeAll,
  beforeEach,
  describe,
  expect,
  it,
  jest,
} from '@jest/globals';
import { ReadableSpan } from '@opentelemetry/sdk-trace-base';
import * as assert from 'assert';
import { Genkit, z } from 'genkit';
import { GenkitBeta, genkit } from 'genkit/beta';
import { appendSpan } from 'genkit/tracing';
import {
  __forceFlushSpansForTesting,
  __getSpanExporterForTesting,
} from '../src/gcpOpenTelemetry.js';
import { enableGoogleCloudTelemetry } from '../src/index.js';

jest.mock('../src/auth.js', () => {
  const original = jest.requireActual('../src/auth.js');
  return {
    ...(original || {}),
    resolveCurrentPrincipal: jest.fn().mockImplementation(() => {
      return Promise.resolve({
        projectId: 'test',
        serviceAccountEmail: 'test@test.com',
      });
    }),
    credentialsFromEnvironment: jest.fn().mockImplementation(() => {
      return Promise.resolve({
        projectId: 'test',
        credentials: {
          client_email: 'test@genkit.com',
          private_key: '-----BEGIN PRIVATE KEY-----',
        },
      });
    }),
  };
});

describe('GoogleCloudTracing', () => {
  let ai: GenkitBeta;

  beforeAll(async () => {
    process.env.GCLOUD_PROJECT = 'test';
    process.env.GENKIT_ENV = 'dev';
    await enableGoogleCloudTelemetry({
      projectId: 'test',
      forceDevExport: false,
    });
    ai = genkit({});
  });
  beforeEach(async () => {
    __getSpanExporterForTesting().reset();
  });
  afterAll(async () => {
    await ai.stopServers();
  });

  it('writes traces', async () => {
    const testFlow = createFlow(ai, 'testFlow');

    await testFlow();

    const spans = await getExportedSpans();
    assert.equal(spans.length, 1);
    assert.equal(spans[0].name, 'testFlow');
  });

  it('Adjusts attributes to support GCP trace filtering', async () => {
    const testFlow = createFlow(ai, 'testFlow');

    await testFlow();

    const spans = await getExportedSpans();
    // Check some common attributes
    assert.equal(spans[0].attributes['genkit/name'], 'testFlow');
    assert.equal(spans[0].attributes['genkit/type'], 'action');
    assert.equal(spans[0].attributes['genkit/metadata/subtype'], 'flow');
    // Ensure we have no attributes with ':' because these are awkward to use in
    // Cloud Trace.
    const spanAttrKeys = Object.entries(spans[0].attributes).map(([k, v]) => k);
    for (const key in spanAttrKeys) {
      assert.equal(key.indexOf(':'), -1);
    }
  });

  it('sub actions are contained within flows', async () => {
    const testFlow = createFlow(ai, 'testFlow', async () => {
      return await ai.run('subAction', async () => {
        return await ai.run('subAction2', async () => {
          return 'done';
        });
      });
    });

    await testFlow();

    const spans = await getExportedSpans();
    assert.equal(spans.length, 3);
    assert.equal(spans[2].name, 'testFlow');
    assert.equal(spans[2].parentSpanId, undefined);
    assert.equal(spans[1].name, 'subAction');
    assert.equal(spans[1].parentSpanId, spans[2].spanContext().spanId);
    assert.equal(spans[0].name, 'subAction2');
    assert.equal(spans[0].parentSpanId, spans[1].spanContext().spanId);
  });

  it('different flows run independently', async () => {
    const testFlow1 = createFlow(ai, 'testFlow1');
    const testFlow2 = createFlow(ai, 'testFlow2');

    await testFlow1();
    await testFlow2();

    const spans = await getExportedSpans();
    assert.equal(spans.length, 2);
    assert.equal(spans[0].parentSpanId, undefined);
    assert.equal(spans[1].parentSpanId, undefined);
  });

  it('labels failed spans', async () => {
    const testFlow = createFlow(ai, 'badFlow', async () => {
      return await ai.run('badStep', async () => {
        throw new Error('oh no!');
      });
    });
    try {
      await testFlow();
    } catch (e) {}

    const spans = await getExportedSpans();
    assert.equal(spans.length, 2);
    assert.equal(spans[0].name, 'badStep');
    assert.equal(spans[0].attributes['genkit/failedSpan'], 'badStep');
    assert.equal(
      spans[0].attributes['genkit/failedPath'],
      '/{badFlow,t:flow}/{badStep,t:flowStep}'
    );
    assert.equal(spans[1].attributes['genkit/isRoot'], true);
    assert.equal(spans[1].attributes['genkit/rootState'], 'error');
  });

  it('labels the root feature', async () => {
    const testFlow = createFlow(ai, 'niceFlow', async () => {
      return ai.run('niceStep', async () => {});
    });
    await testFlow();

    const spans = await getExportedSpans();
    assert.equal(spans[0].name, 'niceStep');
    assert.equal(spans[0].attributes['genkit/feature'], undefined);
    assert.equal(spans[1].name, 'niceFlow');
    assert.equal(spans[1].attributes['genkit/feature'], 'niceFlow');
    assert.equal(spans[1].attributes['genkit/rootState'], 'success');
  });

  it('marks the root feature failed when it is the failure', async () => {
    const testFlow = createFlow(ai, 'failingFlow', async () => {
      await ai.run('good step', async () => {});
      throw new Error('oops!');
    });
    try {
      await testFlow();
    } catch (e) {}

    const spans = await getExportedSpans();
    assert.equal(spans.length, 2);
    assert.equal(spans[1].name, 'failingFlow');
    assert.equal(spans[1].attributes['genkit/failedSpan'], 'failingFlow');
    assert.equal(
      spans[1].attributes['genkit/failedPath'],
      '/{failingFlow,t:flow}'
    );
    assert.equal(spans[1].attributes['genkit/isRoot'], true);
    assert.equal(spans[1].attributes['genkit/rootState'], 'error');
  });

  it('adds the genkit/model label for model actions', async () => {
    const echoModel = ai.defineModel(
      {
        name: 'echoModel',
      },
      async (request) => {
        return {
          message: {
            role: 'model',
            content: [
              {
                text:
                  'Echo: ' +
                  request.messages
                    .map((m) => m.content.map((c) => c.text).join())
                    .join(),
              },
            ],
          },
          finishReason: 'stop',
        };
      }
    );
    const testFlow = createFlow(ai, 'modelFlow', async () => {
      return ai.run('runFlow', async () => {
        await ai.generate({
          model: echoModel,
          prompt: 'Testing model telemetry',
        });
      });
    });

    await testFlow();

    const spans = await getExportedSpans();

    assert.equal(spans[0].name, 'echoModel');
    assert.equal(spans[0].attributes['genkit/model'], 'echoModel');
    assert.equal(spans[1].name, 'generate');
    assert.equal(spans[2].name, 'runFlow');
    assert.equal(spans[3].name, 'modelFlow');
  });

  it('attaches additional span', async () => {
    await appendSpan(
      'trace1',
      'parent1',
      { name: 'span-name', metadata: { metadata_key: 'metadata_value' } },
      { ['label_key']: 'label_value' }
    );

    const spans = await getExportedSpans();
    const span = spans.find((it) => it.name === 'span-name');
    assert.equal(Object.keys(span?.attributes || {}).length, 3);
    assert.equal(span?.attributes['genkit/name'], 'span-name');
    assert.equal(span?.attributes['label_key'], 'label_value');
    assert.equal(
      span?.attributes['genkit/metadata/metadata_key'],
      'metadata_value'
    );
  });

  it('writes sessionId and threadName for chats', async () => {
    const testModel = ai.defineModel({ name: 'testModel' }, async () => {
      return {
        message: {
          role: 'user',
          content: [
            {
              text: 'response',
            },
          ],
        },
        finishReason: 'stop',
        usage: {
          inputTokens: 10,
          outputTokens: 14,
          inputCharacters: 8,
          outputCharacters: 16,
          inputImages: 1,
          outputImages: 3,
        },
      };
    });

    const chat = ai.chat();

    await chat.send({ model: testModel, prompt: 'Sending test prompt' });

    const spans = await getExportedSpans();
    // We should get 3 spans from this chat -- "send" which delegates to "generate" which delegates to our "testModel"
    // Only the top level span will have the sessionId and threadName until we make sessionId more ubiquitous
    expect(spans).toHaveLength(3);

    spans.forEach((span) => {
      if (span.name === 'send') {
        expect(span?.attributes['genkit/sessionId']).not.toBeUndefined();
        expect(span?.attributes['genkit/threadName']).not.toBeUndefined();
        return;
      }

      // Once we make the change to have sessionId on all relevant spans, then these should verify they are populated.
      expect(span?.attributes['genkit/sessionId']).toBeUndefined();
      expect(span?.attributes['genkit/threadName']).toBeUndefined();
    });
  });

  /** Helper to create a flow with no inputs or outputs */
  function createFlow(
    ai: Genkit,
    name: string,
    fn: () => Promise<any> = async () => {}
  ) {
    return ai.defineFlow(
      {
        name,
        inputSchema: z.void(),
        outputSchema: z.void(),
      },
      fn
    );
  }

  /** Polls the in memory metric exporter until the genkit scope is found. */
  async function getExportedSpans(
    maxAttempts: number = 200
  ): Promise<ReadableSpan[]> {
    __forceFlushSpansForTesting();
    var attempts = 0;
    while (attempts++ < maxAttempts) {
      await new Promise((resolve) => setTimeout(resolve, 50));
      const found = __getSpanExporterForTesting().getFinishedSpans();
      if (found.length > 0) {
        return found;
      }
    }
    assert.fail(`Timed out while waiting for spans to be exported.`);
  }
});
