添加人工干预 - LangChain 框架 --知识铺
添加人工干预¶
interrupt¶
LangGraph 中的 [interrupt 函数](https://langgraph.com.cn/reference/types/index.html#langgraph.types.interrupt “ interrupt”) 通过在特定节点暂停图表,向人类呈现信息,并根据其输入恢复图表,从而实现人工干预工作流。这对于审批、编辑或收集额外上下文等任务非常有用。
图表使用提供人类响应的 [Command](https://langgraph.com.cn/reference/types/index.html#langgraph.types.Command “ Command dataclass ”) 对象恢复。
API 参考: interrupt | Command
<span id="__span-0-1">from langgraph.types import interrupt, Command
<span id="__span-0-2">
<span id="__span-0-3">def human_node(state: State):
<span id="__span-0-4"> value = interrupt(
<span id="__span-0-5"> {
<span id="__span-0-6"> "text_to_revise": state["some_text"]
<span id="__span-0-7"> }
<span id="__span-0-8"> )
<span id="__span-0-9"> return {
<span id="__span-0-10"> "some_text": value
<span id="__span-0-11"> }
<span id="__span-0-12">
<span id="__span-0-13">
<span id="__span-0-14">graph = graph_builder.compile(checkpointer=checkpointer)
<span id="__span-0-15">
<span id="__span-0-16"># Run the graph until the interrupt is hit.
<span id="__span-0-17">config = {"configurable": {"thread_id": "some_id"}}
<span id="__span-0-18">result = graph.invoke({"some_text": "original text"}, config=config)
<span id="__span-0-19">print(result['__interrupt__'])
<span id="__span-0-20"># > [
<span id="__span-0-21"># > Interrupt(
<span id="__span-0-22"># > value={'text_to_revise': 'original text'},
<span id="__span-0-23"># > resumable=True,
<span id="__span-0-24"># > ns=['human_node:6ce9e64f-edef-fe5d-f7dc-511fa9526960']
<span id="__span-0-25"># > )
<span id="__span-0-26"># > ]
<span id="__span-0-27">
<span id="__span-0-28">print(graph.invoke(Command(resume="Edited text"), config=config))
<span id="__span-0-29"># > {'some_text': 'Edited text'}
扩展示例:使用 interrupt
<span id="__span-1-1">from typing import TypedDict
<span id="__span-1-2">import uuid
<span id="__span-1-3">
<span id="__span-1-4">from langgraph.checkpoint.memory import InMemorySaver
<span id="__span-1-5">from langgraph.constants import START
<span id="__span-1-6">from langgraph.graph import StateGraph
<span id="__span-1-7">from langgraph.types import interrupt, Command
<span id="__span-1-8">
<span id="__span-1-9">class State(TypedDict):
<span id="__span-1-10"> some_text: str
<span id="__span-1-11">
<span id="__span-1-12">def human_node(state: State):
<span id="__span-1-13"> value = interrupt(
<span id="__span-1-14"> {
<span id="__span-1-15"> "text_to_revise": state["some_text"]
<span id="__span-1-16"> }
<span id="__span-1-17"> )
<span id="__span-1-18"> return {
<span id="__span-1-19"> "some_text": value
<span id="__span-1-20"> }
<span id="__span-1-21">
<span id="__span-1-22">
<span id="__span-1-23"># Build the graph
<span id="__span-1-24">graph_builder = StateGraph(State)
<span id="__span-1-25">graph_builder.add_node("human_node", human_node)
<span id="__span-1-26">graph_builder.add_edge(START, "human_node")
<span id="__span-1-27">
<span id="__span-1-28">checkpointer = InMemorySaver()
<span id="__span-1-29">
<span id="__span-1-30">graph = graph_builder.compile(checkpointer=checkpointer)
<span id="__span-1-31">
<span id="__span-1-32"># Pass a thread ID to the graph to run it.
<span id="__span-1-33">config = {"configurable": {"thread_id": uuid.uuid4()}}
<span id="__span-1-34">
<span id="__span-1-35"># Run the graph until the interrupt is hit.
<span id="__span-1-36">result = graph.invoke({"some_text": "original text"}, config=config)
<span id="__span-1-37">
<span id="__span-1-38">print(result['__interrupt__'])
<span id="__span-1-39"># > [
<span id="__span-1-40"># > Interrupt(
<span id="__span-1-41"># > value={'text_to_revise': 'original text'},
<span id="__span-1-42"># > resumable=True,
<span id="__span-1-43"># > ns=['human_node:6ce9e64f-edef-fe5d-f7dc-511fa9526960']
<span id="__span-1-44"># > )
<span id="__span-1-45"># > ]
<span id="__span-1-46">
<span id="__span-1-47">print(graph.invoke(Command(resume="Edited text"), config=config))
<span id="__span-1-48"># > {'some_text': 'Edited text'}
0.4.0 版本新增
__interrupt__ 是一个特殊键,当图表被中断时,在运行图表时将返回此键。在 0.4.0 版本中,invoke 和 ainvoke 中已添加了对 __interrupt__ 的支持。如果您使用的是旧版本,则只有在使用 stream 或 astream 时才能在结果中看到 __interrupt__。您还可以使用 graph.get_state(thread_id) 获取中断值。
警告
中断功能强大且符合人体工程学。然而,尽管它们在开发人员体验方面可能与 Python 的 input() 函数类似,但需要注意的是,它们不会自动从中断点恢复执行。相反,它们会重新运行使用中断的整个节点。因此,中断通常最好放置在节点的开头或专用节点中。请阅读从中断中恢复部分以获取更多详细信息。
要求¶
要在图表中使用 interrupt,您需要
- 指定一个检查点 以在每一步之后保存图表状态。
- 在适当的位置调用
interrupt()。请参阅设计模式部分以获取示例。 - 使用 线程 ID 运行图表 直到命中
interrupt。 - 使用
invoke/ainvoke/stream/astream恢复执行(请参阅Command原语)。
设计模式¶
人工干预工作流通常有三种不同的操作
- 批准或拒绝:在关键步骤(例如 API 调用)之前暂停图表,以审查和批准操作。如果操作被拒绝,您可以阻止图表执行该步骤,并可能采取替代操作。此模式通常涉及根据人类的输入对图表进行路由。
- 编辑图表状态:暂停图表以审查和编辑图表状态。这对于纠正错误或使用附加信息更新状态非常有用。此模式通常涉及使用人类的输入更新状态。
- 获取输入:在图表中的特定步骤显式请求人工输入。这对于收集附加信息或上下文以指导代理的决策过程非常有用。
下面我们展示了可以使用这些操作实现的不同设计模式。
批准或拒绝¶
根据人类的批准或拒绝,图表可以继续执行操作或采取替代路径。
在关键步骤(例如 API 调用)之前暂停图表,以审查和批准操作。如果操作被拒绝,您可以阻止图表执行该步骤,并可能采取替代操作。
API 参考: interrupt | Command
<code tabindex="0"><span id="__span-2-1">from typing import Literal
<span id="__span-2-2">from langgraph.types import interrupt, Command
<span id="__span-2-3">
<span id="__span-2-4">def human_approval(state: State) -> Command[Literal["some_node", "another_node"]]:
<span id="__span-2-5"> is_approved = interrupt(
<span id="__span-2-6"> {
<span id="__span-2-7"> "question": "Is this correct?",
<span id="__span-2-8"> # Surface the output that should be
<span id="__span-2-9"> # reviewed and approved by the human.
<span id="__span-2-10"> "llm_output": state["llm_output"]
<span id="__span-2-11"> }
<span id="__span-2-12"> )
<span id="__span-2-13">
<span id="__span-2-14"> if is_approved:
<span id="__span-2-15"> return Command(goto="some_node")
<span id="__span-2-16"> else:
<span id="__span-2-17"> return Command(goto="another_node")
<span id="__span-2-18">
<span id="__span-2-19"># Add the node to the graph in an appropriate location
<span id="__span-2-20"># and connect it to the relevant nodes.
<span id="__span-2-21">graph_builder.add_node("human_approval", human_approval)
<span id="__span-2-22">graph = graph_builder.compile(checkpointer=checkpointer)
<span id="__span-2-23">
<span id="__span-2-24"># After running the graph and hitting the interrupt, the graph will pause.
<span id="__span-2-25"># Resume it with either an approval or rejection.
<span id="__span-2-26">thread_config = {"configurable": {"thread_id": "some_id"}}
<span id="__span-2-27">graph.invoke(Command(resume=True), config=thread_config)
扩展示例:使用中断批准或拒绝
<code tabindex="0"><span id="__span-3-1">from typing import Literal, TypedDict
<span id="__span-3-2">import uuid
<span id="__span-3-3">
<span id="__span-3-4">from langgraph.constants import START, END
<span id="__span-3-5">from langgraph.graph import StateGraph
<span id="__span-3-6">from langgraph.types import interrupt, Command
<span id="__span-3-7">from langgraph.checkpoint.memory import MemorySaver
<span id="__span-3-8">
<span id="__span-3-9"># Define the shared graph state
<span id="__span-3-10">class State(TypedDict):
<span id="__span-3-11"> llm_output: str
<span id="__span-3-12"> decision: str
<span id="__span-3-13">
<span id="__span-3-14"># Simulate an LLM output node
<span id="__span-3-15">def generate_llm_output(state: State) -> State:
<span id="__span-3-16"> return {"llm_output": "This is the generated output."}
<span id="__span-3-17">
<span id="__span-3-18"># Human approval node
<span id="__span-3-19">def human_approval(state: State) -> Command[Literal["approved_path", "rejected_path"]]:
<span id="__span-3-20"> decision = interrupt({
<span id="__span-3-21"> "question": "Do you approve the following output?",
<span id="__span-3-22"> "llm_output": state["llm_output"]
<span id="__span-3-23"> })
<span id="__span-3-24">
<span id="__span-3-25"> if decision == "approve":
<span id="__span-3-26"> return Command(goto="approved_path", update={"decision": "approved"})
<span id="__span-3-27"> else:
<span id="__span-3-28"> return Command(goto="rejected_path", update={"decision": "rejected"})
<span id="__span-3-29">
<span id="__span-3-30"># Next steps after approval
<span id="__span-3-31">def approved_node(state: State) -> State:
<span id="__span-3-32"> print("✅ Approved path taken.")
<span id="__span-3-33"> return state
<span id="__span-3-34">
<span id="__span-3-35"># Alternative path after rejection
<span id="__span-3-36">def rejected_node(state: State) -> State:
<span id="__span-3-37"> print("❌ Rejected path taken.")
<span id="__span-3-38"> return state
<span id="__span-3-39">
<span id="__span-3-40"># Build the graph
<span id="__span-3-41">builder = StateGraph(State)
<span id="__span-3-42">builder.add_node("generate_llm_output", generate_llm_output)
<span id="__span-3-43">builder.add_node("human_approval", human_approval)
<span id="__span-3-44">builder.add_node("approved_path", approved_node)
<span id="__span-3-45">builder.add_node("rejected_path", rejected_node)
<span id="__span-3-46">
<span id="__span-3-47">builder.set_entry_point("generate_llm_output")
<span id="__span-3-48">builder.add_edge("generate_llm_output", "human_approval")
<span id="__span-3-49">builder.add_edge("approved_path", END)
<span id="__span-3-50">builder.add_edge("rejected_path", END)
<span id="__span-3-51">
<span id="__span-3-52">checkpointer = MemorySaver()
<span id="__span-3-53">graph = builder.compile(checkpointer=checkpointer)
<span id="__span-3-54">
<span id="__span-3-55"># Run until interrupt
<span id="__span-3-56">config = {"configurable": {"thread_id": uuid.uuid4()}}
<span id="__span-3-57">result = graph.invoke({}, config=config)
<span id="__span-3-58">print(result["__interrupt__"])
<span id="__span-3-59"># Output:
<span id="__span-3-60"># Interrupt(value={'question': 'Do you approve the following output?', 'llm_output': 'This is the generated output.'}, ...)
<span id="__span-3-61">
<span id="__span-3-62"># Simulate resuming with human input
<span id="__span-3-63"># To test rejection, replace resume="approve" with resume="reject"
<span id="__span-3-64">final_result = graph.invoke(Command(resume="approve"), config=config)
<span id="__span-3-65">print(final_result)
有关更详细的示例,请参见如何审查工具调用。
审查和编辑状态¶
人类可以审查和编辑图表的状态。这对于纠正错误或使用附加信息更新状态非常有用。
API 参考: interrupt
<code tabindex="0"><span id="__span-4-1">from langgraph.types import interrupt
<span id="__span-4-2">
<span id="__span-4-3">def human_editing(state: State):
<span id="__span-4-4"> ...
<span id="__span-4-5"> result = interrupt(
<span id="__span-4-6"> # Interrupt information to surface to the client.
<span id="__span-4-7"> # Can be any JSON serializable value.
<span id="__span-4-8"> {
<span id="__span-4-9"> "task": "Review the output from the LLM and make any necessary edits.",
<span id="__span-4-10"> "llm_generated_summary": state["llm_generated_summary"]
<span id="__span-4-11"> }
<span id="__span-4-12"> )
<span id="__span-4-13">
<span id="__span-4-14"> # Update the state with the edited text
<span id="__span-4-15"> return {
<span id="__span-4-16"> "llm_generated_summary": result["edited_text"]
<span id="__span-4-17"> }
<span id="__span-4-18">
<span id="__span-4-19"># Add the node to the graph in an appropriate location
<span id="__span-4-20"># and connect it to the relevant nodes.
<span id="__span-4-21">graph_builder.add_node("human_editing", human_editing)
<span id="__span-4-22">graph = graph_builder.compile(checkpointer=checkpointer)
<span id="__span-4-23">
<span id="__span-4-24">...
<span id="__span-4-25">
<span id="__span-4-26"># After running the graph and hitting the interrupt, the graph will pause.
<span id="__span-4-27"># Resume it with the edited text.
<span id="__span-4-28">thread_config = {"configurable": {"thread_id": "some_id"}}
<span id="__span-4-29">graph.invoke(
<span id="__span-4-30"> Command(resume={"edited_text": "The edited text"}),
<span id="__span-4-31"> config=thread_config
<span id="__span-4-32">)
扩展示例:使用中断编辑状态
<span id="__span-5-1">from typing import TypedDict
<span id="__span-5-2">import uuid
<span id="__span-5-3">
<span id="__span-5-4">from langgraph.constants import START, END
<span id="__span-5-5">from langgraph.graph import StateGraph
<span id="__span-5-6">from langgraph.types import interrupt, Command
<span id="__span-5-7">from langgraph.checkpoint.memory import MemorySaver
<span id="__span-5-8">
<span id="__span-5-9"># Define the graph state
<span id="__span-5-10">class State(TypedDict):
<span id="__span-5-11"> summary: str
<span id="__span-5-12">
<span id="__span-5-13"># Simulate an LLM summary generation
<span id="__span-5-14">def generate_summary(state: State) -> State:
<span id="__span-5-15"> return {
<span id="__span-5-16"> "summary": "The cat sat on the mat and looked at the stars."
<span id="__span-5-17"> }
<span id="__span-5-18">
<span id="__span-5-19"># Human editing node
<span id="__span-5-20">def human_review_edit(state: State) -> State:
<span id="__span-5-21"> result = interrupt({
<span id="__span-5-22"> "task": "Please review and edit the generated summary if necessary.",
<span id="__span-5-23"> "generated_summary": state["summary"]
<span id="__span-5-24"> })
<span id="__span-5-25"> return {
<span id="__span-5-26"> "summary": result["edited_summary"]
<span id="__span-5-27"> }
<span id="__span-5-28">
<span id="__span-5-29"># Simulate downstream use of the edited summary
<span id="__span-5-30">def downstream_use(state: State) -> State:
<span id="__span-5-31"> print(f"✅ Using edited summary: {state['summary']}")
<span id="__span-5-32"> return state
<span id="__span-5-33">
<span id="__span-5-34"># Build the graph
<span id="__span-5-35">builder = StateGraph(State)
<span id="__span-5-36">builder.add_node("generate_summary", generate_summary)
<span id="__span-5-37">builder.add_node("human_review_edit", human_review_edit)
<span id="__span-5-38">builder.add_node("downstream_use", downstream_use)
<span id="__span-5-39">
<span id="__span-5-40">builder.set_entry_point("generate_summary")
<span id="__span-5-41">builder.add_edge("generate_summary", "human_review_edit")
<span id="__span-5-42">builder.add_edge("human_review_edit", "downstream_use")
<span id="__span-5-43">builder.add_edge("downstream_use", END)
<span id="__span-5-44">
<span id="__span-5-45"># Set up in-memory checkpointing for interrupt support
<span id="__span-5-46">checkpointer = MemorySaver()
<span id="__span-5-47">graph = builder.compile(checkpointer=checkpointer)
<span id="__span-5-48">
<span id="__span-5-49"># Invoke the graph until it hits the interrupt
<span id="__span-5-50">config = {"configurable": {"thread_id": uuid.uuid4()}}
<span id="__span-5-51">result = graph.invoke({}, config=config)
<span id="__span-5-52">
<span id="__span-5-53"># Output interrupt payload
<span id="__span-5-54">print(result["__interrupt__"])
<span id="__span-5-55"># Example output:
<span id="__span-5-56"># Interrupt(
<span id="__span-5-57"># value={
<span id="__span-5-58"># 'task': 'Please review and edit the generated summary if necessary.',
<span id="__span-5-59"># 'generated_summary': 'The cat sat on the mat and looked at the stars.'
<span id="__span-5-60"># },
<span id="__span-5-61"># resumable=True,
<span id="__span-5-62"># ...
<span id="__span-5-63"># )
<span id="__span-5-64">
<span id="__span-5-65"># Resume the graph with human-edited input
<span id="__span-5-66">edited_summary = "The cat lay on the rug, gazing peacefully at the night sky."
<span id="__span-5-67">resumed_result = graph.invoke(
<span id="__span-5-68"> Command(resume={"edited_summary": edited_summary}),
<span id="__span-5-69"> config=config
<span id="__span-5-70">)
<span id="__span-5-71">print(resumed_result)
审查工具调用¶
人类可以在继续之前审查和编辑 LLM 的输出。这在 LLM 请求的工具调用可能敏感或需要人工监督的应用程序中尤为关键。
<span id="__span-6-1">def human_review_node(state) -> Command[Literal["call_llm", "run_tool"]]:
<span id="__span-6-2"> # This is the value we'll be providing via Command(resume=<human_review>)
<span id="__span-6-3"> human_review = interrupt(
<span id="__span-6-4"> {
<span id="__span-6-5"> "question": "Is this correct?",
<span id="__span-6-6"> # Surface tool calls for review
<span id="__span-6-7"> "tool_call": tool_call
<span id="__span-6-8"> }
<span id="__span-6-9"> )
<span id="__span-6-10">
<span id="__span-6-11"> review_action, review_data = human_review
<span id="__span-6-12">
<span id="__span-6-13"> # Approve the tool call and continue
<span id="__span-6-14"> if review_action == "continue":
<span id="__span-6-15"> return Command(goto="run_tool")
<span id="__span-6-16">
<span id="__span-6-17"> # Modify the tool call manually and then continue
<span id="__span-6-18"> elif review_action == "update":
<span id="__span-6-19"> ...
<span id="__span-6-20"> updated_msg = get_updated_msg(review_data)
<span id="__span-6-21"> # Remember that to modify an existing message you will need
<span id="__span-6-22"> # to pass the message with a matching ID.
<span id="__span-6-23"> return Command(goto="run_tool", update={"messages": [updated_message]})
<span id="__span-6-24">
<span id="__span-6-25"> # Give natural language feedback, and then pass that back to the agent
<span id="__span-6-26"> elif review_action == "feedback":
<span id="__span-6-27"> ...
<span id="__span-6-28"> feedback_msg = get_feedback_msg(review_data)
<span id="__span-6-29"> return Command(goto="call_llm", update={"messages": [feedback_msg]})
有关更详细的示例,请参见如何审查工具调用。
验证人工输入¶
如果您需要在图表本身(而不是在客户端)验证人类提供的输入,可以通过在单个节点内使用多个中断调用来实现。
API 参考: interrupt
<span id="__span-7-1">from langgraph.types import interrupt
<span id="__span-7-2">
<span id="__span-7-3">def human_node(state: State):
<span id="__span-7-4"> """Human node with validation."""
<span id="__span-7-5"> question = "What is your age?"
<span id="__span-7-6">
<span id="__span-7-7"> while True:
<span id="__span-7-8"> answer = interrupt(question)
<span id="__span-7-9">
<span id="__span-7-10"> # Validate answer, if the answer isn't valid ask for input again.
<span id="__span-7-11"> if not isinstance(answer, int) or answer < 0:
<span id="__span-7-12"> question = f"'{answer} is not a valid age. What is your age?"
<span id="__span-7-13"> answer = None
<span id="__span-7-14"> continue
<span id="__span-7-15"> else:
<span id="__span-7-16"> # If the answer is valid, we can proceed.
<span id="__span-7-17"> break
<span id="__span-7-18">
<span id="__span-7-19"> print(f"The human in the loop is {answer} years old.")
<span id="__span-7-20"> return {
<span id="__span-7-21"> "age": answer
<span id="__span-7-22"> }
扩展示例:验证用户输入
<span id="__span-8-1">from typing import TypedDict
<span id="__span-8-2">import uuid
<span id="__span-8-3">
<span id="__span-8-4">from langgraph.constants import START, END
<span id="__span-8-5">from langgraph.graph import StateGraph
<span id="__span-8-6">from langgraph.types import interrupt, Command
<span id="__span-8-7">from langgraph.checkpoint.memory import MemorySaver
<span id="__span-8-8">
<span id="__span-8-9"># Define graph state
<span id="__span-8-10">class State(TypedDict):
<span id="__span-8-11"> age: int
<span id="__span-8-12">
<span id="__span-8-13"># Node that asks for human input and validates it
<span id="__span-8-14">def get_valid_age(state: State) -> State:
<span id="__span-8-15"> prompt = "Please enter your age (must be a non-negative integer)."
<span id="__span-8-16">
<span id="__span-8-17"> while True:
<span id="__span-8-18"> user_input = interrupt(prompt)
<span id="__span-8-19">
<span id="__span-8-20"> # Validate the input
<span id="__span-8-21"> try:
<span id="__span-8-22"> age = int(user_input)
<span id="__span-8-23"> if age < 0:
<span id="__span-8-24"> raise ValueError("Age must be non-negative.")
<span id="__span-8-25"> break # Valid input received
<span id="__span-8-26"> except (ValueError, TypeError):
<span id="__span-8-27"> prompt = f"'{user_input}' is not valid. Please enter a non-negative integer for age."
<span id="__span-8-28">
<span id="__span-8-29"> return {"age": age}
<span id="__span-8-30">
<span id="__span-8-31"># Node that uses the valid input
<span id="__span-8-32">def report_age(state: State) -> State:
<span id="__span-8-33"> print(f"✅ Human is {state['age']} years old.")
<span id="__span-8-34"> return state
<span id="__span-8-35">
<span id="__span-8-36"># Build the graph
<span id="__span-8-37">builder = StateGraph(State)
<span id="__span-8-38">builder.add_node("get_valid_age", get_valid_age)
<span id="__span-8-39">builder.add_node("report_age", report_age)
<span id="__span-8-40">
<span id="__span-8-41">builder.set_entry_point("get_valid_age")
<span id="__span-8-42">builder.add_edge("get_valid_age", "report_age")
<span id="__span-8-43">builder.add_edge("report_age", END)
<span id="__span-8-44">
<span id="__span-8-45"># Create the graph with a memory checkpointer
<span id="__span-8-46">checkpointer = MemorySaver()
<span id="__span-8-47">graph = builder.compile(checkpointer=checkpointer)
<span id="__span-8-48">
<span id="__span-8-49"># Run the graph until the first interrupt
<span id="__span-8-50">config = {"configurable": {"thread_id": uuid.uuid4()}}
<span id="__span-8-51">result = graph.invoke({}, config=config)
<span id="__span-8-52">print(result["__interrupt__"]) # First prompt: "Please enter your age..."
<span id="__span-8-53">
<span id="__span-8-54"># Simulate an invalid input (e.g., string instead of integer)
<span id="__span-8-55">result = graph.invoke(Command(resume="not a number"), config=config)
<span id="__span-8-56">print(result["__interrupt__"]) # Follow-up prompt with validation message
<span id="__span-8-57">
<span id="__span-8-58"># Simulate a second invalid input (e.g., negative number)
<span id="__span-8-59">result = graph.invoke(Command(resume="-10"), config=config)
<span id="__span-8-60">print(result["__interrupt__"]) # Another retry
<span id="__span-8-61">
<span id="__span-8-62"># Provide valid input
<span id="__span-8-63">final_result = graph.invoke(Command(resume="25"), config=config)
<span id="__span-8-64">print(final_result) # Should include the valid age
使用 Command 原语恢复¶
当在图表中使用 interrupt 函数时,执行会在该点暂停并等待用户输入。
要恢复执行,请使用 [Command](https://langgraph.com.cn/reference/types/index.html#langgraph.types.Command “ Command dataclass ”) 原语,可以通过 invoke、ainvoke、stream 或 astream 方法提供。
向 interrupt 提供响应: 要继续执行,请使用 Command(resume=value) 传递用户输入。图表从最初调用 interrupt(...) 的节点开头恢复执行。此时,interrupt 函数将返回 Command(resume=value) 中提供的值,而不会再次暂停。
<span id="__span-9-1"># Resume graph execution by providing the user's input.
<span id="__span-9-2">graph.invoke(Command(resume={"age": "25"}), thread_config)
中断后如何恢复?¶
警告
从 interrupt 恢复与 Python 的 input() 函数不同,后者从 input() 函数被调用的确切点恢复执行。
使用 interrupt 的一个关键方面是理解恢复是如何工作的。当您在 interrupt 之后恢复执行时,图表执行将从触发最后一个 interrupt 的图表节点的开头开始。
从节点开头到 interrupt 的所有代码都将重新执行。
<code tabindex="0"><span id="__span-10-1">counter = 0
<span id="__span-10-2">def node(state: State):
<span id="__span-10-3"> # All the code from the beginning of the node to the interrupt will be re-executed
<span id="__span-10-4"> # when the graph resumes.
<span id="__span-10-5"> global counter
<span id="__span-10-6"> counter += 1
<span id="__span-10-7"> print(f"> Entered the node: {counter} # of times")
<span id="__span-10-8"> # Pause the graph and wait for user input.
<span id="__span-10-9"> answer = interrupt()
<span id="__span-10-10"> print("The value of counter is:", counter)
<span id="__span-10-11"> ...
恢复图表后,计数器将第二次递增,产生以下输出
<span id="__span-11-1">> Entered the node: 2 # of times
<span id="__span-11-2">The value of counter is: 2
一次调用恢复多个中断¶
如果任务队列中有多个中断,您可以使用带有中断 ID 到恢复值映射字典的 Command.resume,通过一次 invoke / stream 调用恢复多个中断。
例如,一旦您的图表被中断(理论上多次)并停滞
<span id="__span-12-1">resume_map = {
<span id="__span-12-2"> i.interrupt_id: f"human input for prompt {i.value}"
<span id="__span-12-3"> for i in parent.get_state(thread_config).interrupts
<span id="__span-12-4">}
<span id="__span-12-5">
<span id="__span-12-6">parent_graph.invoke(Command(resume=resume_map), config=thread_config)
常见陷阱¶
副作用¶
将具有副作用的代码(例如 API 调用)放置在 interrupt 之后,以避免重复,因为每次恢复节点时都会重新触发这些副作用。
当节点从 interrupt 恢复时,此代码将再次重新执行 API 调用。
如果 API 调用不是幂等的或只是昂贵的,这可能会导致问题。
<span id="__span-13-1">from langgraph.types import interrupt
<span id="__span-13-2">
<span id="__span-13-3">def human_node(state: State):
<span id="__span-13-4"> """Human node with validation."""
<span id="__span-13-5"> api_call(...) # This code will be re-executed when the node is resumed.
<span id="__span-13-6"> answer = interrupt(question)
作为函数调用的子图¶
当作为函数调用子图时,父图将从调用子图的节点开头(以及触发 interrupt 的位置)恢复执行。类似地,子图将从调用 interrupt() 函数的节点开头恢复。
例如,
<span id="__span-16-1">def node_in_parent_graph(state: State):
<span id="__span-16-2"> some_code() # <-- This will re-execute when the subgraph is resumed.
<span id="__span-16-3"> # Invoke a subgraph as a function.
<span id="__span-16-4"> # The subgraph contains an `interrupt` call.
<span id="__span-16-5"> subgraph_result = subgraph.invoke(some_input)
<span id="__span-16-6"> ...
扩展示例:父图和子图执行流程
假设我们有一个包含 3 个节点的父图
父图:node_1 → node_2(子图调用) → node_3
子图包含 3 个节点,其中第二个节点包含一个 interrupt
子图:sub_node_1 → sub_node_2(interrupt) → sub_node_3
恢复图表时,执行将按以下方式进行
- 跳过父图中的
node_1(已执行,图表状态已保存到快照中)。 - 从头开始重新执行父图中的
node_2。 - 跳过子图中的
sub_node_1(已执行,图表状态已保存到快照中)。 - 从头开始重新执行子图中的
sub_node_2。 - 继续执行
sub_node_3和后续节点。
这是一个缩写示例代码,您可以使用它来了解子图如何与中断一起工作。它计算每个节点被进入的次数并打印计数。
<span id="__span-17-1">import uuid
<span id="__span-17-2">from typing import TypedDict
<span id="__span-17-3">
<span id="__span-17-4">from langgraph.graph import StateGraph
<span id="__span-17-5">from langgraph.constants import START
<span id="__span-17-6">from langgraph.types import interrupt, Command
<span id="__span-17-7">from langgraph.checkpoint.memory import MemorySaver
<span id="__span-17-8">
<span id="__span-17-9">
<span id="__span-17-10">class State(TypedDict):
<span id="__span-17-11"> """The graph state."""
<span id="__span-17-12"> state_counter: int
<span id="__span-17-13">
<span id="__span-17-14">
<span id="__span-17-15">counter_node_in_subgraph = 0
<span id="__span-17-16">
<span id="__span-17-17">def node_in_subgraph(state: State):
<span id="__span-17-18"> """A node in the sub-graph."""
<span id="__span-17-19"> global counter_node_in_subgraph
<span id="__span-17-20"> counter_node_in_subgraph += 1 # This code will **NOT** run again!
<span id="__span-17-21"> print(f"Entered `node_in_subgraph` a total of {counter_node_in_subgraph} times")
<span id="__span-17-22">
<span id="__span-17-23">counter_human_node = 0
<span id="__span-17-24">
<span id="__span-17-25">def human_node(state: State):
<span id="__span-17-26"> global counter_human_node
<span id="__span-17-27"> counter_human_node += 1 # This code will run again!
<span id="__span-17-28"> print(f"Entered human_node in sub-graph a total of {counter_human_node} times")
<span id="__span-17-29"> answer = interrupt("what is your name?")
<span id="__span-17-30"> print(f"Got an answer of {answer}")
<span id="__span-17-31">
<span id="__span-17-32">
<span id="__span-17-33">checkpointer = MemorySaver()
<span id="__span-17-34">
<span id="__span-17-35">subgraph_builder = StateGraph(State)
<span id="__span-17-36">subgraph_builder.add_node("some_node", node_in_subgraph)
<span id="__span-17-37">subgraph_builder.add_node("human_node", human_node)
<span id="__span-17-38">subgraph_builder.add_edge(START, "some_node")
<span id="__span-17-39">subgraph_builder.add_edge("some_node", "human_node")
<span id="__span-17-40">subgraph = subgraph_builder.compile(checkpointer=checkpointer)
<span id="__span-17-41">
<span id="__span-17-42">
<span id="__span-17-43">counter_parent_node = 0
<span id="__span-17-44">
<span id="__span-17-45">def parent_node(state: State):
<span id="__span-17-46"> """This parent node will invoke the subgraph."""
<span id="__span-17-47"> global counter_parent_node
<span id="__span-17-48">
<span id="__span-17-49"> counter_parent_node += 1 # This code will run again on resuming!
<span id="__span-17-50"> print(f"Entered `parent_node` a total of {counter_parent_node} times")
<span id="__span-17-51">
<span id="__span-17-52"> # Please note that we're intentionally incrementing the state counter
<span id="__span-17-53"> # in the graph state as well to demonstrate that the subgraph update
<span id="__span-17-54"> # of the same key will not conflict with the parent graph (until
<span id="__span-17-55"> subgraph_state = subgraph.invoke(state)
<span id="__span-17-56"> return subgraph_state
<span id="__span-17-57">
<span id="__span-17-58">
<span id="__span-17-59">builder = StateGraph(State)
<span id="__span-17-60">builder.add_node("parent_node", parent_node)
<span id="__span-17-61">builder.add_edge(START, "parent_node")
<span id="__span-17-62">
<span id="__span-17-63"># A checkpointer must be enabled for interrupts to work!
<span id="__span-17-64">checkpointer = MemorySaver()
<span id="__span-17-65">graph = builder.compile(checkpointer=checkpointer)
<span id="__span-17-66">
<span id="__span-17-67">config = {
<span id="__span-17-68"> "configurable": {
<span id="__span-17-69"> "thread_id": uuid.uuid4(),
<span id="__span-17-70"> }
<span id="__span-17-71">}
<span id="__span-17-72">
<span id="__span-17-73">for chunk in graph.stream({"state_counter": 1}, config):
<span id="__span-17-74"> print(chunk)
<span id="__span-17-75">
<span id="__span-17-76">print('--- Resuming ---')
<span id="__span-17-77">
<span id="__span-17-78">for chunk in graph.stream(Command(resume="35"), config):
<span id="__span-17-79"> print(chunk)
这将打印出
<code tabindex="0"><span id="__span-18-1">Entered `parent_node` a total of 1 times
<span id="__span-18-2">Entered `node_in_subgraph` a total of 1 times
<span id="__span-18-3">Entered human_node in sub-graph a total of 1 times
<span id="__span-18-4">{'__interrupt__': (Interrupt(value='what is your name?', resumable=True, ns=['parent_node:4c3a0248-21f0-1287-eacf-3002bc304db4', 'human_node:2fe86d52-6f70-2a3f-6b2f-b1eededd6348'], when='during'),)}
<span id="__span-18-5">--- Resuming ---
<span id="__span-18-6">Entered `parent_node` a total of 2 times
<span id="__span-18-7">Entered human_node in sub-graph a total of 2 times
<span id="__span-18-8">Got an answer of 35
<span id="__span-18-9">{'parent_node': {'state_counter': 1}}
使用多个中断¶
在单个节点中使用多个中断对于验证人工输入等模式可能很有帮助。但是,如果在同一节点中使用多个中断,如果不仔细处理,可能会导致意外行为。
当一个节点包含多个中断调用时,LangGraph 会维护一个与执行该节点的任务相关的恢复值列表。每当执行恢复时,它会从节点开头开始。对于遇到的每个中断,LangGraph 会检查任务的恢复列表中是否存在匹配的值。匹配是严格基于索引的,因此节点内中断调用的顺序至关重要。
为避免问题,请避免在执行之间动态更改节点结构。这包括添加、删除或重新排序中断调用,因为此类更改可能导致索引不匹配。这些问题通常源于非常规模式,例如通过 Command(resume=..., update=SOME_STATE_MUTATION) 修改状态或依赖全局变量动态修改节点结构。
扩展示例:引入非确定性的错误代码
<span id="__span-19-1">import uuid
<span id="__span-19-2">from typing import TypedDict, Optional
<span id="__span-19-3">
<span id="__span-19-4">from langgraph.graph import StateGraph
<span id="__span-19-5">from langgraph.constants import START
<span id="__span-19-6">from langgraph.types import interrupt, Command
<span id="__span-19-7">from langgraph.checkpoint.memory import MemorySaver
<span id="__span-19-8">
<span id="__span-19-9">
<span id="__span-19-10">class State(TypedDict):
<span id="__span-19-11"> """The graph state."""
<span id="__span-19-12">
<span id="__span-19-13"> age: Optional[str]
<span id="__span-19-14"> name: Optional[str]
<span id="__span-19-15">
<span id="__span-19-16">
<span id="__span-19-17">def human_node(state: State):
<span id="__span-19-18"> if not state.get('name'):
<span id="__span-19-19"> name = interrupt("what is your name?")
<span id="__span-19-20"> else:
<span id="__span-19-21"> name = "N/A"
<span id="__span-19-22">
<span id="__span-19-23"> if not state.get('age'):
<span id="__span-19-24"> age = interrupt("what is your age?")
<span id="__span-19-25"> else:
<span id="__span-19-26"> age = "N/A"
<span id="__span-19-27">
<span id="__span-19-28"> print(f"Name: {name}. Age: {age}")
<span id="__span-19-29">
<span id="__span-19-30"> return {
<span id="__span-19-31"> "age": age,
<span id="__span-19-32"> "name": name,
<span id="__span-19-33"> }
<span id="__span-19-34">
<span id="__span-19-35">
<span id="__span-19-36">builder = StateGraph(State)
<span id="__span-19-37">builder.add_node("human_node", human_node)
<span id="__span-19-38">builder.add_edge(START, "human_node")
<span id="__span-19-39">
<span id="__span-19-40"># A checkpointer must be enabled for interrupts to work!
<span id="__span-19-41">checkpointer = MemorySaver()
<span id="__span-19-42">graph = builder.compile(checkpointer=checkpointer)
<span id="__span-19-43">
<span id="__span-19-44">config = {
<span id="__span-19-45"> "configurable": {
<span id="__span-19-46"> "thread_id": uuid.uuid4(),
<span id="__span-19-47"> }
<span id="__span-19-48">}
<span id="__span-19-49">
<span id="__span-19-50">for chunk in graph.stream({"age": None, "name": None}, config):
<span id="__span-19-51"> print(chunk)
<span id="__span-19-52">
<span id="__span-19-53">for chunk in graph.stream(Command(resume="John", update={"name": "foo"}), config):
<span id="__span-19-54"> print(chunk)
<code tabindex="0"><span id="__span-20-1">{'__interrupt__': (Interrupt(value='what is your name?', resumable=True, ns=['human_node:3a007ef9-c30d-c357-1ec1-86a1a70d8fba'], when='during'),)}
<span id="__span-20-2">Name: N/A. Age: John
<span id="__span-20-3">{'human_node': {'age': 'John', 'name': 'N/A'}}
- 原文作者:知识铺
- 原文链接:https://index.zshipu.com/ai002/post/20251125/%E6%B7%BB%E5%8A%A0%E4%BA%BA%E5%B7%A5%E5%B9%B2%E9%A2%84-LangChain-%E6%A1%86%E6%9E%B6/
- 版权声明:本作品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行许可,非商业转载请注明出处(作者,原文链接),商业转载请联系作者获得授权。
- 免责声明:本页面内容均来源于站内编辑发布,部分信息来源互联网,并不意味着本站赞同其观点或者证实其内容的真实性,如涉及版权等问题,请立即联系客服进行更改或删除,保证您的合法权益。转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。也可以邮件至 sblig@126.com