好的，没问题。我们来整理一份关于 `@flowlab/data` 库到目前为止已完成功能的中文文档，并包含带有中文注释的代码示例，以便于理解。

---

# @flowlab/data 功能文档 (当前版本)

## 1. 引言与目标

`@flowlab/data` 是一个用于 Node.js 环境下的数据流处理与管理库，基于 TypeScript 构建，提供强类型支持。其核心目标是：

* **简化 ETL 流程**：提供清晰、可组合的 API 来定义和执行数据抽取 (Extract)、转换 (Transform)、加载 (Load) 的完整流程。
* **数据处理能力**：内置数据清洗、格式转换、字段映射、数据校验等常用数据处理功能。
* **灵活性与可扩展性**：支持多种数据源和目标，并允许用户轻松扩展自定义的处理逻辑或组件。
* **配置化管理**：支持通过 JSON 或 YAML 文件定义复杂的数据处理流程，便于管理和部署。
* **健壮性**：包含错误处理、重试机制、死信队列（DLQ）支持，提高流程稳定性。
* **高性能**：支持批量处理和并发执行，以提高处理效率。
* **FlowLab 生态集成**：设计为可以作为节点集成到 `@flowlab/core` 工作流引擎中。

## 2. 核心概念

* **`DataPipeline` (数据管道)**: 这是编排和执行整个数据处理流程的核心类。你可以通过链式调用其方法来定义数据如何从源头流向目标。
* **`Extractor` (抽取器)**: 负责从特定数据源（如数据库、文件、API）读取数据的组件。
* **`Transformer` (转换器)**: 负责对流经管道的数据进行结构或内容修改的组件。
* **`Cleaner` (清洗器)**: 一种特殊的转换器，专注于数据清洗任务，如处理无效值、填充缺失值、数据校验等。
* **`Loader` (加载器)**: 负责将处理完成的数据写入到目标存储（如数据库、文件、API）的组件。
* **`Registry` (组件注册表)**: 一个内部机制，允许通过类型名称（字符串）查找并实例化对应的 Extractor, Transformer, Cleaner, Loader。这使得通过配置文件定义流程成为可能。
* **`Configuration` (配置)**: 可以使用 JSON 或 YAML 文件来描述一个完整的 `DataPipeline`，包括使用哪个 Extractor、执行哪些转换/清洗步骤、使用哪个 Loader，以及相关的配置参数。
* **`Context` (上下文)**: 在管道执行过程中传递的对象，包含日志记录器 (Logger)、运行 ID、FlowLab 上下文（如果集成）等信息，可供各个组件访问。

## 3. 已实现的主要功能

以下是 `@flowlab/data` 库当前版本已实现的关键功能：

### 3.1. 核心管道 (`DataPipeline`)

* **链式 API 构建**: 提供 `.extract().clean().transform().load()` 等方法，以流式接口定义数据处理步骤。
* **流程执行**: `run()` 方法启动整个管道，处理数据从抽取到加载的全过程。`execute()` 方法（实现自 `@flowlab/core` 的 `IFlowNode` 接口，*基于假设*）允许管道作为 FlowLab 节点被执行。
* **错误处理集成**: 内置了对 Extractor 和 Loader 操作的重试机制，并根据配置处理数据处理步骤中的单项错误。
* **并发处理**: 支持通过 `concurrency` 配置项，利用 `p-limit` 库并行执行数据清洗和转换步骤，提高效率。
* **上下文管理**: 创建并传递 `PipelineContext` 给每个处理步骤。

```typescript
// 示例：使用链式 API 构建管道 (带中文注释)
import { DataPipeline, /*...*/ } from '@flowlab/data';

const pipeline = new DataPipeline('my-simple-pipeline');

pipeline
  .configure({ batchSize: 50, concurrency: 4 }) // 配置管道选项：批处理大小50，并发度4
  .extract(new SomeExtractor(/* extractor config */)) // 第一步：使用某个抽取器抽取数据
  .clean(new SomeCleaner(/* cleaner config */))     // 第二步：进行数据清洗
  .transform(new SomeTransformer(/* transformer config */)) // 第三步：进行数据转换
  .load(new SomeLoader(/* loader config */));      // 第四步：加载数据到目标

// 异步执行管道
// pipeline.run().catch(error => console.error("管道执行失败:", error));
```

