2026/4/9 6:54:12
网站建设
项目流程
写代码做网站,网站flash模板,网络推广公司哪家做得好,建材营销型的网站‘Batch Inference’ 优化#xff1a;利用 RunnableBatch 实现跨模型供应商的并行请求合并随着人工智能技术#xff0c;特别是大型语言模型#xff08;LLM#xff09;的飞速发展#xff0c;越来越多的企业和开发者开始将LLM集成到他们的应用中。然而#xff0c;与这些强大…‘Batch Inference’ 优化利用RunnableBatch实现跨模型供应商的并行请求合并随着人工智能技术特别是大型语言模型LLM的飞速发展越来越多的企业和开发者开始将LLM集成到他们的应用中。然而与这些强大模型交互时效率和成本始终是核心考量。面对高并发、多用户请求的场景以及需要整合来自不同供应商的模型服务时如何有效优化推理性能降低运营成本并提高系统吞吐量成为了一个迫切需要解决的问题。今天我们将深入探讨一种强大的优化策略批量推理Batch Inference并重点介绍 LangChain 框架中一个专门为此设计的组件——RunnableBatch。我们将从基础概念出发逐步深入到其工作原理、实际应用场景特别是如何利用它实现跨模型供应商的并行请求合并最终提升我们应用的整体性能和可扩展性。一、批量推理Batch Inference的基石为什么我们需要它在分布式系统和微服务架构中每一次对外部服务的调用都伴随着一定的固定开销网络握手、协议协商、数据序列化/反序列化、API 鉴权等等。对于LLM调用而言这些开销是不可忽视的。当应用需要处理大量独立但结构相似的请求时如果每个请求都单独调用一次LLM API这些固定开销就会被反复叠加导致高延迟 (High Latency)尽管单个请求的推理时间可能很短但网络和协议开销可能占据主导地位导致用户体验不佳。低吞吐量 (Low Throughput)服务器资源如CPU、内存、网络带宽在处理大量小请求时频繁的上下文切换和资源争用会降低整体的处理能力。高成本 (High Cost)许多LLM供应商的计费模式可能包含请求次数的考量即使主要是按 Token 计费减少请求次数也能摊薄固定成本。资源利用率低下 (Poor Resource Utilization)对于提供模型服务的硬件如GPU小批量请求可能无法充分利用其并行计算能力。批量推理的核心思想是将多个独立的输入数据打包成一个单一的请求然后发送给模型进行一次性处理。模型处理完整个批次后再将结果返回。这种方式带来的优势显而易见分摊固定开销一次网络传输、一次鉴权、一次模型加载处理多个请求的数据。提高吞吐量模型可以更高效地利用底层硬件的并行性特别是GPU。降低成本减少API调用次数可能节省计费费用。更好的资源利用率模型服务可以一次性处理更多数据减少空闲时间。然而批量推理也并非没有挑战延迟增加为了凑齐一个批次可能需要等待一段时间这会增加单个请求的端到端延迟。需要在吞吐量和延迟之间找到平衡。异构输入处理批次中的输入可能长度不同需要填充padding或更复杂的处理。错误处理批次中某个输入失败时如何处理整个批次或只处理失败项供应商API兼容性并非所有LLM供应商都提供原生批处理API或者其接口各异。正是为了应对这些挑战LangChain 引入了RunnableBatch。二、LangChain 与 LCEL构建可组合的AI应用在深入RunnableBatch之前我们有必要简要回顾一下 LangChain 及其核心概念 LangChain Expressive Language (LCEL)。LangChain 是一个用于开发由语言模型驱动的应用程序的框架。它提供了一系列工具、组件和接口使得开发者能够轻松地构建复杂的AI应用如问答系统、聊天机器人、数据分析工具等。LCEL 则是 LangChain 中用于构建可组合链chains的声明式方式。LCEL 的核心是Runnable接口。任何实现Runnable接口的对象都定义了invoke方法用于处理单个输入和batch方法用于处理多个输入。这种设计使得 LCEL 链天然支持并行化和批处理。通过 LCEL我们可以像搭积木一样将不同的组件如 LLM、PromptTemplate、OutputParser、自定义函数等连接起来形成一个完整的处理流程。LCEL 链的优势在于可组合性所有组件都是Runnable可以轻松地进行组合。异步支持原生支持async/await便于构建高性能应用。流式处理支持数据流式传输提升用户体验。并行执行自动识别并优化并行执行的机会。可观测性易于集成日志、追踪和监控。RunnableBatch正是 LCEL 生态系统中的一个重要成员它利用了Runnable接口的batch能力并在此基础上提供了更高级的批处理管理功能。三、RunnableBatch的深度解析工作原理与参数RunnableBatch的核心职责是作为代理将多个针对底层Runnable的invoke调用聚合成一个batch调用。它在内部维护一个队列收集传入的请求并在达到特定条件时例如队列中的请求数量达到阈值或等待时间超过阈值触发底层的批处理操作。3.1RunnableBatch的构造函数与核心参数让我们来看看RunnableBatch的主要构造函数签名及其关键参数from typing import Any, Callable, List, Optional, Sequence, Union from langchain_core.runnables import Runnable, RunnableBatch as _RunnableBatch class RunnableBatch(_RunnableBatch): def __init__( self, bound: Runnable[Sequence[Any], Sequence[Any]], # 强制要求bound runnable支持batch方法 *, max_batch_size: int 64, max_batch_time: float 0.1, # seconds wait_until_full: bool False, default_response: Optional[Any] None, batch_fn: Optional[Callable[[List[Any]], Any]] None, ): # ...bound(Runnable[Sequence[Any], Sequence[Any]]):这是RunnableBatch包装的底层Runnable。关键在于这个bound的Runnable必须能够处理Sequence[Any]类型的输入并返回Sequence[Any]类型的输出也就是说它必须支持其自身的batch方法。如果它只支持invoke那么RunnableBatch就无法将其转换为批处理调用。但是RunnableBatch提供了batch_fn参数来解决这个问题我们后面会详细讨论。max_batch_size(int, default64):一个批次中可以包含的最大请求数量。当收集到的请求数量达到这个值时RunnableBatch会立即触发底层的batch调用。max_batch_time(float, default0.1):一个批次可以等待的最长时间秒。如果在这个时间内没有达到max_batch_size但时间已到RunnableBatch也会触发底层的batch调用即使批次不满。这个参数在平衡延迟和吞吐量之间起着关键作用。wait_until_full(bool, defaultFalse):如果设置为TrueRunnableBatch会一直等待直到批次达到max_batch_size才触发调用即使max_batch_time已经过期。这通常在对延迟不敏感但对批次效率要求极高的场景中使用。default_response(Optional[Any], defaultNone):当底层bound的batch调用中某个子请求发生错误时RunnableBatch会用这个default_response来填充对应位置的结果而不是让整个批次失败。这对于构建容错系统非常有用。batch_fn(Optional[Callable[[List[Any]], Any]], defaultNone):这是一个非常强大的参数。如果你的boundRunnable没有实现batch方法或者你希望对批处理的输入/输出进行自定义的预处理/后处理你可以提供一个batch_fn。这个函数会接收一个List[Any]作为输入即聚合后的批次并期望返回一个Any类型的结果通常也是一个List[Any]对应批次中的每个输入。RunnableBatch会用这个batch_fn来替代bound.batch()调用。3.2RunnableBatch的内部工作机制RunnableBatch在幕后做的工作可以概括为以下几个步骤请求入队当一个针对RunnableBatch实例的invoke或ainvoke调用发生时它不会立即调用底层boundRunnable的invoke方法。相反它会将这个请求及其上下文放入一个内部的等待队列中。定时器与计数器RunnableBatch会启动一个定时器基于max_batch_time并维护一个计数器基于max_batch_size。批次触发条件当队列中的请求数量达到max_batch_size时且wait_until_full为False。当max_batch_time到期时且wait_until_full为False。如果wait_until_full为True则只在达到max_batch_size时触发。执行批处理一旦触发条件满足RunnableBatch会从队列中取出所有等待的请求将它们的输入聚合成一个列表。如果提供了batch_fn则调用batch_fn(aggregated_inputs)。否则调用bound.batch(aggregated_inputs)。结果分发底层批处理操作完成后RunnableBatch会将返回的结果通常是一个列表与原始的请求一一对应然后将每个子请求的结果返回给各自的调用方。如果某个子请求失败并且default_response已设置则返回default_response。这种机制使得RunnableBatch能够透明地将多个零散的invoke调用转换为高效的batch调用极大地简化了批处理逻辑的实现。四、实践单模型供应商的基础批量推理让我们从最简单的场景开始对单个LLM供应商的模型进行批量推理。我们将使用 OpenAI 的ChatOpenAI作为示例。首先确保你已经安装了必要的库并设置了API密钥pip install langchain langchain-openai python-dotenv# .env 文件 # OPENAI_API_KEYyour_openai_api_key import os from dotenv import load_dotenv load_dotenv() # 加载 .env 文件中的环境变量 from langchain_openai import ChatOpenAI from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser from langchain_core.runnables import RunnableParallel, RunnablePassthrough, RunnableLambda import time import asyncio import random # 定义一个基础的LLM链 llm ChatOpenAI(model_namegpt-3.5-turbo, temperature0.7) prompt ChatPromptTemplate.from_messages([ (system, 你是一个专业的文案助手。), (user, {text}) ]) output_parser StrOutputParser() # 基础链支持 invoke 和 batch basic_chain prompt | llm | output_parser print(--- 基础链的单次调用 (invoke) ---) start_time time.perf_counter() result_invoke basic_chain.invoke({text: 请为我生成一个关于智能家居的短广告语。}) end_time time.perf_counter() print(f单个 invoke 结果: {result_invoke[:50]}...) print(f单个 invoke 耗时: {end_time - start_time:.4f} 秒n) print(--- 基础链的批量调用 (batch) ---) texts_for_batch [ 请为我生成一个关于智能家居的短广告语。, 请为我生成一个关于环保出行的新闻标题。, 请为我生成一个关于健康饮食的社交媒体帖子。, 请为我生成一个关于儿童教育的口号。, 请为我生成一个关于未来科技趋势的摘要。 ] inputs_for_batch [{text: t} for t in texts_for_batch] start_time time.perf_counter() results_batch basic_chain.batch(inputs_for_batch) end_time time.perf_counter() print(f批量 batch 结果 (前20字符): {[r[:20] for r in results_batch]}) print(f批量 batch 耗时: {end_time - start_time:.4f} 秒) print(f平均每个请求耗时 (batch): {(end_time - start_time) / len(texts_for_batch):.4f} 秒n)从上面的输出可以看出batch调用显著降低了平均每个请求的耗时体现了批量推理的优势。但是如果我们的应用逻辑是零散地触发这些请求而不是一次性收集好一个批次再调用batch呢这就是RunnableBatch发挥作用的地方。4.1 使用RunnableBatch包装基础链现在我们用RunnableBatch来包装basic_chain。from langchain_core.runnables import RunnableBatch # 使用 RunnableBatch 包装我们的基础链 # 设定 max_batch_size 为 3max_batch_time 为 0.5 秒 batched_chain RunnableBatch( boundbasic_chain, max_batch_size3, max_batch_time0.5, default_response抱歉处理失败。 ) async def simulate_concurrent_requests(chain_to_test, num_requests10): print(fn--- 模拟 {num_requests} 个并发请求到 {RunnableBatch if isinstance(chain_to_test, RunnableBatch) else 原始链} ---) start_time_total time.perf_counter() async def single_request(i): text f请生成一个关于主题 {i} 的简短描述。 input_data {text: text} try: result await chain_to_test.ainvoke(input_data) # print(f请求 {i} 结果: {result[:30]}...) return f请求 {i} 成功 except Exception as e: # print(f请求 {i} 失败: {e}) return f请求 {i} 失败 tasks [single_request(i) for i in range(num_requests)] results await asyncio.gather(*tasks) end_time_total time.perf_counter() total_duration end_time_total - start_time_total print(f总耗时: {total_duration:.4f} 秒) print(f平均每个请求耗时: {total_duration / num_requests:.4f} 秒) # print(所有请求结果状态:, results) # 模拟直接对原始链进行并发调用 (每个都是单独的 invoke) # 注意这可能会导致API限速或性能瓶颈 # await simulate_concurrent_requests(basic_chain, num_requests10) # 模拟通过 RunnableBatch 进行并发调用 # RunnableBatch 会在内部将这些 invoke 聚合为 batch 调用 asyncio.run(simulate_concurrent_requests(batched_chain, num_requests10)) # 尝试模拟一个导致 batch_time 超时的场景 batched_chain_small_batch_time RunnableBatch( boundbasic_chain, max_batch_size10, # 设置一个较大的批次大小确保不会轻易达到 max_batch_time0.1, # 设置一个较小的超时时间 default_response抱歉处理失败。 ) asyncio.run(simulate_concurrent_requests(batched_chain_small_batch_time, num_requests5))通过RunnableBatch包装后即使我们以ainvoke的方式发起多个看似独立的异步请求RunnableBatch也会在幕后智能地将它们聚合成批次然后调用底层basic_chain的batch方法。这使得我们的应用代码可以保持简单的invoke逻辑而底层的性能优化则由RunnableBatch自动完成。在上面的例子中当num_requests10时max_batch_size3和max_batch_time0.5意味着RunnableBatch会尝试创建 3 个批次3, 3, 3, 1。每次批次调用都会分摊固定开销从而降低平均请求延迟。如果max_batch_time很短即使请求不多也会尽快发出小批次。五、跨模型供应商的并行请求合并现在我们进入更复杂的场景如何利用RunnableBatch实现跨不同模型供应商的并行请求合并。这在需要结合不同模型能力、进行模型A/B测试、或作为多模型路由策略的一部分时非常有用。假设我们的应用需要从两个不同的模型供应商例如OpenAI 和 Anthropic获取文本生成结果并且希望这些请求也能被批量处理。首先我们需要 Anthropic 的模型。pip install langchain-anthropicimport os from dotenv import load_dotenv load_dotenv() # 确保加载所有API密钥 from langchain_openai import ChatOpenAI from langchain_anthropic import ChatAnthropic # 导入 Anthropic 模型 from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser from langchain_core.runnables import RunnableParallel, RunnablePassthrough, RunnableLambda, RunnableBatch import time import asyncio import random # --- 模型和链定义 --- # 1. OpenAI 模型链 llm_openai ChatOpenAI(model_namegpt-3.5-turbo, temperature0.7) prompt_openai ChatPromptTemplate.from_messages([ (system, 你是一个简洁的助手。), (user, {text}) ]) chain_openai prompt_openai | llm_openai | StrOutputParser() # 2. Anthropic 模型链 # 确保 ANTHROPIC_API_KEY 已在 .env 中设置 llm_anthropic ChatAnthropic(model_nameclaude-3-haiku-20240307, temperature0.7) prompt_anthropic ChatPromptTemplate.from_messages([ (system, 你是一个富有创意和详细的助手。), (user, {text}) ]) chain_anthropic prompt_anthropic | llm_anthropic | StrOutputParser() print(--- 准备跨供应商批量处理 ---) # --- 使用 RunnableBatch 包装每个供应商的链 --- # 为 OpenAI 链创建批处理器 batched_openai_chain RunnableBatch( boundchain_openai, max_batch_size5, max_batch_time0.2, default_responseOpenAI 响应失败 ) # 为 Anthropic 链创建批处理器 batched_anthropic_chain RunnableBatch( boundchain_anthropic, max_batch_size5, max_batch_time0.2, default_responseAnthropic 响应失败 ) # --- 组合这些批处理器以实现并行请求合并 --- # 使用 RunnableParallel 将相同的输入发送到两个批处理器 # 这里的 并行 指的是从应用层面看两个模型供应商的批处理是同时启动的 # 内部每个 batched_xxx_chain 会各自收集请求并进行批处理 combined_batched_chain RunnableParallel( openai_resultbatched_openai_chain, anthropic_resultbatched_anthropic_chain ) async def simulate_cross_vendor_requests(num_requests10): print(fn--- 模拟 {num_requests} 个并发请求到跨供应商批处理链 ---) start_time_total time.perf_counter() async def single_request_to_combined(i): text f请为主题 {chr(65 i % 26)} 生成一个简短的创意描述。 input_data {text: text} try: # 调用 combined_batched_chain.ainvoke 会同时触发 batched_openai_chain 和 batched_anthropic_chain 的 ainvoke # 它们各自会把请求放入自己的批处理队列中 result await combined_batched_chain.ainvoke(input_data) # print(f请求 {i} 结果 (OpenAI): {result[openai_result][:30]}...) # print(f请求 {i} 结果 (Anthropic): {result[anthropic_result][:30]}...) return f请求 {i} 成功 except Exception as e: # print(f请求 {i} 失败: {e}) return f请求 {i} 失败 tasks [single_request_to_combined(i) for i in range(num_requests)] results await asyncio.gather(*tasks) end_time_total time.perf_counter() total_duration end_time_total - start_time_total print(f总耗时: {total_duration:.4f} 秒) print(f平均每个请求耗时: {total_duration / num_requests:.4f} 秒) print(所有请求结果状态:, results) asyncio.run(simulate_cross_vendor_requests(num_requests10)) # 进一步的例子如何处理不同的输入路径 print(n--- 模拟路由到特定供应商的批处理链 ---) # 假设我们有一个路由器根据输入决定使用哪个模型 def decide_model(input_data: dict) - str: if creative in input_data[query].lower(): return anthropic return openai # 定义一个路由器将请求导向不同的批处理器 router_chain RunnablePassthrough.assign( model_choiceRunnableLambda(lambda x: decide_model(x)) ) | { openai_response: RunnableLambda(lambda x: batched_openai_chain.ainvoke({text: x[query]})) .when(lambda x: x[model_choice] openai), anthropic_response: RunnableLambda(lambda x: batched_anthropic_chain.ainvoke({text: x[query]})) .when(lambda x: x[model_choice] anthropic), original_query: RunnablePassthrough() } async def simulate_routed_batched_requests(num_requests10): print(fn--- 模拟 {num_requests} 个路由到供应商的并发请求 ---) start_time_total time.perf_counter() async def single_routed_request(i): query f描述一个普通物体比如椅子主题 {chr(65 i % 26)} if i % 3 0: query f给我一个非常creative的关于科幻主题 {chr(65 i % 26)} 的故事开头。 input_data {query: query} try: result await router_chain.ainvoke(input_data) # print(f请求 {i} 路由结果: {result}) return f请求 {i} 成功 except Exception as e: # print(f请求 {i} 失败: {e}) return f请求 {i} 失败 tasks [single_routed_request(i) for i in range(num_requests)] results await asyncio.gather(*tasks) end_time_total time.perf_counter() total_duration end_time_total - start_time_total print(f总耗时: {total_duration:.4f} 秒) print(f平均每个请求耗时: {total_duration / num_requests:.4f} 秒) print(所有请求结果状态:, results) asyncio.run(simulate_routed_batched_requests(num_requests10))在这个示例中我们为 OpenAI 和 Anthropic 各自创建了一个RunnableBatch实例 (batched_openai_chain和batched_anthropic_chain)。每个实例都独立地管理其各自供应商的批处理队列。通过RunnableParallel我们创建了一个combined_batched_chain。当对combined_batched_chain调用ainvoke时它会同时向batched_openai_chain和batched_anthropic_chain发送请求。关键点尽管combined_batched_chain看似并行地调用了两个批处理器但这两个批处理器本身是独立的。它们各自会收集请求并在达到自己的max_batch_size或max_batch_time时向其对应的 LLM 供应商发起一次批处理请求。后续的路由示例展示了如何根据输入动态选择使用哪个批处理器。这使得我们可以在一个统一的接口下根据业务逻辑将请求智能地分发到不同的模型供应商并同时享受到批处理带来的性能优势。这种模式极大地简化了跨供应商模型集成的复杂性。开发者无需手动管理批处理队列、定时器和结果映射RunnableBatch会在 LCEL 链中自动处理这些细节。六、高级用法自定义批处理函数与错误处理RunnableBatch的batch_fn参数和default_response参数提供了强大的定制和容错能力。6.1batch_fn自定义批处理逻辑有时你包装的Runnable可能没有原生的batch方法或者你希望在发送批次请求之前/之后进行一些特殊的处理例如对所有输入进行统一的格式转换或者在返回结果时进行聚合。这时batch_fn就派上用场了。import time import asyncio from typing import List, Any # 模拟一个没有原生 batch 方法的慢速 Runnable class SlowTextProcessor(Runnable): def __init__(self, delay_per_item: float 0.1): self.delay_per_item delay_per_item def invoke(self, input: str, configNone) - str: time.sleep(self.delay_per_item) # 模拟处理时间 return fProcessed: {input.upper()} # 注意这里没有实现 batch 方法 # 定义一个自定义的批处理函数 def custom_batch_processor(inputs: List[str]) - List[str]: print(fn[Custom Batch Processor] 正在处理批次大小: {len(inputs)}) results [] # 模拟批处理的并行或优化处理 # 实际应用中这里可能是调用一个不支持batch但我们希望批量发送的API # 或者对所有输入进行一些预处理再批量发送到某个服务 for i, item in enumerate(inputs): time.sleep(0.05) # 模拟一些处理时间但比单个 invoke 快 results.append(fCustom Batched Processed: {item.lower()} (Index: {i})) print([Custom Batch Processor] 批次处理完成。) return results # 使用 RunnableBatch 包装 SlowTextProcessor并提供 custom_batch_processor 作为 batch_fn # max_batch_size 设置为 3max_batch_time 设置为 0.5 秒 custom_batched_processor RunnableBatch( boundSlowTextProcessor(delay_per_item0.3), # 单个处理很慢 max_batch_size3, max_batch_time0.5, batch_fncustom_batch_processor, default_responseCustom Batch Processor 失败 ) async def simulate_custom_batch_requests(num_requests10): print(fn--- 模拟 {num_requests} 个并发请求到带自定义 batch_fn 的链 ---) start_time_total time.perf_counter() async def single_request(i): text fitem {i} try: result await custom_batched_processor.ainvoke(text) # print(f请求 {i} 结果: {result}) return f请求 {i} 成功 except Exception as e: # print(f请求 {i} 失败: {e}) return f请求 {i} 失败 tasks [single_request(i) for i in range(num_requests)] results await asyncio.gather(*tasks) end_time_total time.perf_counter() total_duration end_time_total - start_time_total print(f总耗时: {total_duration:.4f} 秒) print(f平均每个请求耗时: {total_duration / num_requests:.4f} 秒) print(所有请求结果状态:, results) asyncio.run(simulate_custom_batch_requests(num_requests10))在这个例子中SlowTextProcessor没有实现batch方法。但我们通过batch_fncustom_batch_processor为RunnableBatch提供了一个自定义的批处理逻辑。RunnableBatch会收集请求然后将它们作为一个列表传递给custom_batch_processor。custom_batch_processor可以对这个列表进行任何处理并返回一个结果列表RunnableBatch会将这些结果映射回原始的ainvoke调用。这使得RunnableBatch的适用范围大大扩展。6.2default_response优雅处理批次内错误在批处理中如果批次中的某一个或几个输入导致底层模型失败我们通常不希望整个批次都失败。default_response参数就是为此设计的。import time import asyncio from typing import List, Any from langchain_core.runnables import RunnableBatch, Runnable # 模拟一个会随机失败的 Runnable class FailingProcessor(Runnable): def invoke(self, input: str, configNone) - str: if fail in input.lower(): raise ValueError(f故意失败: {input}) time.sleep(0.1) return fProcessed: {input} def batch(self, inputs: List[str], configNone) - List[str]: results [] for input_item in inputs: try: # 模拟批处理中某个项目失败 if batch_fail in input_item.lower(): raise ValueError(f批次内故意失败: {input_item}) time.sleep(0.05) # 模拟批处理中的单个项目处理时间 results.append(fBatched Processed: {input_item}) except Exception as e: # 在实际的 batch 实现中你可能需要将错误捕获并返回一个特定的标记 # 或者让 RunnableBatch 的 default_response 处理 # 这里我们让它抛出看 RunnableBatch 如何处理 results.append(e) # 返回错误对象让 RunnableBatch 替换 return results # 使用 RunnableBatch 包装 FailingProcessor # 设置 default_response batched_failing_processor RunnableBatch( boundFailingProcessor(), max_batch_size5, max_batch_time0.5, default_response--- 错误已处理 --- # 当子请求失败时返回此值 ) async def simulate_failing_batch_requests(num_requests10): print(fn--- 模拟 {num_requests} 个并发请求到带 default_response 的链 ---) start_time_total time.perf_counter() async def single_request(i): text fitem {i} if i % 3 0: # 模拟部分请求会失败 text fbatch_fail item {i} input_data text try: result await batched_failing_processor.ainvoke(input_data) print(f请求 {i} 结果: {result}) return f请求 {i} 成功 if result ! --- 错误已处理 --- else f请求 {i} 失败 (被 default_response 捕获) except Exception as e: print(f请求 {i} 真的失败了: {e}) # 只有当 default_response 没有捕获到时才会到这里 return f请求 {i} 真的失败了 tasks [single_request(i) for i in range(num_requests)] results await asyncio.gather(*tasks) end_time_total time.perf_counter() total_duration end_time_total - start_time_total print(f总耗时: {total_duration:.4f} 秒) print(f所有请求结果状态:, results) asyncio.run(simulate_failing_batch_requests(num_requests10))在这个例子中FailingProcessor的batch方法在处理包含 batch_fail 的输入时会抛出异常。RunnableBatch捕获到这些异常并用我们定义的default_response(— 错误已处理 —) 替换了这些失败的结果从而保证了批处理的整体流程不会中断同时调用方也能收到一个明确的错误指示。这对于构建高可用的系统至关重要因为它可以防止单个问题导致整个批次甚至整个应用程序崩溃。七、性能考量与权衡使用RunnableBatch进行批量推理确实能带来显著的性能提升但这种提升并非没有代价我们需要仔细权衡几个关键参数。7.1 关键参数的权衡参数描述影响建议与考量max_batch_size一个批次中最大请求数。吞吐量: 越大潜在吞吐量越高API固定开销分摊越充分。延迟: 越大单个请求等待批次填满的时间可能越长导致延迟增加。内存/计算: 越大单次模型推理的资源消耗越大。根据模型提供商的推荐批次大小、模型本身的计算效率以及系统可用内存来设置。如果请求量高且稳定可以设置较大值。如果请求量波动大可能需要设置较小值或配合max_batch_time。max_batch_time一个批次等待的最大时间秒。延迟: 越小单个请求的等待时间越短延迟越低。吞吐量: 越小可能导致批次不满就发出降低批次效率减少吞吐量。API调用频率: 越小API调用频率可能越高。平衡延迟和吞吐量的关键。对于实时性要求高的应用设置较小值。对于后台任务可以设置较大值以最大化批次效率。通常建议从 0.1-0.5 秒开始尝试。wait_until_full是否等待批次完全填满才触发。吞吐量:True时最大化批次效率可能带来最高吞吐量。延迟:True时可能导致无限等待或极高延迟如果请求量不足以填满批次。仅在确定请求流足够稳定且能快速填满批次并且对延迟不敏感的场景下使用True。绝大多数实时应用应设置为False。default_response批次中单个请求失败时的默认响应。容错性: 增强系统容错能力防止单个失败影响整个批次。调试: 可能掩盖底层错误需要配合日志和监控。强烈建议设置以提高系统的健壮性。返回的default_response应该足够清晰表示该请求失败以便上层应用进行处理。batch_fn自定义批处理函数。灵活性: 允许处理不提供原生batch方法的Runnable或进行自定义预/后处理。复杂性: 引入额外的逻辑需要开发者自行管理批处理的输入输出并确保其高效性。当底层Runnable不支持batch或需要特殊处理时使用。确保batch_fn本身是高效的否则会抵消RunnableBatch带来的优化。7.2 性能基准测试与监控要真正理解RunnableBatch在您的特定应用场景下的效果进行实际的基准测试至关重要。定义明确的指标吞吐量 (Throughput)每秒处理的请求数 (RPS)。平均延迟 (Average Latency)每个请求从发出到收到结果的平均时间。P90/P99 延迟 (Percentile Latency)90% 或 99% 的请求所花费的时间。这对于衡量用户体验的稳定性非常重要。成本 (Cost)在不同批处理策略下的API调用成本。模拟真实负载使用工具如 Locust、JMeter、k6 或简单的并发脚本模拟您的应用可能面临的并发请求模式。迭代调整参数从小批次大小和短超时时间开始。逐渐增加max_batch_size观察吞吐量和延迟的变化。调整max_batch_time观察它如何平衡批次大小和请求延迟。在每次调整后重新运行基准测试并记录结果。监控在生产环境中集成监控系统如 Prometheus、Grafana来跟踪RunnableBatch实例的实际批次大小、批处理延迟、成功率和错误率。这有助于识别瓶颈并动态调整配置。7.3RunnableBatch的局限性尽管RunnableBatch非常强大但它并非万能药。它主要适用于以下场景输入相互独立批次中的每个输入请求的计算不依赖于批次中其他请求的结果。输出顺序与输入顺序一致底层batch方法或batch_fn必须保证返回结果的顺序与输入顺序一致RunnableBatch才能正确地将结果映射回原始请求。同质或可统一处理的输入批次中的输入虽然可能内容不同但结构和处理方式应足够相似以便于一次性处理。对于需要复杂依赖关系、动态批次重组或更高级调度策略的场景可能需要结合队列系统如 Kafka、RabbitMQ和更复杂的自定义批处理服务来实现。八、最佳实践与设计模式为了最大限度地发挥RunnableBatch的优势并构建健壮的系统请遵循以下最佳实践按供应商和模型粒度创建RunnableBatch实例为每个不同的 LLM 供应商或模型如果您有多个模型实例创建独立的RunnableBatch实例。这有助于隔离配置、错误处理和性能指标。# 错误示例共享一个 RunnableBatch 实例处理不同的模型 # BAD: all_models_batcher RunnableBatch(some_generic_chain) # GOOD: openai_batcher RunnableBatch(openai_specific_chain, max_batch_size50, max_batch_time0.1) anthropic_batcher RunnableBatch(anthropic_specific_chain, max_batch_size30, max_batch_time0.2)合理设置max_batch_size和max_batch_time根据您的应用场景实时性要求、预期负载、模型提供商的速率限制进行调优。对于低延迟应用max_batch_time应该较小对于高吞吐量应用max_batch_size可以较大。利用default_response进行容错始终为RunnableBatch配置default_response以优雅地处理批次中个别请求的失败避免整个批次或链条中断。结合 LCEL 的路由能力当需要根据输入动态选择模型时将RunnableBatch与RunnableLambda和when方法结合构建智能路由器。from langchain_core.runnables import RunnableLambda, RunnableBranch router RunnableBranch( (lambda x: x[type] creative, creative_model_batcher), (lambda x: x[type] factual, factual_model_batcher), default_model_batcher # 默认处理器 )异步优先在与RunnableBatch交互时尽可能使用ainvoke和abatch方法。RunnableBatch本身是为异步操作设计的这将确保您的应用程序能够充分利用并发性。监控和日志对RunnableBatch实例的运行情况进行监控。记录批次大小、批处理时间、错误率等关键指标。这有助于您理解其性能特点并在生产环境中进行问题排查。预热Warm-up在生产部署后通过发送少量请求预热RunnableBatch实例。这可以确保内部队列和定时器正常启动并避免冷启动带来的初始延迟高峰。九、未来的展望RunnableBatch作为 LangChain LCEL 的一部分为LLM应用的性能优化提供了开箱即用的解决方案。展望未来我们可以期待更智能的自适应批处理根据实时负载和性能指标动态调整max_batch_size和max_batch_time进一步优化资源利用。与更广泛的生态系统集成例如与消息队列系统Kafka、RabbitMQ的更深层集成以处理跨服务边界的异步批处理流。批处理的透明度与可观测性增强提供更丰富的钩子和指标让开发者能够更细致地了解批处理内部的运作。针对特定模型类型的优化例如为长上下文或多模态输入提供更专业的批处理策略。RunnableBatch极大地简化了在 LangChain 应用中实现高效批量推理的复杂性它提供了一个统一、声明式的方式来聚合对底层Runnable的调用无论是针对单个模型供应商还是跨多个。通过合理配置和应用开发者可以显著提升其LLM应用的吞吐量降低延迟并优化运营成本从而构建出更具弹性、高性能和成本效益的AI驱动解决方案。它是 LangChain LCEL 强大可组合性理念的一个绝佳体现也是构建未来AI应用不可或缺的工具之一。