Graph 工作流引擎
Spring AI Alibaba Graph 是受 LangGraph 启发的有状态图执行引擎,为复杂多 Agent 工作流提供底层运行时支持,是 Agent Framework 的核心基础设施。
为什么需要 Graph?
Agent Framework 中的内置 Agent 类型(Sequential/Parallel/Routing)能覆盖大多数场景,但面对以下需求时,需要直接使用 Graph API:
- 复杂条件路由:基于运行时状态动态决定下一步
- 循环与回退:执行失败时回退到某个节点重试
- 嵌套子图:将复杂流程模块化为可复用的子图
- 持久化检查点:长时间运行的工作流支持断点续传
- 精细状态控制:完全掌控每个节点的输入输出状态
核心概念
Graph(图)
├── State(状态) ← 贯穿整个工作流的数据容器
├── Node(节点) ← 执行具体逻辑的处理单元
├── Edge(边) ← 节点间的连接关系
│ ├── 普通边 ← 固定跳转
│ └── 条件边 ← 基于状态动态路由
└── Checkpoint(检查点) ← 状态持久化快照快速入门
xml
<dependency>
<groupId>com.alibaba.cloud.ai</groupId>
<artifactId>spring-ai-alibaba-graph</artifactId>
<version>1.1.2.0</version>
</dependency>定义状态
java
// 状态是贯穿整个 Graph 的数据容器
public class ResearchState extends AgentState {
public ResearchState(Map<String, Object> initData) {
super(initData);
}
// 便捷访问方法
public String getQuery() {
return (String) data().get("query");
}
public List<String> getSources() {
return (List<String>) data().getOrDefault("sources", new ArrayList<>());
}
public String getDraft() {
return (String) data().get("draft");
}
public int getRevisionCount() {
return (int) data().getOrDefault("revision_count", 0);
}
public boolean isApproved() {
return (boolean) data().getOrDefault("approved", false);
}
}定义节点
java
// 节点:接收状态,处理逻辑,返回更新后的状态
@Component
public class ResearchNodes {
@Autowired
private ChatClient chatClient;
@Autowired
private SearchTools searchTools;
// 搜索节点
public NodeAction<ResearchState> searchNode() {
return state -> {
String query = state.getQuery();
List<String> sources = searchTools.search(query, 5);
return Map.of("sources", sources); // 返回状态更新
};
}
// 写作节点
public NodeAction<ResearchState> writeNode() {
return state -> {
String draft = chatClient.prompt()
.system("你是专业写作助手,基于资料撰写文章")
.user("主题:" + state.getQuery() +
"\n\n参考资料:\n" + String.join("\n", state.getSources()))
.call()
.content();
return Map.of(
"draft", draft,
"revision_count", state.getRevisionCount() + 1
);
};
}
// 审核节点
public NodeAction<ResearchState> reviewNode() {
return state -> {
String review = chatClient.prompt()
.system("你是严格的内容审核专家")
.user("审核以下文章,判断是否通过(输出 APPROVED 或 REJECTED 及原因):\n\n"
+ state.getDraft())
.call()
.content();
boolean approved = review.startsWith("APPROVED");
return Map.of("approved", approved, "review_comment", review);
};
}
}构建 Graph
java
@Configuration
public class ResearchGraphConfig {
@Autowired
private ResearchNodes nodes;
@Bean
public StateGraph<ResearchState> researchGraph() throws GraphStateException {
return new StateGraph<>(ResearchState::new)
// 注册节点
.addNode("search", nodes.searchNode())
.addNode("write", nodes.writeNode())
.addNode("review", nodes.reviewNode())
// 设置入口节点
.addEdge(StateGraph.START, "search")
// 普通边:search → write
.addEdge("search", "write")
// 条件边:review 后根据结果决定下一步
.addConditionalEdges(
"review",
state -> {
if (state.isApproved()) {
return StateGraph.END; // 通过 → 结束
} else if (state.getRevisionCount() < 3) {
return "write"; // 未通过且未超限 → 重写
} else {
return StateGraph.END; // 超过修改次数 → 强制结束
}
},
Map.of(
StateGraph.END, StateGraph.END,
"write", "write"
)
)
// write → review
.addEdge("write", "review");
}
}执行 Graph
java
@Service
public class ResearchService {
@Autowired
private StateGraph<ResearchState> researchGraph;
public String research(String topic) throws Exception {
// 编译 Graph
CompiledGraph<ResearchState> compiled = researchGraph.compile();
// 执行
Map<String, Object> result = compiled.invoke(
Map.of("query", topic)
);
return (String) result.get("draft");
}
// 流式执行(实时获取每个节点的输出)
public Flux<String> researchStream(String topic) throws Exception {
CompiledGraph<ResearchState> compiled = researchGraph.compile();
return compiled.stream(Map.of("query", topic))
.map(output -> {
String nodeName = output.node();
Map<String, Object> state = output.state().data();
return "[" + nodeName + "] " + state.toString();
});
}
}并行节点
java
StateGraph<AnalysisState> graph = new StateGraph<>(AnalysisState::new)
.addNode("start", state -> Map.of("initialized", true))
// 并行执行三个分析节点
.addNode("tech_analysis", nodes.techAnalysisNode())
.addNode("market_analysis", nodes.marketAnalysisNode())
.addNode("risk_analysis", nodes.riskAnalysisNode())
// 汇总节点
.addNode("aggregate", nodes.aggregateNode())
// start → 三个并行节点
.addEdge("start", "tech_analysis")
.addEdge("start", "market_analysis")
.addEdge("start", "risk_analysis")
// 三个并行节点 → 汇总(等待所有完成)
.addEdge("tech_analysis", "aggregate")
.addEdge("market_analysis", "aggregate")
.addEdge("risk_analysis", "aggregate")
.addEdge("aggregate", StateGraph.END);嵌套子图
java
// 定义子图:代码审查流程
StateGraph<CodeReviewState> codeReviewSubGraph = new StateGraph<>(CodeReviewState::new)
.addNode("static_analysis", nodes.staticAnalysisNode())
.addNode("security_check", nodes.securityCheckNode())
.addNode("generate_report", nodes.generateReportNode())
.addEdge(StateGraph.START, "static_analysis")
.addEdge("static_analysis", "security_check")
.addEdge("security_check", "generate_report")
.addEdge("generate_report", StateGraph.END);
// 主图中嵌入子图
StateGraph<CiPipelineState> mainGraph = new StateGraph<>(CiPipelineState::new)
.addNode("build", nodes.buildNode())
.addNode("test", nodes.testNode())
// 嵌入代码审查子图
.addNode("code_review", codeReviewSubGraph.compile())
.addNode("deploy", nodes.deployNode())
.addEdge(StateGraph.START, "build")
.addEdge("build", "test")
.addEdge("test", "code_review")
.addConditionalEdges("code_review",
state -> state.isReviewPassed() ? "deploy" : StateGraph.END,
Map.of("deploy", "deploy", StateGraph.END, StateGraph.END)
)
.addEdge("deploy", StateGraph.END);持久化检查点
对于长时间运行的工作流,检查点支持断点续传:
java
// 配置检查点存储
@Bean
public CheckpointSaver checkpointSaver(JdbcTemplate jdbcTemplate) {
return new JdbcCheckpointSaver(jdbcTemplate);
// 也支持:InMemoryCheckpointSaver(测试用)
// RedisCheckpointSaver(生产推荐)
}
// 编译时启用检查点
CompiledGraph<ResearchState> compiled = researchGraph.compile(
CompileConfig.builder()
.checkpointSaver(checkpointSaver)
.build()
);
// 执行时指定线程 ID(用于恢复)
String threadId = UUID.randomUUID().toString();
Map<String, Object> result = compiled.invoke(
Map.of("query", topic),
RunnableConfig.builder()
.threadId(threadId)
.build()
);
// 如果中途失败,用同一 threadId 恢复执行
Map<String, Object> resumed = compiled.invoke(
Map.of(), // 空输入,从检查点恢复
RunnableConfig.builder()
.threadId(threadId)
.build()
);Human-in-the-Loop(图级别)
java
// 在特定节点暂停,等待人工输入
CompiledGraph<ApprovalState> compiled = approvalGraph.compile(
CompileConfig.builder()
.checkpointSaver(checkpointSaver)
.interruptBefore("approve_node") // 在 approve_node 前暂停
.build()
);
// 第一次执行,在 approve_node 前暂停
String threadId = "approval-" + UUID.randomUUID();
compiled.invoke(Map.of("request", request), RunnableConfig.builder()
.threadId(threadId).build());
// 人工审批后,更新状态并继续
compiled.updateState(
RunnableConfig.builder().threadId(threadId).build(),
Map.of("approved", true, "approver", "张三")
);
// 继续执行
compiled.invoke(null, RunnableConfig.builder()
.threadId(threadId).build());可视化导出
java
// 导出为 Mermaid 格式(可在 GitHub/Notion 渲染)
String mermaid = compiled.getGraph(GraphRepresentation.Type.MERMAID);
System.out.println(mermaid);
/*
graph TD
__START__ --> search
search --> write
write --> review
review -->|approved| __END__
review -->|rejected| write
*/
// 导出为 PlantUML
String plantuml = compiled.getGraph(GraphRepresentation.Type.PLANTUML);实战:DeepResearch 工作流
java
@Configuration
public class DeepResearchGraph {
@Bean
public StateGraph<DeepResearchState> deepResearchGraph(
ChatClient.Builder builder,
VectorStore vectorStore,
SearchTools searchTools
) throws GraphStateException {
ChatClient llm = builder.build();
return new StateGraph<>(DeepResearchState::new)
// 1. 问题分解
.addNode("decompose", state -> {
String subQuestions = llm.prompt()
.system("将复杂问题分解为 3-5 个子问题,每行一个")
.user(state.getMainQuestion())
.call().content();
return Map.of("sub_questions",
Arrays.asList(subQuestions.split("\n")));
})
// 2. 并行搜索(每个子问题独立搜索)
.addNode("search", state -> {
List<String> allResults = state.getSubQuestions()
.parallelStream()
.flatMap(q -> searchTools.search(q, 3).stream())
.distinct()
.toList();
return Map.of("search_results", allResults);
})
// 3. 知识提炼
.addNode("synthesize", state -> {
String synthesis = llm.prompt()
.system("基于搜索结果,提炼关键信息和洞察")
.user("问题:" + state.getMainQuestion() +
"\n\n搜索结果:\n" +
String.join("\n---\n", state.getSearchResults()))
.call().content();
return Map.of("synthesis", synthesis);
})
// 4. 生成报告
.addNode("report", state -> {
String report = llm.prompt()
.system("生成结构化的深度研究报告,包含:摘要、详细分析、结论、参考来源")
.user("基于以下分析生成报告:\n" + state.getSynthesis())
.call().content();
return Map.of("final_report", report);
})
// 5. 质量检查
.addNode("quality_check", state -> {
String check = llm.prompt()
.system("评估报告质量,输出 PASS 或 FAIL 及原因")
.user(state.getFinalReport())
.call().content();
return Map.of("quality_passed", check.startsWith("PASS"));
})
.addEdge(StateGraph.START, "decompose")
.addEdge("decompose", "search")
.addEdge("search", "synthesize")
.addEdge("synthesize", "report")
.addEdge("report", "quality_check")
.addConditionalEdges("quality_check",
state -> state.isQualityPassed() ? StateGraph.END : "synthesize",
Map.of(StateGraph.END, StateGraph.END, "synthesize", "synthesize")
);
}
}Graph vs Agent Framework 选择指南
| 场景 | 推荐方案 |
|---|---|
| 简单问答、工具调用 | ReactAgent |
| 固定顺序的多步骤任务 | SequentialAgent |
| 多个独立任务并行 | ParallelAgent |
| 基于输入类型分发 | RoutingAgent |
| 复杂条件路由 | Graph API |
| 需要循环/回退逻辑 | Graph API |
| 长时间运行需断点续传 | Graph API + Checkpoint |
| 可视化工作流设计 | Graph API + Admin 平台 |
建议
优先使用 Agent Framework 的内置 Agent 类型,它们封装了最佳实践。只有当内置类型无法满足需求时,才直接使用 Graph API。