### 3.2. 配置加载 (`loadPipelineFromConfig`)

* **文件支持**: 可以从 `.json`, `.yaml`, 或 `.yml` 文件加载管道定义。
* **动态实例化**: 利用组件注册表 (`registry`)，根据配置文件中指定的 `type` 字符串和相关配置，自动创建 Extractor, Transformer, Cleaner, Loader 实例。
* **易于管理**: 将复杂的流程定义移出代码，便于非开发人员理解和修改，也方便版本控制和部署。

```yaml
# 示例：pipeline-config.yaml 文件
id: user-sync-pipeline
source:
  type: mongodb # 指定抽取器类型
  connection: # 连接信息（需要代码中提供实际的 Db 实例）
    # db: mongoDbInstance # 假设在代码中注入
    collection: source_users
  query: # MongoDB 查询条件
    status: active
options: # 管道运行选项
  batchSize: 100
  concurrency: 8
  errorHandling:
    retries: 2 # Extractor/Loader 重试次数
    delay: 1500 # 重试间隔 (ms)
    itemProcessingErrorStrategy: dlq # 单项处理错误策略：发送到死信队列
    dlqTarget: # 死信队列目标配置
      type: file-dlq # 使用文件作为 DLQ
      path: ./logs/user-sync-failed.jsonl
steps:
  - id: clean-email # 清洗步骤 ID (可选)
    type: clean # 步骤类型：清洗
    cleaner: required-fields # 使用已注册的 'required-fields' 清洗器
    # 清洗器的特定配置 (假设 RequiredFieldCleaner 接受此配置)
    config:
        fields: ['email']
  - id: transform-structure
    type: transform # 步骤类型：转换
    transformer: mapping # 使用已注册的 'mapping' 转换器
    rules: # 映射规则
      - sourcePath: _id
        targetPath: legacyId
      - sourcePath: email
        targetPath: contact.emailAddress
      - sourcePath: name
        targetPath: fullName
        defaultValue: 'N/A'
  - id: validate-output
    type: validate # 步骤类型：校验
    transformer: validator # 使用已注册的 'validator' 转换器
    schema: './schemas/targetUserSchema.js' # 指向导出 Zod Schema 的文件路径
    onFailure: error # 校验失败则抛出错误 (并由 errorHandling.itemProcessingErrorStrategy 处理)
target:
  type: postgresql # 指定加载器类型
  connection: # 连接信息 (需要代码中提供实际的 Prisma Client 实例)
    # client: prismaClientInstance # 假设在代码中注入
    model: TargetUser
  operation: upsert # 加载操作：更新或插入
  upsertWhereField: emailAddress # Upsert 时用于查找记录的字段
```

```typescript
// 示例：从配置文件加载并运行管道 (带中文注释)
import { loadPipelineFromConfig } from '@flowlab/data';
import { Db } from 'mongodb'; // 假设已有 MongoDB 连接实例
import { PrismaClient } from '@prisma/client'; // 假设已有 Prisma 客户端实例

async function runFromConfig(mongoDbInstance: Db, prismaClientInstance: PrismaClient) {
  try {
    // 注意：实际使用中，数据库连接实例需要通过某种方式注入到配置中
    // 这通常在加载配置后，手动设置到 extractor/loader 实例上，或通过更高级的依赖注入实现
    // 这里仅作演示，假设 registry 或配置加载能处理连接注入
    const pipeline = await loadPipelineFromConfig('./path/to/pipeline-config.yaml');

    // --- 依赖注入示例 (如果配置中不能直接包含实例) ---
    // const extractor = pipeline.getExtractor(); // 假设有方法获取组件实例
    // if (extractor instanceof MongoExtractor) {
    //     extractor.setDbConnection(mongoDbInstance); // 假设有设置连接的方法
    // }
    // const loader = pipeline.getLoader();
    // if (loader instanceof PrismaLoader) {
    //     loader.setPrismaConnection(prismaClientInstance);
    // }
    // --- 依赖注入示例结束 ---


    console.log(`管道 ${pipeline.id} 从配置加载成功，开始执行...`);
    await pipeline.run(); // 执行从文件配置加载的管道
    console.log(`管道 ${pipeline.id} 执行完毕。`);

  } catch (error) {
    console.error("从配置运行管道失败:", error);
  }
}
```

