{"version":3,"file":"postgres.cjs","names":[],"sources":["../../src/indexes/postgres.ts"],"sourcesContent":["import pg, { PoolConfig, Pool } from \"pg\";\nimport {\n  ListKeyOptions,\n  RecordManagerInterface,\n  UpdateOptions,\n} from \"@langchain/core/indexing\";\n\nexport type PostgresRecordManagerOptions = {\n  postgresConnectionOptions?: PoolConfig;\n  pool?: Pool;\n  tableName?: string;\n  schema?: string;\n};\n\nexport class PostgresRecordManager implements RecordManagerInterface {\n  lc_namespace = [\"langchain\", \"recordmanagers\", \"postgres\"];\n\n  pool: Pool;\n\n  tableName: string;\n\n  namespace: string;\n\n  finalTableName: string;\n\n  constructor(namespace: string, config: PostgresRecordManagerOptions) {\n    const { postgresConnectionOptions, tableName, pool } = config;\n    this.namespace = namespace;\n    if (!postgresConnectionOptions && !pool) {\n      throw new Error(\n        \"You must provide either a `postgresConnectionOptions` object or a `pool` instance.\"\n      );\n    }\n    this.pool = pool ?? new pg.Pool(postgresConnectionOptions);\n    this.tableName = tableName || \"upsertion_records\";\n    this.finalTableName = config.schema\n      ? `\"${config.schema}\".\"${this.tableName}\"`\n      : `\"${this.tableName}\"`;\n  }\n\n  async createSchema(): Promise<void> {\n    try {\n      await this.pool.query(`\n        CREATE TABLE IF NOT EXISTS ${this.finalTableName} (\n          uuid UUID PRIMARY KEY DEFAULT gen_random_uuid(),\n          key TEXT NOT NULL,\n          namespace TEXT NOT NULL,\n          updated_at Double PRECISION NOT NULL,\n          group_id TEXT,\n          UNIQUE (key, namespace)\n        );\n        CREATE INDEX IF NOT EXISTS updated_at_index ON ${this.finalTableName} (updated_at);\n        CREATE INDEX IF NOT EXISTS key_index ON ${this.finalTableName} (key);\n        CREATE INDEX IF NOT EXISTS namespace_index ON ${this.finalTableName} (namespace);\n        CREATE INDEX IF NOT EXISTS group_id_index ON ${this.finalTableName} (group_id);`);\n\n      // oxlint-disable-next-line typescript/no-explicit-any\n    } catch (e: any) {\n      // This error indicates that the table already exists\n      // Due to asynchronous nature of the code, it is possible that\n      // the table is created between the time we check if it exists\n      // and the time we try to create it. It can be safely ignored.\n      if (\"code\" in e && e.code === \"23505\") {\n        return;\n      }\n      throw e;\n    }\n  }\n\n  async getTime(): Promise<number> {\n    const res = await this.pool.query(\n      \"SELECT EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) AS extract\"\n    );\n    return Number.parseFloat(res.rows[0].extract);\n  }\n\n  /**\n   * Generates the SQL placeholders for a specific row at the provided index.\n   *\n   * @param index - The index of the row for which placeholders need to be generated.\n   * @param numOfColumns - The number of columns we are inserting data into.\n   * @returns The SQL placeholders for the row values.\n   */\n  private generatePlaceholderForRowAt(\n    index: number,\n    numOfColumns: number\n  ): string {\n    const placeholders = [];\n    for (let i = 0; i < numOfColumns; i += 1) {\n      placeholders.push(`$${index * numOfColumns + i + 1}`);\n    }\n    return `(${placeholders.join(\", \")})`;\n  }\n\n  async update(keys: string[], updateOptions?: UpdateOptions): Promise<void> {\n    if (keys.length === 0) {\n      return;\n    }\n\n    const updatedAt = await this.getTime();\n    const { timeAtLeast, groupIds: _groupIds } = updateOptions ?? {};\n\n    if (timeAtLeast && updatedAt < timeAtLeast) {\n      throw new Error(\n        `Time sync issue with database ${updatedAt} < ${timeAtLeast}`\n      );\n    }\n\n    const groupIds = _groupIds ?? keys.map(() => null);\n\n    if (groupIds.length !== keys.length) {\n      throw new Error(\n        `Number of keys (${keys.length}) does not match number of group_ids ${groupIds.length})`\n      );\n    }\n\n    const recordsToUpsert = keys.map((key, i) => [\n      key,\n      this.namespace,\n      updatedAt,\n      groupIds[i],\n    ]);\n\n    const valuesPlaceholders = recordsToUpsert\n      .map((_, j) =>\n        this.generatePlaceholderForRowAt(j, recordsToUpsert[0].length)\n      )\n      .join(\", \");\n\n    const query = `INSERT INTO ${this.finalTableName} (key, namespace, updated_at, group_id) VALUES ${valuesPlaceholders} ON CONFLICT (key, namespace) DO UPDATE SET updated_at = EXCLUDED.updated_at;`;\n    await this.pool.query(query, recordsToUpsert.flat());\n  }\n\n  async exists(keys: string[]): Promise<boolean[]> {\n    if (keys.length === 0) {\n      return [];\n    }\n\n    const startIndex = 2;\n    const arrayPlaceholders = keys\n      .map((_, i) => `$${i + startIndex}`)\n      .join(\", \");\n\n    const query = `\n      WITH ordered_keys AS (\n        SELECT * FROM unnest(ARRAY[${arrayPlaceholders}]) WITH ORDINALITY as t(key, o)\n      )\n      SELECT ok.key, (r.key IS NOT NULL) ex\n      FROM ordered_keys ok \n      LEFT JOIN ${this.finalTableName} r \n      ON r.key = ok.key \n      AND namespace = $1\n      ORDER BY ok.o;\n      `;\n    const res = await this.pool.query(query, [this.namespace, ...keys.flat()]);\n    return res.rows.map((row: { ex: boolean }) => row.ex);\n  }\n\n  async listKeys(options?: ListKeyOptions): Promise<string[]> {\n    const { before, after, limit, groupIds } = options ?? {};\n\n    let query = `SELECT key FROM ${this.finalTableName} WHERE namespace = $1`;\n    const values: (string | number | (string | null)[])[] = [this.namespace];\n\n    let index = 2;\n    if (before) {\n      values.push(before);\n      query += ` AND updated_at < $${index}`;\n      index += 1;\n    }\n\n    if (after) {\n      values.push(after);\n      query += ` AND updated_at > $${index}`;\n      index += 1;\n    }\n\n    if (limit) {\n      values.push(limit);\n      query += ` LIMIT $${index}`;\n      index += 1;\n    }\n\n    if (groupIds) {\n      values.push(groupIds);\n      query += ` AND group_id = ANY($${index})`;\n      index += 1;\n    }\n\n    query += \";\";\n    const res = await this.pool.query(query, values);\n    return res.rows.map((row: { key: string }) => row.key);\n  }\n\n  async deleteKeys(keys: string[]): Promise<void> {\n    if (keys.length === 0) {\n      return;\n    }\n\n    const query = `DELETE FROM ${this.finalTableName} WHERE namespace = $1 AND key = ANY($2);`;\n    await this.pool.query(query, [this.namespace, keys]);\n  }\n\n  /**\n   * Terminates the connection pool.\n   * @returns {Promise<void>}\n   */\n  async end(): Promise<void> {\n    await this.pool.end();\n  }\n}\n"],"mappings":";;;;;;AAcA,IAAa,wBAAb,MAAqE;CACnE,eAAe;EAAC;EAAa;EAAkB;EAAW;CAE1D;CAEA;CAEA;CAEA;CAEA,YAAY,WAAmB,QAAsC;EACnE,MAAM,EAAE,2BAA2B,WAAW,SAAS;AACvD,OAAK,YAAY;AACjB,MAAI,CAAC,6BAA6B,CAAC,KACjC,OAAM,IAAI,MACR,qFACD;AAEH,OAAK,OAAO,QAAQ,IAAI,GAAA,QAAG,KAAK,0BAA0B;AAC1D,OAAK,YAAY,aAAa;AAC9B,OAAK,iBAAiB,OAAO,SACzB,IAAI,OAAO,OAAO,KAAK,KAAK,UAAU,KACtC,IAAI,KAAK,UAAU;;CAGzB,MAAM,eAA8B;AAClC,MAAI;AACF,SAAM,KAAK,KAAK,MAAM;qCACS,KAAK,eAAe;;;;;;;;yDAQA,KAAK,eAAe;kDAC3B,KAAK,eAAe;wDACd,KAAK,eAAe;uDACrB,KAAK,eAAe,cAAc;WAG5E,GAAQ;AAKf,OAAI,UAAU,KAAK,EAAE,SAAS,QAC5B;AAEF,SAAM;;;CAIV,MAAM,UAA2B;EAC/B,MAAM,MAAM,MAAM,KAAK,KAAK,MAC1B,0DACD;AACD,SAAO,OAAO,WAAW,IAAI,KAAK,GAAG,QAAQ;;;;;;;;;CAU/C,4BACE,OACA,cACQ;EACR,MAAM,eAAe,EAAE;AACvB,OAAK,IAAI,IAAI,GAAG,IAAI,cAAc,KAAK,EACrC,cAAa,KAAK,IAAI,QAAQ,eAAe,IAAI,IAAI;AAEvD,SAAO,IAAI,aAAa,KAAK,KAAK,CAAC;;CAGrC,MAAM,OAAO,MAAgB,eAA8C;AACzE,MAAI,KAAK,WAAW,EAClB;EAGF,MAAM,YAAY,MAAM,KAAK,SAAS;EACtC,MAAM,EAAE,aAAa,UAAU,cAAc,iBAAiB,EAAE;AAEhE,MAAI,eAAe,YAAY,YAC7B,OAAM,IAAI,MACR,iCAAiC,UAAU,KAAK,cACjD;EAGH,MAAM,WAAW,aAAa,KAAK,UAAU,KAAK;AAElD,MAAI,SAAS,WAAW,KAAK,OAC3B,OAAM,IAAI,MACR,mBAAmB,KAAK,OAAO,uCAAuC,SAAS,OAAO,GACvF;EAGH,MAAM,kBAAkB,KAAK,KAAK,KAAK,MAAM;GAC3C;GACA,KAAK;GACL;GACA,SAAS;GACV,CAAC;EAEF,MAAM,qBAAqB,gBACxB,KAAK,GAAG,MACP,KAAK,4BAA4B,GAAG,gBAAgB,GAAG,OAAO,CAC/D,CACA,KAAK,KAAK;EAEb,MAAM,QAAQ,eAAe,KAAK,eAAe,iDAAiD,mBAAmB;AACrH,QAAM,KAAK,KAAK,MAAM,OAAO,gBAAgB,MAAM,CAAC;;CAGtD,MAAM,OAAO,MAAoC;AAC/C,MAAI,KAAK,WAAW,EAClB,QAAO,EAAE;EAGX,MAAM,aAAa;EAKnB,MAAM,QAAQ;;qCAJY,KACvB,KAAK,GAAG,MAAM,IAAI,IAAI,aAAa,CACnC,KAAK,KAAK,CAIsC;;;;kBAIrC,KAAK,eAAe;;;;;AAMlC,UADY,MAAM,KAAK,KAAK,MAAM,OAAO,CAAC,KAAK,WAAW,GAAG,KAAK,MAAM,CAAC,CAAC,EAC/D,KAAK,KAAK,QAAyB,IAAI,GAAG;;CAGvD,MAAM,SAAS,SAA6C;EAC1D,MAAM,EAAE,QAAQ,OAAO,OAAO,aAAa,WAAW,EAAE;EAExD,IAAI,QAAQ,mBAAmB,KAAK,eAAe;EACnD,MAAM,SAAkD,CAAC,KAAK,UAAU;EAExE,IAAI,QAAQ;AACZ,MAAI,QAAQ;AACV,UAAO,KAAK,OAAO;AACnB,YAAS,sBAAsB;AAC/B,YAAS;;AAGX,MAAI,OAAO;AACT,UAAO,KAAK,MAAM;AAClB,YAAS,sBAAsB;AAC/B,YAAS;;AAGX,MAAI,OAAO;AACT,UAAO,KAAK,MAAM;AAClB,YAAS,WAAW;AACpB,YAAS;;AAGX,MAAI,UAAU;AACZ,UAAO,KAAK,SAAS;AACrB,YAAS,wBAAwB,MAAM;AACvC,YAAS;;AAGX,WAAS;AAET,UADY,MAAM,KAAK,KAAK,MAAM,OAAO,OAAO,EACrC,KAAK,KAAK,QAAyB,IAAI,IAAI;;CAGxD,MAAM,WAAW,MAA+B;AAC9C,MAAI,KAAK,WAAW,EAClB;EAGF,MAAM,QAAQ,eAAe,KAAK,eAAe;AACjD,QAAM,KAAK,KAAK,MAAM,OAAO,CAAC,KAAK,WAAW,KAAK,CAAC;;;;;;CAOtD,MAAM,MAAqB;AACzB,QAAM,KAAK,KAAK,KAAK"}