TravelAssistant

1. 概述

TravelAssistant 是一个基于 LangGraph4J 的多步骤 AI 工作流系统,展示了如何在复杂的对话流程中实现:

  • 人机交互中断机制
  • 对话历史消息记录
  • 检查点状态保存
  • 时光倒流(状态回溯)功能

完整项目可以在Github获取。


2. 系统架构

2.1 工作流程图

TravelAssistant 实现了以下工作流:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
START

parseCountry(解析国家)

recommendCities(推荐城市)

waitForCitySelection(等待用户选择,中断点)

interruptChooseCityEdge(条件判断:城市是否有效)
├─ continue → planTrip
└─ rechoose → recommendCities

planTrip(制定旅行计划)

END

2.2 核心组件

2.2.1 TravelState(状态定义)

TravelState 定义了整个工作流的数据模型:

1
2
3
4
5
6
public static final String MESSAGES_STATE = "messages";      // 消息历史
public static final String COUNTRY = "country"; // 用户选择的国家
public static final String CITY = "city"; // 用户选择的城市
public static final String CITY_OPTIONS = "cityOptions"; // AI推荐的城市列表
public static final String RECOMMENDED_CITIES = "recommendedCities"; // 已推荐城市集合
public static final String TRIP_PLAN = "tripPlan"; // 生成的旅行计划

状态通道设计

  • messages:使用 Appender 通道,自动累积所有消息
  • recommendedCities:使用 Base 通道与自定义合并器,实现城市集合的累积
  • 其他字段:使用 Base 通道,支持替换或合并

2.2.2 TravelAssistant(主控制器)

TravelAssistant 负责构建和执行有状态的 Graph:

初始化特点

  • 使用 MemorySaver 或 MysqlSaver 作为检查点保存器
  • recommendCities 节点后设置中断点 (interruptAfter)
  • 为每个会话生成唯一的 threadId

关键方法

  • startConversation(String userInput):开启对话
  • provideFeedback(String input):用户提供反馈
  • rollbackToStep(String node, Map<String, Object> newInput):时光倒流

2.2.3 TravelTools(工具集)

集成了四个外部工具供 AI 调用:

  • getWeather(String city):查询城市天气
  • getTraffic(String city):查询城市交通
  • recommendHotel(String city):推荐酒店
  • searchAttractions(String city, List<String> keywords):搜索景点

3. 中断功能:人机交互

3.1 设计原理

TravelAssistant 中的中断功能允许系统在特定步骤暂停执行,等待用户反馈,然后根据反馈决定下一步流向。

3.2 实现机制

3.2.1 中断点配置

在 Graph 编译时指定中断点:

1
2
3
4
5
var compileConfig = CompileConfig.builder()
.checkpointSaver(saver)
.interruptAfter("recommendCities") // 在推荐城市后中断
.releaseThread(false)
.build();

中断后,Graph 会将控制权交给用户,等待用户的下一步指令。

3.2.2 条件路由边

使用条件边实现基于用户输入的路由决策:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
AsyncEdgeAction<TravelState> interruptChooseCityEdge = edge_async(s -> {
String city = s.getCity();
List<String> cityOptions = s.getCityOptions();

if (cityOptions.contains(city)) {
return "continue"; // 城市有效,继续执行 planTrip
} else {
return "rechoose"; // 城市无效,重新推荐
}
});

.addConditionalEdges("waitForCitySelection", interruptChooseCityEdge, Map.of(
"continue", "planTrip",
"rechoose", "recommendCities"
))

当用户提供反馈时,系统根据反馈内容决定是继续还是重新推荐。

3.2.3 用户反馈处理

1
2
3
4
5
6
7
public NodeOutput<TravelState> provideFeedback(String input) throws Exception {
// 更新状态中的城市字段
config = graph.updateState(config, Map.of(TravelState.CITY, input));

// 继续执行 Graph
return executeGraphUpdateConfigAndFetchLastState(null);
}

updateState 方法更新 Graph 的状态,然后 stream 继续执行,触发条件边的判断逻辑。

