## 数据处理与审批工作流

主要组件包括：

1. **核心部分**:
   - FlowLab: 入口类，提供创建工作流和注册节点的能力
   - Workflow: 工作流实例，支持添加步骤、条件分支和执行

2. **引擎部分**:
   - NodeRegistry: 节点注册中心
   - StatusTracker: 状态跟踪器
   - Scheduler: 调度器
   - ErrorHandler: 错误处理器
   - Runner: 运行器

3. **节点部分**:
   - BaseNode: 基础节点
   - ConditionNode: 条件节点
   - SubWorkflowNode: 子工作流节点

4. **持久化部分**:
   - Persistence: 持久化接口
   - MemoryPersistence: 内存持久化实现
   - RedisPersistence: Redis持久化实现

我将构建一个包含所有这些组件的API文档，然后创建一个全面的使用示例。</think># FlowLab API 文档

根据您提供的代码文件，我对 FlowLab 工作流系统进行了详细分析。下面是完整的 API 文档（中文版）及使用示例。

<!-- MARK: Core
 -->
## 1. 核心模块 (Core)


<!-- MARK: FlowLab
 -->
### 1.1 FlowLab 类

FlowLab 是整个工作流系统的入口类，提供创建工作流和注册节点的核心能力。

```typescript
class FlowLab {
  /**
   * 创建一个新的工作流实例
   * @param name 工作流名称
   * @returns 返回一个新的 Workflow 实例
   */
  static create(name: string): Workflow;

  /**
   * 注册一个节点（基础方式：名称 + 函数 + 可选描述）
   * @param name 节点名称
   * @param fn 节点执行函数
   * @param description 可选的节点描述
   * @returns 返回注册的节点条目
   */
  static registerNode<Input = any, Output = any, Context = any>(
    name: string,
    fn: NodeFunction<Input, Output, Context>,
    description?: string
  ): NodeRegistryEntry<Input, Output, Context>;

  /**
   * 注册一个节点（高级方式：带 metadata 对象）
   * @param metadata 节点元数据对象
   * @param fn 节点执行函数
   * @returns 返回注册的节点条目
   */
  static registerNodeWithMeta<Input = any, Output = any, Context = any>(
    metadata: NodeMetadata,
    fn: NodeFunction<Input, Output, Context>
  ): NodeRegistryEntry<Input, Output, Context>;

  /**
   * 获取所有已注册的节点
   * @returns 返回所有注册节点的数组
   */
  static getAllRegisteredNodes(): NodeRegistryEntry<any, any, any>[];
}
```
<!-- MARK: Workflow
 -->
### 1.2 Workflow 类

Workflow 是工作流的核心类，支持链式调用添加各种节点和执行工作流。

```typescript
class Workflow {
  /**
   * 构造函数
   * @param name 工作流名称
   */
  constructor(name: string);

  /**
   * 添加一个顺序执行的节点
   * @param name 节点名称
   * @param fn 节点执行函数
   * @param options 可选节点配置（重试次数、超时等）
   * @returns 返回当前 Workflow 实例，支持链式调用
   */
  addStep(
    name: string,
    fn: NodeFunction,
    options?: NodeOptions
  ): this;

  /**
   * 添加并行执行的多个节点
   * @param steps 节点配置数组
   * @returns 返回当前 Workflow 实例，支持链式调用
   */
  addParallelStep(
    steps: {
      name: string;
      fn: NodeFunction;
      options?: NodeOptions;
    }[]
  ): this;

  /**
   * 添加条件分支节点
   * @param condition 条件函数，返回分支名称或布尔值
   * @param branches 分支路径映射表
   * @returns 返回当前 Workflow 实例，支持链式调用
   */
  addCondition(
    condition: (input: any, context: WorkflowContext) => string | boolean,
    branches: Record<string, Workflow>
  ): this;

  /**
   * 添加子工作流节点
   * @param name 子工作流节点名称
   * @param workflow 子工作流实例
   * @returns 返回当前 Workflow 实例，支持链式调用
   */
  addSubWorkflow(name: string, workflow: Workflow): this;

  /**
   * 执行工作流
   * @param input 工作流输入数据
   * @param context 可选的执行上下文（用户ID、租户ID等）
   * @returns 返回工作流执行结果 Promise
   */
  run(input: any, context?: WorkflowContext): Promise<any>;
}
```

