Skip to content

Graph 工作流引擎

Spring AI Alibaba Graph 是受 LangGraph 启发的有状态图执行引擎,为复杂多 Agent 工作流提供底层运行时支持,是 Agent Framework 的核心基础设施。

为什么需要 Graph?

Agent Framework 中的内置 Agent 类型(Sequential/Parallel/Routing)能覆盖大多数场景,但面对以下需求时,需要直接使用 Graph API:

  • 复杂条件路由:基于运行时状态动态决定下一步
  • 循环与回退:执行失败时回退到某个节点重试
  • 嵌套子图:将复杂流程模块化为可复用的子图
  • 持久化检查点:长时间运行的工作流支持断点续传
  • 精细状态控制:完全掌控每个节点的输入输出状态

核心概念

Graph(图)
├── State(状态)    ← 贯穿整个工作流的数据容器
├── Node(节点)     ← 执行具体逻辑的处理单元
├── Edge(边)       ← 节点间的连接关系
│   ├── 普通边       ← 固定跳转
│   └── 条件边       ← 基于状态动态路由
└── Checkpoint(检查点) ← 状态持久化快照

快速入门

xml
<dependency>
    <groupId>com.alibaba.cloud.ai</groupId>
    <artifactId>spring-ai-alibaba-graph</artifactId>
    <version>1.1.2.0</version>
</dependency>

定义状态

java
// 状态是贯穿整个 Graph 的数据容器
public class ResearchState extends AgentState {

    public ResearchState(Map<String, Object> initData) {
        super(initData);
    }

    // 便捷访问方法
    public String getQuery() {
        return (String) data().get("query");
    }

    public List<String> getSources() {
        return (List<String>) data().getOrDefault("sources", new ArrayList<>());
    }

    public String getDraft() {
        return (String) data().get("draft");
    }

    public int getRevisionCount() {
        return (int) data().getOrDefault("revision_count", 0);
    }

    public boolean isApproved() {
        return (boolean) data().getOrDefault("approved", false);
    }
}

定义节点

java
// 节点:接收状态,处理逻辑,返回更新后的状态
@Component
public class ResearchNodes {

    @Autowired
    private ChatClient chatClient;

    @Autowired
    private SearchTools searchTools;

    // 搜索节点
    public NodeAction<ResearchState> searchNode() {
        return state -> {
            String query = state.getQuery();
            List<String> sources = searchTools.search(query, 5);

            return Map.of("sources", sources);  // 返回状态更新
        };
    }

    // 写作节点
    public NodeAction<ResearchState> writeNode() {
        return state -> {
            String draft = chatClient.prompt()
                .system("你是专业写作助手,基于资料撰写文章")
                .user("主题:" + state.getQuery() +
                      "\n\n参考资料:\n" + String.join("\n", state.getSources()))
                .call()
                .content();

            return Map.of(
                "draft", draft,
                "revision_count", state.getRevisionCount() + 1
            );
        };
    }

    // 审核节点
    public NodeAction<ResearchState> reviewNode() {
        return state -> {
            String review = chatClient.prompt()
                .system("你是严格的内容审核专家")
                .user("审核以下文章,判断是否通过(输出 APPROVED 或 REJECTED 及原因):\n\n"
                      + state.getDraft())
                .call()
                .content();

            boolean approved = review.startsWith("APPROVED");
            return Map.of("approved", approved, "review_comment", review);
        };
    }
}

构建 Graph

java
@Configuration
public class ResearchGraphConfig {

    @Autowired
    private ResearchNodes nodes;

    @Bean
    public StateGraph<ResearchState> researchGraph() throws GraphStateException {

        return new StateGraph<>(ResearchState::new)
            // 注册节点
            .addNode("search", nodes.searchNode())
            .addNode("write", nodes.writeNode())
            .addNode("review", nodes.reviewNode())

            // 设置入口节点
            .addEdge(StateGraph.START, "search")

            // 普通边:search → write
            .addEdge("search", "write")

            // 条件边:review 后根据结果决定下一步
            .addConditionalEdges(
                "review",
                state -> {
                    if (state.isApproved()) {
                        return StateGraph.END;  // 通过 → 结束
                    } else if (state.getRevisionCount() < 3) {
                        return "write";  // 未通过且未超限 → 重写
                    } else {
                        return StateGraph.END;  // 超过修改次数 → 强制结束
                    }
                },
                Map.of(
                    StateGraph.END, StateGraph.END,
                    "write", "write"
                )
            )

            // write → review
            .addEdge("write", "review");
    }
}

