使用函数式API - LangChain 框架 --知识铺
创建简单工作流¶
定义 entrypoint 时,输入仅限于函数的第一个参数。要传递多个输入,可以使用字典。
<span id="__span-0-1">@entrypoint(checkpointer=checkpointer)
<span id="__span-0-2">def my_workflow(inputs: dict) -> int:
<span id="__span-0-3"> value = inputs["value"]
<span id="__span-0-4"> another_value = inputs["another_value"]
<span id="__span-0-5"> ...
<span id="__span-0-6">
<span id="__span-0-7">my_workflow.invoke({"value": 1, "another_value": 2})
扩展示例:简单工作流
<span id="__span-1-1">import uuid
<span id="__span-1-2">from langgraph.func import entrypoint, task
<span id="__span-1-3">from langgraph.checkpoint.memory import MemorySaver
<span id="__span-1-4">
<span id="__span-1-5"># Task that checks if a number is even
<span id="__span-1-6">@task
<span id="__span-1-7">def is_even(number: int) -> bool:
<span id="__span-1-8"> return number % 2 == 0
<span id="__span-1-9">
<span id="__span-1-10"># Task that formats a message
<span id="__span-1-11">@task
<span id="__span-1-12">def format_message(is_even: bool) -> str:
<span id="__span-1-13"> return "The number is even." if is_even else "The number is odd."
<span id="__span-1-14">
<span id="__span-1-15"># Create a checkpointer for persistence
<span id="__span-1-16">checkpointer = MemorySaver()
<span id="__span-1-17">
<span id="__span-1-18">@entrypoint(checkpointer=checkpointer)
<span id="__span-1-19">def workflow(inputs: dict) -> str:
<span id="__span-1-20"> """Simple workflow to classify a number."""
<span id="__span-1-21"> even = is_even(inputs["number"]).result()
<span id="__span-1-22"> return format_message(even).result()
<span id="__span-1-23">
<span id="__span-1-24"># Run the workflow with a unique thread ID
<span id="__span-1-25">config = {"configurable": {"thread_id": str(uuid.uuid4())}}
<span id="__span-1-26">result = workflow.invoke({"number": 7}, config=config)
<span id="__span-1-27">print(result)
扩展示例:使用 LLM 撰写论文
此示例演示了如何在语法上使用 @task 和 @entrypoint 装饰器。如果提供了检查点,工作流结果将持久化到检查点中。
<span id="__span-2-1">import uuid
<span id="__span-2-2">from langchain.chat_models import init_chat_model
<span id="__span-2-3">from langgraph.func import entrypoint, task
<span id="__span-2-4">from langgraph.checkpoint.memory import MemorySaver
<span id="__span-2-5">
<span id="__span-2-6">llm = init_chat_model('openai:gpt-3.5-turbo')
<span id="__span-2-7">
<span id="__span-2-8"># Task: generate essay using an LLM
<span id="__span-2-9">@task
<span id="__span-2-10">def compose_essay(topic: str) -> str:
<span id="__span-2-11"> """Generate an essay about the given topic."""
<span id="__span-2-12"> return llm.invoke([
<span id="__span-2-13"> {"role": "system", "content": "You are a helpful assistant that writes essays."},
<span id="__span-2-14"> {"role": "user", "content": f"Write an essay about {topic}."}
<span id="__span-2-15"> ]).content
<span id="__span-2-16">
<span id="__span-2-17"># Create a checkpointer for persistence
<span id="__span-2-18">checkpointer = MemorySaver()
<span id="__span-2-19">
<span id="__span-2-20">@entrypoint(checkpointer=checkpointer)
<span id="__span-2-21">def workflow(topic: str) -> str:
<span id="__span-2-22"> """Simple workflow that generates an essay with an LLM."""
<span id="__span-2-23"> return compose_essay(topic).result()
<span id="__span-2-24">
<span id="__span-2-25"># Execute the workflow
<span id="__span-2-26">config = {"configurable": {"thread_id": str(uuid.uuid4())}}
<span id="__span-2-27">result = workflow.invoke("the history of flight", config=config)
<span id="__span-2-28">print(result)
并行执行¶
任务可以通过并发调用并等待结果来并行执行。这对于提高 I/O 密集型任务(例如,调用 LLM API)的性能非常有用。
<span id="__span-3-1">@task
<span id="__span-3-2">def add_one(number: int) -> int:
<span id="__span-3-3"> return number + 1
<span id="__span-3-4">
<span id="__span-3-5">@entrypoint(checkpointer=checkpointer)
<span id="__span-3-6">def graph(numbers: list[int]) -> list[str]:
<span id="__span-3-7"> futures = [add_one(i) for i in numbers]
<span id="__span-3-8"> return [f.result() for f in futures]
扩展示例:并行 LLM 调用
此示例演示了如何使用 @task 并行运行多个 LLM 调用。每个调用都会生成关于不同主题的一段文字,结果会合并成一个文本输出。
<code tabindex="0"><span id="__span-4-1">import uuid
<span id="__span-4-2">from langchain.chat_models import init_chat_model
<span id="__span-4-3">from langgraph.func import entrypoint, task
<span id="__span-4-4">from langgraph.checkpoint.memory import MemorySaver
<span id="__span-4-5">
<span id="__span-4-6"># Initialize the LLM model
<span id="__span-4-7">llm = init_chat_model("openai:gpt-3.5-turbo")
<span id="__span-4-8">
<span id="__span-4-9"># Task that generates a paragraph about a given topic
<span id="__span-4-10">@task
<span id="__span-4-11">def generate_paragraph(topic: str) -> str:
<span id="__span-4-12"> response = llm.invoke([
<span id="__span-4-13"> {"role": "system", "content": "You are a helpful assistant that writes educational paragraphs."},
<span id="__span-4-14"> {"role": "user", "content": f"Write a paragraph about {topic}."}
<span id="__span-4-15"> ])
<span id="__span-4-16"> return response.content
<span id="__span-4-17">
<span id="__span-4-18"># Create a checkpointer for persistence
<span id="__span-4-19">checkpointer = MemorySaver()
<span id="__span-4-20">
<span id="__span-4-21">@entrypoint(checkpointer=checkpointer)
<span id="__span-4-22">def workflow(topics: list[str]) -> str:
<span id="__span-4-23"> """Generates multiple paragraphs in parallel and combines them."""
<span id="__span-4-24"> futures = [generate_paragraph(topic) for topic in topics]
<span id="__span-4-25"> paragraphs = [f.result() for f in futures]
<span id="__span-4-26"> return "\n\n".join(paragraphs)
<span id="__span-4-27">
<span id="__span-4-28"># Run the workflow
<span id="__span-4-29">config = {"configurable": {"thread_id": str(uuid.uuid4())}}
<span id="__span-4-30">result = workflow.invoke(["quantum computing", "climate change", "history of aviation"], config=config)
<span id="__span-4-31">print(result)
此示例使用 LangGraph 的并发模型来缩短执行时间,尤其是在任务涉及 I/O(如 LLM 补全)时。
调用图¶
函数式 API 和图 API 可以在同一应用程序中一起使用,因为它们共享相同的底层运行时。
API 参考:entrypoint | StateGraph
<span id="__span-5-1">from langgraph.func import entrypoint
<span id="__span-5-2">from langgraph.graph import StateGraph
<span id="__span-5-3">
<span id="__span-5-4">builder = StateGraph()
<span id="__span-5-5">...
<span id="__span-5-6">some_graph = builder.compile()
<span id="__span-5-7">
<span id="__span-5-8">@entrypoint()
<span id="__span-5-9">def some_workflow(some_input: dict) -> int:
<span id="__span-5-10"> # Call a graph defined using the graph API
<span id="__span-5-11"> result_1 = some_graph.invoke(...)
<span id="__span-5-12"> # Call another graph defined using the graph API
<span id="__span-5-13"> result_2 = another_graph.invoke(...)
<span id="__span-5-14"> return {
<span id="__span-5-15"> "result_1": result_1,
<span id="__span-5-16"> "result_2": result_2
<span id="__span-5-17"> }
扩展示例:从函数式 API 调用简单图
<span id="__span-6-1">import uuid
<span id="__span-6-2">from typing import TypedDict
<span id="__span-6-3">from langgraph.func import entrypoint
<span id="__span-6-4">from langgraph.checkpoint.memory import MemorySaver
<span id="__span-6-5">from langgraph.graph import StateGraph
<span id="__span-6-6">
<span id="__span-6-7"># Define the shared state type
<span id="__span-6-8">class State(TypedDict):
<span id="__span-6-9"> foo: int
<span id="__span-6-10">
<span id="__span-6-11"># Define a simple transformation node
<span id="__span-6-12">def double(state: State) -> State:
<span id="__span-6-13"> return {"foo": state["foo"] * 2}
<span id="__span-6-14">
<span id="__span-6-15"># Build the graph using the Graph API
<span id="__span-6-16">builder = StateGraph(State)
<span id="__span-6-17">builder.add_node("double", double)
<span id="__span-6-18">builder.set_entry_point("double")
<span id="__span-6-19">graph = builder.compile()
<span id="__span-6-20">
<span id="__span-6-21"># Define the functional API workflow
<span id="__span-6-22">checkpointer = MemorySaver()
<span id="__span-6-23">
<span id="__span-6-24">@entrypoint(checkpointer=checkpointer)
<span id="__span-6-25">def workflow(x: int) -> dict:
<span id="__span-6-26"> result = graph.invoke({"foo": x})
<span id="__span-6-27"> return {"bar": result["foo"]}
<span id="__span-6-28">
<span id="__span-6-29"># Execute the workflow
<span id="__span-6-30">config = {"configurable": {"thread_id": str(uuid.uuid4())}}
<span id="__span-6-31">print(workflow.invoke(5, config=config)) # Output: {'bar': 10}
调用其他入口点¶
您可以在 entrypoint 或 task 内部调用其他 entrypoint。
<code tabindex="0"><span id="__span-7-1">@entrypoint() # Will automatically use the checkpointer from the parent entrypoint
<span id="__span-7-2">def some_other_workflow(inputs: dict) -> int:
<span id="__span-7-3"> return inputs["value"]
<span id="__span-7-4">
<span id="__span-7-5">@entrypoint(checkpointer=checkpointer)
<span id="__span-7-6">def my_workflow(inputs: dict) -> int:
<span id="__span-7-7"> value = some_other_workflow.invoke({"value": 1})
<span id="__span-7-8"> return value
扩展示例:调用另一个入口点
<span id="__span-8-1">import uuid
<span id="__span-8-2">from langgraph.func import entrypoint
<span id="__span-8-3">from langgraph.checkpoint.memory import MemorySaver
<span id="__span-8-4">
<span id="__span-8-5"># Initialize a checkpointer
<span id="__span-8-6">checkpointer = MemorySaver()
<span id="__span-8-7">
<span id="__span-8-8"># A reusable sub-workflow that multiplies a number
<span id="__span-8-9">@entrypoint()
<span id="__span-8-10">def multiply(inputs: dict) -> int:
<span id="__span-8-11"> return inputs["a"] * inputs["b"]
<span id="__span-8-12">
<span id="__span-8-13"># Main workflow that invokes the sub-workflow
<span id="__span-8-14">@entrypoint(checkpointer=checkpointer)
<span id="__span-8-15">def main(inputs: dict) -> dict:
<span id="__span-8-16"> result = multiply.invoke({"a": inputs["x"], "b": inputs["y"]})
<span id="__span-8-17"> return {"product": result}
<span id="__span-8-18">
<span id="__span-8-19"># Execute the main workflow
<span id="__span-8-20">config = {"configurable": {"thread_id": str(uuid.uuid4())}}
<span id="__span-8-21">print(main.invoke({"x": 6, "y": 7}, config=config)) # Output: {'product': 42}
流式传输¶
函数式 API 使用与 图 API 相同的流式传输机制。请参阅流式传输指南部分了解更多详细信息。
使用流式传输 API 流式传输更新和自定义数据的示例。
API 参考:entrypoint | MemorySaver | get_stream_writer
<span id="__span-9-1">from langgraph.func import entrypoint
<span id="__span-9-2">from langgraph.checkpoint.memory import MemorySaver
<span id="__span-9-3">from langgraph.config import get_stream_writer
<span id="__span-9-4">
<span id="__span-9-5">checkpointer = MemorySaver()
<span id="__span-9-6">
<span id="__span-9-7">@entrypoint(checkpointer=checkpointer)
<span id="__span-9-8">def main(inputs: dict) -> int:
<span id="__span-9-9"> writer = get_stream_writer()
<span id="__span-9-10"> writer("Started processing")
<span id="__span-9-11"> result = inputs["x"] * 2
<span id="__span-9-12"> writer(f"Result is {result}")
<span id="__span-9-13"> return result
<span id="__span-9-14">
<span id="__span-9-15">config = {"configurable": {"thread_id": "abc"}}
<span id="__span-9-16">
<span id="__span-9-17">for mode, chunk in main.stream(
<span id="__span-9-18"> {"x": 5},
<span id="__span-9-19"> stream_mode=["custom", "updates"],
<span id="__span-9-20"> config=config
<span id="__span-9-21">):
<span id="__span-9-22"> print(f"{mode}: {chunk}")
<span id="__span-10-1">('updates', {'add_one': 2})
<span id="__span-10-2">('updates', {'add_two': 3})
<span id="__span-10-3">('custom', 'hello')
<span id="__span-10-4">('custom', 'world')
<span id="__span-10-5">('updates', {'main': 5})
Python < 3.11 的异步
如果使用 Python < 3.11 并编写异步代码,get_stream_writer() 将不起作用。请直接使用 StreamWriter 类。有关更多详细信息,请参阅Python < 3.11 的异步。
<span id="__span-11-1">from langgraph.types import StreamWriter
<span id="__span-11-2">
<span id="__span-11-3">@entrypoint(checkpointer=checkpointer)
<span id="__span-11-4">async def main(inputs: dict, writer: StreamWriter) -> int:
<span id="__span-11-5"> ...
重试策略 ¶
API 参考:MemorySaver | entrypoint | task | RetryPolicy
<code tabindex="0"><span id="__span-12-1">from langgraph.checkpoint.memory import MemorySaver
<span id="__span-12-2">from langgraph.func import entrypoint, task
<span id="__span-12-3">from langgraph.types import RetryPolicy
<span id="__span-12-4">
<span id="__span-12-5"># This variable is just used for demonstration purposes to simulate a network failure.
<span id="__span-12-6"># It's not something you will have in your actual code.
<span id="__span-12-7">attempts = 0
<span id="__span-12-8">
<span id="__span-12-9"># Let's configure the RetryPolicy to retry on ValueError.
<span id="__span-12-10"># The default RetryPolicy is optimized for retrying specific network errors.
<span id="__span-12-11">retry_policy = RetryPolicy(retry_on=ValueError)
<span id="__span-12-12">
<span id="__span-12-13">@task(retry=retry_policy)
<span id="__span-12-14">def get_info():
<span id="__span-12-15"> global attempts
<span id="__span-12-16"> attempts += 1
<span id="__span-12-17">
<span id="__span-12-18"> if attempts < 2:
<span id="__span-12-19"> raise ValueError('Failure')
<span id="__span-12-20"> return "OK"
<span id="__span-12-21">
<span id="__span-12-22">checkpointer = MemorySaver()
<span id="__span-12-23">
<span id="__span-12-24">@entrypoint(checkpointer=checkpointer)
<span id="__span-12-25">def main(inputs, writer):
<span id="__span-12-26"> return get_info().result()
<span id="__span-12-27">
<span id="__span-12-28">config = {
<span id="__span-12-29"> "configurable": {
<span id="__span-12-30"> "thread_id": "1"
<span id="__span-12-31"> }
<span id="__span-12-32">}
<span id="__span-12-33">
<span id="__span-12-34">main.invoke({'any_input': 'foobar'}, config=config)
缓存任务¶
API 参考:入口点 | 任务
<span id="__span-14-1">import time
<span id="__span-14-2">from langgraph.cache.memory import InMemoryCache
<span id="__span-14-3">from langgraph.func import entrypoint, task
<span id="__span-14-4">from langgraph.types import CachePolicy
<span id="__span-14-5">
<span id="__span-14-6">
<span id="__span-14-7">@task(cache_policy=CachePolicy(ttl=120))
<span id="__span-14-8">def slow_add(x: int) -> int:
<span id="__span-14-9"> time.sleep(1)
<span id="__span-14-10"> return x * 2
<span id="__span-14-11">
<span id="__span-14-12">
<span id="__span-14-13">@entrypoint(cache=InMemoryCache())
<span id="__span-14-14">def main(inputs: dict) -> dict[str, int]:
<span id="__span-14-15"> result1 = slow_add(inputs["x"]).result()
<span id="__span-14-16"> result2 = slow_add(inputs["x"]).result()
<span id="__span-14-17"> return {"result1": result1, "result2": result2}
<span id="__span-14-18">
<span id="__span-14-19">
<span id="__span-14-20">for chunk in main.stream({"x": 5}, stream_mode="updates"):
<span id="__span-14-21"> print(chunk)
<span id="__span-14-22">
<span id="__span-14-23">#> {'slow_add': 10}
<span id="__span-14-24">#> {'slow_add': 10, '__metadata__': {'cached': True}}
<span id="__span-14-25">#> {'main': {'result1': 10, 'result2': 10}}
错误后恢复¶
API 参考:MemorySaver | entrypoint | task | StreamWriter
<code tabindex="0"><span id="__span-15-1">import time
<span id="__span-15-2">from langgraph.checkpoint.memory import MemorySaver
<span id="__span-15-3">from langgraph.func import entrypoint, task
<span id="__span-15-4">from langgraph.types import StreamWriter
<span id="__span-15-5">
<span id="__span-15-6"># This variable is just used for demonstration purposes to simulate a network failure.
<span id="__span-15-7"># It's not something you will have in your actual code.
<span id="__span-15-8">attempts = 0
<span id="__span-15-9">
<span id="__span-15-10">@task()
<span id="__span-15-11">def get_info():
<span id="__span-15-12"> """
<span id="__span-15-13"> Simulates a task that fails once before succeeding.
<span id="__span-15-14"> Raises an exception on the first attempt, then returns "OK" on subsequent tries.
<span id="__span-15-15"> """
<span id="__span-15-16"> global attempts
<span id="__span-15-17"> attempts += 1
<span id="__span-15-18">
<span id="__span-15-19"> if attempts < 2:
<span id="__span-15-20"> raise ValueError("Failure") # Simulate a failure on the first attempt
<span id="__span-15-21"> return "OK"
<span id="__span-15-22">
<span id="__span-15-23"># Initialize an in-memory checkpointer for persistence
<span id="__span-15-24">checkpointer = MemorySaver()
<span id="__span-15-25">
<span id="__span-15-26">@task
<span id="__span-15-27">def slow_task():
<span id="__span-15-28"> """
<span id="__span-15-29"> Simulates a slow-running task by introducing a 1-second delay.
<span id="__span-15-30"> """
<span id="__span-15-31"> time.sleep(1)
<span id="__span-15-32"> return "Ran slow task."
<span id="__span-15-33">
<span id="__span-15-34">@entrypoint(checkpointer=checkpointer)
<span id="__span-15-35">def main(inputs, writer: StreamWriter):
<span id="__span-15-36"> """
<span id="__span-15-37"> Main workflow function that runs the slow_task and get_info tasks sequentially.
<span id="__span-15-38">
<span id="__span-15-39"> Parameters:
<span id="__span-15-40"> - inputs: Dictionary containing workflow input values.
<span id="__span-15-41"> - writer: StreamWriter for streaming custom data.
<span id="__span-15-42">
<span id="__span-15-43"> The workflow first executes `slow_task` and then attempts to execute `get_info`,
<span id="__span-15-44"> which will fail on the first invocation.
<span id="__span-15-45"> """
<span id="__span-15-46"> slow_task_result = slow_task().result() # Blocking call to slow_task
<span id="__span-15-47"> get_info().result() # Exception will be raised here on the first attempt
<span id="__span-15-48"> return slow_task_result
<span id="__span-15-49">
<span id="__span-15-50"># Workflow execution configuration with a unique thread identifier
<span id="__span-15-51">config = {
<span id="__span-15-52"> "configurable": {
<span id="__span-15-53"> "thread_id": "1" # Unique identifier to track workflow execution
<span id="__span-15-54"> }
<span id="__span-15-55">}
<span id="__span-15-56">
<span id="__span-15-57"># This invocation will take ~1 second due to the slow_task execution
<span id="__span-15-58">try:
<span id="__span-15-59"> # First invocation will raise an exception due to the `get_info` task failing
<span id="__span-15-60"> main.invoke({'any_input': 'foobar'}, config=config)
<span id="__span-15-61">except ValueError:
<span id="__span-15-62"> pass # Handle the failure gracefully
当我们恢复执行时,无需重新运行 slow_task,因为其结果已保存到检查点中。
<span id="__span-16-1">main.invoke(None, config=config)
人机协作¶
函数式 API 支持使用 interrupt 函数和 Command 原语的人机协作工作流。
请参阅以下示例了解更多详细信息
- 如何等待用户输入(函数式 API):展示了如何使用函数式 API 实现简单的人机协作工作流。
- 如何审查工具调用(函数式 API):本指南演示了如何使用 LangGraph 函数式 API 在 ReAct 代理中实现人机协作工作流。
短期记忆¶
短期记忆允许在同一线程 ID 的不同调用之间存储信息。有关更多详细信息,请参阅短期记忆。
解耦返回值与保存值¶
使用 entrypoint.final 将返回给调用者的值与持久化到检查点的值解耦。这在以下情况下很有用:
- 您想返回一个计算结果(例如,摘要或状态),但保存一个不同的内部值以供下次调用使用。
- 您需要控制在下次运行时传递给先前参数的内容。
API 参考:入口点 | MemorySaver
<code tabindex="0"><span id="__span-18-1">from typing import Optional
<span id="__span-18-2">from langgraph.func import entrypoint
<span id="__span-18-3">from langgraph.checkpoint.memory import MemorySaver
<span id="__span-18-4">
<span id="__span-18-5">checkpointer = MemorySaver()
<span id="__span-18-6">
<span id="__span-18-7">@entrypoint(checkpointer=checkpointer)
<span id="__span-18-8">def accumulate(n: int, *, previous: Optional[int]) -> entrypoint.final[int, int]:
<span id="__span-18-9"> previous = previous or 0
<span id="__span-18-10"> total = previous + n
<span id="__span-18-11"> # Return the *previous* value to the caller but save the *new* total to the checkpoint.
<span id="__span-18-12"> return entrypoint.final(value=previous, save=total)
<span id="__span-18-13">
<span id="__span-18-14">config = {"configurable": {"thread_id": "my-thread"}}
<span id="__span-18-15">
<span id="__span-18-16">print(accumulate.invoke(1, config=config)) # 0
<span id="__span-18-17">print(accumulate.invoke(2, config=config)) # 1
<span id="__span-18-18">print(accumulate.invoke(3, config=config)) # 3
聊天机器人示例¶
一个使用函数式 API 和 MemorySaver 检查点的简单聊天机器人示例。该机器人能够记住之前的对话并从上次中断的地方继续。
API 参考:BaseMessage | add_messages | entrypoint | task | MemorySaver | ChatAnthropic
<span id="__span-19-1">from langchain_core.messages import BaseMessage
<span id="__span-19-2">from langgraph.graph import add_messages
<span id="__span-19-3">from langgraph.func import entrypoint, task
<span id="__span-19-4">from langgraph.checkpoint.memory import MemorySaver
<span id="__span-19-5">from langchain_anthropic import ChatAnthropic
<span id="__span-19-6">
<span id="__span-19-7">model = ChatAnthropic(model="claude-3-5-sonnet-latest")
<span id="__span-19-8">
<span id="__span-19-9">@task
<span id="__span-19-10">def call_model(messages: list[BaseMessage]):
<span id="__span-19-11"> response = model.invoke(messages)
<span id="__span-19-12"> return response
<span id="__span-19-13">
<span id="__span-19-14">checkpointer = MemorySaver()
<span id="__span-19-15">
<span id="__span-19-16">@entrypoint(checkpointer=checkpointer)
<span id="__span-19-17">def workflow(inputs: list[BaseMessage], *, previous: list[BaseMessage]):
<span id="__span-19-18"> if previous:
<span id="__span-19-19"> inputs = add_messages(previous, inputs)
<span id="__span-19-20">
<span id="__span-19-21"> response = call_model(inputs).result()
<span id="__span-19-22"> return entrypoint.final(value=response, save=add_messages(inputs, response))
<span id="__span-19-23">
<span id="__span-19-24">config = {"configurable": {"thread_id": "1"}}
<span id="__span-19-25">input_message = {"role": "user", "content": "hi! I'm bob"}
<span id="__span-19-26">for chunk in workflow.stream([input_message], config, stream_mode="values"):
<span id="__span-19-27"> chunk.pretty_print()
<span id="__span-19-28">
<span id="__span-19-29">input_message = {"role": "user", "content": "what's my name?"}
<span id="__span-19-30">for chunk in workflow.stream([input_message], config, stream_mode="values"):
<span id="__span-19-31"> chunk.pretty_print()
扩展示例:构建一个简单聊天机器人
如何添加线程级持久化(函数式 API):展示了如何为函数式 API 工作流添加线程级持久化并实现一个简单聊天机器人。
长期记忆¶
长期记忆允许在不同的线程 ID 之间存储信息。这对于在一个对话中学习有关给定用户的信息并在另一个对话中使用它很有用。
扩展示例:添加长期记忆
如何添加跨线程持久化(函数式 API):展示了如何为函数式 API 工作流添加跨线程持久化并实现一个简单聊天机器人。
工作流¶
- 工作流和代理指南提供了更多关于如何使用函数式 API 构建工作流的示例。
代理¶
- 如何从头开始创建代理(函数式 API):展示了如何使用函数式 API 从头开始创建简单代理。
- 如何构建多代理网络:展示了如何使用函数式 API 构建多代理网络。
- 如何在多代理应用程序中添加多轮对话(函数式 API):允许最终用户与一个或多个代理进行多轮对话。
与其他库集成¶
- 使用函数式 API 将 LangGraph 的功能添加到其他框架:将 LangGraph 的持久化、内存和流式传输等功能添加到其他不提供这些功能的代理框架中。
- 原文作者:知识铺
- 原文链接:https://index.zshipu.com/ai002/post/20251125/%E4%BD%BF%E7%94%A8%E5%87%BD%E6%95%B0%E5%BC%8FAPI-LangChain-%E6%A1%86%E6%9E%B6/
- 版权声明:本作品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行许可,非商业转载请注明出处(作者,原文链接),商业转载请联系作者获得授权。
- 免责声明:本页面内容均来源于站内编辑发布,部分信息来源互联网,并不意味着本站赞同其观点或者证实其内容的真实性,如涉及版权等问题,请立即联系客服进行更改或删除,保证您的合法权益。转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。也可以邮件至 sblig@126.com