/**
 * Copyright 2024 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import {
  afterAll,
  beforeAll,
  beforeEach,
  describe,
  expect,
  it,
  jest,
} from '@jest/globals';
import type { ReadableSpan } from '@opentelemetry/sdk-trace-base';
import * as assert from 'assert';
import { genkit, z, type GenerateResponseData, type Genkit } from 'genkit';
import { SPAN_TYPE_ATTR, appendSpan } from 'genkit/tracing';
import { Writable } from 'stream';
import {
  __addTransportStreamForTesting,
  __forceFlushSpansForTesting,
  __getSpanExporterForTesting,
  __useJsonFormatForTesting,
  enableGoogleCloudTelemetry,
} from '../src/index.js';

jest.mock('../src/auth.js', () => {
  const original = jest.requireActual('../src/auth.js');
  return {
    ...(original || {}),
    resolveCurrentPrincipal: jest.fn().mockImplementation(() => {
      return Promise.resolve({
        projectId: 'test',
        serviceAccountEmail: 'test@test.com',
      });
    }),
    credentialsFromEnvironment: jest.fn().mockImplementation(() => {
      return Promise.resolve({
        projectId: 'test',
        credentials: {
          client_email: 'test@genkit.com',
          private_key: '-----BEGIN PRIVATE KEY-----',
        },
      });
    }),
  };
});

describe('GoogleCloudLogs', () => {
  let logLines = '';
  const logStream = new Writable();
  logStream._write = (chunk, encoding, next) => {
    logLines = logLines += chunk.toString();
    next();
  };

  let ai: Genkit;

  beforeAll(async () => {
    process.env.GCLOUD_PROJECT = 'test';
    process.env.GENKIT_ENV = 'dev';
    __useJsonFormatForTesting();
    __addTransportStreamForTesting(logStream);
    await enableGoogleCloudTelemetry({
      projectId: 'test',
      forceDevExport: false,
      metricExportIntervalMillis: 100,
      metricExportTimeoutMillis: 100,
    });
    ai = genkit({
      // Force GCP Plugin to use in-memory metrics exporter
      plugins: [],
    });
    await waitForLogsInit(ai, logLines);
  });
  beforeEach(async () => {
    logLines = '';
    __getSpanExporterForTesting().reset();
  });
  afterAll(async () => {
    await ai.stopServers();
  });

  describe('with truncation', () => {
    it('truncates large output logs', async () => {
      const testModel = createModel(ai, 'testModel', async () => {
        return {
          message: {
            role: 'user',
            content: [
              {
                text: 'r'.repeat(130_000),
              },
            ],
          },
          finishReason: 'stop',
          usage: {
            inputTokens: 10,
            outputTokens: 14,
            inputCharacters: 8,
            outputCharacters: 16,
            inputImages: 1,
            outputImages: 3,
          },
        };
      });
      const testFlow = createFlowWithInput(ai, 'testFlow', async (input) => {
        return await ai.run('sub1', async () => {
          return await ai.run('sub2', async () => {
            return await ai.generate({
              model: testModel,
              prompt: `${input} prompt`,
              config: {
                temperature: 1.0,
                topK: 3,
                topP: 5,
                maxOutputTokens: 7,
              },
            });
          });
        });
      });

      await testFlow('test');
      await getExportedSpans();

      const logMessages = await getLogs(1, 100, logLines);
      const logObjects = logMessages.map((l) => JSON.parse(l as string));
      const logObjectMessages = logObjects.map(
        (structuredLog) => structuredLog.message
      );

      expect(logObjectMessages).toContain('Output[testFlow, testFlow]');

      logObjects.map((structuredLog) => {
        if (structuredLog.message === 'Output[testFlow, testFlow]') {
          expect(structuredLog.content.length).toBe(128_000);
        }
      });
    });

    it('truncates large input logs', async () => {
      const testModel = createModel(ai, 'testModel', async () => {
        return {
          message: {
            role: 'user',
            content: [
              {
                text: 'response',
              },
            ],
          },
          finishReason: 'stop',
          usage: {
            inputTokens: 10,
            outputTokens: 14,
            inputCharacters: 8,
            outputCharacters: 16,
            inputImages: 1,
            outputImages: 3,
          },
        };
      });
      const testFlow = createFlowWithInput(ai, 'testFlow', async (input) => {
        return await ai.run('sub1', async () => {
          return await ai.run('sub2', async () => {
            return await ai.generate({
              model: testModel,
              prompt: `${input} prompt`,
              config: {
                temperature: 1.0,
                topK: 3,
                topP: 5,
                maxOutputTokens: 7,
              },
            });
          });
        });
      });

      await testFlow('t'.repeat(130_000));
      await getExportedSpans();

      const logMessages = await getLogs(1, 100, logLines);
      const logObjects = logMessages.map((l) => JSON.parse(l as string));
      const logObjectMessages = logObjects.map(
        (structuredLog) => structuredLog.message
      );

      expect(logObjectMessages).toContain('Input[testFlow, testFlow]');

      logObjects.map((structuredLog) => {
        if (structuredLog.message === 'Input[testFlow, testFlow]') {
          expect(structuredLog.content.length).toBe(128_000);
        }
      });
    });

    it('truncates large model names', async () => {
      const testModel = createModel(ai, 'm'.repeat(2046), async () => {
        return {
          message: {
            role: 'user',
            content: [
              {
                text: 'response',
              },
            ],
          },
          finishReason: 'stop',
          usage: {
            inputTokens: 10,
            outputTokens: 14,
            inputCharacters: 8,
            outputCharacters: 16,
            inputImages: 1,
            outputImages: 3,
          },
        };
      });
      const testFlow = createFlowWithInput(ai, 'testFlow', async (input) => {
        return await ai.run('sub1', async () => {
          return await ai.run('sub2', async () => {
            return await ai.generate({
              model: testModel,
              prompt: `${input} prompt`,
              config: {
                temperature: 1.0,
                topK: 3,
                topP: 5,
                maxOutputTokens: 7,
              },
            });
          });
        });
      });

      await testFlow('test');
      await getExportedSpans();

      const logMessages = await getLogs(1, 100, logLines);
      const logObjects = logMessages.map((l) => JSON.parse(l as string));
      const logObjectModels = logObjects.map(
        (structuredLog) => structuredLog.model
      );

      expect(logObjectModels).toContain('m'.repeat(1024));
    });
  });

  describe('path logs', () => {
    it('writes error log for failed path', async () => {
      const testFlow = createFlow(ai, 'testFlow', async () => {
        await ai.run('sub1', async () => {
          return 'not failing';
        });
        await ai.run('sub2', async () => {
          return explode();
        });
        return 'never reached';
      });

      await assert.rejects(async () => {
        await testFlow();
      });

      await getExportedSpans();

      const logs = await getLogs(1, 100, logLines);
      const logObjectMessages = getStructuredLogMessages(logs);
      expect(logObjectMessages).toContain(
        "Error[testFlow > sub2, TypeError] Cannot read properties of undefined (reading 'explode')"
      );
      const errorLogs = logObjectMessages.filter(
        (m) => m.indexOf('Error[') >= 0
      );
      expect(errorLogs).toHaveLength(1); // Only 1 error log
    }, 10000); //timeout

    it('writes error log for failed root', async () => {
      const testFlow = createFlow(ai, 'testFlow', async () => {
        await ai.run('sub1', async () => {
          return 'not failing';
        });
        await ai.run('sub2', async () => {
          return 'not failing';
        });
        return explode();
      });

      await assert.rejects(async () => {
        await testFlow();
      });

      await getExportedSpans();

      const logs = await getLogs(1, 100, logLines);
      const logObjectMessages = getStructuredLogMessages(logs);
      expect(logObjectMessages).toContain(
        "Error[testFlow, TypeError] Cannot read properties of undefined (reading 'explode')"
      );
      const errorLogs = logObjectMessages.filter(
        (m) => m.indexOf('Error[') >= 0
      );
      expect(errorLogs).toHaveLength(1); // Only 1 error log
    }, 10000); //timeout

    it('writes error log for multiple failing spans', async () => {
      const testFlow = createFlow(ai, 'testFlow', async () => {
        await Promise.all([
          ai.run('sub1', async () => {
            return explode();
          }),
          ai.run('sub2', async () => {
            return explode();
          }),
        ]);
        return 'not failing';
      });

      await assert.rejects(async () => {
        await testFlow();
      });

      await getExportedSpans();

      const logs = await getLogs(1, 100, logLines);
      const logObjectMessages = getStructuredLogMessages(logs);
      expect(logObjectMessages).toContain(
        "Error[testFlow > sub1, TypeError] Cannot read properties of undefined (reading 'explode')"
      );
      expect(logObjectMessages).toContain(
        "Error[testFlow > sub2, TypeError] Cannot read properties of undefined (reading 'explode')"
      );
      const errorLogs = logObjectMessages.filter(
        (m) => m.indexOf('Error[') >= 0
      );
      expect(errorLogs).toHaveLength(2); // Only 2 error log
    }, 10000); //timeout
  });

  it('writes generate logs', async () => {
    const testModel = createModel(ai, 'testModel', async () => {
      return {
        message: {
          role: 'model',
          content: [
            {
              text: 'response',
            },
          ],
        },
        finishReason: 'stop',
        usage: {
          inputTokens: 10,
          outputTokens: 14,
          inputCharacters: 8,
          outputCharacters: 16,
          inputImages: 1,
          outputImages: 3,
        },
      };
    });
    const testFlow = createFlowWithInput(ai, 'testFlow', async (input) => {
      return await ai.run('sub1', async () => {
        return await ai.run('sub2', async () => {
          return await ai.generate({
            model: testModel,
            prompt: `${input} prompt`,
            config: {
              temperature: 1.0,
              topK: 3,
              topP: 5,
              maxOutputTokens: 7,
            },
          });
        });
      });
    });

    await testFlow('test');

    await getExportedSpans();

    const logs = await getLogs(1, 100, logLines);
    expect(logs.length).toEqual(9);
    const logObjectMessages = getStructuredLogMessages(logs);
    expect(logObjectMessages).toContain(
      'Config[testFlow > sub1 > sub2 > generate > testModel, testModel]'
    );
    expect(logObjectMessages).toContain(
      'Input[testFlow > sub1 > sub2 > generate > testModel, testModel] '
    );
    expect(logObjectMessages).toContain(
      'Output[testFlow > sub1 > sub2 > generate > testModel, testModel] '
    );
    expect(logObjectMessages).toContain('Input[testFlow, testFlow]');
    expect(logObjectMessages).toContain('Output[testFlow, testFlow]');
    expect(logObjectMessages).toContain(
      'Input[testFlow > sub1 > sub2 > generate, testFlow]'
    );
    expect(logObjectMessages).toContain(
      'Output[testFlow > sub1 > sub2 > generate, testFlow]'
    );
    // Ensure the model input/output has an associated role
    logs.forEach((log) => {
      const structuredLog = JSON.parse(log as string);
      if (
        structuredLog.message ===
        'Input[testFlow > sub1 > sub2 > generate > testModel, testModel] '
      ) {
        expect(structuredLog.role).toBe('user');
      }
      if (
        structuredLog.message ===
        'Output[testFlow > sub1 > sub2 > generate > testModel, testModel]'
      ) {
        expect(structuredLog.role).toBe('model');
      }
    });
  });

  it('writes feature logs for generate without flow', async () => {
    const testModel = createModel(ai, 'testModel', async () => {
      return {
        message: {
          role: 'model',
          content: [
            {
              text: 'response',
            },
          ],
        },
        finishReason: 'stop',
        usage: {
          inputTokens: 10,
          outputTokens: 14,
          inputCharacters: 8,
          outputCharacters: 16,
          inputImages: 1,
          outputImages: 3,
        },
      };
    });

    await ai.generate({
      model: testModel,
      prompt: `a test prompt`,
      config: {
        temperature: 1.0,
        topK: 3,
        topP: 5,
        maxOutputTokens: 7,
      },
    });

    await getExportedSpans();

    const logs = await getLogs(1, 100, logLines);
    expect(logs.length).toEqual(6);
    const logObjectMessages = getStructuredLogMessages(logs);
    expect(logObjectMessages).toContain(
      'Config[generate > testModel, testModel]'
    );
    expect(logObjectMessages).toContain(
      'Input[generate > testModel, testModel] '
    );
    expect(logObjectMessages).toContain(
      'Output[generate > testModel, testModel] '
    );
    expect(logObjectMessages).toContain('Input[generate, generate]');
    expect(logObjectMessages).toContain('Output[generate, generate]');
  });

  it('writes user feedback log', async () => {
    await appendSpan(
      'trace1',
      'parent1',
      {
        name: 'user-feedback',
        path: '/{flowName}',
        metadata: {
          subtype: 'userFeedback',
          feedbackValue: 'negative',
          textFeedback: 'terrible',
        },
      },
      { [SPAN_TYPE_ATTR]: 'userEngagement' }
    );

    await getExportedSpans();
    const logs = await getLogs(1, 100, logLines);
    const logObjectMessages = getStructuredLogMessages(logs);
    expect(logObjectMessages).toContain('UserFeedback[flowName]');
  });

  it('writes user acceptance log', async () => {
    await appendSpan(
      'trace1',
      'parent1',
      {
        name: 'user-acceptance',
        path: '/{flowName}',
        metadata: { subtype: 'userAcceptance', acceptanceValue: 'rejected' },
      },
      { [SPAN_TYPE_ATTR]: 'userEngagement' }
    );

    await getExportedSpans();
    const logs = await getLogs(1, 100, logLines);
    const logObjectMessages = getStructuredLogMessages(logs);
    expect(logObjectMessages).toContain('UserAcceptance[flowName]');
  });

  it('writes tool input and output logs', async () => {
    const echoTool = ai.defineTool(
      { name: 'echoTool', description: 'echo' },
      async (input) => input
    );
    await echoTool('Helllooooo!');
    await getExportedSpans();
    const logs = await getLogs(1, 100, logLines);
    const logObjectMessages = getStructuredLogMessages(logs);
    expect(logObjectMessages).toContain('Input[echoTool, echoTool]');
    expect(logObjectMessages).toContain('Output[echoTool, echoTool]');
  });
});

/** Helper to create a flow with no inputs or outputs */
function createFlow(
  ai: Genkit,
  name: string,
  fn: () => Promise<any> = async () => {}
) {
  return ai.defineFlow(
    {
      name,
      inputSchema: z.void(),
      outputSchema: z.void(),
    },
    fn
  );
}