执行 Graph

java
@Service
public class ResearchService {

    @Autowired
    private StateGraph<ResearchState> researchGraph;

    public String research(String topic) throws Exception {
        // 编译 Graph
        CompiledGraph<ResearchState> compiled = researchGraph.compile();

        // 执行
        Map<String, Object> result = compiled.invoke(
            Map.of("query", topic)
        );

        return (String) result.get("draft");
    }

    // 流式执行(实时获取每个节点的输出)
    public Flux<String> researchStream(String topic) throws Exception {
        CompiledGraph<ResearchState> compiled = researchGraph.compile();

        return compiled.stream(Map.of("query", topic))
            .map(output -> {
                String nodeName = output.node();
                Map<String, Object> state = output.state().data();
                return "[" + nodeName + "] " + state.toString();
            });
    }
}

并行节点

java
StateGraph<AnalysisState> graph = new StateGraph<>(AnalysisState::new)
    .addNode("start", state -> Map.of("initialized", true))

    // 并行执行三个分析节点
    .addNode("tech_analysis", nodes.techAnalysisNode())
    .addNode("market_analysis", nodes.marketAnalysisNode())
    .addNode("risk_analysis", nodes.riskAnalysisNode())

    // 汇总节点
    .addNode("aggregate", nodes.aggregateNode())

    // start → 三个并行节点
    .addEdge("start", "tech_analysis")
    .addEdge("start", "market_analysis")
    .addEdge("start", "risk_analysis")

    // 三个并行节点 → 汇总(等待所有完成)
    .addEdge("tech_analysis", "aggregate")
    .addEdge("market_analysis", "aggregate")
    .addEdge("risk_analysis", "aggregate")

    .addEdge("aggregate", StateGraph.END);

嵌套子图

java
// 定义子图:代码审查流程
StateGraph<CodeReviewState> codeReviewSubGraph = new StateGraph<>(CodeReviewState::new)
    .addNode("static_analysis", nodes.staticAnalysisNode())
    .addNode("security_check", nodes.securityCheckNode())
    .addNode("generate_report", nodes.generateReportNode())
    .addEdge(StateGraph.START, "static_analysis")
    .addEdge("static_analysis", "security_check")
    .addEdge("security_check", "generate_report")
    .addEdge("generate_report", StateGraph.END);

// 主图中嵌入子图
StateGraph<CiPipelineState> mainGraph = new StateGraph<>(CiPipelineState::new)
    .addNode("build", nodes.buildNode())
    .addNode("test", nodes.testNode())
    // 嵌入代码审查子图
    .addNode("code_review", codeReviewSubGraph.compile())
    .addNode("deploy", nodes.deployNode())
    .addEdge(StateGraph.START, "build")
    .addEdge("build", "test")
    .addEdge("test", "code_review")
    .addConditionalEdges("code_review",
        state -> state.isReviewPassed() ? "deploy" : StateGraph.END,
        Map.of("deploy", "deploy", StateGraph.END, StateGraph.END)
    )
    .addEdge("deploy", StateGraph.END);

持久化检查点

对于长时间运行的工作流,检查点支持断点续传:

java
// 配置检查点存储
@Bean
public CheckpointSaver checkpointSaver(JdbcTemplate jdbcTemplate) {
    return new JdbcCheckpointSaver(jdbcTemplate);
    // 也支持:InMemoryCheckpointSaver(测试用)
    //         RedisCheckpointSaver(生产推荐)
}

// 编译时启用检查点
CompiledGraph<ResearchState> compiled = researchGraph.compile(
    CompileConfig.builder()
        .checkpointSaver(checkpointSaver)
        .build()
);

// 执行时指定线程 ID(用于恢复)
String threadId = UUID.randomUUID().toString();
Map<String, Object> result = compiled.invoke(
    Map.of("query", topic),
    RunnableConfig.builder()
        .threadId(threadId)
        .build()
);

// 如果中途失败,用同一 threadId 恢复执行
Map<String, Object> resumed = compiled.invoke(
    Map.of(),  // 空输入,从检查点恢复
    RunnableConfig.builder()
        .threadId(threadId)
        .build()
);

Human-in-the-Loop(图级别)