### 3.3. 组件注册表 (`registry`)

* **解耦配置与实现**: 允许配置文件通过简单的字符串（如 `"mongodb"`, `"mapping"`, `"file"`）来引用具体的组件实现类。
* **可扩展性**: 用户可以通过 `registerExtractor`, `registerTransformer` 等函数注册自己的定制组件，使其能在配置文件中使用。
* **内置组件注册**: 库本身在加载时会自动注册所有内置的组件。

### 3.4. 数据抽取器 (Extractors) - 已实现

* `PrismaExtractor`: 连接 PostgreSQL/MySQL (需提供 `PrismaClient` 实例)，支持模型名称和查询参数，使用游标分页读取。
* `MongoExtractor`: 连接 MongoDB (需提供 `Db` 实例和集合名称)，支持查询条件 (`filter`)，使用游标读取。
* `RedisExtractor`: 连接 Redis (需提供 `ioredis` 实例)，使用 `SCAN` 命令迭代符合 `redisScanMatch` 模式的 Key，支持读取 `string`, `hash`, `list` 类型的值。
* `FileExtractor`: 从本地文件读取，支持 JSON Lines (`.jsonl`)、纯文本 (`.txt`)、基础逗号分隔值 (`.csv`) 格式，采用流式读取处理大文件。
* `ApiExtractor`: 从 HTTP(S) API 获取数据，支持配置 URL、方法 (GET/POST)、请求头、请求体，主要处理 JSON 响应。

### 3.5. 数据转换与清洗 (Transformers/Cleaners) - 已实现

* `MappingTransformer` (`mapping`): 核心转换器，根据 `rules` 配置灵活地映射源字段到目标字段，支持 `lodash` 风格的嵌套路径、默认值，并允许为特定字段指定 `transform` 函数进行自定义处理。
* `CustomFunctionTransformer` (`custom-function`): 加载一个外部 `.js` 或 `.ts` 文件（需配置 `customFunctionPath`），并使用该文件导出的函数来处理每个数据项。
* `ValidationTransformer` (`validator`): 使用 `zod` 库进行数据校验。需要配置 `schema`（通常是导出 Zod Schema 的文件路径），可配置校验失败时的行为 (`onFailure`)。
* `RequiredFieldCleaner` (`required-fields`): 过滤掉缺少指定的一个或多个必填字段的数据项。
* `DefaultValueCleaner` (`default-values`): 为指定的字段设置默认值，仅当该字段在数据项中为 `null` 或 `undefined` 时生效。

### 3.6. 数据加载器 (Loaders) - 已实现

* `PrismaLoader`: 写入 PostgreSQL/MySQL (需提供 `PrismaClient` 实例和模型名称)，支持 `createMany`（批量创建）和 `upsert`（更新或插入）操作。
* `MongoLoader`: 写入 MongoDB (需提供 `Db` 实例和集合名称)，支持 `insertMany`（批量插入）和 `bulkWrite`（执行多种写操作，需提供 `bulkWriteOperations` 函数生成操作指令）。
* `RedisLoader`: 写入 Redis (需提供 `ioredis` 实例)，根据配置的 `redisOperation`（如 `"set"`, `"hmset"`, `"lpush"`）执行命令，使用 Redis Pipeline 提高批量写入性能。需要配置 `redisKeyField` 或 `table` 来确定 Key。
* `FileLoader`: 写入本地文件，支持 JSON Lines, Text, 基础 CSV 格式，支持 `overwrite`（覆盖）和 `append`（追加）模式。
* `ApiLoader`: 将数据批量发送到 HTTP(S) API 端点，通常使用 `POST` 或 `PUT` 方法，将整个批次的数据作为 JSON 请求体发送。