<!-- MARK: Engine
 -->
## 2. 引擎模块 (Engine)


<!-- MARK: NodeRegistry
 -->
### 2.1 NodeRegistry 类

NodeRegistry 是节点注册中心，负责管理系统中所有可用的节点。

```typescript
class NodeRegistry {
  /**
   * 注册一个新的节点函数（基础方式）
   * @param name 节点名称
   * @param fn 节点执行函数
   * @param description 可选描述
   * @returns 返回注册的节点条目
   */
  static register<Input = any, Output = any, Context = any>(
    name: string,
    fn: NodeFunction<Input, Output, Context>,
    description?: string
  ): NodeRegistryEntry<Input, Output, Context>;

  /**
   * 注册一个新的节点函数（高级方式）
   * @param metadata 节点元数据对象
   * @param fn 节点执行函数
   * @returns 返回注册的节点条目
   */
  static register<Input = any, Output = any, Context = any>(
    metadata: NodeMetadata,
    fn: NodeFunction<Input, Output, Context>
  ): NodeRegistryEntry<Input, Output, Context>;

  /**
   * 获取指定名称的节点函数
   * @param name 节点名称
   * @returns 返回节点函数或 undefined
   */
  static getNodeFunction(name: string): NodeFunction | undefined;

  /**
   * 获取指定名称的节点元数据
   * @param name 节点名称
   * @returns 返回节点元数据或 undefined
   */
  static getNodeMetadata(name: string): NodeMetadata | undefined;

  /**
   * 获取所有已注册的节点
   * @returns 返回所有注册节点的数组
   */
  static getAllNodes(): NodeRegistryEntry<any, any, any>[];

  /**
   * 检查节点是否已存在
   * @param name 节点名称
   * @returns 返回是否存在的布尔值
   */
  static exists(name: string): boolean;
}
```
<!-- MARK: StatusTracker
 -->
### 2.2 StatusTracker 类

StatusTracker 负责跟踪工作流中各节点的执行状态。

```typescript
class StatusTracker {
  /**
   * 构造函数
   * @param workflowName 工作流名称
   */
  constructor(workflowName: string);

  /**
   * 更新节点状态
   * @param nodeName 节点名称
   * @param status 状态值（PENDING/RUNNING/COMPLETED/FAILED/SKIPPED）
   */
  update(nodeName: string, status: TaskStatus): void;

  /**
   * 获取节点状态
   * @param nodeName 节点名称
   * @returns 返回节点状态或 undefined
   */
  getStatus(nodeName: string): TaskStatus | undefined;

  /**
   * 获取所有节点的状态
   * @returns 返回节点状态映射表
   */
  getAll(): TaskStatusMap;

  /**
   * 重置所有状态
   */
  reset(): void;
}
```

<!-- MARK: 调度器
 -->
### 2.3 Scheduler 类

Scheduler 提供工作流延迟执行和立即执行的能力。

```typescript
class Scheduler {
  /**
   * 延迟执行工作流
   * @param delay 延迟时间（毫秒）
   * @param workflow 要执行的工作流
   * @param input 初始输入数据
   * @param context 执行上下文
   */
  static scheduleWithDelay(
    delay: number,
    workflow: Workflow,
    input: any,
    context?: Record<string, any>
  ): void;

  /**
   * 立即执行工作流
   * @param workflow 要执行的工作流
   * @param input 初始输入数据
   * @param context 执行上下文
   * @returns 执行结果 Promise
   */
  static runNow(
    workflow: Workflow,
    input: any,
    context?: Record<string, any>
  ): Promise<any>;
}
```

<!-- MARK: 错误处理
 -->
### 2.4 ErrorHandler 类

ErrorHandler 负责统一处理工作流执行过程中的错误。

```typescript
class ErrorHandler {
  /**
   * 注册全局错误处理函数
   * @param handler 错误处理函数
   */
  static register(handler: TaskErrorHandler): void;

  /**
   * 处理节点执行错误
   * @param taskName 节点名称
   * @param error 错误对象
   */
  static handleNodeError(taskName: string, error: unknown): void;
}
```