3.3 使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
// 1. 用户开始对话
NodeOutput<TravelState> output1 = assistant.startConversation("我想去中国");
// 系统推荐城市后中断,等待用户选择

// 2. 用户选择城市
NodeOutput<TravelState> output2 = assistant.provideFeedback("北京");
// 系统检查"北京"是否在推荐列表中
// 如果有效,继续制定旅行计划
// 如果无效,重新推荐城市

// 3. 用户不满意,选择"其他"
NodeOutput<TravelState> output3 = assistant.provideFeedback("其他");
// 系统识别"其他"不在列表中,重新推荐城市

4. 历史消息、检查点与时光倒流

4.1 历史消息管理

4.1.1 消息存储机制

TravelState 中的 messages 字段使用 Appender 通道自动累积:

1
"messages", Channels.appender(() -> new ArrayList<ChatMessage>())

每个节点的输出可以向消息列表追加新消息:

1
2
3
4
5
6
7
8
AsyncNodeAction<TravelState> parseCountryNode = node_async(s -> {
UserMessage userMessage = (UserMessage) s.getLastMessage();
// 处理后可返回新的 ChatMessage
return Map.of(
TravelState.COUNTRY, country,
TravelState.MESSAGES_STATE, new AiMessage("提取到的国家:" + country)
);
});

4.1.2 消息访问

通过 TravelState 提供的便利方法访问消息:

1
2
3
4
5
6
7
8
9
public List<ChatMessage> getMessages() {
return this.<List<ChatMessage>>value(MESSAGES_STATE)
.orElse(List.of());
}

public ChatMessage getLastMessage() {
return this.getMessages().isEmpty() ? null
: this.getMessages().getLast();
}

4.2 检查点机制

4.2.1 检查点保存器

TravelAssistant 支持两种检查点保存策略:

MemorySaver(内存保存)

  • 用于开发和测试
  • 状态保存在内存中,进程结束后丢失
  • 性能较高,适合短期会话
1
var saver = new MemorySaver();

MysqlSaver(数据库保存)

  • 生产环境推荐
  • 状态持久化到 MySQL,支持长期存储
  • 支持跨进程恢复
1
2
3
4
var saver = MysqlSaver.builder()
.createOption(CreateOption.CREATE_IF_NOT_EXISTS)
.dataSource(dataSource)
.build();

4.2.2 检查点内容

每个检查点记录:

  • 当前节点名称
  • 完整的状态数据(所有通道值)
  • RunnableConfig(包含 threadId)
  • 执行时间戳

4.2.3 检查点存储流程

1
2
3
4
5
6
7
var compileConfig = CompileConfig.builder()
.checkpointSaver(saver)
.interruptAfter("recommendCities")
.releaseThread(false)
.build();

this.graph = stateGraph.compile(compileConfig);

Graph 编译时注册检查点保存器,每次节点执行后自动保存快照。

4.3 时光倒流(状态回溯)

4.3.1 原理

时光倒流允许用户在工作流的任意检查点重新开始,并可以修改该点的状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public NodeOutput<TravelState> rollbackToStep(String node, 
Map<String, Object> newInput) throws Exception {
// 1. 获取所有历史快照
List<StateSnapshot<TravelState>> snapshots =
graph.getStateHistory(config).stream().toList();

// 2. 找到指定节点的快照
StateSnapshot<TravelState> snapshot = snapshots.stream()
.filter(s -> s.node().equals(node))
.findFirst()
.orElse(null);

// 3. 更新状态为新的输入
config = graph.updateState(snapshot.config(), newInput);

// 4. 继续执行 Graph
return executeGraphUpdateConfigAndFetchLastState(null);
}

4.3.2 实现细节

StateSnapshot 结构

1
2
3
4
5
6
StateSnapshot<TravelState> {
String node; // 节点名称
RunnableConfig config; // 运行配置
TravelState state; // 完整的状态数据
// 其他元数据
}

RunnableConfig 作用

  • 包含 threadId,用于在数据库中定位会话
  • 包含检查点坐标,用于访问特定的快照

