{"version":3,"file":"cassandra.cjs","names":["BaseStore","CassandraTable"],"sources":["../../src/storage/cassandra.ts"],"sourcesContent":["import { BaseStore } from \"@langchain/core/stores\";\n\nimport {\n  CassandraClientArgs,\n  Column,\n  Filter,\n  CassandraTable,\n} from \"../utils/cassandra.js\";\n\n/**\n * Configuration options for initializing a CassandraKVStore.\n * These options extend generic Cassandra client arguments with specific settings\n * for key-value store operations.\n *\n * @interface CassandraKVOptions\n * @extends CassandraClientArgs Custom arguments for the Cassandra client, such as connection settings.\n *\n * @property {string} keyspace The name of the Cassandra keyspace to be used by the key-value store.\n *                The keyspace must exist.\n *\n * @property {string} table The name of the table within the specified keyspace dedicated to storing\n *                key-value pairs. The table will be created if it does not exist.\n *\n * @property {string} [keyDelimiter=\"/\"] An optional delimiter used to structure complex keys. Defaults to '/'.\n *                This delimiter is used for parsing complex keys (e.g., hierarchical keys) when performing\n *                operations that involve key prefixes or segmentation.\n */\nexport interface CassandraKVOptions extends CassandraClientArgs {\n  keyspace: string;\n  table: string;\n  keyDelimiter?: string;\n}\n\n/**\n * A concrete implementation of BaseStore for interacting with a Cassandra database.\n * It provides methods to get, set, delete, and yield keys based on specified criteria.\n */\nexport class CassandraKVStore extends BaseStore<string, Uint8Array> {\n  lc_namespace = [\"langchain\", \"storage\"];\n\n  private cassandraTable: CassandraTable;\n\n  private options: CassandraKVOptions;\n\n  private colKey: Column;\n\n  private colKeyMap: Column;\n\n  private colVal: Column;\n\n  private keyDelimiter: string;\n\n  protected inClauseSize = 1000;\n\n  protected yieldKeysFetchSize = 5000;\n\n  constructor(options: CassandraKVOptions) {\n    super(options);\n    this.options = options;\n    this.colKey = { name: \"key\", type: \"text\", partition: true };\n    this.colKeyMap = { name: \"key_map\", type: \"map<tinyint,text>\" };\n    this.colVal = { name: \"val\", type: \"blob\" };\n    this.keyDelimiter = options.keyDelimiter || \"/\";\n  }\n\n  /**\n   * Retrieves the values associated with an array of keys from the Cassandra database.\n   * It chunks requests for large numbers of keys to manage performance and Cassandra limitations.\n   * @param keys An array of keys for which to retrieve values.\n   * @returns A promise that resolves with an array of Uint8Array or undefined, corresponding to each key.\n   */\n  async mget(keys: string[]): Promise<(Uint8Array | undefined)[]> {\n    await this.ensureTable();\n\n    const processFunction = async (\n      chunkKeys: string[]\n    ): Promise<(Uint8Array | undefined)[]> => {\n      const chunkResults = await this.cassandraTable.select(\n        [this.colKey, this.colVal],\n        [{ name: this.colKey.name, operator: \"IN\", value: chunkKeys }]\n      );\n\n      const useMap = chunkKeys.length > 25;\n      const rowsMap = useMap\n        ? new Map(chunkResults.rows.map((row) => [row[this.colKey.name], row]))\n        : null;\n\n      return chunkKeys.map((key) => {\n        const row =\n          useMap && rowsMap\n            ? rowsMap.get(key)\n            : chunkResults.rows.find((row) => row[this.colKey.name] === key);\n        if (row && row[this.colVal.name]) {\n          const buffer = row[this.colVal.name];\n          return new Uint8Array(\n            buffer.buffer,\n            buffer.byteOffset,\n            buffer.byteLength\n          );\n        }\n        return undefined;\n      });\n    };\n\n    const result = await this.processInChunks<Uint8Array | undefined>(\n      keys,\n      processFunction\n    );\n    return result || [];\n  }\n\n  /**\n   * Sets multiple key-value pairs in the Cassandra database.\n   * Each key-value pair is processed to ensure compatibility with Cassandra's storage requirements.\n   * @param keyValuePairs An array of key-value pairs to set in the database.\n   * @returns A promise that resolves when all key-value pairs have been set.\n   */\n  async mset(keyValuePairs: [string, Uint8Array][]): Promise<void> {\n    await this.ensureTable();\n\n    const values = keyValuePairs.map(([key, value]) => {\n      const keySegments = key.split(this.keyDelimiter);\n      const keyMap = keySegments.reduce<Record<number, string>>(\n        (acc, segment, index) => {\n          acc[index] = segment;\n          return acc;\n        },\n        {}\n      );\n\n      const bufferValue = Buffer.from(\n        value.buffer,\n        value.byteOffset,\n        value.byteLength\n      );\n\n      return [key, keyMap, bufferValue];\n    });\n\n    await this.cassandraTable.upsert(values, [\n      this.colKey,\n      this.colKeyMap,\n      this.colVal,\n    ]);\n  }\n\n  /**\n   * Deletes multiple keys and their associated values from the Cassandra database.\n   * @param keys An array of keys to delete from the database.\n   * @returns A promise that resolves when all specified keys have been deleted.\n   */\n  async mdelete(keys: string[]): Promise<void> {\n    if (keys.length > 0) {\n      await this.ensureTable();\n\n      const processFunction = async (chunkKeys: string[]): Promise<void> => {\n        const filter: Filter = {\n          name: this.colKey.name,\n          operator: \"IN\",\n          value: chunkKeys,\n        };\n        await this.cassandraTable.delete(filter);\n      };\n\n      await this.processInChunks(keys, processFunction);\n    }\n  }\n\n  /**\n   * Yields keys from the Cassandra database optionally based on a prefix, based\n   * on the store's keyDelimiter. This method pages through results efficiently\n   * for large datasets.\n   * @param prefix An optional prefix to filter the keys to be yielded.\n   * @returns An async generator that yields keys from the database.\n   */\n  async *yieldKeys(prefix?: string): AsyncGenerator<string> {\n    await this.ensureTable();\n\n    const filter: Filter[] = [];\n\n    if (prefix) {\n      let segments = prefix.split(this.keyDelimiter);\n\n      // Remove the last segment only if it is empty (due to a trailing delimiter)\n      if (segments[segments.length - 1] === \"\") {\n        segments = segments.slice(0, -1);\n      }\n\n      segments.forEach((segment, index) => {\n        filter.push({\n          name: `${this.colKeyMap.name}[${index}]`,\n          operator: \"=\",\n          value: segment,\n        });\n      });\n    }\n\n    let currentPageState;\n    do {\n      const results = await this.cassandraTable.select(\n        [this.colKey],\n        filter,\n        undefined, // orderBy\n        undefined, // limit\n        false, // allowFiltering\n        this.yieldKeysFetchSize,\n        currentPageState\n      );\n\n      for (const row of results.rows) {\n        yield row[this.colKey.name];\n      }\n\n      currentPageState = results.pageState;\n    } while (currentPageState);\n  }\n\n  /**\n   * Ensures the Cassandra table is initialized and ready for operations.\n   * This method is called internally before database operations.\n   * @returns A promise that resolves when the table is ensured to exist and be accessible.\n   */\n  private async ensureTable(): Promise<void> {\n    if (this.cassandraTable) {\n      return;\n    }\n\n    const tableConfig = {\n      ...this.options,\n      primaryKey: [this.colKey],\n      nonKeyColumns: [this.colKeyMap, this.colVal],\n      indices: [\n        {\n          name: this.colKeyMap.name,\n          value: `( ENTRIES (${this.colKeyMap.name}))`,\n        },\n      ],\n    };\n\n    this.cassandraTable = await new CassandraTable(tableConfig);\n  }\n\n  /**\n   * Processes an array of keys in chunks, applying a given processing function to each chunk.\n   * This method is designed to handle large sets of keys by breaking them down into smaller\n   * manageable chunks, applying the processing function to each chunk sequentially. This approach\n   * helps in managing resource utilization and adhering to database query limitations.\n   *\n   * The method is generic, allowing for flexible processing functions that can either perform actions\n   * without returning a result (e.g., deletion operations) or return a result (e.g., data retrieval).\n   * This design enables the method to be used across a variety of batch processing scenarios.\n   *\n   * @template T The type of elements in the result array when the processFunction returns data. This\n   *             is used to type the resolution of the promise returned by processFunction. For void\n   *             operations, T can be omitted or set to any empty interface or null type.\n   * @param keys The complete array of keys to be processed. The method chunks this array\n   *             based on the specified CHUNK_SIZE.\n   * @param processFunction A function that will be applied to each chunk of keys. This function\n   *                        should accept an array of strings (chunkKeys) and return a Promise\n   *                        that resolves to either void (for operations that don't produce a result,\n   *                        like deletion) or an array of type T (for operations that fetch data,\n   *                        like retrieval). The array of type T should match the template parameter.\n   * @param CHUNK_SIZE (optional) The maximum size of each chunk. If not specified, the class's\n   *                   `inClauseSize` property is used as the default chunk size. This value determines\n   *                   how many keys are included in each chunk and should be set based on the\n   *                   operation's performance characteristics and any limitations of the underlying\n   *                   storage system.\n   *\n   * @returns A Promise that resolves to void if the processing function returns void, or an array\n   *          of type T if the processing function returns data. If the processing function returns\n   *          data for each chunk, the results from all chunks are concatenated and returned as a\n   *          single array. If the processing function does not return data, the method resolves to undefined,\n   *          aligning with the void return expectation for non-data-returning operations.\n   */\n  private async processInChunks<T>(\n    keys: string[],\n    processFunction: (chunkKeys: string[]) => Promise<T[] | void>,\n    CHUNK_SIZE: number = this.inClauseSize\n  ): Promise<T[] | void> {\n    let results: T[] = [];\n    for (let i = 0; i < keys.length; i += CHUNK_SIZE) {\n      const chunkKeys = keys.slice(i, i + CHUNK_SIZE);\n      const chunkResult: T[] | void = await processFunction(chunkKeys);\n      if (Array.isArray(chunkResult)) {\n        results = results.concat(chunkResult);\n      }\n    }\n\n    return results.length > 0 ? results : undefined;\n  }\n}\n"],"mappings":";;;;;;;;;;AAqCA,IAAa,mBAAb,cAAsCA,uBAAAA,UAA8B;CAClE,eAAe,CAAC,aAAa,UAAU;CAEvC;CAEA;CAEA;CAEA;CAEA;CAEA;CAEA,eAAyB;CAEzB,qBAA+B;CAE/B,YAAY,SAA6B;AACvC,QAAM,QAAQ;AACd,OAAK,UAAU;AACf,OAAK,SAAS;GAAE,MAAM;GAAO,MAAM;GAAQ,WAAW;GAAM;AAC5D,OAAK,YAAY;GAAE,MAAM;GAAW,MAAM;GAAqB;AAC/D,OAAK,SAAS;GAAE,MAAM;GAAO,MAAM;GAAQ;AAC3C,OAAK,eAAe,QAAQ,gBAAgB;;;;;;;;CAS9C,MAAM,KAAK,MAAqD;AAC9D,QAAM,KAAK,aAAa;EAExB,MAAM,kBAAkB,OACtB,cACwC;GACxC,MAAM,eAAe,MAAM,KAAK,eAAe,OAC7C,CAAC,KAAK,QAAQ,KAAK,OAAO,EAC1B,CAAC;IAAE,MAAM,KAAK,OAAO;IAAM,UAAU;IAAM,OAAO;IAAW,CAAC,CAC/D;GAED,MAAM,SAAS,UAAU,SAAS;GAClC,MAAM,UAAU,SACZ,IAAI,IAAI,aAAa,KAAK,KAAK,QAAQ,CAAC,IAAI,KAAK,OAAO,OAAO,IAAI,CAAC,CAAC,GACrE;AAEJ,UAAO,UAAU,KAAK,QAAQ;IAC5B,MAAM,MACJ,UAAU,UACN,QAAQ,IAAI,IAAI,GAChB,aAAa,KAAK,MAAM,QAAQ,IAAI,KAAK,OAAO,UAAU,IAAI;AACpE,QAAI,OAAO,IAAI,KAAK,OAAO,OAAO;KAChC,MAAM,SAAS,IAAI,KAAK,OAAO;AAC/B,YAAO,IAAI,WACT,OAAO,QACP,OAAO,YACP,OAAO,WACR;;KAGH;;AAOJ,SAJe,MAAM,KAAK,gBACxB,MACA,gBACD,IACgB,EAAE;;;;;;;;CASrB,MAAM,KAAK,eAAsD;AAC/D,QAAM,KAAK,aAAa;EAExB,MAAM,SAAS,cAAc,KAAK,CAAC,KAAK,WAAW;AAgBjD,UAAO;IAAC;IAfY,IAAI,MAAM,KAAK,aAAa,CACrB,QACxB,KAAK,SAAS,UAAU;AACvB,SAAI,SAAS;AACb,YAAO;OAET,EAAE,CACH;IAEmB,OAAO,KACzB,MAAM,QACN,MAAM,YACN,MAAM,WACP;IAEgC;IACjC;AAEF,QAAM,KAAK,eAAe,OAAO,QAAQ;GACvC,KAAK;GACL,KAAK;GACL,KAAK;GACN,CAAC;;;;;;;CAQJ,MAAM,QAAQ,MAA+B;AAC3C,MAAI,KAAK,SAAS,GAAG;AACnB,SAAM,KAAK,aAAa;GAExB,MAAM,kBAAkB,OAAO,cAAuC;IACpE,MAAM,SAAiB;KACrB,MAAM,KAAK,OAAO;KAClB,UAAU;KACV,OAAO;KACR;AACD,UAAM,KAAK,eAAe,OAAO,OAAO;;AAG1C,SAAM,KAAK,gBAAgB,MAAM,gBAAgB;;;;;;;;;;CAWrD,OAAO,UAAU,QAAyC;AACxD,QAAM,KAAK,aAAa;EAExB,MAAM,SAAmB,EAAE;AAE3B,MAAI,QAAQ;GACV,IAAI,WAAW,OAAO,MAAM,KAAK,aAAa;AAG9C,OAAI,SAAS,SAAS,SAAS,OAAO,GACpC,YAAW,SAAS,MAAM,GAAG,GAAG;AAGlC,YAAS,SAAS,SAAS,UAAU;AACnC,WAAO,KAAK;KACV,MAAM,GAAG,KAAK,UAAU,KAAK,GAAG,MAAM;KACtC,UAAU;KACV,OAAO;KACR,CAAC;KACF;;EAGJ,IAAI;AACJ,KAAG;GACD,MAAM,UAAU,MAAM,KAAK,eAAe,OACxC,CAAC,KAAK,OAAO,EACb,QACA,KAAA,GACA,KAAA,GACA,OACA,KAAK,oBACL,iBACD;AAED,QAAK,MAAM,OAAO,QAAQ,KACxB,OAAM,IAAI,KAAK,OAAO;AAGxB,sBAAmB,QAAQ;WACpB;;;;;;;CAQX,MAAc,cAA6B;AACzC,MAAI,KAAK,eACP;AAeF,OAAK,iBAAiB,MAAM,IAAIC,wBAAAA,eAZZ;GAClB,GAAG,KAAK;GACR,YAAY,CAAC,KAAK,OAAO;GACzB,eAAe,CAAC,KAAK,WAAW,KAAK,OAAO;GAC5C,SAAS,CACP;IACE,MAAM,KAAK,UAAU;IACrB,OAAO,cAAc,KAAK,UAAU,KAAK;IAC1C,CACF;GACF,CAE0D;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;CAmC7D,MAAc,gBACZ,MACA,iBACA,aAAqB,KAAK,cACL;EACrB,IAAI,UAAe,EAAE;AACrB,OAAK,IAAI,IAAI,GAAG,IAAI,KAAK,QAAQ,KAAK,YAAY;GAEhD,MAAM,cAA0B,MAAM,gBADpB,KAAK,MAAM,GAAG,IAAI,WAAW,CACiB;AAChE,OAAI,MAAM,QAAQ,YAAY,CAC5B,WAAU,QAAQ,OAAO,YAAY;;AAIzC,SAAO,QAAQ,SAAS,IAAI,UAAU,KAAA"}