function createFlowWithInput(
  ai: Genkit,
  name: string,
  fn: (input: string) => Promise<any>
) {
  return ai.defineFlow(
    {
      name,
      inputSchema: z.string(),
      outputSchema: z.any(),
    },
    fn
  );
}

/**
 * Helper to create a model that returns the value produced by the given
 * response function.
 */
function createModel(
  genkit: Genkit,
  name: string,
  respFn: () => Promise<GenerateResponseData>
) {
  return genkit.defineModel({ name }, (req) => respFn());
}

async function waitForLogsInit(genkit: Genkit, logLines: any) {
  await import('winston');
  const testFlow = createFlow(genkit, 'testLogsInitFlow');
  await testFlow();
  await getLogs(1, 100, logLines);
}

async function getLogs(
  logCount: number,
  maxAttempts: number,
  logLines: string
): Promise<string[]> {
  var attempts = 0;
  while (attempts++ < maxAttempts) {
    await new Promise((resolve) => setTimeout(resolve, 100));
    const found = logLines
      .trim()
      .split('\n')
      .map((l) => l.trim());
    if (found.length >= logCount) {
      return found;
    }
  }
  assert.fail(`Waiting for logs, but none have been written.`);
}

/** Polls the in memory metric exporter until the genkit scope is found. */
async function getExportedSpans(maxAttempts = 200): Promise<ReadableSpan[]> {
  __forceFlushSpansForTesting();
  var attempts = 0;
  while (attempts++ < maxAttempts) {
    await new Promise((resolve) => setTimeout(resolve, 50));
    const found = __getSpanExporterForTesting().getFinishedSpans();
    if (found.length > 0) {
      return found;
    }
  }
  assert.fail(`Timed out while waiting for spans to be exported.`);
}

function getStructuredLogMessages(logs: string[]): string[] {
  const logObjects = logs.map((l) => JSON.parse(l as string));
  return logObjects.map((log) => log.message);
}

function explode() {
  const nothing: { missing?: any } = { missing: 1 };
  delete nothing.missing;
  return nothing.missing.explode;
}