java
// 在特定节点暂停,等待人工输入
CompiledGraph<ApprovalState> compiled = approvalGraph.compile(
    CompileConfig.builder()
        .checkpointSaver(checkpointSaver)
        .interruptBefore("approve_node")  // 在 approve_node 前暂停
        .build()
);

// 第一次执行,在 approve_node 前暂停
String threadId = "approval-" + UUID.randomUUID();
compiled.invoke(Map.of("request", request), RunnableConfig.builder()
    .threadId(threadId).build());

// 人工审批后,更新状态并继续
compiled.updateState(
    RunnableConfig.builder().threadId(threadId).build(),
    Map.of("approved", true, "approver", "张三")
);

// 继续执行
compiled.invoke(null, RunnableConfig.builder()
    .threadId(threadId).build());

可视化导出

java
// 导出为 Mermaid 格式(可在 GitHub/Notion 渲染)
String mermaid = compiled.getGraph(GraphRepresentation.Type.MERMAID);
System.out.println(mermaid);
/*
graph TD
    __START__ --> search
    search --> write
    write --> review
    review -->|approved| __END__
    review -->|rejected| write
*/

// 导出为 PlantUML
String plantuml = compiled.getGraph(GraphRepresentation.Type.PLANTUML);

实战:DeepResearch 工作流

java
@Configuration
public class DeepResearchGraph {

    @Bean
    public StateGraph<DeepResearchState> deepResearchGraph(
        ChatClient.Builder builder,
        VectorStore vectorStore,
        SearchTools searchTools
    ) throws GraphStateException {

        ChatClient llm = builder.build();

        return new StateGraph<>(DeepResearchState::new)

            // 1. 问题分解
            .addNode("decompose", state -> {
                String subQuestions = llm.prompt()
                    .system("将复杂问题分解为 3-5 个子问题,每行一个")
                    .user(state.getMainQuestion())
                    .call().content();
                return Map.of("sub_questions",
                    Arrays.asList(subQuestions.split("\n")));
            })

            // 2. 并行搜索(每个子问题独立搜索)
            .addNode("search", state -> {
                List<String> allResults = state.getSubQuestions()
                    .parallelStream()
                    .flatMap(q -> searchTools.search(q, 3).stream())
                    .distinct()
                    .toList();
                return Map.of("search_results", allResults);
            })

            // 3. 知识提炼
            .addNode("synthesize", state -> {
                String synthesis = llm.prompt()
                    .system("基于搜索结果,提炼关键信息和洞察")
                    .user("问题:" + state.getMainQuestion() +
                          "\n\n搜索结果:\n" +
                          String.join("\n---\n", state.getSearchResults()))
                    .call().content();
                return Map.of("synthesis", synthesis);
            })

            // 4. 生成报告
            .addNode("report", state -> {
                String report = llm.prompt()
                    .system("生成结构化的深度研究报告,包含:摘要、详细分析、结论、参考来源")
                    .user("基于以下分析生成报告:\n" + state.getSynthesis())
                    .call().content();
                return Map.of("final_report", report);
            })

            // 5. 质量检查
            .addNode("quality_check", state -> {
                String check = llm.prompt()
                    .system("评估报告质量,输出 PASS 或 FAIL 及原因")
                    .user(state.getFinalReport())
                    .call().content();
                return Map.of("quality_passed", check.startsWith("PASS"));
            })

            .addEdge(StateGraph.START, "decompose")
            .addEdge("decompose", "search")
            .addEdge("search", "synthesize")
            .addEdge("synthesize", "report")
            .addEdge("report", "quality_check")
            .addConditionalEdges("quality_check",
                state -> state.isQualityPassed() ? StateGraph.END : "synthesize",
                Map.of(StateGraph.END, StateGraph.END, "synthesize", "synthesize")
            );
    }
}

Graph vs Agent Framework 选择指南

场景推荐方案
简单问答、工具调用ReactAgent
固定顺序的多步骤任务SequentialAgent
多个独立任务并行ParallelAgent
基于输入类型分发RoutingAgent
复杂条件路由Graph API
需要循环/回退逻辑Graph API
长时间运行需断点续传Graph API + Checkpoint
可视化工作流设计Graph API + Admin 平台

建议

优先使用 Agent Framework 的内置 Agent 类型,它们封装了最佳实践。只有当内置类型无法满足需求时,才直接使用 Graph API。

相关组件

本站内容由 褚成志 整理编写,仅供学习参考