<!-- MARK: Runner
 -->
### 2.5 Runner 类

Runner 负责执行一组工作流步骤（顺序、并行、条件、子流程）。

```typescript
class Runner {
  /**
   * 构造函数
   * @param steps 工作流步骤数组
   * @param workflowName 工作流名称
   * @param context 执行上下文
   */
  constructor(steps: Step[], workflowName: string, context: Record<string, any>);

  /**
   * 执行完整工作流
   * @param input 初始输入数据
   * @returns 执行结果 Promise
   */
  execute(input: any): Promise<any>;
}
```
<!-- MARK: - - Nodes -。
 -->
## 3. 节点模块 (Nodes)


<!-- MARK: BaseNode
 -->
### 3.1 BaseNode 类

BaseNode 是最基本的执行节点，包含重试、超时和条件控制功能。

```typescript
class BaseNode {
  /**
   * 构造函数
   * @param name 节点名称
   * @param fn 节点执行函数
   * @param options 节点配置选项
   */
  constructor(name: string, fn: NodeFunction, options?: NodeOptions);

  /**
   * 执行节点函数
   * @param input 输入数据
   * @param context 执行上下文
   * @returns 执行结果 Promise
   */
  execute(input: any, context: any): Promise<any>;
}
```

<!-- MARK: 条件节点
 -->
### 3.2 ConditionNode 类

ConditionNode 用于根据条件选择执行分支。

```typescript
class ConditionNode {
  /**
   * 构造函数
   * @param name 节点名称
   * @param conditionFn 条件函数
   * @param branches 分支映射表
   */
  constructor(
    name: string,
    conditionFn: (input: any, context: any) => string | boolean,
    branches: Record<string, Workflow>
  );

  /**
   * 解析应该执行的分支
   * @param input 输入数据
   * @param context 执行上下文
   * @returns 分支名称或布尔值
   */
  resolveBranch(input: any, context: any): Promise<string | boolean>;

  /**
   * 获取指定的分支工作流
   * @param branch 分支名称或布尔值
   * @returns 返回对应的工作流或 undefined
   */
  getBranch(branch: string | boolean): Workflow | undefined;
}
```
<!-- MARK: 子工作流
 -->
### 3.3 SubWorkflowNode 类

SubWorkflowNode 用于嵌套执行另一个工作流。

```typescript
class SubWorkflowNode {
  /**
   * 构造函数
   * @param name 节点名称
   * @param workflow 子工作流实例
   */
  constructor(name: string, workflow: Workflow);

  /**
   * 执行子工作流
   * @param input 输入数据
   * @param context 执行上下文
   * @returns 执行结果 Promise
   */
  run(input: any, context: any): Promise<void>;
}
```

<!-- MARK: - -持久化 
-->
## 4. 持久化模块 (Persistence)


<!-- MARK: Persistence
 -->
### 4.1 Persistence 模块

Persistence 是持久化入口模块，允许注入不同的持久化引擎。

```typescript
const Persistence = {
  /**
   * 设置持久化引擎
   * @param p 持久化引擎实例
   */
  use(p: PersistenceEngine): void;

  /**
   * 获取当前使用的持久化引擎
   * @returns 当前持久化引擎
   */
  current(): PersistenceEngine;
};
```

<!-- MARK: Memory
 -->
### 4.2 MemoryPersistence 类

MemoryPersistence 是基于内存的持久化实现，适用于本地开发和测试。

```typescript
class MemoryPersistence implements PersistenceEngine {
  /**
   * 保存工作流快照
   * @param workflowId 工作流ID
   * @param snapshot 工作流快照
   */
  async save(workflowId: string, snapshot: WorkflowSnapshot): Promise<void>;

  /**
   * 加载工作流快照
   * @param workflowId 工作流ID
   * @returns 返回工作流快照或 null
   */
  async load(workflowId: string): Promise<WorkflowSnapshot | null>;

  /**
   * 更新节点状态
   * @param workflowId 工作流ID
   * @param nodeId 节点ID
   * @param status 节点状态
   */
  async updateStatus(workflowId: string, nodeId: string, status: TaskState): Promise<void>;

  /**
   * 清除工作流快照
   * @param workflowId 工作流ID
   */
  async clear(workflowId: string): Promise<void>;
}
```

