2026/2/25 2:51:32
网站建设
项目流程
网站ico如何修改,网站建设公司销售技巧,wordpress占用,郑州家居网站建设服务公司一、类的定位与特点
CompiledStateGraph 是一个 可运行的智能体工作流#xff0c;支持#xff1a;
功能描述有状态(State)所有节点共享一个 state schema自动调度框架决定下一个要执行哪个节点工具调用支持自动判断何时调用 ToolNode条件分支根据 state 或 judge 函数决定路…一、类的定位与特点CompiledStateGraph是一个可运行的智能体工作流支持功能描述有状态(State)所有节点共享一个 state schema自动调度框架决定下一个要执行哪个节点工具调用支持自动判断何时调用 ToolNode条件分支根据 state 或 judge 函数决定路线Streaming 输出支持部分生成如大模型 token 流异步执行适合 Web API、服务端执行Checkpoint错误恢复、回放、持久化人类参与支持 Human-in-the-loop二、完整方法系统结构化总结我按照使用频率和逻辑把所有方法分成五类。方法分类总览① 执行类核心运行方法说明invoke同步执行输入 → 输出ainvoke异步执行Web API 场景推荐stream同步流式执行astream异步流式执行最常用② 回放 / 检查点 / 状态类方法说明get_state获取执行中的状态含 memory / checkpointget_subgraphs获取子图如工具子图③ Streaming 事件类方法说明astream_events异步事件流比 astream 更底层astream_log记录执行日志包含 token / 工具调用astream_final仅监听最终输出④ 调度、边控制类方法说明_execute_graph内部方法执行图你一般不用手动调用_get_iterator内部迭代器为流式输出服务⑤ 工具相关方法方法说明get_tools获取工具节点信息list_tools返回可调用工具三、核心方法详解下面我会依次解释最关键的几个方法并附带对应 demo。1. invoke(input) → 同步一次性执行适用于简单流程不需要流式输出不需要异步示例resultapp.invoke({query:你好})print(result)特点等所有节点执行完才返回返回最新的完整 state2. ainvoke(input) → 异步执行适合Web APIFlask / FastAPILangServe高并发场景示例resultawaitapp.ainvoke({query:你好})3. stream(input) → 同步流式执行逐步输出适用于终端应用模型回答长文本时想边生成边显示示例forstepinapp.stream({query:写一首诗}):print(step)4. astream(input) → 异步流式执行适用于WebSocket / SSE前端实时输出前后端分离部署示例asyncforeventinapp.astream({query:写一个优美的句子}):print(event)输出结构每个 event 是{node:某个节点名,state:{...当前 state...},event:node_end / tool_start / tool_end / token等等}高级方法astream_events比 astream 更底层会输出更详细的事件包括token 级别输出工具调用开始/结束节点切换状态更新示例asyncforeventinapp.astream_events({query:天气如何}):print(event[event],event.get(data))四、完整 demo这是一个综合案例包含state schemastep1 → 条件判断 → OK/FAIL工具调用节点 ToolNodeStreaming 输出① 构建图结构fromlanggraph.graphimportStateGraph,MessagesStatefromlanggraph.prebuiltimportToolNode,tools_condition# 定义 State classMyState(MessagesState):x:int0result:str# 定义节点逻辑 defstep1(state:MyState):new_xstate[x]3return{x:new_x}defok(state:MyState):msg{role:assistant,content:检测通过执行工具}return{result:通过检查,messages:state[messages][msg]}deffail(state:MyState):msg{role:assistant,content:检测失败执行工具}return{result:未通过检查,messages:state[messages][msg]}# 工具模拟defadd_tool(value:int): 加 10 的工具函数。 参数 value (int): 输入数值。 返回 int: 输入值加 10。 returnvalue10tools[add_tool]tool_nodeToolNode(tools)# judge 函数defjudge(state):returnOKifstate[x]5elseFAIL② 构建流图并编译graphStateGraph(MyState)graph.add_node(step1,step1)graph.add_node(OK,ok)graph.add_node(FAIL,fail)graph.add_node(tool,tool_node)graph.set_entry_point(step1)# 条件分支graph.add_conditional_edges(step1,judge,{OK:OK,FAIL:FAIL})# 最后统一走工具节点graph.add_edge(OK,tool)graph.add_edge(FAIL,tool)# 编译图appgraph.compile()③ astream 流式运行asyncforchunkinapp.astream({messages:[],x:2}):print(chunk)输出示例结构化{step1: {x: 5}} {FAIL: {result: 未通过检查, messages: [{role: assistant, content: 检测失败执行工具}]}} {tool: {messages: []}}五、场景级示例① Web API 示例Flask astreamfromflaskimportFlask,request,Responseimportjsonimportasyncio app_serverFlask(__name__)app_server.route(/run,methods[POST])defrun_graph():datarequest.jsonasyncdefgenerate():asyncforeventinapp.astream(data):yieldjson.dumps(event,ensure_asciiFalse)\nreturnResponse(generate(),mimetypetext/event-stream)② Token 级事件监听LLM streamingasyncforeinapp.astream_events({query:给我讲一个故事}):ife[event]token:print(e[data],end)③ 获取执行状态stateapp.get_state()print(state.values)六、总结类别方法用途执行invoke / ainvoke完整执行一次流式stream / astream边执行边输出事件astream_events更细粒度事件工具list_tools查看工具状态get_state获取当前状态