4.3.3 时光倒流的两种场景

场景 1:重新选择城市

用户在制定旅行计划后,想回到城市选择环节重新选择:

1
2
3
4
// 回到 recommendCities 节点,选择另一个城市
NodeOutput<TravelState> output =
assistant.rollbackToStep("recommendCities",
Map.of(TravelState.CITY, "上海"));

此时系统会:

  1. 恢复到 recommendCities 节点完成后的状态
  2. 更新城市选择为”上海”
  3. 继续执行条件边和后续节点
  4. 生成基于”上海”的新旅行计划

场景 2:重新选择国家

用户想完全改变目的地,回到工作流的起点:

1
2
3
4
5
// 回到 START 节点,重新输入国家
NodeOutput<TravelState> output =
assistant.rollbackToStep(START,
Map.of(TravelState.MESSAGES_STATE,
List.of(UserMessage.from("我想去日本"))));

此时系统会:

  1. 恢复到初始状态
  2. 替换用户消息为新的国家输入
  3. 重新执行整个工作流
  4. 生成基于新国家的推荐和计划

4.3.4 状态一致性保证

时光倒流过程中,系统保证:

  • 恢复的状态与快照时刻的状态完全相同
  • 新的输入与其他状态字段保持一致
  • Appender 通道的历史消息不丢失
  • 后续执行会产生新的快照

5. 完整使用流程

5.1 工作流示意

以下展示完整的用户交互流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
[用户] → startConversation("我想去中国")

[系统执行]
parseCountry → 提取国家:中国
recommendCities → 推荐城市:北京、上海、西安、杭州、南京
中断

[系统返回] 状态:国家=中国, 推荐城市列表

[用户] → provideFeedback("北京")

[系统执行]
条件边判断:北京在列表中 → continue
planTrip → 生成北京旅行计划
END

[系统返回] 状态:国家=中国, 城市=北京, 旅行计划

[用户不满意] → rollbackToStep("recommendCities",
Map.of(CITY, "上海"))

[系统执行]
恢复到 recommendCities 之后的状态
更新城市为"上海"
条件边判断:上海在列表中 → continue
planTrip → 生成上海旅行计划
END

[系统返回] 状态:国家=中国, 城市=上海, 旅行计划

[用户想改国家] → rollbackToStep(START,
Map.of(MESSAGES_STATE,
List.of(UserMessage.from("我想去日本"))))

[系统执行]
恢复到初始状态
替换用户消息
parseCountry → 提取国家:日本
recommendCities → 推荐城市:东京、京都、大阪、北海道、箱根
中断

[系统返回] 状态:国家=日本, 推荐城市列表

5.2 测试代码片段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@Test
void testTravel() throws Exception {
TravelAssistant assistant = new TravelAssistant(dataSource);

// 1. 用户输入国家
System.out.println("===== 用户输入国家 =====");
NodeOutput<TravelState> output1 =
assistant.startConversation("我想去中国");

TravelState state1 = output1.state();
System.out.println("国家: " + state1.getCountry());
System.out.println("AI 推荐城市: " + state1.getCityOptions());

// 2. 用户不满意,选择"其他"来重新推荐
System.out.println("\n===== 用户不满意,重新选择城市 =====");
NodeOutput<TravelState> output2_1 =
assistant.provideFeedback("其他");

TravelState state2_1 = output2_1.state();
System.out.println("AI 推荐新城市: " + state2_1.getCityOptions());

// 3. 用户选择城市
List<String> cityOptions = state2_1.getCityOptions();
String city = cityOptions.getFirst();
NodeOutput<TravelState> output2_2 =
assistant.provideFeedback(city);
TravelState state2_2 = output2_2.state();
System.out.println("旅行计划:\n" + state2_2.getTripPlan());

// 4. 时光倒流:重新选择城市
System.out.println("\n===== 用户不满意,时光倒流重新选择 =====");
NodeOutput<TravelState> output3 =
assistant.rollbackToStep("recommendCities",
Map.of(TravelState.CITY, cityOptions.getLast()));

TravelState state3 = output3.state();
System.out.println("新的旅行计划:\n" + state3.getTripPlan());

// 5. 时光倒流:重新选择国家
System.out.println("\n===== 用户想重新选择国家 =====");
NodeOutput<TravelState> output4 =
assistant.rollbackToStep(START,
Map.of(TravelState.MESSAGES_STATE,
List.of(UserMessage.from("我想去日本"))));

TravelState state4 = output4.state();
System.out.println("新国家: " + state4.getCountry());
System.out.println("新的推荐城市: " + state4.getCityOptions());

// 6. 用户选择日本城市
String japanCity = state4.getCityOptions().getFirst();
NodeOutput<TravelState> output5 =
assistant.provideFeedback(japanCity);

TravelState state5 = output5.state();
System.out.println("选择城市: " + japanCity);
System.out.println("日本旅行计划:\n" + state5.getTripPlan());
}

