基于 LangGraph4j 的旅游计划助手 —— 支持人机交互和时光倒流
|字数总计:3.2k|阅读时长:12分钟|阅读量:|
TravelAssistant
1. 概述
TravelAssistant 是一个基于 LangGraph4J 的多步骤 AI 工作流系统,展示了如何在复杂的对话流程中实现:
- 人机交互中断机制
- 对话历史消息记录
- 检查点状态保存
- 时光倒流(状态回溯)功能
完整项目可以在Github获取。
2. 系统架构
2.1 工作流程图
TravelAssistant 实现了以下工作流:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| START ↓ parseCountry(解析国家) ↓ recommendCities(推荐城市) ↓ waitForCitySelection(等待用户选择,中断点) ↓ interruptChooseCityEdge(条件判断:城市是否有效) ├─ continue → planTrip └─ rechoose → recommendCities ↓ planTrip(制定旅行计划) ↓ END
|
2.2 核心组件
2.2.1 TravelState(状态定义)
TravelState 定义了整个工作流的数据模型:
1 2 3 4 5 6
| public static final String MESSAGES_STATE = "messages"; public static final String COUNTRY = "country"; public static final String CITY = "city"; public static final String CITY_OPTIONS = "cityOptions"; public static final String RECOMMENDED_CITIES = "recommendedCities"; public static final String TRIP_PLAN = "tripPlan";
|
状态通道设计:
messages:使用 Appender 通道,自动累积所有消息
recommendedCities:使用 Base 通道与自定义合并器,实现城市集合的累积
- 其他字段:使用 Base 通道,支持替换或合并
2.2.2 TravelAssistant(主控制器)
TravelAssistant 负责构建和执行有状态的 Graph:
初始化特点:
- 使用 MemorySaver 或 MysqlSaver 作为检查点保存器
- 在
recommendCities 节点后设置中断点 (interruptAfter)
- 为每个会话生成唯一的 threadId
关键方法:
startConversation(String userInput):开启对话
provideFeedback(String input):用户提供反馈
rollbackToStep(String node, Map<String, Object> newInput):时光倒流
集成了四个外部工具供 AI 调用:
getWeather(String city):查询城市天气
getTraffic(String city):查询城市交通
recommendHotel(String city):推荐酒店
searchAttractions(String city, List<String> keywords):搜索景点
3. 中断功能:人机交互
3.1 设计原理
TravelAssistant 中的中断功能允许系统在特定步骤暂停执行,等待用户反馈,然后根据反馈决定下一步流向。
3.2 实现机制
3.2.1 中断点配置
在 Graph 编译时指定中断点:
1 2 3 4 5
| var compileConfig = CompileConfig.builder() .checkpointSaver(saver) .interruptAfter("recommendCities") .releaseThread(false) .build();
|
中断后,Graph 会将控制权交给用户,等待用户的下一步指令。
3.2.2 条件路由边
使用条件边实现基于用户输入的路由决策:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| AsyncEdgeAction<TravelState> interruptChooseCityEdge = edge_async(s -> { String city = s.getCity(); List<String> cityOptions = s.getCityOptions(); if (cityOptions.contains(city)) { return "continue"; } else { return "rechoose"; } });
.addConditionalEdges("waitForCitySelection", interruptChooseCityEdge, Map.of( "continue", "planTrip", "rechoose", "recommendCities" ))
|
当用户提供反馈时,系统根据反馈内容决定是继续还是重新推荐。
3.2.3 用户反馈处理
1 2 3 4 5 6 7
| public NodeOutput<TravelState> provideFeedback(String input) throws Exception { config = graph.updateState(config, Map.of(TravelState.CITY, input)); return executeGraphUpdateConfigAndFetchLastState(null); }
|
updateState 方法更新 Graph 的状态,然后 stream 继续执行,触发条件边的判断逻辑。
3.3 使用示例
1 2 3 4 5 6 7 8 9 10 11 12 13
| NodeOutput<TravelState> output1 = assistant.startConversation("我想去中国");
NodeOutput<TravelState> output2 = assistant.provideFeedback("北京");
NodeOutput<TravelState> output3 = assistant.provideFeedback("其他");
|
4. 历史消息、检查点与时光倒流
4.1 历史消息管理
4.1.1 消息存储机制
TravelState 中的 messages 字段使用 Appender 通道自动累积:
1
| "messages", Channels.appender(() -> new ArrayList<ChatMessage>())
|
每个节点的输出可以向消息列表追加新消息:
1 2 3 4 5 6 7 8
| AsyncNodeAction<TravelState> parseCountryNode = node_async(s -> { UserMessage userMessage = (UserMessage) s.getLastMessage(); return Map.of( TravelState.COUNTRY, country, TravelState.MESSAGES_STATE, new AiMessage("提取到的国家:" + country) ); });
|
4.1.2 消息访问
通过 TravelState 提供的便利方法访问消息:
1 2 3 4 5 6 7 8 9
| public List<ChatMessage> getMessages() { return this.<List<ChatMessage>>value(MESSAGES_STATE) .orElse(List.of()); }
public ChatMessage getLastMessage() { return this.getMessages().isEmpty() ? null : this.getMessages().getLast(); }
|
4.2 检查点机制
4.2.1 检查点保存器
TravelAssistant 支持两种检查点保存策略:
MemorySaver(内存保存):
- 用于开发和测试
- 状态保存在内存中,进程结束后丢失
- 性能较高,适合短期会话
1
| var saver = new MemorySaver();
|
MysqlSaver(数据库保存):
- 生产环境推荐
- 状态持久化到 MySQL,支持长期存储
- 支持跨进程恢复
1 2 3 4
| var saver = MysqlSaver.builder() .createOption(CreateOption.CREATE_IF_NOT_EXISTS) .dataSource(dataSource) .build();
|
4.2.2 检查点内容
每个检查点记录:
- 当前节点名称
- 完整的状态数据(所有通道值)
- RunnableConfig(包含 threadId)
- 执行时间戳
4.2.3 检查点存储流程
1 2 3 4 5 6 7
| var compileConfig = CompileConfig.builder() .checkpointSaver(saver) .interruptAfter("recommendCities") .releaseThread(false) .build();
this.graph = stateGraph.compile(compileConfig);
|
Graph 编译时注册检查点保存器,每次节点执行后自动保存快照。
4.3 时光倒流(状态回溯)
4.3.1 原理
时光倒流允许用户在工作流的任意检查点重新开始,并可以修改该点的状态:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public NodeOutput<TravelState> rollbackToStep(String node, Map<String, Object> newInput) throws Exception { List<StateSnapshot<TravelState>> snapshots = graph.getStateHistory(config).stream().toList(); StateSnapshot<TravelState> snapshot = snapshots.stream() .filter(s -> s.node().equals(node)) .findFirst() .orElse(null); config = graph.updateState(snapshot.config(), newInput); return executeGraphUpdateConfigAndFetchLastState(null); }
|
4.3.2 实现细节
StateSnapshot 结构:
1 2 3 4 5 6
| StateSnapshot<TravelState> { String node; RunnableConfig config; TravelState state; }
|
RunnableConfig 作用:
- 包含 threadId,用于在数据库中定位会话
- 包含检查点坐标,用于访问特定的快照
4.3.3 时光倒流的两种场景
场景 1:重新选择城市
用户在制定旅行计划后,想回到城市选择环节重新选择:
1 2 3 4
| NodeOutput<TravelState> output = assistant.rollbackToStep("recommendCities", Map.of(TravelState.CITY, "上海"));
|
此时系统会:
- 恢复到
recommendCities 节点完成后的状态
- 更新城市选择为”上海”
- 继续执行条件边和后续节点
- 生成基于”上海”的新旅行计划
场景 2:重新选择国家
用户想完全改变目的地,回到工作流的起点:
1 2 3 4 5
| NodeOutput<TravelState> output = assistant.rollbackToStep(START, Map.of(TravelState.MESSAGES_STATE, List.of(UserMessage.from("我想去日本"))));
|
此时系统会:
- 恢复到初始状态
- 替换用户消息为新的国家输入
- 重新执行整个工作流
- 生成基于新国家的推荐和计划
4.3.4 状态一致性保证
时光倒流过程中,系统保证:
- 恢复的状态与快照时刻的状态完全相同
- 新的输入与其他状态字段保持一致
- Appender 通道的历史消息不丢失
- 后续执行会产生新的快照
5. 完整使用流程
5.1 工作流示意
以下展示完整的用户交互流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| [用户] → startConversation("我想去中国") ↓ [系统执行] parseCountry → 提取国家:中国 recommendCities → 推荐城市:北京、上海、西安、杭州、南京 中断 ↓ [系统返回] 状态:国家=中国, 推荐城市列表 ↓ [用户] → provideFeedback("北京") ↓ [系统执行] 条件边判断:北京在列表中 → continue planTrip → 生成北京旅行计划 END ↓ [系统返回] 状态:国家=中国, 城市=北京, 旅行计划
[用户不满意] → rollbackToStep("recommendCities", Map.of(CITY, "上海")) ↓ [系统执行] 恢复到 recommendCities 之后的状态 更新城市为"上海" 条件边判断:上海在列表中 → continue planTrip → 生成上海旅行计划 END ↓ [系统返回] 状态:国家=中国, 城市=上海, 旅行计划
[用户想改国家] → rollbackToStep(START, Map.of(MESSAGES_STATE, List.of(UserMessage.from("我想去日本")))) ↓ [系统执行] 恢复到初始状态 替换用户消息 parseCountry → 提取国家:日本 recommendCities → 推荐城市:东京、京都、大阪、北海道、箱根 中断 ↓ [系统返回] 状态:国家=日本, 推荐城市列表
|
5.2 测试代码片段
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| @Test void testTravel() throws Exception { TravelAssistant assistant = new TravelAssistant(dataSource);
System.out.println("===== 用户输入国家 ====="); NodeOutput<TravelState> output1 = assistant.startConversation("我想去中国");
TravelState state1 = output1.state(); System.out.println("国家: " + state1.getCountry()); System.out.println("AI 推荐城市: " + state1.getCityOptions());
System.out.println("\n===== 用户不满意,重新选择城市 ====="); NodeOutput<TravelState> output2_1 = assistant.provideFeedback("其他");
TravelState state2_1 = output2_1.state(); System.out.println("AI 推荐新城市: " + state2_1.getCityOptions());
List<String> cityOptions = state2_1.getCityOptions(); String city = cityOptions.getFirst(); NodeOutput<TravelState> output2_2 = assistant.provideFeedback(city); TravelState state2_2 = output2_2.state(); System.out.println("旅行计划:\n" + state2_2.getTripPlan());
System.out.println("\n===== 用户不满意,时光倒流重新选择 ====="); NodeOutput<TravelState> output3 = assistant.rollbackToStep("recommendCities", Map.of(TravelState.CITY, cityOptions.getLast()));
TravelState state3 = output3.state(); System.out.println("新的旅行计划:\n" + state3.getTripPlan());
System.out.println("\n===== 用户想重新选择国家 ====="); NodeOutput<TravelState> output4 = assistant.rollbackToStep(START, Map.of(TravelState.MESSAGES_STATE, List.of(UserMessage.from("我想去日本"))));
TravelState state4 = output4.state(); System.out.println("新国家: " + state4.getCountry()); System.out.println("新的推荐城市: " + state4.getCityOptions());
String japanCity = state4.getCityOptions().getFirst(); NodeOutput<TravelState> output5 = assistant.provideFeedback(japanCity);
TravelState state5 = output5.state(); System.out.println("选择城市: " + japanCity); System.out.println("日本旅行计划:\n" + state5.getTripPlan()); }
|
6. 关键技术细节
6.1 状态序列化
为了支持检查点的持久化和恢复,TravelAssistant 实现了自定义序列化器:
1 2 3 4 5 6 7 8
| public static StateSerializer<TravelState> serializer() { var serializer = new LC4jStateSerializer<>(TravelState::new); serializer.mapper().register(UserMessage.class, new UserMessageSerializer()); serializer.mapper().register(AiMessage.class, new AiMessageSerializer()); return serializer; }
|
这确保了 ChatMessage 对象可以被正确序列化和反序列化。
6.2 异步执行模型
所有节点都实现为异步操作,确保不阻塞执行线程:
1 2 3 4 5 6
| AsyncNodeAction<TravelState> parseCountryNode = node_async(s -> { UserMessage userMessage = (UserMessage) s.getLastMessage(); String country = chatModel.chat(userMessage).aiMessage().text(); return Map.of(TravelState.COUNTRY, country); });
|
所有 I/O 操作(LLM 调用、数据库访问等)都包装在异步操作中。
6.3 RunnableConfig 与 ThreadId
每个会话都关联一个唯一的 threadId,用于在数据库中追踪会话状态:
1 2 3 4 5 6
| this.threadId = UUID.randomUUID().toString();
config = RunnableConfig.builder() .threadId(threadId) .streamMode(CompiledGraph.StreamMode.SNAPSHOTS) .build();
|
ThreadId 是检查点数据库中的主要查询键,支持多会话并发。
6.4 条件边的动态路由
条件边在运行时基于当前状态做出决策:
1 2 3 4 5 6 7 8 9 10
| AsyncEdgeAction<TravelState> interruptChooseCityEdge = edge_async(s -> { String city = s.getCity(); List<String> cityOptions = s.getCityOptions(); if (cityOptions.contains(city)) { return "continue"; } else { return "rechoose"; } });
|
这种设计允许复杂的业务逻辑驱动工作流路由,而不是硬编码的流程。
7. 性能与可扩展性考虑
7.1 检查点存储性能
- MemorySaver:适合单个进程内的短期会话,内存占用受限于活跃会话数量
- MysqlSaver:适合分布式系统,支持跨进程恢复,数据库查询可能成为瓶颈
7.2 消息历史增长
Appender 通道持续累积消息,长期对话可能导致状态膨胀。建议:
7.3 条件边的计算复杂度
复杂的条件判断逻辑可能阻塞边路由。建议:
8. 总结
TravelAssistant 展示了 LangGraph4J 框架在以下方面的能力:
- 中断机制:通过条件边和状态更新实现人机交互,允许用户在工作流中选择分支
- 历史管理:使用 Appender 通道自动累积所有消息和操作历史
- 状态持久化:支持多种检查点保存策略,从内存到数据库
- 时光倒流:允许用户回到任意检查点,修改状态,重新执行工作流
这些功能结合异步执行模型和自定义序列化器,为构建复杂的、交互式的 AI 工作流应用提供了坚实的基础。