### 3.7. 错误处理

* **操作重试**: `Extractor` 和 `Loader` 在执行其主要操作（如连接、读写）失败时，会自动根据配置进行重试 (`retries`, `delay`)。
* **单项处理错误策略**: 在 `DataPipeline` 的 `clean` 或 `transform` 步骤中，如果处理单个数据项时发生错误，可以配置不同的处理策略 (`itemProcessingErrorStrategy`):
    * `'fail'` (默认): 立即停止整个管道执行并报错。
    * `'skip'`: 跳过该错误项，继续处理下一个数据项（不记录错误项内容）。
    * `'log'`: 记录错误和数据项信息，然后跳过该项，继续处理。
    * `'dlq'`: 将错误信息和原始数据项打包发送到一个“死信队列”(Dead Letter Queue) 目标，然后继续处理下一个数据项。
* **死信队列 (DLQ)**:
    * 可通过 `dlqTarget` 配置项指定一个目标（如文件、Kafka 主题等）来接收处理失败的数据项和错误详情。
    * 内置了一个 `FileDlqLoader` (`file-dlq`) 作为示例，可将失败信息写入指定的 JSON Lines 文件。

### 3.8. 并行处理

* **配置并发度**: 通过 `concurrency` 选项设置管道在处理数据项（Clean/Transform 步骤）时允许同时运行的任务数量。
* **性能提升**: 对于 CPU 密集型或包含 I/O 等待（如调用外部 API）的转换步骤，设置合理的并发度可以显著提高整体处理速度。

### 3.9. 测试

* **测试框架**: 项目配置了 Jest 作为测试运行器，使用 `ts-jest` 处理 TypeScript。
* **示例测试**: `tests/` 目录下包含了单元测试（如 `MappingTransformer.test.ts`）和集成测试（如 `FileExtractor.test.ts`, `DataPipeline.test.ts` 使用 Mocks）的示例，展示了如何为组件和管道编写测试。

### 3.10. FlowLab 集成 (基于对 `@flowlab/core` 的假设)

* **管道作为节点**: `DataPipeline` 类被设计为实现了（假设的）`@flowlab/core` 的 `IFlowNode` 接口，可以通过 `execute` 方法被 FlowLab 引擎调用。配置可以通过 `getConfigSchema` 定义，并通过 `execute` 的 `payload` 或构造函数传入。
* **组件作为节点**: 提供了适配器类（如 `ExtractorNode`）的示例，允许将单个 Extractor, Loader, Transformer/Cleaner 包装成独立的 FlowLab 节点，增加了流程编排的灵活性。
* **节点注册**: 提供了 `registerFlowLabDataNodes` 函数，用于将 `DataPipeline` 节点类型和各组件节点类型注册到（假设的）`@flowlab/core` 的 `FlowRegistry` 中。

## 4. 待完成/待完善

* **流式处理**: `KafkaAdapter` 和 `RedisStreamAdapter` 的核心消费与处理逻辑需要完全实现。
* **全面测试**: 需要为所有组件、配置选项、错误场景和并发场景编写更广泛的单元测试和集成测试。
* **文档完善**: 需要编写详细的 README、API 文档和更多使用示例。
* **FlowLab 集成细节**: 需要根据 `@flowlab/core` 的实际 API 调整接口实现、配置传递和上下文使用。
* **错误处理细化**: 完善批量加载时的部分失败处理机制。
* **性能优化**: 进行基准测试和潜在的性能调优。

---

希望这份文档能帮助你更好地理解 `@flowlab/data` 当前的功能实现情况！