添加人工干预

interrupt

LangGraph 中的 [interrupt 函数](https://langgraph.com.cn/reference/types/index.html#langgraph.types.interrupt interrupt”) 通过在特定节点暂停图表,向人类呈现信息,并根据其输入恢复图表,从而实现人工干预工作流。这对于审批、编辑或收集额外上下文等任务非常有用。

图表使用提供人类响应的 [Command](https://langgraph.com.cn/reference/types/index.html#langgraph.types.Command Command dataclass ”) 对象恢复。

API 参考: interrupt | Command

<span id="__span-0-1">from langgraph.types import interrupt, Command
<span id="__span-0-2">
<span id="__span-0-3">def human_node(state: State):
<span id="__span-0-4">    value = interrupt( 
<span id="__span-0-5">        {
<span id="__span-0-6">            "text_to_revise": state["some_text"] 
<span id="__span-0-7">        }
<span id="__span-0-8">    )
<span id="__span-0-9">    return {
<span id="__span-0-10">        "some_text": value 
<span id="__span-0-11">    }
<span id="__span-0-12">
<span id="__span-0-13">
<span id="__span-0-14">graph = graph_builder.compile(checkpointer=checkpointer) 
<span id="__span-0-15">
<span id="__span-0-16"># Run the graph until the interrupt is hit.
<span id="__span-0-17">config = {"configurable": {"thread_id": "some_id"}}
<span id="__span-0-18">result = graph.invoke({"some_text": "original text"}, config=config) 
<span id="__span-0-19">print(result['__interrupt__']) 
<span id="__span-0-20"># &gt; [
<span id="__span-0-21"># &gt;    Interrupt(
<span id="__span-0-22"># &gt;       value={'text_to_revise': 'original text'}, 
<span id="__span-0-23"># &gt;       resumable=True,
<span id="__span-0-24"># &gt;       ns=['human_node:6ce9e64f-edef-fe5d-f7dc-511fa9526960']
<span id="__span-0-25"># &gt;    )
<span id="__span-0-26"># &gt; ] 
<span id="__span-0-27">
<span id="__span-0-28">print(graph.invoke(Command(resume="Edited text"), config=config)) 
<span id="__span-0-29"># &gt; {'some_text': 'Edited text'}

扩展示例:使用 interrupt

<span id="__span-1-1">from typing import TypedDict
<span id="__span-1-2">import uuid
<span id="__span-1-3">
<span id="__span-1-4">from langgraph.checkpoint.memory import InMemorySaver
<span id="__span-1-5">from langgraph.constants import START
<span id="__span-1-6">from langgraph.graph import StateGraph
<span id="__span-1-7">from langgraph.types import interrupt, Command
<span id="__span-1-8">
<span id="__span-1-9">class State(TypedDict):
<span id="__span-1-10">    some_text: str
<span id="__span-1-11">
<span id="__span-1-12">def human_node(state: State):
<span id="__span-1-13">    value = interrupt( 
<span id="__span-1-14">        {
<span id="__span-1-15">            "text_to_revise": state["some_text"] 
<span id="__span-1-16">        }
<span id="__span-1-17">    )
<span id="__span-1-18">    return {
<span id="__span-1-19">        "some_text": value 
<span id="__span-1-20">    }
<span id="__span-1-21">
<span id="__span-1-22">
<span id="__span-1-23"># Build the graph
<span id="__span-1-24">graph_builder = StateGraph(State)
<span id="__span-1-25">graph_builder.add_node("human_node", human_node)
<span id="__span-1-26">graph_builder.add_edge(START, "human_node")
<span id="__span-1-27">
<span id="__span-1-28">checkpointer = InMemorySaver() 
<span id="__span-1-29">
<span id="__span-1-30">graph = graph_builder.compile(checkpointer=checkpointer)
<span id="__span-1-31">
<span id="__span-1-32"># Pass a thread ID to the graph to run it.
<span id="__span-1-33">config = {"configurable": {"thread_id": uuid.uuid4()}}
<span id="__span-1-34">
<span id="__span-1-35"># Run the graph until the interrupt is hit.
<span id="__span-1-36">result = graph.invoke({"some_text": "original text"}, config=config) 
<span id="__span-1-37">
<span id="__span-1-38">print(result['__interrupt__']) 
<span id="__span-1-39"># &gt; [
<span id="__span-1-40"># &gt;    Interrupt(
<span id="__span-1-41"># &gt;       value={'text_to_revise': 'original text'}, 
<span id="__span-1-42"># &gt;       resumable=True,
<span id="__span-1-43"># &gt;       ns=['human_node:6ce9e64f-edef-fe5d-f7dc-511fa9526960']
<span id="__span-1-44"># &gt;    )
<span id="__span-1-45"># &gt; ] 
<span id="__span-1-46">
<span id="__span-1-47">print(graph.invoke(Command(resume="Edited text"), config=config)) 
<span id="__span-1-48"># &gt; {'some_text': 'Edited text'}

0.4.0 版本新增

__interrupt__ 是一个特殊键,当图表被中断时,在运行图表时将返回此键。在 0.4.0 版本中,invokeainvoke 中已添加了对 __interrupt__ 的支持。如果您使用的是旧版本,则只有在使用 streamastream 时才能在结果中看到 __interrupt__。您还可以使用 graph.get_state(thread_id) 获取中断值。

警告

中断功能强大且符合人体工程学。然而,尽管它们在开发人员体验方面可能与 Python 的 input() 函数类似,但需要注意的是,它们不会自动从中断点恢复执行。相反,它们会重新运行使用中断的整个节点。因此,中断通常最好放置在节点的开头或专用节点中。请阅读从中断中恢复部分以获取更多详细信息。

要求

要在图表中使用 interrupt,您需要

  1. 指定一个检查点 以在每一步之后保存图表状态。
  2. 在适当的位置调用 interrupt()。请参阅设计模式部分以获取示例。
  3. 使用 线程 ID 运行图表 直到命中 interrupt
  4. 使用 invoke/ainvoke/stream/astream 恢复执行(请参阅Command 原语)。

设计模式

人工干预工作流通常有三种不同的操作

  1. 批准或拒绝:在关键步骤(例如 API 调用)之前暂停图表,以审查和批准操作。如果操作被拒绝,您可以阻止图表执行该步骤,并可能采取替代操作。此模式通常涉及根据人类的输入对图表进行路由
  2. 编辑图表状态:暂停图表以审查和编辑图表状态。这对于纠正错误或使用附加信息更新状态非常有用。此模式通常涉及使用人类的输入更新状态。
  3. 获取输入:在图表中的特定步骤显式请求人工输入。这对于收集附加信息或上下文以指导代理的决策过程非常有用。

下面我们展示了可以使用这些操作实现的不同设计模式。

批准或拒绝

image

根据人类的批准或拒绝,图表可以继续执行操作或采取替代路径。

在关键步骤(例如 API 调用)之前暂停图表,以审查和批准操作。如果操作被拒绝,您可以阻止图表执行该步骤,并可能采取替代操作。

API 参考: interrupt | Command

<code tabindex="0"><span id="__span-2-1">from typing import Literal
<span id="__span-2-2">from langgraph.types import interrupt, Command
<span id="__span-2-3">
<span id="__span-2-4">def human_approval(state: State) -&gt; Command[Literal["some_node", "another_node"]]:
<span id="__span-2-5">    is_approved = interrupt(
<span id="__span-2-6">        {
<span id="__span-2-7">            "question": "Is this correct?",
<span id="__span-2-8">            # Surface the output that should be
<span id="__span-2-9">            # reviewed and approved by the human.
<span id="__span-2-10">            "llm_output": state["llm_output"]
<span id="__span-2-11">        }
<span id="__span-2-12">    )
<span id="__span-2-13">
<span id="__span-2-14">    if is_approved:
<span id="__span-2-15">        return Command(goto="some_node")
<span id="__span-2-16">    else:
<span id="__span-2-17">        return Command(goto="another_node")
<span id="__span-2-18">
<span id="__span-2-19"># Add the node to the graph in an appropriate location
<span id="__span-2-20"># and connect it to the relevant nodes.
<span id="__span-2-21">graph_builder.add_node("human_approval", human_approval)
<span id="__span-2-22">graph = graph_builder.compile(checkpointer=checkpointer)
<span id="__span-2-23">
<span id="__span-2-24"># After running the graph and hitting the interrupt, the graph will pause.
<span id="__span-2-25"># Resume it with either an approval or rejection.
<span id="__span-2-26">thread_config = {"configurable": {"thread_id": "some_id"}}
<span id="__span-2-27">graph.invoke(Command(resume=True), config=thread_config)

扩展示例:使用中断批准或拒绝

<code tabindex="0"><span id="__span-3-1">from typing import Literal, TypedDict
<span id="__span-3-2">import uuid
<span id="__span-3-3">
<span id="__span-3-4">from langgraph.constants import START, END
<span id="__span-3-5">from langgraph.graph import StateGraph
<span id="__span-3-6">from langgraph.types import interrupt, Command
<span id="__span-3-7">from langgraph.checkpoint.memory import MemorySaver
<span id="__span-3-8">
<span id="__span-3-9"># Define the shared graph state
<span id="__span-3-10">class State(TypedDict):
<span id="__span-3-11">    llm_output: str
<span id="__span-3-12">    decision: str
<span id="__span-3-13">
<span id="__span-3-14"># Simulate an LLM output node
<span id="__span-3-15">def generate_llm_output(state: State) -&gt; State:
<span id="__span-3-16">    return {"llm_output": "This is the generated output."}
<span id="__span-3-17">
<span id="__span-3-18"># Human approval node
<span id="__span-3-19">def human_approval(state: State) -&gt; Command[Literal["approved_path", "rejected_path"]]:
<span id="__span-3-20">    decision = interrupt({
<span id="__span-3-21">        "question": "Do you approve the following output?",
<span id="__span-3-22">        "llm_output": state["llm_output"]
<span id="__span-3-23">    })
<span id="__span-3-24">
<span id="__span-3-25">    if decision == "approve":
<span id="__span-3-26">        return Command(goto="approved_path", update={"decision": "approved"})
<span id="__span-3-27">    else:
<span id="__span-3-28">        return Command(goto="rejected_path", update={"decision": "rejected"})
<span id="__span-3-29">
<span id="__span-3-30"># Next steps after approval
<span id="__span-3-31">def approved_node(state: State) -&gt; State:
<span id="__span-3-32">    print("✅ Approved path taken.")
<span id="__span-3-33">    return state
<span id="__span-3-34">
<span id="__span-3-35"># Alternative path after rejection
<span id="__span-3-36">def rejected_node(state: State) -&gt; State:
<span id="__span-3-37">    print("❌ Rejected path taken.")
<span id="__span-3-38">    return state
<span id="__span-3-39">
<span id="__span-3-40"># Build the graph
<span id="__span-3-41">builder = StateGraph(State)
<span id="__span-3-42">builder.add_node("generate_llm_output", generate_llm_output)
<span id="__span-3-43">builder.add_node("human_approval", human_approval)
<span id="__span-3-44">builder.add_node("approved_path", approved_node)
<span id="__span-3-45">builder.add_node("rejected_path", rejected_node)
<span id="__span-3-46">
<span id="__span-3-47">builder.set_entry_point("generate_llm_output")
<span id="__span-3-48">builder.add_edge("generate_llm_output", "human_approval")
<span id="__span-3-49">builder.add_edge("approved_path", END)
<span id="__span-3-50">builder.add_edge("rejected_path", END)
<span id="__span-3-51">
<span id="__span-3-52">checkpointer = MemorySaver()
<span id="__span-3-53">graph = builder.compile(checkpointer=checkpointer)
<span id="__span-3-54">
<span id="__span-3-55"># Run until interrupt
<span id="__span-3-56">config = {"configurable": {"thread_id": uuid.uuid4()}}
<span id="__span-3-57">result = graph.invoke({}, config=config)
<span id="__span-3-58">print(result["__interrupt__"])
<span id="__span-3-59"># Output:
<span id="__span-3-60"># Interrupt(value={'question': 'Do you approve the following output?', 'llm_output': 'This is the generated output.'}, ...)
<span id="__span-3-61">
<span id="__span-3-62"># Simulate resuming with human input
<span id="__span-3-63"># To test rejection, replace resume="approve" with resume="reject"
<span id="__span-3-64">final_result = graph.invoke(Command(resume="approve"), config=config)
<span id="__span-3-65">print(final_result)

有关更详细的示例,请参见如何审查工具调用

审查和编辑状态

image

人类可以审查和编辑图表的状态。这对于纠正错误或使用附加信息更新状态非常有用。

API 参考: interrupt

<code tabindex="0"><span id="__span-4-1">from langgraph.types import interrupt
<span id="__span-4-2">
<span id="__span-4-3">def human_editing(state: State):
<span id="__span-4-4">    ...
<span id="__span-4-5">    result = interrupt(
<span id="__span-4-6">        # Interrupt information to surface to the client.
<span id="__span-4-7">        # Can be any JSON serializable value.
<span id="__span-4-8">        {
<span id="__span-4-9">            "task": "Review the output from the LLM and make any necessary edits.",
<span id="__span-4-10">            "llm_generated_summary": state["llm_generated_summary"]
<span id="__span-4-11">        }
<span id="__span-4-12">    )
<span id="__span-4-13">
<span id="__span-4-14">    # Update the state with the edited text
<span id="__span-4-15">    return {
<span id="__span-4-16">        "llm_generated_summary": result["edited_text"] 
<span id="__span-4-17">    }
<span id="__span-4-18">
<span id="__span-4-19"># Add the node to the graph in an appropriate location
<span id="__span-4-20"># and connect it to the relevant nodes.
<span id="__span-4-21">graph_builder.add_node("human_editing", human_editing)
<span id="__span-4-22">graph = graph_builder.compile(checkpointer=checkpointer)
<span id="__span-4-23">
<span id="__span-4-24">...
<span id="__span-4-25">
<span id="__span-4-26"># After running the graph and hitting the interrupt, the graph will pause.
<span id="__span-4-27"># Resume it with the edited text.
<span id="__span-4-28">thread_config = {"configurable": {"thread_id": "some_id"}}
<span id="__span-4-29">graph.invoke(
<span id="__span-4-30">    Command(resume={"edited_text": "The edited text"}), 
<span id="__span-4-31">    config=thread_config
<span id="__span-4-32">)

扩展示例:使用中断编辑状态

<span id="__span-5-1">from typing import TypedDict
<span id="__span-5-2">import uuid
<span id="__span-5-3">
<span id="__span-5-4">from langgraph.constants import START, END
<span id="__span-5-5">from langgraph.graph import StateGraph
<span id="__span-5-6">from langgraph.types import interrupt, Command
<span id="__span-5-7">from langgraph.checkpoint.memory import MemorySaver
<span id="__span-5-8">
<span id="__span-5-9"># Define the graph state
<span id="__span-5-10">class State(TypedDict):
<span id="__span-5-11">    summary: str
<span id="__span-5-12">
<span id="__span-5-13"># Simulate an LLM summary generation
<span id="__span-5-14">def generate_summary(state: State) -&gt; State:
<span id="__span-5-15">    return {
<span id="__span-5-16">        "summary": "The cat sat on the mat and looked at the stars."
<span id="__span-5-17">    }
<span id="__span-5-18">
<span id="__span-5-19"># Human editing node
<span id="__span-5-20">def human_review_edit(state: State) -&gt; State:
<span id="__span-5-21">    result = interrupt({
<span id="__span-5-22">        "task": "Please review and edit the generated summary if necessary.",
<span id="__span-5-23">        "generated_summary": state["summary"]
<span id="__span-5-24">    })
<span id="__span-5-25">    return {
<span id="__span-5-26">        "summary": result["edited_summary"]
<span id="__span-5-27">    }
<span id="__span-5-28">
<span id="__span-5-29"># Simulate downstream use of the edited summary
<span id="__span-5-30">def downstream_use(state: State) -&gt; State:
<span id="__span-5-31">    print(f"✅ Using edited summary: {state['summary']}")
<span id="__span-5-32">    return state
<span id="__span-5-33">
<span id="__span-5-34"># Build the graph
<span id="__span-5-35">builder = StateGraph(State)
<span id="__span-5-36">builder.add_node("generate_summary", generate_summary)
<span id="__span-5-37">builder.add_node("human_review_edit", human_review_edit)
<span id="__span-5-38">builder.add_node("downstream_use", downstream_use)
<span id="__span-5-39">
<span id="__span-5-40">builder.set_entry_point("generate_summary")
<span id="__span-5-41">builder.add_edge("generate_summary", "human_review_edit")
<span id="__span-5-42">builder.add_edge("human_review_edit", "downstream_use")
<span id="__span-5-43">builder.add_edge("downstream_use", END)
<span id="__span-5-44">
<span id="__span-5-45"># Set up in-memory checkpointing for interrupt support
<span id="__span-5-46">checkpointer = MemorySaver()
<span id="__span-5-47">graph = builder.compile(checkpointer=checkpointer)
<span id="__span-5-48">
<span id="__span-5-49"># Invoke the graph until it hits the interrupt
<span id="__span-5-50">config = {"configurable": {"thread_id": uuid.uuid4()}}
<span id="__span-5-51">result = graph.invoke({}, config=config)
<span id="__span-5-52">
<span id="__span-5-53"># Output interrupt payload
<span id="__span-5-54">print(result["__interrupt__"])
<span id="__span-5-55"># Example output:
<span id="__span-5-56"># Interrupt(
<span id="__span-5-57">#   value={
<span id="__span-5-58">#     'task': 'Please review and edit the generated summary if necessary.',
<span id="__span-5-59">#     'generated_summary': 'The cat sat on the mat and looked at the stars.'
<span id="__span-5-60">#   },
<span id="__span-5-61">#   resumable=True,
<span id="__span-5-62">#   ...
<span id="__span-5-63"># )
<span id="__span-5-64">
<span id="__span-5-65"># Resume the graph with human-edited input
<span id="__span-5-66">edited_summary = "The cat lay on the rug, gazing peacefully at the night sky."
<span id="__span-5-67">resumed_result = graph.invoke(
<span id="__span-5-68">    Command(resume={"edited_summary": edited_summary}),
<span id="__span-5-69">    config=config
<span id="__span-5-70">)
<span id="__span-5-71">print(resumed_result)

审查工具调用

image

人类可以在继续之前审查和编辑 LLM 的输出。这在 LLM 请求的工具调用可能敏感或需要人工监督的应用程序中尤为关键。

<span id="__span-6-1">def human_review_node(state) -&gt; Command[Literal["call_llm", "run_tool"]]:
<span id="__span-6-2">    # This is the value we'll be providing via Command(resume=&lt;human_review&gt;)
<span id="__span-6-3">    human_review = interrupt(
<span id="__span-6-4">        {
<span id="__span-6-5">            "question": "Is this correct?",
<span id="__span-6-6">            # Surface tool calls for review
<span id="__span-6-7">            "tool_call": tool_call
<span id="__span-6-8">        }
<span id="__span-6-9">    )
<span id="__span-6-10">
<span id="__span-6-11">    review_action, review_data = human_review
<span id="__span-6-12">
<span id="__span-6-13">    # Approve the tool call and continue
<span id="__span-6-14">    if review_action == "continue":
<span id="__span-6-15">        return Command(goto="run_tool")
<span id="__span-6-16">
<span id="__span-6-17">    # Modify the tool call manually and then continue
<span id="__span-6-18">    elif review_action == "update":
<span id="__span-6-19">        ...
<span id="__span-6-20">        updated_msg = get_updated_msg(review_data)
<span id="__span-6-21">        # Remember that to modify an existing message you will need
<span id="__span-6-22">        # to pass the message with a matching ID.
<span id="__span-6-23">        return Command(goto="run_tool", update={"messages": [updated_message]})
<span id="__span-6-24">
<span id="__span-6-25">    # Give natural language feedback, and then pass that back to the agent
<span id="__span-6-26">    elif review_action == "feedback":
<span id="__span-6-27">        ...
<span id="__span-6-28">        feedback_msg = get_feedback_msg(review_data)
<span id="__span-6-29">        return Command(goto="call_llm", update={"messages": [feedback_msg]})

有关更详细的示例,请参见如何审查工具调用

验证人工输入

如果您需要在图表本身(而不是在客户端)验证人类提供的输入,可以通过在单个节点内使用多个中断调用来实现。

API 参考: interrupt

<span id="__span-7-1">from langgraph.types import interrupt
<span id="__span-7-2">
<span id="__span-7-3">def human_node(state: State):
<span id="__span-7-4">    """Human node with validation."""
<span id="__span-7-5">    question = "What is your age?"
<span id="__span-7-6">
<span id="__span-7-7">    while True:
<span id="__span-7-8">        answer = interrupt(question)
<span id="__span-7-9">
<span id="__span-7-10">        # Validate answer, if the answer isn't valid ask for input again.
<span id="__span-7-11">        if not isinstance(answer, int) or answer &lt; 0:
<span id="__span-7-12">            question = f"'{answer} is not a valid age. What is your age?"
<span id="__span-7-13">            answer = None
<span id="__span-7-14">            continue
<span id="__span-7-15">        else:
<span id="__span-7-16">            # If the answer is valid, we can proceed.
<span id="__span-7-17">            break
<span id="__span-7-18">
<span id="__span-7-19">    print(f"The human in the loop is {answer} years old.")
<span id="__span-7-20">    return {
<span id="__span-7-21">        "age": answer
<span id="__span-7-22">    }

扩展示例:验证用户输入

<span id="__span-8-1">from typing import TypedDict
<span id="__span-8-2">import uuid
<span id="__span-8-3">
<span id="__span-8-4">from langgraph.constants import START, END
<span id="__span-8-5">from langgraph.graph import StateGraph
<span id="__span-8-6">from langgraph.types import interrupt, Command
<span id="__span-8-7">from langgraph.checkpoint.memory import MemorySaver
<span id="__span-8-8">
<span id="__span-8-9"># Define graph state
<span id="__span-8-10">class State(TypedDict):
<span id="__span-8-11">    age: int
<span id="__span-8-12">
<span id="__span-8-13"># Node that asks for human input and validates it
<span id="__span-8-14">def get_valid_age(state: State) -&gt; State:
<span id="__span-8-15">    prompt = "Please enter your age (must be a non-negative integer)."
<span id="__span-8-16">
<span id="__span-8-17">    while True:
<span id="__span-8-18">        user_input = interrupt(prompt)
<span id="__span-8-19">
<span id="__span-8-20">        # Validate the input
<span id="__span-8-21">        try:
<span id="__span-8-22">            age = int(user_input)
<span id="__span-8-23">            if age &lt; 0:
<span id="__span-8-24">                raise ValueError("Age must be non-negative.")
<span id="__span-8-25">            break  # Valid input received
<span id="__span-8-26">        except (ValueError, TypeError):
<span id="__span-8-27">            prompt = f"'{user_input}' is not valid. Please enter a non-negative integer for age."
<span id="__span-8-28">
<span id="__span-8-29">    return {"age": age}
<span id="__span-8-30">
<span id="__span-8-31"># Node that uses the valid input
<span id="__span-8-32">def report_age(state: State) -&gt; State:
<span id="__span-8-33">    print(f"✅ Human is {state['age']} years old.")
<span id="__span-8-34">    return state
<span id="__span-8-35">
<span id="__span-8-36"># Build the graph
<span id="__span-8-37">builder = StateGraph(State)
<span id="__span-8-38">builder.add_node("get_valid_age", get_valid_age)
<span id="__span-8-39">builder.add_node("report_age", report_age)
<span id="__span-8-40">
<span id="__span-8-41">builder.set_entry_point("get_valid_age")
<span id="__span-8-42">builder.add_edge("get_valid_age", "report_age")
<span id="__span-8-43">builder.add_edge("report_age", END)
<span id="__span-8-44">
<span id="__span-8-45"># Create the graph with a memory checkpointer
<span id="__span-8-46">checkpointer = MemorySaver()
<span id="__span-8-47">graph = builder.compile(checkpointer=checkpointer)
<span id="__span-8-48">
<span id="__span-8-49"># Run the graph until the first interrupt
<span id="__span-8-50">config = {"configurable": {"thread_id": uuid.uuid4()}}
<span id="__span-8-51">result = graph.invoke({}, config=config)
<span id="__span-8-52">print(result["__interrupt__"])  # First prompt: "Please enter your age..."
<span id="__span-8-53">
<span id="__span-8-54"># Simulate an invalid input (e.g., string instead of integer)
<span id="__span-8-55">result = graph.invoke(Command(resume="not a number"), config=config)
<span id="__span-8-56">print(result["__interrupt__"])  # Follow-up prompt with validation message
<span id="__span-8-57">
<span id="__span-8-58"># Simulate a second invalid input (e.g., negative number)
<span id="__span-8-59">result = graph.invoke(Command(resume="-10"), config=config)
<span id="__span-8-60">print(result["__interrupt__"])  # Another retry
<span id="__span-8-61">
<span id="__span-8-62"># Provide valid input
<span id="__span-8-63">final_result = graph.invoke(Command(resume="25"), config=config)
<span id="__span-8-64">print(final_result)  # Should include the valid age

使用 Command 原语恢复

当在图表中使用 interrupt 函数时,执行会在该点暂停并等待用户输入。

要恢复执行,请使用 [Command](https://langgraph.com.cn/reference/types/index.html#langgraph.types.Command Command dataclass ”) 原语,可以通过 invokeainvokestreamastream 方法提供。

interrupt 提供响应: 要继续执行,请使用 Command(resume=value) 传递用户输入。图表从最初调用 interrupt(...) 的节点开头恢复执行。此时,interrupt 函数将返回 Command(resume=value) 中提供的值,而不会再次暂停。

<span id="__span-9-1"># Resume graph execution by providing the user's input.
<span id="__span-9-2">graph.invoke(Command(resume={"age": "25"}), thread_config)

中断后如何恢复?

警告

interrupt 恢复与 Python 的 input() 函数不同,后者从 input() 函数被调用的确切点恢复执行。

使用 interrupt 的一个关键方面是理解恢复是如何工作的。当您在 interrupt 之后恢复执行时,图表执行将从触发最后一个 interrupt图表节点开头开始。

从节点开头到 interrupt所有代码都将重新执行。

<code tabindex="0"><span id="__span-10-1">counter = 0
<span id="__span-10-2">def node(state: State):
<span id="__span-10-3">    # All the code from the beginning of the node to the interrupt will be re-executed
<span id="__span-10-4">    # when the graph resumes.
<span id="__span-10-5">    global counter
<span id="__span-10-6">    counter += 1
<span id="__span-10-7">    print(f"&gt; Entered the node: {counter} # of times")
<span id="__span-10-8">    # Pause the graph and wait for user input.
<span id="__span-10-9">    answer = interrupt()
<span id="__span-10-10">    print("The value of counter is:", counter)
<span id="__span-10-11">    ...

恢复图表后,计数器将第二次递增,产生以下输出

<span id="__span-11-1">&gt; Entered the node: 2 # of times
<span id="__span-11-2">The value of counter is: 2

一次调用恢复多个中断

如果任务队列中有多个中断,您可以使用带有中断 ID 到恢复值映射字典的 Command.resume,通过一次 invoke / stream 调用恢复多个中断。

例如,一旦您的图表被中断(理论上多次)并停滞

<span id="__span-12-1">resume_map = {
<span id="__span-12-2">    i.interrupt_id: f"human input for prompt {i.value}"
<span id="__span-12-3">    for i in parent.get_state(thread_config).interrupts
<span id="__span-12-4">}
<span id="__span-12-5">
<span id="__span-12-6">parent_graph.invoke(Command(resume=resume_map), config=thread_config)

常见陷阱

副作用

将具有副作用的代码(例如 API 调用)放置在 interrupt 之后,以避免重复,因为每次恢复节点时都会重新触发这些副作用。

当节点从 interrupt 恢复时,此代码将再次重新执行 API 调用。

如果 API 调用不是幂等的或只是昂贵的,这可能会导致问题。

<span id="__span-13-1">from langgraph.types import interrupt
<span id="__span-13-2">
<span id="__span-13-3">def human_node(state: State):
<span id="__span-13-4">    """Human node with validation."""
<span id="__span-13-5">    api_call(...) # This code will be re-executed when the node is resumed.
<span id="__span-13-6">    answer = interrupt(question)

作为函数调用的子图

作为函数调用子图时,父图将从调用子图的节点开头(以及触发 interrupt 的位置)恢复执行。类似地,子图将从调用 interrupt() 函数的节点开头恢复。

例如,

<span id="__span-16-1">def node_in_parent_graph(state: State):
<span id="__span-16-2">    some_code()  # &lt;-- This will re-execute when the subgraph is resumed.
<span id="__span-16-3">    # Invoke a subgraph as a function.
<span id="__span-16-4">    # The subgraph contains an `interrupt` call.
<span id="__span-16-5">    subgraph_result = subgraph.invoke(some_input)
<span id="__span-16-6">    ...

扩展示例:父图和子图执行流程

假设我们有一个包含 3 个节点的父图

父图node_1node_2(子图调用) → node_3

子图包含 3 个节点,其中第二个节点包含一个 interrupt

子图sub_node_1sub_node_2interrupt) → sub_node_3

恢复图表时,执行将按以下方式进行

  1. 跳过父图中的 node_1(已执行,图表状态已保存到快照中)。
  2. 从头开始重新执行父图中的 node_2
  3. 跳过子图中的 sub_node_1(已执行,图表状态已保存到快照中)。
  4. 从头开始重新执行子图中的 sub_node_2
  5. 继续执行 sub_node_3 和后续节点。

这是一个缩写示例代码,您可以使用它来了解子图如何与中断一起工作。它计算每个节点被进入的次数并打印计数。

<span id="__span-17-1">import uuid
<span id="__span-17-2">from typing import TypedDict
<span id="__span-17-3">
<span id="__span-17-4">from langgraph.graph import StateGraph
<span id="__span-17-5">from langgraph.constants import START
<span id="__span-17-6">from langgraph.types import interrupt, Command
<span id="__span-17-7">from langgraph.checkpoint.memory import MemorySaver
<span id="__span-17-8">
<span id="__span-17-9">
<span id="__span-17-10">class State(TypedDict):
<span id="__span-17-11">    """The graph state."""
<span id="__span-17-12">    state_counter: int
<span id="__span-17-13">
<span id="__span-17-14">
<span id="__span-17-15">counter_node_in_subgraph = 0
<span id="__span-17-16">
<span id="__span-17-17">def node_in_subgraph(state: State):
<span id="__span-17-18">    """A node in the sub-graph."""
<span id="__span-17-19">    global counter_node_in_subgraph
<span id="__span-17-20">    counter_node_in_subgraph += 1  # This code will **NOT** run again!
<span id="__span-17-21">    print(f"Entered `node_in_subgraph` a total of {counter_node_in_subgraph} times")
<span id="__span-17-22">
<span id="__span-17-23">counter_human_node = 0
<span id="__span-17-24">
<span id="__span-17-25">def human_node(state: State):
<span id="__span-17-26">    global counter_human_node
<span id="__span-17-27">    counter_human_node += 1 # This code will run again!
<span id="__span-17-28">    print(f"Entered human_node in sub-graph a total of {counter_human_node} times")
<span id="__span-17-29">    answer = interrupt("what is your name?")
<span id="__span-17-30">    print(f"Got an answer of {answer}")
<span id="__span-17-31">
<span id="__span-17-32">
<span id="__span-17-33">checkpointer = MemorySaver()
<span id="__span-17-34">
<span id="__span-17-35">subgraph_builder = StateGraph(State)
<span id="__span-17-36">subgraph_builder.add_node("some_node", node_in_subgraph)
<span id="__span-17-37">subgraph_builder.add_node("human_node", human_node)
<span id="__span-17-38">subgraph_builder.add_edge(START, "some_node")
<span id="__span-17-39">subgraph_builder.add_edge("some_node", "human_node")
<span id="__span-17-40">subgraph = subgraph_builder.compile(checkpointer=checkpointer)
<span id="__span-17-41">
<span id="__span-17-42">
<span id="__span-17-43">counter_parent_node = 0
<span id="__span-17-44">
<span id="__span-17-45">def parent_node(state: State):
<span id="__span-17-46">    """This parent node will invoke the subgraph."""
<span id="__span-17-47">    global counter_parent_node
<span id="__span-17-48">
<span id="__span-17-49">    counter_parent_node += 1 # This code will run again on resuming!
<span id="__span-17-50">    print(f"Entered `parent_node` a total of {counter_parent_node} times")
<span id="__span-17-51">
<span id="__span-17-52">    # Please note that we're intentionally incrementing the state counter
<span id="__span-17-53">    # in the graph state as well to demonstrate that the subgraph update
<span id="__span-17-54">    # of the same key will not conflict with the parent graph (until
<span id="__span-17-55">    subgraph_state = subgraph.invoke(state)
<span id="__span-17-56">    return subgraph_state
<span id="__span-17-57">
<span id="__span-17-58">
<span id="__span-17-59">builder = StateGraph(State)
<span id="__span-17-60">builder.add_node("parent_node", parent_node)
<span id="__span-17-61">builder.add_edge(START, "parent_node")
<span id="__span-17-62">
<span id="__span-17-63"># A checkpointer must be enabled for interrupts to work!
<span id="__span-17-64">checkpointer = MemorySaver()
<span id="__span-17-65">graph = builder.compile(checkpointer=checkpointer)
<span id="__span-17-66">
<span id="__span-17-67">config = {
<span id="__span-17-68">    "configurable": {
<span id="__span-17-69">      "thread_id": uuid.uuid4(),
<span id="__span-17-70">    }
<span id="__span-17-71">}
<span id="__span-17-72">
<span id="__span-17-73">for chunk in graph.stream({"state_counter": 1}, config):
<span id="__span-17-74">    print(chunk)
<span id="__span-17-75">
<span id="__span-17-76">print('--- Resuming ---')
<span id="__span-17-77">
<span id="__span-17-78">for chunk in graph.stream(Command(resume="35"), config):
<span id="__span-17-79">    print(chunk)

这将打印出

<code tabindex="0"><span id="__span-18-1">Entered `parent_node` a total of 1 times
<span id="__span-18-2">Entered `node_in_subgraph` a total of 1 times
<span id="__span-18-3">Entered human_node in sub-graph a total of 1 times
<span id="__span-18-4">{'__interrupt__': (Interrupt(value='what is your name?', resumable=True, ns=['parent_node:4c3a0248-21f0-1287-eacf-3002bc304db4', 'human_node:2fe86d52-6f70-2a3f-6b2f-b1eededd6348'], when='during'),)}
<span id="__span-18-5">--- Resuming ---
<span id="__span-18-6">Entered `parent_node` a total of 2 times
<span id="__span-18-7">Entered human_node in sub-graph a total of 2 times
<span id="__span-18-8">Got an answer of 35
<span id="__span-18-9">{'parent_node': {'state_counter': 1}}

使用多个中断

单个节点中使用多个中断对于验证人工输入等模式可能很有帮助。但是,如果在同一节点中使用多个中断,如果不仔细处理,可能会导致意外行为。

当一个节点包含多个中断调用时,LangGraph 会维护一个与执行该节点的任务相关的恢复值列表。每当执行恢复时,它会从节点开头开始。对于遇到的每个中断,LangGraph 会检查任务的恢复列表中是否存在匹配的值。匹配是严格基于索引的,因此节点内中断调用的顺序至关重要。

为避免问题,请避免在执行之间动态更改节点结构。这包括添加、删除或重新排序中断调用,因为此类更改可能导致索引不匹配。这些问题通常源于非常规模式,例如通过 Command(resume=..., update=SOME_STATE_MUTATION) 修改状态或依赖全局变量动态修改节点结构。

扩展示例:引入非确定性的错误代码

<span id="__span-19-1">import uuid
<span id="__span-19-2">from typing import TypedDict, Optional
<span id="__span-19-3">
<span id="__span-19-4">from langgraph.graph import StateGraph
<span id="__span-19-5">from langgraph.constants import START 
<span id="__span-19-6">from langgraph.types import interrupt, Command
<span id="__span-19-7">from langgraph.checkpoint.memory import MemorySaver
<span id="__span-19-8">
<span id="__span-19-9">
<span id="__span-19-10">class State(TypedDict):
<span id="__span-19-11">    """The graph state."""
<span id="__span-19-12">
<span id="__span-19-13">    age: Optional[str]
<span id="__span-19-14">    name: Optional[str]
<span id="__span-19-15">
<span id="__span-19-16">
<span id="__span-19-17">def human_node(state: State):
<span id="__span-19-18">    if not state.get('name'):
<span id="__span-19-19">        name = interrupt("what is your name?")
<span id="__span-19-20">    else:
<span id="__span-19-21">        name = "N/A"
<span id="__span-19-22">
<span id="__span-19-23">    if not state.get('age'):
<span id="__span-19-24">        age = interrupt("what is your age?")
<span id="__span-19-25">    else:
<span id="__span-19-26">        age = "N/A"
<span id="__span-19-27">
<span id="__span-19-28">    print(f"Name: {name}. Age: {age}")
<span id="__span-19-29">
<span id="__span-19-30">    return {
<span id="__span-19-31">        "age": age,
<span id="__span-19-32">        "name": name,
<span id="__span-19-33">    }
<span id="__span-19-34">
<span id="__span-19-35">
<span id="__span-19-36">builder = StateGraph(State)
<span id="__span-19-37">builder.add_node("human_node", human_node)
<span id="__span-19-38">builder.add_edge(START, "human_node")
<span id="__span-19-39">
<span id="__span-19-40"># A checkpointer must be enabled for interrupts to work!
<span id="__span-19-41">checkpointer = MemorySaver()
<span id="__span-19-42">graph = builder.compile(checkpointer=checkpointer)
<span id="__span-19-43">
<span id="__span-19-44">config = {
<span id="__span-19-45">    "configurable": {
<span id="__span-19-46">        "thread_id": uuid.uuid4(),
<span id="__span-19-47">    }
<span id="__span-19-48">}
<span id="__span-19-49">
<span id="__span-19-50">for chunk in graph.stream({"age": None, "name": None}, config):
<span id="__span-19-51">    print(chunk)
<span id="__span-19-52">
<span id="__span-19-53">for chunk in graph.stream(Command(resume="John", update={"name": "foo"}), config):
<span id="__span-19-54">    print(chunk)
<code tabindex="0"><span id="__span-20-1">{'__interrupt__': (Interrupt(value='what is your name?', resumable=True, ns=['human_node:3a007ef9-c30d-c357-1ec1-86a1a70d8fba'], when='during'),)}
<span id="__span-20-2">Name: N/A. Age: John
<span id="__span-20-3">{'human_node': {'age': 'John', 'name': 'N/A'}}