<!-- MARK: Redis 
-->
### 4.3 RedisPersistence 类

RedisPersistence 是基于 Redis 的持久化实现，适用于分布式环境。

```typescript
class RedisPersistence implements PersistenceEngine {
  /**
   * 构造函数
   * @param redisUrl Redis 连接 URL
   */
  constructor(redisUrl?: string);

  /**
   * 保存工作流快照
   * @param workflowId 工作流ID
   * @param snapshot 工作流快照
   */
  async save(workflowId: string, snapshot: WorkflowSnapshot): Promise<void>;

  /**
   * 加载工作流快照
   * @param workflowId 工作流ID
   * @returns 返回工作流快照或 null
   */
  async load(workflowId: string): Promise<WorkflowSnapshot | null>;

  /**
   * 更新节点状态
   * @param workflowId 工作流ID
   * @param nodeId 节点ID
   * @param status 节点状态
   */
  async updateStatus(workflowId: string, nodeId: string, status: TaskState): Promise<void>;

  /**
   * 清除工作流快照
   * @param workflowId 工作流ID
   */
  async clear(workflowId: string): Promise<void>;
}
```
<!-- MARK: - -Types
 -->
## 5. 类型定义


<!-- MARK: Node
 -->
### 5.1 节点类型

```typescript
// 节点执行函数
type NodeFunction<Input = any, Output = any, Context = any> = (
  input: Input,
  context: Context
) => Promise<Output>;

// 节点元数据
interface NodeMetadata {
  name: string;
  description?: string;
}

// 节点注册条目
interface NodeRegistryEntry<Input = any, Output = any, Context = any> {
  metadata: NodeMetadata;
  nodeFunction: NodeFunction<Input, Output, Context>;
}

// 节点选项
interface NodeOptions {
  retries?: number;
  timeout?: number;
  condition?: (input: any, context?: any) => boolean;
}
```

<!-- MARK: 工作流类型
 -->
### 5.2 工作流类型

```typescript
// 步骤类型
type Step =
  | { type: 'step'; node: any }
  | { type: 'parallel'; nodes: any[] }
  | { type: 'condition'; node: any }
  | { type: 'subworkflow'; node: any };

// 工作流上下文
interface WorkflowContext {
  tenantId?: string;
  userId?: string;
  userRole?: string;
  traceId?: string;
  locale?: string;
  [key: string]: any;
}
```

<!-- MARK: 错误处理类型
 -->
### 5.3 状态与错误处理类型

```typescript
// 任务状态
type TaskStatus = 'PENDING' | 'RUNNING' | 'COMPLETED' | 'FAILED' | 'SKIPPED';

// 任务状态映射
interface TaskStatusMap {
  [nodeName: string]: TaskStatus;
}

// 错误信息
interface TaskErrorInfo {
  taskName: string;
  message: string;
  raw: unknown;
  timestamp: string;
}

// 错误处理函数
type TaskErrorHandler = (errorInfo: TaskErrorInfo) => void;
```

<!-- MARK: 持久化类型
 -->
### 5.4 持久化类型

```typescript
// 任务状态
type TaskState = 'PENDING' | 'RUNNING' | 'COMPLETED' | 'FAILED';

// 工作流快照
interface WorkflowSnapshot {
  workflowId: string;
  name: string;
  data: any;
  states: Record<string, TaskState>;
  updatedAt: string;
}

// 持久化引擎接口
interface PersistenceEngine {
  save(workflowId: string, snapshot: WorkflowSnapshot): Promise<void>;
  load(workflowId: string): Promise<WorkflowSnapshot | null>;
  updateStatus(workflowId: string, nodeId: string, status: TaskState): Promise<void>;
  clear(workflowId: string): Promise<void>;
}
```

## 使用示例：数据处理与审批工作流

下面是一个完整的示例，演示了如何使用 FlowLab 构建一个包含数据处理、条件判断和审批流程的工作流系统：

