概述 - LangChain 框架 (10) --知识铺
函数式 API 概念¶
概述¶
函数式 API 允许您将 LangGraph 的关键功能——持久化、内存、人在环和流式传输——以最少的改动集成到您现有的代码中。
它旨在将这些功能集成到可能使用标准语言原语(例如 if 语句、for 循环和函数调用)进行分支和控制流的现有代码中。与许多需要将代码重构为显式管道或 DAG 的数据编排框架不同,函数式 API 允许您在不强制执行严格执行模型的情况下整合这些功能。
函数式 API 使用两个关键构建块
@entrypoint– 将函数标记为工作流的起始点,封装逻辑并管理执行流,包括处理长时间运行的任务和中断。@task– 表示一个离散的工作单元,例如 API 调用或数据处理步骤,可以在入口点内异步执行。任务返回一个类似 future 的对象,该对象可以被等待或同步解析。
这为构建具有状态管理和流式传输功能的工作流提供了最小的抽象。
提示
对于喜欢声明式方法的开发人员,LangGraph 的图 API 允许您使用图范式定义工作流。两种 API 共享相同的底层运行时,因此您可以在同一个应用程序中同时使用它们。请参阅函数式 API 与图 API 对比部分,了解这两种范式的比较。
示例¶
下面我们演示一个简单的应用程序,它会撰写一篇论文并中断以请求人工审查。
API 参考:MemorySaver | entrypoint | task | interrupt
<code tabindex="0"><span id="__span-0-1">from langgraph.checkpoint.memory import MemorySaver
<span id="__span-0-2">from langgraph.func import entrypoint, task
<span id="__span-0-3">from langgraph.types import interrupt
<span id="__span-0-4">
<span id="__span-0-5">
<span id="__span-0-6">@task
<span id="__span-0-7">def write_essay(topic: str) -> str:
<span id="__span-0-8"> """Write an essay about the given topic."""
<span id="__span-0-9"> time.sleep(1) # A placeholder for a long-running task.
<span id="__span-0-10"> return f"An essay about topic: {topic}"
<span id="__span-0-11">
<span id="__span-0-12">@entrypoint(checkpointer=MemorySaver())
<span id="__span-0-13">def workflow(topic: str) -> dict:
<span id="__span-0-14"> """A simple workflow that writes an essay and asks for a review."""
<span id="__span-0-15"> essay = write_essay("cat").result()
<span id="__span-0-16"> is_approved = interrupt({
<span id="__span-0-17"> # Any json-serializable payload provided to interrupt as argument.
<span id="__span-0-18"> # It will be surfaced on the client side as an Interrupt when streaming data
<span id="__span-0-19"> # from the workflow.
<span id="__span-0-20"> "essay": essay, # The essay we want reviewed.
<span id="__span-0-21"> # We can add any additional information that we need.
<span id="__span-0-22"> # For example, introduce a key called "action" with some instructions.
<span id="__span-0-23"> "action": "Please approve/reject the essay",
<span id="__span-0-24"> })
<span id="__span-0-25">
<span id="__span-0-26"> return {
<span id="__span-0-27"> "essay": essay, # The essay that was generated
<span id="__span-0-28"> "is_approved": is_approved, # Response from HIL
<span id="__span-0-29"> }
详细解释
此工作流将撰写一篇关于“猫”主题的论文,然后暂停以获取人工审查。工作流可以无限期中断,直到提供审查。
当工作流恢复时,它会从头开始执行,但由于 write_essay 任务的结果已保存,因此任务结果将从检查点加载,而不是重新计算。
<span id="__span-1-1">import time
<span id="__span-1-2">import uuid
<span id="__span-1-3">
<span id="__span-1-4">from langgraph.func import entrypoint, task
<span id="__span-1-5">from langgraph.types import interrupt
<span id="__span-1-6">from langgraph.checkpoint.memory import MemorySaver
<span id="__span-1-7">
<span id="__span-1-8">@task
<span id="__span-1-9">def write_essay(topic: str) -> str:
<span id="__span-1-10"> """Write an essay about the given topic."""
<span id="__span-1-11"> time.sleep(1) # This is a placeholder for a long-running task.
<span id="__span-1-12"> return f"An essay about topic: {topic}"
<span id="__span-1-13">
<span id="__span-1-14">@entrypoint(checkpointer=MemorySaver())
<span id="__span-1-15">def workflow(topic: str) -> dict:
<span id="__span-1-16"> """A simple workflow that writes an essay and asks for a review."""
<span id="__span-1-17"> essay = write_essay("cat").result()
<span id="__span-1-18"> is_approved = interrupt({
<span id="__span-1-19"> # Any json-serializable payload provided to interrupt as argument.
<span id="__span-1-20"> # It will be surfaced on the client side as an Interrupt when streaming data
<span id="__span-1-21"> # from the workflow.
<span id="__span-1-22"> "essay": essay, # The essay we want reviewed.
<span id="__span-1-23"> # We can add any additional information that we need.
<span id="__span-1-24"> # For example, introduce a key called "action" with some instructions.
<span id="__span-1-25"> "action": "Please approve/reject the essay",
<span id="__span-1-26"> })
<span id="__span-1-27">
<span id="__span-1-28"> return {
<span id="__span-1-29"> "essay": essay, # The essay that was generated
<span id="__span-1-30"> "is_approved": is_approved, # Response from HIL
<span id="__span-1-31"> }
<span id="__span-1-32">
<span id="__span-1-33">thread_id = str(uuid.uuid4())
<span id="__span-1-34">
<span id="__span-1-35">config = {
<span id="__span-1-36"> "configurable": {
<span id="__span-1-37"> "thread_id": thread_id
<span id="__span-1-38"> }
<span id="__span-1-39">}
<span id="__span-1-40">
<span id="__span-1-41">for item in workflow.stream("cat", config):
<span id="__span-1-42"> print(item)
<code tabindex="0"><span id="__span-2-1">{'write_essay': 'An essay about topic: cat'}
<span id="__span-2-2">{'__interrupt__': (Interrupt(value={'essay': 'An essay about topic: cat', 'action': 'Please approve/reject the essay'}, resumable=True, ns=['workflow:f7b8508b-21c0-8b4c-5958-4e8de74d2684'], when='during'),)}
一篇论文已撰写完成并可供审查。提供审查后,我们可以恢复工作流
<span id="__span-3-1">from langgraph.types import Command
<span id="__span-3-2">
<span id="__span-3-3"># Get review from a user (e.g., via a UI)
<span id="__span-3-4"># In this case, we're using a bool, but this can be any json-serializable value.
<span id="__span-3-5">human_review = True
<span id="__span-3-6">
<span id="__span-3-7">for item in workflow.stream(Command(resume=human_review), config):
<span id="__span-3-8"> print(item)
<span id="__span-4-1">{'workflow': {'essay': 'An essay about topic: cat', 'is_approved': False}}
工作流已完成,审查已添加到论文中。
入口点¶
[@entrypoint](https://langgraph.com.cn/reference/func/index.html#langgraph.func.entrypoint “ entrypoint”) 装饰器可用于从函数创建工作流。它封装了工作流逻辑并管理执行流,包括处理_长时间运行的任务_和中断。
定义¶
入口点通过使用 @entrypoint 装饰器装饰函数来定义。
该函数必须接受一个位置参数,该参数用作工作流输入。如果您需要传递多个数据片段,请使用字典作为第一个参数的输入类型。
使用 entrypoint 装饰函数会生成一个 [Pregel](https://langgraph.com.cn/reference/pregel/index.html#langgraph.pregel.Pregel.stream “ stream”) 实例,有助于管理工作流的执行(例如,处理流式传输、恢复和检查点)。
通常,您会希望将一个检查点器传递给 @entrypoint 装饰器,以启用持久化并使用人在环等功能。
<span id="__span-5-1">from langgraph.func import entrypoint
<span id="__span-5-2">
<span id="__span-5-3">@entrypoint(checkpointer=checkpointer)
<span id="__span-5-4">def my_workflow(some_input: dict) -> int:
<span id="__span-5-5"> # some logic that may involve long-running tasks like API calls,
<span id="__span-5-6"> # and may be interrupted for human-in-the-loop.
<span id="__span-5-7"> ...
<span id="__span-5-8"> return result
序列化
入口点的输入和输出必须是 JSON 可序列化的,以支持检查点。有关更多详细信息,请参阅序列化部分。
可注入参数¶
声明 entrypoint 时,您可以请求访问将在运行时自动注入的额外参数。这些参数包括
请求可注入参数
<span id="__span-7-1">from langchain_core.runnables import RunnableConfig
<span id="__span-7-2">from langgraph.func import entrypoint
<span id="__span-7-3">from langgraph.store.base import BaseStore
<span id="__span-7-4">from langgraph.store.memory import InMemoryStore
<span id="__span-7-5">
<span id="__span-7-6">in_memory_store = InMemoryStore(...) # An instance of InMemoryStore for long-term memory
<span id="__span-7-7">
<span id="__span-7-8">@entrypoint(
<span id="__span-7-9"> checkpointer=checkpointer, # Specify the checkpointer
<span id="__span-7-10"> store=in_memory_store # Specify the store
<span id="__span-7-11">)
<span id="__span-7-12">def my_workflow(
<span id="__span-7-13"> some_input: dict, # The input (e.g., passed via `invoke`)
<span id="__span-7-14"> *,
<span id="__span-7-15"> previous: Any = None, # For short-term memory
<span id="__span-7-16"> store: BaseStore, # For long-term memory
<span id="__span-7-17"> writer: StreamWriter, # For streaming custom data
<span id="__span-7-18"> config: RunnableConfig # For accessing the configuration passed to the entrypoint
<span id="__span-7-19">) -> ...:
执行¶
使用@entrypoint 会生成一个 [Pregel](https://langgraph.com.cn/reference/pregel/index.html#langgraph.pregel.Pregel.stream “ stream”) 对象,该对象可以使用 invoke、ainvoke、stream 和 astream 方法执行。
<span id="__span-8-1">config = {
<span id="__span-8-2"> "configurable": {
<span id="__span-8-3"> "thread_id": "some_thread_id"
<span id="__span-8-4"> }
<span id="__span-8-5">}
<span id="__span-8-6">my_workflow.invoke(some_input, config) # Wait for the result synchronously
恢复¶
在[中断](https://langgraph.com.cn/reference/types/index.html#langgraph.types.interrupt “ interrupt")后恢复执行可以通过将恢复值传递给[Command](https://langgraph.com.cn/reference/types/index.html#langgraph.types.Command “ Command dataclass ”) 原语来完成。
<span id="__span-12-1">from langgraph.types import Command
<span id="__span-12-2">
<span id="__span-12-3">config = {
<span id="__span-12-4"> "configurable": {
<span id="__span-12-5"> "thread_id": "some_thread_id"
<span id="__span-12-6"> }
<span id="__span-12-7">}
<span id="__span-12-8">
<span id="__span-12-9">my_workflow.invoke(Command(resume=some_resume_value), config)
错误后恢复
要在大纲后恢复,请使用 None 和相同的线程 ID(配置)运行 entrypoint。
这假设底层错误已解决并且执行可以成功进行。
<span id="__span-16-1">config = {
<span id="__span-16-2"> "configurable": {
<span id="__span-16-3"> "thread_id": "some_thread_id"
<span id="__span-16-4"> }
<span id="__span-16-5">}
<span id="__span-16-6">
<span id="__span-16-7">my_workflow.invoke(None, config)
短期记忆¶
当 entrypoint 与 checkpointer 一起定义时,它会在同一线程 ID 的连续调用之间将信息存储在检查点中。
这允许使用 previous 参数访问前一次调用的状态。
默认情况下,previous 参数是前一次调用的返回值。
<span id="__span-20-1">@entrypoint(checkpointer=checkpointer)
<span id="__span-20-2">def my_workflow(number: int, *, previous: Any = None) -> int:
<span id="__span-20-3"> previous = previous or 0
<span id="__span-20-4"> return number + previous
<span id="__span-20-5">
<span id="__span-20-6">config = {
<span id="__span-20-7"> "configurable": {
<span id="__span-20-8"> "thread_id": "some_thread_id"
<span id="__span-20-9"> }
<span id="__span-20-10">}
<span id="__span-20-11">
<span id="__span-20-12">my_workflow.invoke(1, config) # 1 (previous was None)
<span id="__span-20-13">my_workflow.invoke(2, config) # 3 (previous was 1 from the previous invocation)
entrypoint.final¶
[entrypoint.final](https://langgraph.com.cn/reference/func/index.html#langgraph.func.entrypoint.final “ final dataclass ”) 是一个特殊的原语,可以从入口点返回,并允许将保存到检查点中的值与入口点的返回值进行解耦。
第一个值是入口点的返回值,第二个值是保存到检查点中的值。类型注释为 entrypoint.final[return_type, save_type]。
<code tabindex="0"><span id="__span-21-1">@entrypoint(checkpointer=checkpointer)
<span id="__span-21-2">def my_workflow(number: int, *, previous: Any = None) -> entrypoint.final[int, int]:
<span id="__span-21-3"> previous = previous or 0
<span id="__span-21-4"> # This will return the previous value to the caller, saving
<span id="__span-21-5"> # 2 * number to the checkpoint, which will be used in the next invocation
<span id="__span-21-6"> # for the `previous` parameter.
<span id="__span-21-7"> return entrypoint.final(value=previous, save=2 * number)
<span id="__span-21-8">
<span id="__span-21-9">config = {
<span id="__span-21-10"> "configurable": {
<span id="__span-21-11"> "thread_id": "1"
<span id="__span-21-12"> }
<span id="__span-21-13">}
<span id="__span-21-14">
<span id="__span-21-15">my_workflow.invoke(3, config) # 0 (previous was None)
<span id="__span-21-16">my_workflow.invoke(1, config) # 6 (previous was 3 * 2 from the previous invocation)
任务¶
任务表示一个离散的工作单元,例如 API 调用或数据处理步骤。它具有两个关键特性:
- 异步执行:任务旨在异步执行,允许多个操作并发运行而不会阻塞。
- 检查点:任务结果会保存到检查点,从而使工作流能够从上次保存的状态恢复。(有关更多详细信息,请参阅持久化)。
定义¶
任务是使用 @task 装饰器定义的,该装饰器包装了一个普通的 Python 函数。
API 参考:task
<span id="__span-22-1">from langgraph.func import task
<span id="__span-22-2">
<span id="__span-22-3">@task()
<span id="__span-22-4">def slow_computation(input_value):
<span id="__span-22-5"> # Simulate a long-running operation
<span id="__span-22-6"> ...
<span id="__span-22-7"> return result
序列化
任务的输出必须是 JSON 可序列化的,以支持检查点。
执行¶
任务只能从入口点、另一个任务或状态图节点内部调用。
任务_不能_直接从主应用程序代码中调用。
当您调用一个任务时,它会_立即_返回一个 future 对象。future 是一个占位符,用于表示稍后可用的结果。
要获取任务的结果,您可以同步等待它(使用 result())或异步等待它(使用 await)。
<span id="__span-23-1">@entrypoint(checkpointer=checkpointer)
<span id="__span-23-2">def my_workflow(some_input: int) -> int:
<span id="__span-23-3"> future = slow_computation(some_input)
<span id="__span-23-4"> return future.result() # Wait for the result synchronously
何时使用任务¶
任务在以下场景中很有用:
- 检查点:当您需要将长时间运行操作的结果保存到检查点时,这样在恢复工作流时就不需要重新计算它。
- 人在环:如果您正在构建需要人工干预的工作流,则必须使用任务来封装任何随机性(例如 API 调用),以确保工作流可以正确恢复。有关更多详细信息,请参阅确定性部分。
- 并行执行:对于 I/O 密集型任务,任务支持并行执行,允许多个操作并发运行而不会阻塞(例如,调用多个 API)。
- 可观测性:将操作封装在任务中,提供了一种使用LangSmith跟踪工作流进度和监控单个操作执行的方法。
- 可重试工作:当工作需要重试以处理故障或不一致时,任务提供了一种封装和管理重试逻辑的方法。
序列化¶
LangGraph 中的序列化有两个关键方面:
@entrypoint的输入和输出必须是 JSON 可序列化的。@task的输出必须是 JSON 可序列化的。
这些要求对于启用检查点和工作流恢复是必要的。使用 Python 原语,如字典、列表、字符串、数字和布尔值,以确保您的输入和输出是可序列化的。
序列化确保工作流状态(例如任务结果和中间值)可以可靠地保存和恢复。这对于启用人在环交互、容错和并行执行至关重要。
提供不可序列化的输入或输出将在工作流配置了检查点器时导致运行时错误。
确定性¶
为了利用人在环等功能,任何随机性都应封装在任务内部。这保证了当执行停止(例如,为了人在环)并随后恢复时,它将遵循相同的_步骤序列_,即使任务结果是非确定性的。
LangGraph 通过在执行时持久化任务和子图的结果来实现此行为。精心设计的工作流可确保恢复执行遵循_相同的步骤序列_,从而允许正确检索先前计算的结果,而无需重新执行它们。这对于长时间运行的任务或具有非确定性结果的任务特别有用,因为它避免了重复以前完成的工作,并允许从本质上相同的状态恢复。
尽管工作流的不同运行可以产生不同的结果,但恢复特定运行应始终遵循相同的记录步骤序列。这使得 LangGraph 能够有效地查找在图中断之前执行的任务和子图结果,并避免重新计算它们。
幂等性¶
幂等性确保多次运行相同的操作会产生相同的结果。这有助于防止因故障而重新运行某个步骤时出现重复的 API 调用和冗余处理。始终将 API 调用放在任务函数内部进行检查点,并将其设计为幂等的,以防重新执行。如果任务启动但未成功完成,则可能会发生重新执行。然后,如果工作流恢复,任务将再次运行。使用幂等性键或验证现有结果以避免重复。
函数式 API 与图 API 对比¶
函数式 API 和图 API (StateGraph) 提供了两种不同的范式来使用 LangGraph 创建应用程序。以下是一些主要区别:
- 控制流:函数式 API 不需要考虑图结构。您可以使用标准 Python 构造来定义工作流。这通常会减少您需要编写的代码量。
- 短期记忆:图 API 需要声明一个State,并且可能需要定义reducer来管理图状态的更新。
@entrypoint和@tasks不需要显式状态管理,因为它们的状态作用域限定在函数内部,并且不跨函数共享。 - 检查点:两种 API 都生成和使用检查点。在图 API 中,每个超步后都会生成一个新的检查点。在函数式 API 中,当任务执行时,它们的結果會保存到与给定入口点关联的现有检查点中,而不是创建新的检查点。
- 可视化:图 API 可以轻松地将工作流可视化为图,这对于调试、理解工作流和与他人共享非常有用。函数式 API 不支持可视化,因为图是在运行时动态生成的。
常见陷阱¶
处理副作用¶
将副作用(例如,写入文件、发送电子邮件)封装在任务中,以确保在恢复工作流时它们不会被多次执行。
在此示例中,副作用(写入文件)直接包含在工作流中,因此在恢复工作流时会再次执行。
<span id="__span-25-1">@entrypoint(checkpointer=checkpointer)
<span id="__span-25-2">def my_workflow(inputs: dict) -> int:
<span id="__span-25-3"> # This code will be executed a second time when resuming the workflow.
<span id="__span-25-4"> # Which is likely not what you want.
<span id="__span-25-5"> with open("output.txt", "w") as f:
<span id="__span-25-6"> f.write("Side effect executed")
<span id="__span-25-7"> value = interrupt("question")
<span id="__span-25-8"> return value
非确定性控制流¶
每次可能产生不同结果的操作(例如获取当前时间或随机数)应封装在任务中,以确保在恢复时返回相同的结果。
- 在任务中:获取随机数 (5) → 中断 → 恢复 → (再次返回 5) → …
- 不在任务中:获取随机数 (5) → 中断 → 恢复 → 获取新的随机数 (7) → …
当使用具有多个中断调用的人在环工作流时,这一点尤为重要。LangGraph 为每个任务/入口点保留一个恢复值列表。当遇到中断时,它会与相应的恢复值匹配。这种匹配严格按索引进行,因此恢复值的顺序应与中断的顺序匹配。
如果在恢复时未保持执行顺序,一个 interrupt 调用可能会与错误的 resume 值匹配,从而导致结果不正确。
请阅读确定性部分了解更多详细信息。
在此示例中,工作流使用当前时间来确定要执行的任务。这是非确定性的,因为工作流的结果取决于其执行时间。
<span id="__span-27-1">from langgraph.func import entrypoint
<span id="__span-27-2">
<span id="__span-27-3">@entrypoint(checkpointer=checkpointer)
<span id="__span-27-4">def my_workflow(inputs: dict) -> int:
<span id="__span-27-5"> t0 = inputs["t0"]
<span id="__span-27-6"> t1 = time.time()
<span id="__span-27-7">
<span id="__span-27-8"> delta_t = t1 - t0
<span id="__span-27-9">
<span id="__span-27-10"> if delta_t > 1:
<span id="__span-27-11"> result = slow_task(1).result()
<span id="__span-27-12"> value = interrupt("question")
<span id="__span-27-13"> else:
<span id="__span-27-14"> result = slow_task(2).result()
<span id="__span-27-15"> value = interrupt("question")
<span id="__span-27-16">
<span id="__span-27-17"> return {
<span id="__span-27-18"> "result": result,
<span id="__span-27-19"> "value": value
<span id="__span-27-20"> }
- 原文作者:知识铺
- 原文链接:https://index.zshipu.com/ai002/post/20251125/%E6%A6%82%E8%BF%B0-LangChain-%E6%A1%86%E6%9E%B6-10/
- 版权声明:本作品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行许可,非商业转载请注明出处(作者,原文链接),商业转载请联系作者获得授权。
- 免责声明:本页面内容均来源于站内编辑发布,部分信息来源互联网,并不意味着本站赞同其观点或者证实其内容的真实性,如涉及版权等问题,请立即联系客服进行更改或删除,保证您的合法权益。转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。也可以邮件至 sblig@126.com