2026/1/17 16:07:03
网站建设
项目流程
贵阳建站公司,wordpress 调用php,wordpress给分类做模板,wordpress php调优分布式爬虫全链路追踪系统设计与实现
一、背景与问题
在分布式爬虫系统中#xff0c;一个初始请求可能会派生出数十子请求#xff08;列表页→详情页→评论页→下一页…#xff09;#xff0c;这些请求分布在不同的消费者进程中执行。如何追踪整个任务链的执行状态、性能…分布式爬虫全链路追踪系统设计与实现一、背景与问题在分布式爬虫系统中一个初始请求可能会派生出数十子请求列表页→详情页→评论页→下一页…这些请求分布在不同的消费者进程中执行。如何追踪整个任务链的执行状态、性能瓶颈以及调用关系成为系统可观测性的核心挑战。1.1 核心诉求全链路追踪从初始请求到所有子孙请求形成完整的调用链任务状态监控实时掌握每个请求的执行状态pending/success/failed性能分析记录每个请求的耗时、错误信息等任务完成回调当整棵任务树完成时自动触发通知日志关联在海量日志中快速定位某个任务链的所有相关日志二、技术方案OpenTelemetry 风格的 Trace/Span 模型2.1 核心概念参考 OpenTelemetry 的分布式追踪标准定义了三个关键ID# Request 类的追踪字段trace_id:str# 整个爬取任务的唯一标识所有子请求继承span_id:str# 当前请求节点的唯一标识parent_span_id:str# 父请求节点的 Span ID调用链示例列表页 trace_idabc12345 span_idspan001 parent_span_idNone 商品A详情 trace_idabc12345 # 继承父请求 span_idspan002 # 新生成 parent_span_idspan001 # 指向列表页 商品A评论第1页 trace_idabc12345 span_idspan003 parent_span_idspan002 # 指向详情页 商品A评论第2页 trace_idabc12345 span_idspan004 parent_span_idspan002 # 同一个父请求2.2 与 funboost task_id 的整合funboost 的task_id是消费任务的唯一标识我们将其与 span 信息结合# 格式parent_span_id:span_idtask_idf{result.parent_span_idor}:{result.span_idor}# 示例task_idspan001:span002# 表示父节点是 span001当前节点是 span002这样设计的好处task_id 即包含调用关系无需额外字段就能知道父子关系兼容 funboost仍然是唯一字符串不影响消息去重日志可读性parent_span_id - span_id一目了然三、系统架构分层设计3.1 架构图┌─────────────────────────────────────────────────────┐ │ 日志层 (Logger) │ │ - TaskIdLogger 自动注入 task_id │ │ - 日志格式: %(task_id)s parent_span_id:span_id │ └─────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────┐ │ Request 对象 (网络层) │ │ - trace_id / span_id / parent_span_id │ │ - 自动继承父请求的 trace_id │ │ - 自动生成新的 span_id │ └─────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────┐ │ Worker 处理层 (核心) │ │ - 反序列化 Request 对象 │ │ - 构造 task_id parent_span_id:span_id │ │ - 发布子请求时传递 task_id 到 funboost │ └─────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────┐ │ TaskTraceManager (追踪管理器) │ │ - 记录请求链路 (Redis InfluxDB) │ │ - 追踪任务树状态 │ │ - 任务完成时触发回调 │ └─────────────────────────────────────────────────────┘四、核心实现4.1 第一层funboost 日志系统扩展4.1.1 原始 funboost 日志配置funboost 默认的日志格式funboost_config.py:125-128NB_LOG_FORMATER_INDEX_FOR_CONSUMER_AND_PUBLISHERlogging.Formatter(f%(asctime)s-({nb_log_config_default.computer_ip},{nb_log_config_default.computer_name})f-[p%(process)d_t%(thread)d] - %(name)s - %(filename)s:%(lineno)d - f%(funcName)s - %(levelname)s - %(task_id)s - %(message)s,%Y-%m-%d %H:%M:%S,)关键字段%(task_id)sfunboost 自动注入的任务ID通过TaskIdLogger实现这个task_id来自funboost.core.current_task.get_current_taskid()这个能从上下文自动获取taskid 然后用给日志灰常好用4.1.2 我们的扩展方案目标让task_id显示为parent_span_id:span_id格式实现路径funboost 发布任务时我们主动传入task_id参数funboost 的TaskIdLogger会自动从上下文读取这个task_id日志输出时自动填充到%(task_id)s位置代码位置funspider/utils/fun_logger.pyfromfunboost.core.task_id_loggerimportTaskIdLogger# 使用 funboost 原生的 TaskIdLogger无需自定义log_managerLogManager(funspider,logger_clsTaskIdLogger)logger_config{log_level_int:10,is_add_stream_handler:True,formatter_template:FunboostCommonConfig.NB_LOG_FORMATER_INDEX_FOR_CONSUMER_AND_PUBLISHER,log_path:None,log_filename:None,}loggerlog_manager.get_logger_and_add_handlers(**logger_config)日志输出示例2025-12-14 13:13:08-(192.168.1.5,koohai)-[p28568_t30180] - funspider - worker.py:315 - _handle_parse_results - INFO - span001:span002 - 发布子请求: http://example.com/page14.2 第二层Request 对象的 Trace/Span 初始化4.2.1 Request 初始化时的自动追踪代码位置funspider/network/request.py:143-207classRequest:增强的Request类支持处理器直接修改和中间件选择def__init__(self,url:str,callback:Optional[Callable]None,headers:Optional[Dict[str,str]]None,meta:Optional[Dict[str,Any]]None,method:strGET,params:strNone,middleware_tags:List[str]None,middleware_conditions:List[str]None,pipeline_tags:List[str]None,pipeline_conditions:List[str]None,download:Optional[Callable]None,request_id:Optional[str]None,depth:Optional[int]None,parent_request_id:Optional[str]None,priority_params:Optional[Dict[str,Any]]None,trace_id:Optional[str]None,span_id:Optional[str]None,parent_span_id:Optional[str]None,queue_prefix:Optional[str]None,**kwargs):# 基础属性self.urlurl self.callbackcallback self.methodmethod self.request_idrequest_idoruuid.uuid4().hexself.queue_prefixqueue_prefix# 重要确保 headers 和 meta 是字典类型支持处理器直接修改self.headersheadersor{}self.metametaor{}# 设置深度和父请求ID到meta中self.meta[depth]depthor0self.meta[parent_request_id]parent_request_id# OpenTelemetry 风格的链路追踪 # Trace ID: 整个爬取任务的唯一标识所有子请求继承# Span ID: 当前请求节点的唯一标识# Parent Span ID: 父请求节点的 Span ID# 如果已经设置了 trace_id/span_id则不再重新生成避免覆盖用户自定义的值self.trace_id,self.span_id,self.parent_span_idself._init_trace_span(trace_id,span_id,parent_span_id,parent_request_id)# 保存到 meta 中方便序列化和传递self.meta[trace_id]self.trace_id self.meta[span_id]self.span_id self.meta[parent_span_id]self.parent_span_id# 中间件选择self.middleware_tagsmiddleware_tagsor[]self.middleware_conditionsmiddleware_conditionsor[]# Pipeline选择self.pipeline_tagspipeline_tagsor[]self.pipeline_conditionspipeline_conditionsor[]# 自定义下载器self.downloaddownload# PriorityConsumingControlConfig 参数任务级别控制self.priority_paramspriority_paramsor{}# 设置回调名称ifcallable(self.callback):self.callback_nameself.callback.__name__elifself.callbackisNone:self.callback_nameparse# 默认名称else:self.callback_namestr(self.callback)# HTTP参数处理 - 提取常用HTTP参数作为直接属性self._extract_http_params(kwargs)# 其他参数存储在kwargs中self.kwargskwargsdef_init_trace_span(self,trace_id:Optional[str],span_id:Optional[str],parent_span_id:Optional[str],parent_request_id:Optional[str])-tuple: 初始化 Trace ID 和 Span ID 逻辑 1. trace_id如果传入则继承否则生成新的16位短ID 2. span_id总是生成新的12位短ID 3. parent_span_id如果传入则使用否则为 None根节点 # 1. 生成或继承 Trace IDiftrace_id:final_trace_idtrace_id# 子请求继承elifparent_request_idandtrace_idinself.meta:final_trace_idself.meta.get(trace_id)else:final_trace_iduuid.uuid4().hex[:16]# 初始请求生成# 2. 生成 Span ID每个请求都是新的ifspan_id:final_span_idspan_idelse:final_span_iduuid.uuid4().hex[:12]# 3. 设置 Parent Span IDifparent_span_id:final_parent_span_idparent_span_idelifparent_request_idandspan_idinself.meta:final_parent_span_idself.meta.get(span_id)else:final_parent_span_idNone# 根节点returnfinal_trace_id,final_span_id,final_parent_span_id4.3 第三层Worker 处理子请求时的自动传递4.3.1 处理解析结果时注入追踪信息代码位置funspider/core/worker.py:260-319def_handle_parse_results(self,results,spider_instance,parent_request:Optional[Request]None): 处理解析结果自动为子请求注入追踪信息 # 1. 获取父请求的追踪信息parent_trace_idNoneparent_span_idNoneifparent_request:parent_trace_idparent_request.trace_id# 继承parent_span_idparent_request.span_id# 作为子请求的 parent_span_id# 从 request 里拿到span idforresultinresults:ifisinstance(result,Request):# 2. 为子请求设置 trace_id继承父请求# core 中判断是子请求还是 item 的时候 自动注入父traceid 。 因此 爬虫里只要给request 赋值一个 spanid就好了。 (还是习惯taskid和batchid 但是为了规范 )ifparent_trace_idandnotresult.trace_id:result.meta[trace_id]parent_trace_id result.trace_idparent_trace_id# 3. 为子请求设置 parent_span_id父请求的 span_idifparent_span_idandnotresult.parent_span_id:result.meta[parent_span_id]parent_span_id result.parent_span_idparent_span_id# 4. 如果子请求没有 span_id自动生成ifnotresult.span_id:new_span_iduuid.uuid4().hex[:12]result.span_idnew_span_id result.meta[span_id]new_span_id# 5. 构造 task_id格式parent_span_id:span_idtask_idf{result.parent_span_idor}:{result.span_idor}# 6. 日志输出显示完整追踪信息logger.debug(f[trace{result.trace_id}][span{result.span_id}][parent{result.parent_span_id}] f发布子请求:{result.url})# 7. 发布到 funboost 队列时传入 task_id# ... (见下一节)4.3.2 发布任务时传入 task_id代码位置funspider/core/engine.py发布方法defpublish_request(self,request:Request,task_id:strNone): 发布请求到队列 Args: request: 请求对象 task_id: 任务ID格式parent_span_id:span_id callback_namerequest.callback_name queue_prefixrequest.queue_prefixorself.default_queue_prefix# 获取或创建 Boosterboosterself._get_or_create_booster(queue_prefix,callback_name)# 关键传入 task_id 到 funboostbooster.publish(request.to_dict(),task_idtask_id,# funboost 会将其注入到上下文中**request.priority_params)funboost 内部处理task_id被存储到funboost.core.current_task上下文中TaskIdLogger的makeRecord()方法自动读取这个task_id日志格式化时填充到%(task_id)s位置4.4 第四层TaskTraceManager 统一追踪管理4.4.1 核心功能代码位置funspider/core/trace_stats.pyclassTaskTraceManager: 统一任务追踪管理器 功能 1. 【监控统计】记录请求链路、性能数据 2. 【任务协调】追踪任务树完成状态 3. 【完成回调】任务树完成时触发自定义回调 数据存储 - Redis: 实时状态1小时TTL - InfluxDB: 历史数据长期分析可选 def__init__(self,spider_name:str,redis_url:strNone,influx_config:Optional[Dict]None,on_tree_completed:Optional[Callable]None):self.spider_namespider_name self.redis_clientget_redis_client(redis_url)self.on_tree_completedon_tree_completed# InfluxDB 异步写入可选ifinflux_config:self._write_queueQueue(maxsize10000)self._writer_threadThread(targetself._background_writer,daemonTrue)self._writer_thread.start()4.4.2 记录请求链路defrecord_request(self,trace_id:str,span_id:str,parent_span_id:Optional[str],callback_name:str,url:str,status:strpending,metadata:Optional[Dict]None): 记录请求同时写入 Redis 和 InfluxDB Redis 数据结构 - trace:{trace_id}:total - {callback_name: count} - trace:{trace_id}:status:{status} - {callback_name: count} - trace:{trace_id}:children:{parent_span_id} - {callback_name: count} - trace:{trace_id}:request:{span_id} - {url, callback, parent_span_id, status, timestamp} timestamptime.time()# 1. Redis: 实时状态同步写入pipeself.redis_client.pipeline()pipe.hincrby(ftrace:{trace_id}:total,callback_name,1)pipe.hincrby(ftrace:{trace_id}:status:{status},callback_name,1)ifparent_span_id:pipe.hincrby(ftrace:{trace_id}:children:{parent_span_id},callback_name,1)request_keyftrace:{trace_id}:request:{span_id}pipe.hset(request_key,mapping{url:url,callback:callback_name,parent_span_id:parent_span_idor,status:status,timestamp:timestamp})pipe.expire(request_key,3600)# 1小时过期pipe.execute()# 2. InfluxDB: 历史数据异步写入ifself._write_api:self._write_queue.put({type:request,trace_id:trace_id,span_id:span_id,parent_span_id:parent_span_idorroot,callback_name:callback_name,status:status,url:url,timestamp:timestamp,metadata:metadataor{}})4.4.3 更新请求状态defupdate_status(self,trace_id:str,span_id:str,callback_name:str,old_status:str,new_status:str,duration_ms:Optional[float]None,error_msg:Optional[str]None): 更新请求状态pending - success/failed 同时记录 - 状态计数变更 - 请求耗时 - 错误信息 pipeself.redis_client.pipeline()pipe.hincrby(ftrace:{trace_id}:status:{old_status},callback_name,-1)pipe.hincrby(ftrace:{trace_id}:status:{new_status},callback_name,1)request_keyftrace:{trace_id}:request:{span_id}pipe.hset(request_key,status,new_status)ifduration_msisnotNone:pipe.hset(request_key,duration_ms,duration_ms)iferror_msg:pipe.hset(request_key,error,error_msg)pipe.execute()4.4.4 获取追踪树结构defget_trace_tree(self,trace_id:str)-Dict: 获取完整的请求树结构用于可视化 Returns: { trace_id: abc12345, tree: { span_id: span001, url: http://example.com, status: success, children: [ { span_id: span002, url: http://example.com/page1, status: success, children: [...] } ] } } # 1. 获取所有请求详情request_keysself.redis_client.keys(ftrace:{trace_id}:request:*)requests{}forkeyinrequest_keys:span_idkey.decode().split(:)[-1]dataself.redis_client.hgetall(key)requests[span_id]{k.decode():v.decode()fork,vindata.items()}# 2. 构建树结构root_nodes[]children_map{}forspan_id,req_datainrequests.items():parent_span_idreq_data.get(parent_span_id,)node{span_id:span_id,**req_data,children:[]}ifnotparent_span_id:root_nodes.append(node)else:ifparent_span_idnotinchildren_map:children_map[parent_span_id][]children_map[parent_span_id].append(node)# 3. 递归填充子节点deffill_children(node):span_idnode[span_id]ifspan_idinchildren_map:node[children]children_map[span_id]forchildinnode[children]:fill_children(child)forrootinroot_nodes:fill_children(root)return{trace_id:trace_id,tree:root_nodes[0]ifroot_nodeselseNone}4.4.5 异步批量写入 InfluxDBdef_background_writer(self): 后台线程批量写入 InfluxDB 策略 - 批量大小100 条 - 刷新间隔2 秒 points_buffer[]last_flushtime.time()whileTrue:try:# 非阻塞获取数据dataself._write_queue.get(timeout0.1)pointself._build_point(data)points_buffer.append(point)except:pass# 满足条件则刷新should_flush(len(points_buffer)100ortime.time()-last_flush2.0)ifshould_flushandpoints_buffer:self._write_api.write(bucketself._bucket,recordpoints_buffer)logger.debug(fInfluxDB 批量写入:{len(points_buffer)}条)points_buffer.clear()last_flushtime.time()def_build_point(self,data:Dict):构建 InfluxDB Pointpoint(Point(spider_request).tag(spider_name,self.spider_name).tag(trace_id,data[trace_id]).tag(span_id,data[span_id]).tag(parent_span_id,data[parent_span_id]).tag(callback,data[callback_name]).tag(status,data[status]).field(url,data[url]).field(count,1).time(int(data[timestamp]*1e9)))# 添加元数据如 duration_ms, error_msgifdata.get(metadata):fork,vindata[metadata].items():ifvisnotNone:point.field(k,v)returnpoint五、完整调用流程5.1 初始请求发布# 用户代码spiderMySpider()spider.start_requests()defstart_requests(self):requestRequest(urlhttp://example.com/list,callbackself.parse)# 此时自动生成# trace_idabc12345新生成# span_idspan001新生成# parent_span_idNone根节点yieldrequest5.2 Worker 消费初始请求# funspider/core/worker.pydef_process_request_task(self,spider_instance,payload:Dict):# 1. 反序列化requestRequest.from_dict(payload)# 2. 记录请求开始self.task_manager.record_request(trace_idrequest.trace_id,# abc12345span_idrequest.span_id,# span001parent_span_idrequest.parent_span_id,# Nonecallback_nameparse,urlrequest.url,statuspending)# 3. 下载 解析responsespider_instance.download(request)resultsspider_instance.parse(request,response)# 4. 处理解析结果发布子请求self._handle_parse_results(results,spider_instance,parent_requestrequest)# 5. 更新状态self.task_manager.update_status(trace_idrequest.trace_id,span_idrequest.span_id,callback_nameparse,old_statuspending,new_statussuccess,duration_ms100)5.3 发布子请求# funspider/core/worker.pydef_handle_parse_results(self,results,spider_instance,parent_request):forresultinresults:ifisinstance(result,Request):# 自动注入追踪信息result.trace_idparent_request.trace_id# abc12345继承result.parent_span_idparent_request.span_id# span001父节点result.span_iduuid.uuid4().hex[:12]# span002新生成# 构造 task_idtask_idf{result.parent_span_id}:{result.span_id}# span001:span002# 发布到队列self.engine.publish_request(result,task_idtask_id)# 日志输出# 2025-12-14 13:13:08 - funspider - worker.py:315 - INFO - span001:span002 - 发布子请求: ...5.4 消费子请求# Worker 消费 span002 任务def_process_request_task(self,spider_instance,payload:Dict):requestRequest.from_dict(payload)# 此时# request.trace_id abc12345# request.span_id span002# request.parent_span_id span001# funboost 上下文中的 task_id span001:span002# 日志自动显示# 2025-12-14 13:13:10 - funspider - worker.py:154 - DEBUG - span001:span002 - Processing request: ...# 记录请求self.task_manager.record_request(trace_idabc12345,span_idspan002,parent_span_idspan001,# 建立父子关系callback_nameparse_detail,urlrequest.url,statuspending)# ... 继续处理六、使用示例6.1 爬虫代码fromfunspiderimportBaseSpider,Request,ItemclassMySpider(BaseSpider):namemy_spiderdefstart_requests(self):# 初始请求自动生成 trace_id 和 span_idyieldRequest(urlhttp://example.com/list,callbackself.parse)defparse(self,request,response):解析列表页foriteminresponse.xpath(//div[classitem]):# 发布详情页请求自动继承 trace_id生成新 span_idyieldRequest(urlitem.xpath(./a/href).get(),callbackself.parse_detail)defparse_detail(self,request,response):解析详情页yieldItem(titleresponse.xpath(//h1/text()).get(),contentresponse.xpath(//div[classcontent]/text()).get())6.2 查询追踪树# 获取某个 trace 的完整调用树treespider.task_manager.get_trace_tree(abc12345)# 输出结果{trace_id:abc12345,tree:{span_id:span001,url:http://example.com/list,callback:parse,status:success,duration_ms:100,children:[{span_id:span002,url:http://example.com/item/1,callback:parse_detail,status:success,duration_ms:80,children:[]},{span_id:span003,url:http://example.com/item/2,callback:parse_detail,status:success,duration_ms:75,children:[]}]}}6.3 查询统计信息# 获取某个 trace 的统计信息statsspider.task_manager.get_trace_stats(abc12345)# 输出结果{total:{parse:1,parse_detail:2},pending:{parse:0,parse_detail:0},success:{parse:1,parse_detail:2},failed:{parse:0,parse_detail:0}}6.4 日志输出示例2025-12-14 13:13:08-(192.168.1.5,koohai)-[p28568_t30180] - funspider - worker.py:154 - _process_request_task - INFO - :span001 - Processing request: http://example.com/list 2025-12-14 13:13:08-(192.168.1.5,koohai)-[p28568_t30180] - funspider - worker.py:315 - _handle_parse_results - DEBUG - span001:span002 - 发布子请求: http://example.com/item/1 2025-12-14 13:13:09-(192.168.1.5,koohai)-[p28568_t30181] - funspider - worker.py:154 - _process_request_task - INFO - span001:span002 - Processing request: http://example.com/item/1 2025-12-14 13:13:09-(192.168.1.5,koohai)-[p28568_t30181] - funspider - worker.py:205 - _process_request_task - INFO - span001:span002 - 请求成功耗时: 80ms解读:span001根请求无父节点span001:span002子请求父节点是 span001不同进程t30180vst30181处理不同请求但通过task_id关联当然后面的redis统计以及influxdb 仅供参考。目前还在完善中。更多文章敬请关注gzh零基础爬虫第一天