```typescript
import { 
  FlowLab, 
  Workflow, 
  NodeRegistry, 
  StatusTracker, 
  Scheduler, 
  ErrorHandler,
  Persistence
} from '@flowlab/core';
import { RedisPersistence } from '@flowlab/core';

// 1. 注册自定义错误处理器
ErrorHandler.register((errorInfo) => {
  console.error(`[监控报警] 节点 ${errorInfo.taskName} 执行失败: ${errorInfo.message}`);
  // 可以在这里集成第三方监控或通知系统，如 Sentry、钉钉、邮件等
});

// 2. 配置持久化引擎（可选）
// 默认使用内存存储，适合开发和测试
// 生产环境可以切换到 Redis 等分布式存储
// Persistence.use(new RedisPersistence('redis://localhost:6379'));

// 3. 注册各种节点（处理函数）
// 3.1 数据获取节点
FlowLab.registerNode(
  'fetchUserData',
  async (userId, context) => {
    console.log(`[节点日志] 正在获取用户数据: ${userId}`);
    // 模拟数据库查询
    await new Promise(resolve => setTimeout(resolve, 300));
    return {
      id: userId,
      name: '张三',
      age: 30,
      score: 85,
      applyAmount: 10000
    };
  },
  '从数据库获取用户信息'
);

// 3.2 数据转换节点
FlowLab.registerNode(
  'transformData',
  async (userData, context) => {
    console.log(`[节点日志] 正在转换用户数据: ${userData.id}`);
    return {
      ...userData,
      creditScore: userData.score * 1.2, // 信用分计算
      risk: userData.score < 60 ? 'high' : userData.score < 80 ? 'medium' : 'low' // 风险等级
    };
  },
  '转换用户数据并计算信用分'
);

// 3.3 风险评估节点
FlowLab.registerNode(
  'riskAssessment',
  async (userData, context) => {
    console.log(`[节点日志] 风险评估: ${userData.id}, 风险等级: ${userData.risk}`);
    // 模拟外部系统调用
    await new Promise(resolve => setTimeout(resolve, 500));
    const approved = userData.risk !== 'high';
    return {
      ...userData,
      initialApproved: approved,
      comments: approved ? '自动审批通过' : '高风险，需要人工审核'
    };
  },
  '进行风险评估并生成初步审批结果'
);

// 3.4 自动审批节点
FlowLab.registerNode(
  'autoApproval',
  async (userData, context) => {
    console.log(`[节点日志] 自动审批: ${userData.id}`);
    return {
      ...userData,
      approved: true,
      approver: 'system',
      approvalTime: new Date().toISOString()
    };
  },
  '自动审批通过流程'
);

// 3.5 人工审批节点（模拟）
FlowLab.registerNode(
  'manualApproval',
  async (userData, context) => {
    console.log(`[节点日志] 人工审批: ${userData.id}`);
    // 模拟人工审批
    await new Promise(resolve => setTimeout(resolve, 1000));
    const approved = Math.random() > 0.3; // 70% 概率通过
    return {
      ...userData,
      approved,
      approver: '审批人员ID-' + Math.floor(Math.random() * 1000),
      approvalTime: new Date().toISOString(),
      comments: approved ? '人工审批通过' : '人工审批拒绝，额度过高'
    };
  },
  '人工审批流程'
);

// 3.6 通知节点
FlowLab.registerNode(
  'sendNotification',
  async (userData, context) => {
    const status = userData.approved ? '已通过' : '已拒绝';
    console.log(`[节点日志] 发送通知: 用户 ${userData.id} 的申请${status}`);
    // 模拟发送通知
    await new Promise(resolve => setTimeout(resolve, 200));
    return {
      ...userData,
      notificationSent: true,
      notificationTime: new Date().toISOString()
    };
  },
  '发送审批结果通知'
);

// 3.7 数据归档节点
FlowLab.registerNodeWithMeta(
  {
    name: 'archiveData',
    description: '将审批数据归档到数据库' 
  },
  async (userData, context) => {
    console.log(`[节点日志] 归档数据: ${userData.id}, 审批结果: ${userData.approved ? '通过' : '拒绝'}`);
    // 模拟数据存储
    await new Promise(resolve => setTimeout(resolve, 300));
    return {
      ...userData,
      archived: true,
      archiveTime: new Date().toISOString()
    };
  }
);

// 4. 构建工作流
// 4.1 创建主工作流
const mainWorkflow = FlowLab.create('贷款审批流程')
  .addStep('fetchUserData', NodeRegistry.getNodeFunction('fetchUserData'))
  .addStep('transformData', NodeRegistry.getNodeFunction('transformData'))
  .addStep('riskAssessment', NodeRegistry.getNodeFunction('riskAssessment'));

// 4.2 创建自动审批子流程
const autoApprovalFlow = FlowLab.create('自动审批流程')
  .addStep('autoApproval', NodeRegistry.getNodeFunction('autoApproval'))
  .addStep('sendNotification', NodeRegistry.getNodeFunction('sendNotification'))
  .addStep('archiveData', NodeRegistry.getNodeFunction('archiveData'));

// 4.3 创建人工审批子流程
const manualApprovalFlow = FlowLab.create('人工审批流程')
  .addStep('manualApproval', NodeRegistry.getNodeFunction('manualApproval'), {
    // 添加重试和超时配置
    retries: 2,
    timeout: 10000
  })
  .addStep('sendNotification', NodeRegistry.getNodeFunction('sendNotification'))
  .addStep('archiveData', NodeRegistry.getNodeFunction('archiveData'));

// 4.4 添加条件分支到主流程
mainWorkflow.addCondition(
  (data, context) => {
    // 根据风险等级和初步审批结果选择流程分支
    if (data.initialApproved && data.applyAmount <= 5000) {
      return 'auto'; // 低风险小额自动通过
    }
    return 'manual'; // 其他情况走人工审批
  },
  {
    'auto': autoApprovalFlow,
    'manual': manualApprovalFlow
  }
);

// 5. 添加并行执行的数据分析步骤（可选）
const dataAnalysisWorkflow = FlowLab.create('数据分析流程')
  .addParallelStep([
    {
      name: 'userProfileAnalysis',
      fn: async (data, context) => {
        console.log(`[节点日志] 用户画像分析: ${data.id}`);
        return { profileComplete: true };
      }
    },
    {
      name: 'marketSegmentation',
      fn: async (data, context) => {
        console.log(`[节点日志] 市场细分分析: ${data.id}`);
        return { segment: 'premium' };
      }
    }
  ]);

// 将数据分析作为子流程添加到主流程
mainWorkflow.addSubWorkflow('dataAnalysis', dataAnalysisWorkflow);

// 6. 执行工作流
async function runApprovalProcess(userId) {
  try {
    // 6.1 创建执行上下文
    const context = {
      traceId: `trace-${Date.now()}`,
      tenantId: 'company-A',
      userRole: 'admin'
    };

    // 6.2 立即执行
    console.log(`[工作流] 开始执行贷款审批流程，用户ID: ${userId}`);
    const result = await mainWorkflow.run(userId, context);
    console.log(`[工作流] 审批流程执行完成，结果:`, result);

    // 6.3 延迟执行（可用于定时任务）
    /*
    Scheduler.scheduleWithDelay(
      5000, // 5秒后执行
      mainWorkflow,
      userId,
      context
    );
    */

    return result;
  } catch (error) {
    console.error(`[工作流] 审批流程执行失败: ${error.message}`);
    throw error;
  }
}

// 7. 查看已注册的所有节点
console.log('系统已注册节点列表:');
FlowLab.getAllRegisteredNodes().forEach(node => {
  console.log(`- ${node.metadata.name}: ${node.metadata.description || '无描述'}`);
});

// 8. 调用执行入口
runApprovalProcess('user123')
  .then(finalResult => {
    console.log('流程最终执行结果:', finalResult);
  })
  .catch(err => {
    console.error('流程执行异常:', err);
  });
```

此示例展示了 FlowLab 的核心功能，包括：

1. 节点注册与管理
2. 工作流构建（顺序、并行、条件分支、子流程）
3. 错误处理与监控
4. 持久化配置
5. 调度执行（立即/延迟）

您可以根据实际需求调整和扩展这个示例。