6. 关键技术细节

6.1 状态序列化

为了支持检查点的持久化和恢复,TravelAssistant 实现了自定义序列化器:

1
2
3
4
5
6
7
8
public static StateSerializer<TravelState> serializer() {
var serializer = new LC4jStateSerializer<>(TravelState::new);
serializer.mapper().register(UserMessage.class,
new UserMessageSerializer());
serializer.mapper().register(AiMessage.class,
new AiMessageSerializer());
return serializer;
}

这确保了 ChatMessage 对象可以被正确序列化和反序列化。

6.2 异步执行模型

所有节点都实现为异步操作,确保不阻塞执行线程:

1
2
3
4
5
6
AsyncNodeAction<TravelState> parseCountryNode = node_async(s -> {
// 异步操作
UserMessage userMessage = (UserMessage) s.getLastMessage();
String country = chatModel.chat(userMessage).aiMessage().text();
return Map.of(TravelState.COUNTRY, country);
});

所有 I/O 操作(LLM 调用、数据库访问等)都包装在异步操作中。

6.3 RunnableConfig 与 ThreadId

每个会话都关联一个唯一的 threadId,用于在数据库中追踪会话状态:

1
2
3
4
5
6
this.threadId = UUID.randomUUID().toString();

config = RunnableConfig.builder()
.threadId(threadId)
.streamMode(CompiledGraph.StreamMode.SNAPSHOTS)
.build();

ThreadId 是检查点数据库中的主要查询键,支持多会话并发。

6.4 条件边的动态路由

条件边在运行时基于当前状态做出决策:

1
2
3
4
5
6
7
8
9
10
AsyncEdgeAction<TravelState> interruptChooseCityEdge = edge_async(s -> {
String city = s.getCity();
List<String> cityOptions = s.getCityOptions();

if (cityOptions.contains(city)) {
return "continue";
} else {
return "rechoose";
}
});

这种设计允许复杂的业务逻辑驱动工作流路由,而不是硬编码的流程。


7. 性能与可扩展性考虑

7.1 检查点存储性能

  • MemorySaver:适合单个进程内的短期会话,内存占用受限于活跃会话数量
  • MysqlSaver:适合分布式系统,支持跨进程恢复,数据库查询可能成为瓶颈

7.2 消息历史增长

Appender 通道持续累积消息,长期对话可能导致状态膨胀。建议:

  • 定期归档旧消息
  • 实现消息压缩或摘要策略

7.3 条件边的计算复杂度

复杂的条件判断逻辑可能阻塞边路由。建议:

  • 保持边函数的轻量级
  • 将复杂计算移到专用节点

8. 总结

TravelAssistant 展示了 LangGraph4J 框架在以下方面的能力:

  1. 中断机制:通过条件边和状态更新实现人机交互,允许用户在工作流中选择分支
  2. 历史管理:使用 Appender 通道自动累积所有消息和操作历史
  3. 状态持久化:支持多种检查点保存策略,从内存到数据库
  4. 时光倒流:允许用户回到任意检查点,修改状态,重新执行工作流

这些功能结合异步执行模型和自定义序列化器,为构建复杂的、交互式的 AI 工作流应用提供了坚实的基础。