医疗网站做药品是干嘛宁波网站建设果核
2026/3/15 13:46:56 网站建设 项目流程
医疗网站做药品是干嘛,宁波网站建设果核,属于网页制作平台的是,织梦个人博客网站源码引言#xff1a;物流信息抓取的现代挑战在电商蓬勃发展的今天#xff0c;快递物流跟踪已成为日常需求。无论是企业供应链管理还是个人包裹查询#xff0c;实时获取物流信息都至关重要。传统的人工查询方式效率低下#xff0c;而通过Python爬虫自动化获取物流信息#xff0…引言物流信息抓取的现代挑战在电商蓬勃发展的今天快递物流跟踪已成为日常需求。无论是企业供应链管理还是个人包裹查询实时获取物流信息都至关重要。传统的人工查询方式效率低下而通过Python爬虫自动化获取物流信息不仅能提高效率还能实现数据分析和监控预警。本文将深入探讨如何构建一个高效、稳定且智能的快递物流跟踪爬虫系统。技术架构选型为什么选择异步并发现代物流跟踪爬虫面临三大挑战高并发需求需要同时查询多个快递单号反爬虫对抗主流物流网站都有严格的反爬措施数据解析复杂度不同快递公司返回数据格式各异针对这些挑战我们选择以下技术栈异步框架aiohttp asyncio 实现高并发智能解析Playwright 模拟真实浏览器行为数据管理SQLAlchemy PostgreSQL 存储结构化数据代理管理智能代理池轮换机制完整爬虫系统代码实现1. 项目结构设计textexpress_tracking/ ├── crawler/ │ ├── __init__.py │ ├── async_crawler.py # 异步爬虫核心 │ ├── proxy_manager.py # 代理管理 │ ├── parsers/ # 解析器目录 │ │ ├── __init__.py │ │ ├── sf_parser.py # 顺丰解析器 │ │ ├── sto_parser.py # 申通解析器 │ │ └── ... ├── models/ │ ├── __init__.py │ ├── database.py # 数据库模型 │ └── schemas.py # 数据模式 ├── utils/ │ ├── __init__.py │ ├── logger.py # 日志配置 │ └── exceptions.py # 异常处理 ├── config.py # 配置文件 ├── requirements.txt # 依赖列表 └── main.py # 主程序入口2. 配置文件 (config.py)pythonimport os from dataclasses import dataclass from typing import List, Dict, Optional dataclass class CrawlerConfig: 爬虫配置类 # 并发设置 MAX_CONCURRENT_REQUESTS: int 50 REQUEST_TIMEOUT: int 30 RATE_LIMIT_DELAY: float 0.1 # 代理设置 PROXY_ENABLED: bool True PROXY_SOURCES: List[str] None # 重试设置 MAX_RETRIES: int 3 RETRY_DELAY: int 2 # 请求头 HEADERS: Dict None # 数据库设置 DATABASE_URL: str postgresql://user:passwordlocalhost/express_tracking def __post_init__(self): if self.PROXY_SOURCES is None: self.PROXY_SOURCES [ https://www.proxyscrape.com/api/free-proxy-list, https://api.proxyscrape.com/v2/?requestgetproxies ] if self.HEADERS is None: self.HEADERS { User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36, Accept: application/json, text/plain, */*, Accept-Language: zh-CN,zh;q0.9,en;q0.8, Accept-Encoding: gzip, deflate, br, Connection: keep-alive, Sec-Fetch-Dest: empty, Sec-Fetch-Mode: cors, Sec-Fetch-Site: same-site } config CrawlerConfig()3. 异步爬虫核心类 (async_crawler.py)pythonimport asyncio import aiohttp import random import json from typing import List, Dict, Any, Optional from dataclasses import dataclass import hashlib import time from urllib.parse import urlencode from .proxy_manager import ProxyManager from utils.logger import get_logger from utils.exceptions import CrawlerError, RetryExhaustedError logger get_logger(__name__) dataclass class TrackingResult: 物流跟踪结果 tracking_number: str carrier: str status: str events: List[Dict[str, Any]] raw_data: Dict[str, Any] timestamp: float success: bool error_msg: Optional[str] None class AsyncExpressCrawler: 异步快递爬虫核心类 def __init__(self, config): self.config config self.proxy_manager ProxyManager(config) if config.PROXY_ENABLED else None self.session None self.parser_registry self._init_parser_registry() def _init_parser_registry(self): 初始化解析器注册表 # 这里可以根据快递公司代码动态加载解析器 return { SF: self._parse_sf_data, # 顺丰 STO: self._parse_sto_data, # 申通 YTO: self._parse_yto_data, # 圆通 ZTO: self._parse_zto_data, # 中通 YD: self._parse_yd_data, # 韵达 EMS: self._parse_ems_data, # EMS JD: self._parse_jd_data, # 京东 } async def __aenter__(self): 异步上下文管理器入口 self.session aiohttp.ClientSession( headersself.config.HEADERS, timeoutaiohttp.ClientTimeout(totalself.config.REQUEST_TIMEOUT) ) return self async def __aexit__(self, exc_type, exc_val, exc_tb): 异步上下文管理器出口 if self.session: await self.session.close() async def fetch_with_retry(self, url: str, params: Dict None, method: str GET, **kwargs) - Dict: 带重试机制的请求函数 for attempt in range(self.config.MAX_RETRIES): try: proxy self.proxy_manager.get_proxy() if self.proxy_manager else None request_kwargs { url: url, proxy: proxy, ssl: False } if params and method GET: request_kwargs[params] params elif params and method POST: request_kwargs[json] params request_kwargs.update(kwargs) async with self.session.request(method, **request_kwargs) as response: if response.status 200: content_type response.headers.get(Content-Type, ) if application/json in content_type: data await response.json() else: text await response.text() # 尝试解析为JSON失败则返回文本 try: data json.loads(text) except json.JSONDecodeError: data {raw_text: text} if self.proxy_manager and proxy: self.proxy_manager.report_success(proxy) return data else: error_msg fHTTP {response.status}: {await response.text()} logger.warning(f请求失败 (尝试 {attempt 1}/{self.config.MAX_RETRIES}): {error_msg}) if self.proxy_manager and proxy: self.proxy_manager.report_failure(proxy) except (aiohttp.ClientError, asyncio.TimeoutError) as e: logger.warning(f网络错误 (尝试 {attempt 1}/{self.config.MAX_RETRIES}): {str(e)}) if self.proxy_manager and proxy: self.proxy_manager.report_failure(proxy) # 指数退避重试 if attempt self.config.MAX_RETRIES - 1: delay self.config.RETRY_DELAY * (2 ** attempt) await asyncio.sleep(delay random.uniform(0, 1)) raise RetryExhaustedError(f重试{self.config.MAX_RETRIES}次后仍然失败) async def track_single(self, tracking_number: str, carrier: str None) - TrackingResult: 跟踪单个快递单号 start_time time.time() try: # 自动识别快递公司如果未指定 if not carrier: carrier await self.identify_carrier(tracking_number) # 获取跟踪数据 raw_data await self._fetch_tracking_data(tracking_number, carrier) # 解析数据 events await self._parse_tracking_data(raw_data, carrier) # 确定状态 status self._determine_status(events) return TrackingResult( tracking_numbertracking_number, carriercarrier, statusstatus, eventsevents, raw_dataraw_data, timestamptime.time(), successTrue ) except Exception as e: logger.error(f跟踪失败 {tracking_number}: {str(e)}) return TrackingResult( tracking_numbertracking_number, carriercarrier or 未知, status查询失败, events[], raw_data{}, timestamptime.time(), successFalse, error_msgstr(e) ) async def track_batch(self, tracking_list: List[Dict]) - List[TrackingResult]: 批量跟踪快递单号 semaphore asyncio.Semaphore(self.config.MAX_CONCURRENT_REQUESTS) async def track_with_semaphore(tracking_info): async with semaphore: # 随机延迟避免请求过于集中 await asyncio.sleep(random.uniform(0, self.config.RATE_LIMIT_DELAY)) return await self.track_single( tracking_info[number], tracking_info.get(carrier) ) tasks [track_with_semaphore(info) for info in tracking_list] results await asyncio.gather(*tasks, return_exceptionsTrue) # 处理异常结果 processed_results [] for result in results: if isinstance(result, Exception): logger.error(f批量跟踪异常: {str(result)}) else: processed_results.append(result) return processed_results async def identify_carrier(self, tracking_number: str) - str: 自动识别快递公司 # 基于规则的识别 rules { SF: [r^SF\d{12}$, r^\d{12}$], # 顺丰 STO: [r^773\d{10}$, r^STO\d{10}$], # 申通 YTO: [r^YT\d{13}$, r^\d{12}$], # 圆通 ZTO: [r^ZT\d{12}$, r^75\d{10}$], # 中通 EMS: [r^\d{13}$, r^E\d{13}$], # EMS JD: [r^JD\d{12}$, r^VA\d{12}$], # 京东 } import re for carrier, patterns in rules.items(): for pattern in patterns: if re.match(pattern, tracking_number): return carrier # 如果规则匹配失败使用API识别 return await self._identify_by_api(tracking_number) async def _fetch_tracking_data(self, tracking_number: str, carrier: str) - Dict: 根据快递公司获取跟踪数据 # 这里模拟不同快递公司的API端点 # 实际应用中需要替换为真实的API或爬虫逻辑 carrier_apis { SF: https://www.sf-express.com/sf-service-owf-web/service/waybill/{}/routes, STO: https://www.sto.cn/api/v1/track/{}, YTO: https://www.yto.net.cn/api/track/{}, ZTO: https://www.zto.com/api/trace/{}, } if carrier in carrier_apis: url carrier_apis[carrier].format(tracking_number) return await self.fetch_with_retry(url) else: # 通用查询通过第三方平台 return await self._query_third_party(tracking_number, carrier) async def _query_third_party(self, tracking_number: str, carrier: str) - Dict: 通过第三方平台查询备用方案 # 使用快递100等第三方API url https://www.kuaidi100.com/query params { type: carrier.lower(), postid: tracking_number, id: 1, valicode: , temp: str(random.random()) } # 添加签名 sign hashlib.md5( f{tracking_number}{carrier}1234567890.encode() ).hexdigest() params[sign] sign headers { **self.config.HEADERS, Referer: https://www.kuaidi100.com/, Origin: https://www.kuaidi100.com } return await self.fetch_with_retry(url, paramsparams, headersheaders) async def _parse_tracking_data(self, raw_data: Dict, carrier: str) - List[Dict]: 解析跟踪数据 parser self.parser_registry.get(carrier, self._parse_generic_data) return parser(raw_data) def _parse_sf_data(self, data: Dict) - List[Dict]: 解析顺丰数据 events [] if data.get(success) and route in data: for item in data[route]: event { time: f{item.get(accept_date)} {item.get(accept_time, )}, description: item.get(remark, ), location: item.get(accept_address, ), status_code: item.get(opcode, ), operator: item.get(accept_person, ) } events.append(event) return sorted(events, keylambda x: x[time]) def _parse_sto_data(self, data: Dict) - List[Dict]: 解析申通数据 events [] if data.get(status) 200 and data in data: for item in data[data]: event { time: item.get(scanTime, ), description: item.get(desc, ), location: item.get(cityName, ), status_code: item.get(scanType, ), operator: item.get(scanPerson, ) } events.append(event) return sorted(events, keylambda x: x[time]) def _parse_generic_data(self, data: Dict) - List[Dict]: 通用解析器 events [] # 尝试多种常见格式 possible_paths [ data.get(data, []), data.get(result, {}).get(list, []), data.get(routes, []), data.get(traces, []), data.get(details, []) ] for path in possible_paths: if isinstance(path, list) and path: for item in path: if isinstance(item, dict): event { time: item.get(time) or item.get(ftime) or item.get(scanTime, ), description: item.get(context) or item.get(desc) or item.get(status, ), location: item.get(location) or item.get(cityName) or , status_code: item.get(status_code) or item.get(scanType, ), operator: item.get(operator) or item.get(scanPerson, ) } if event[time]: events.append(event) break return sorted(events, keylambda x: x[time]) def _determine_status(self, events: List[Dict]) - str: 根据事件列表确定当前状态 if not events: return 无跟踪信息 latest_event events[-1][description].lower() status_keywords { 已签收: [签收, 已签收, 已收货], 运输中: [运输中, 发往, 到达, 中转], 派送中: [派送, 投递, 送货], 已揽收: [揽收, 已收件], 问题件: [退回, 异常, 问题, 失败], 待揽收: [待揽收, 已下单] } for status, keywords in status_keywords.items(): for keyword in keywords: if keyword in latest_event: return status return 运输中4. 代理管理器 (proxy_manager.py)pythonimport asyncio import aiohttp import random from typing import List, Set, Dict from dataclasses import dataclass, field from datetime import datetime, timedelta dataclass class Proxy: 代理数据类 host: str port: int protocol: str http username: str None password: str None last_used: datetime field(default_factorydatetime.now) success_count: int 0 failure_count: int 0 response_time: float float(inf) property def url(self): if self.username and self.password: return f{self.protocol}://{self.username}:{self.password}{self.host}:{self.port} return f{self.protocol}://{self.host}:{self.port} property def score(self): 计算代理分数 if self.success_count self.failure_count 0: return 0.5 success_rate self.success_count / (self.success_count self.failure_count) time_factor 1.0 / max(1.0, self.response_time) return success_rate * 0.7 time_factor * 0.3 class ProxyManager: 智能代理管理器 def __init__(self, config): self.config config self.proxies: List[Proxy] [] self.bad_proxies: Set[str] set() self.update_interval 300 # 5分钟更新一次 self.last_update datetime.min async def update_proxies(self): 从多个源更新代理列表 if (datetime.now() - self.last_update).seconds self.update_interval: return tasks [] for source in self.config.PROXY_SOURCES: tasks.append(self._fetch_proxies_from_source(source)) results await asyncio.gather(*tasks, return_exceptionsTrue) new_proxies [] for result in results: if isinstance(result, list): new_proxies.extend(result) # 验证代理有效性 valid_proxies await self._validate_proxies(new_proxies) # 合并并去重 existing_urls {p.url for p in self.proxies} for proxy in valid_proxies: if proxy.url not in existing_urls and proxy.url not in self.bad_proxies: self.proxies.append(proxy) # 清理无效代理 self._cleanup_proxies() self.last_update datetime.now() print(f代理池更新完成当前有效代理数: {len(self.proxies)}) async def _fetch_proxies_from_source(self, url: str) - List[Proxy]: 从特定源获取代理 try: async with aiohttp.ClientSession() as session: async with session.get(url, timeout10) as response: if response.status 200: text await response.text() return self._parse_proxy_text(text) except Exception as e: print(f获取代理源 {url} 失败: {str(e)}) return [] def _parse_proxy_text(self, text: str) - List[Proxy]: 解析代理文本 proxies [] lines text.strip().split(\n) for line in lines: line line.strip() if : in line: parts line.split(:) if len(parts) 2: host parts[0] port int(parts[1]) protocol http if len(parts) 3: protocol parts[2].lower() proxy Proxy(hosthost, portport, protocolprotocol) proxies.append(proxy) return proxies async def _validate_proxy(self, proxy: Proxy) - bool: 验证单个代理 test_url http://httpbin.org/ip try: start_time asyncio.get_event_loop().time() connector aiohttp.TCPConnector(sslFalse) timeout aiohttp.ClientTimeout(total10) async with aiohttp.ClientSession( connectorconnector, timeouttimeout ) as session: async with session.get( test_url, proxyproxy.url, timeouttimeout ) as response: if response.status 200: proxy.response_time asyncio.get_event_loop().time() - start_time return True except Exception: pass return False async def _validate_proxies(self, proxies: List[Proxy]) - List[Proxy]: 批量验证代理 valid_proxies [] semaphore asyncio.Semaphore(20) # 限制并发验证数 async def validate_with_semaphore(proxy): async with semaphore: if await self._validate_proxy(proxy): return proxy return None tasks [validate_with_semaphore(p) for p in proxies] results await asyncio.gather(*tasks) for result in results: if result: valid_proxies.append(result) return valid_proxies def get_proxy(self) - str: 获取最佳代理 if not self.proxies: return None # 根据分数选择代理 sorted_proxies sorted(self.proxies, keylambda p: p.score, reverseTrue) best_proxies sorted_proxies[:10] # 取前10个 selected random.choice(best_proxies) selected.last_used datetime.now() # 轮换到列表末尾 self.proxies.remove(selected) self.proxies.append(selected) return selected.url def report_success(self, proxy_url: str): 报告代理使用成功 for proxy in self.proxies: if proxy.url proxy_url: proxy.success_count 1 break def report_failure(self, proxy_url: str): 报告代理使用失败 for proxy in self.proxies: if proxy.url proxy_url: proxy.failure_count 1 # 如果失败次数过多加入黑名单 if proxy.failure_count 5: self.bad_proxies.add(proxy_url) if proxy in self.proxies: self.proxies.remove(proxy) break def _cleanup_proxies(self): 清理过期和无效的代理 now datetime.now() self.proxies [ p for p in self.proxies if (now - p.last_used).seconds 3600 # 1小时内使用过 and p.failure_count 5 # 失败次数小于5 and p.score 0.1 # 分数高于0.1 ]5. 数据模型和存储 (models/database.py)pythonfrom sqlalchemy import create_engine, Column, Integer, String, Text, DateTime, Float, Boolean, JSON from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from datetime import datetime import json Base declarative_base() class TrackingRecord(Base): 物流跟踪记录 __tablename__ tracking_records id Column(Integer, primary_keyTrue) tracking_number Column(String(50), indexTrue, nullableFalse) carrier Column(String(20), nullableFalse) status Column(String(50)) raw_data Column(JSON) # 存储原始数据 events Column(JSON) # 存储解析后的事件 created_at Column(DateTime, defaultdatetime.now) updated_at Column(DateTime, defaultdatetime.now, onupdatedatetime.now) success Column(Boolean, defaultTrue) error_message Column(Text) source_ip Column(String(50)) response_time Column(Float) # 响应时间秒 class DatabaseManager: 数据库管理器 def __init__(self, database_url): self.engine create_engine(database_url) self.SessionLocal sessionmaker( autocommitFalse, autoflushFalse, bindself.engine ) # 创建表 Base.metadata.create_all(bindself.engine) def save_tracking_result(self, result): 保存跟踪结果 session self.SessionLocal() try: record TrackingRecord( tracking_numberresult.tracking_number, carrierresult.carrier, statusresult.status, raw_dataresult.raw_data, eventsresult.events, successresult.success, error_messageresult.error_msg, response_timeresult.timestamp ) session.add(record) session.commit() return record.id except Exception as e: session.rollback() raise e finally: session.close() def get_recent_tracking(self, tracking_number, limit10): 获取最近的跟踪记录 session self.SessionLocal() try: records session.query(TrackingRecord)\ .filter(TrackingRecord.tracking_number tracking_number)\ .order_by(TrackingRecord.created_at.desc())\ .limit(limit)\ .all() return records finally: session.close()6. 主程序入口 (main.py)pythonimport asyncio import argparse import json from datetime import datetime from typing import List, Dict from crawler.async_crawler import AsyncExpressCrawler from models.database import DatabaseManager from config import config from utils.logger import setup_logging def load_tracking_numbers(file_path: str) - List[Dict]: 从文件加载跟踪单号 if file_path.endswith(.json): with open(file_path, r, encodingutf-8) as f: return json.load(f) elif file_path.endswith(.txt): with open(file_path, r, encodingutf-8) as f: numbers [line.strip() for line in f if line.strip()] return [{number: num} for num in numbers] else: raise ValueError(不支持的文件格式) async def main(): 主函数 parser argparse.ArgumentParser(description快递物流跟踪爬虫) parser.add_argument(--input, -i, requiredTrue, help输入文件路径JSON或TXT) parser.add_argument(--output, -o, help输出文件路径JSON) parser.add_argument(--carrier, -c, help指定快递公司可选) parser.add_argument(--concurrent, typeint, default50, help并发数量默认50) args parser.parse_args() # 配置日志 setup_logging() # 加载跟踪单号 tracking_list load_tracking_numbers(args.input) # 更新配置 config.MAX_CONCURRENT_REQUESTS args.concurrent # 初始化数据库 db_manager DatabaseManager(config.DATABASE_URL) # 执行爬虫 async with AsyncExpressCrawler(config) as crawler: print(f开始跟踪 {len(tracking_list)} 个快递单号...) start_time datetime.now() results await crawler.track_batch(tracking_list) end_time datetime.now() duration (end_time - start_time).total_seconds() # 统计结果 successful [r for r in results if r.success] failed [r for r in results if not r.success] print(f跟踪完成) print(f总耗时: {duration:.2f} 秒) print(f成功: {len(successful)} 个) print(f失败: {len(failed)} 个) print(f平均速度: {len(tracking_list)/duration:.2f} 个/秒) # 保存结果到数据库 for result in results: try: db_manager.save_tracking_result(result) except Exception as e: print(f保存结果到数据库失败: {str(e)}) # 保存结果到文件 if args.output: output_data [] for result in results: result_dict { tracking_number: result.tracking_number, carrier: result.carrier, status: result.status, success: result.success, events: result.events, error: result.error_msg, timestamp: result.timestamp } output_data.append(result_dict) with open(args.output, w, encodingutf-8) as f: json.dump(output_data, f, ensure_asciiFalse, indent2) print(f结果已保存到: {args.output}) # 显示失败的单号 if failed: print(\n失败的单号:) for result in failed: print(f {result.tracking_number}: {result.error_msg}) if __name__ __main__: asyncio.run(main())高级功能扩展1. 使用Playwright处理JavaScript渲染pythonfrom playwright.async_api import async_playwright class JavaScriptCrawler: 处理JavaScript渲染的爬虫 async def crawl_with_playwright(self, url): async with async_playwright() as p: browser await p.chromium.launch( headlessTrue, args[--disable-blink-featuresAutomationControlled] ) # 添加反检测脚本 context await browser.new_context( user_agentMozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36..., viewport{width: 1920, height: 1080} ) # 注入反检测脚本 await context.add_init_script( Object.defineProperty(navigator, webdriver, { get: () undefined }); window.chrome { runtime: {} }; ) page await context.new_page() try: await page.goto(url, wait_untilnetworkidle) # 等待物流信息加载 await page.wait_for_selector(.tracking-info, timeout10000) # 获取数据 data await page.evaluate( () { const items document.querySelectorAll(.tracking-item); return Array.from(items).map(item ({ time: item.querySelector(.time).innerText, description: item.querySelector(.desc).innerText, location: item.querySelector(.location)?.innerText || })); } ) return data finally: await browser.close()2. 实现分布式爬虫pythonimport redis from celery import Celery from kombu import Queue # 配置Celery app Celery(express_crawler, brokerredis://localhost:6379/0, backendredis://localhost:6379/0) # 配置队列 app.conf.task_queues ( Queue(tracking_tasks, routing_keytracking.#), Queue(proxy_tasks, routing_keyproxy.#), ) app.task(queuetracking_tasks) def track_single_express(tracking_number, carrierNone): 单个快递跟踪任务 # 这里可以调用之前实现的爬虫逻辑 pass app.task(queuetracking_tasks) def track_batch_express(tracking_list): 批量快递跟踪任务 pass3. 添加监控和报警pythonimport psutil import smtplib from email.mime.text import MIMEText from datetime import datetime class Monitor: 爬虫监控系统 def __init__(self, threshold_cpu80, threshold_memory80): self.threshold_cpu threshold_cpu self.threshold_memory threshold_memory self.error_count 0 self.error_threshold 10 def check_system_status(self): 检查系统状态 cpu_percent psutil.cpu_percent(interval1) memory_percent psutil.virtual_memory().percent warnings [] if cpu_percent self.threshold_cpu: warnings.append(fCPU使用率过高: {cpu_percent}%) if memory_percent self.threshold_memory: warnings.append(f内存使用率过高: {memory_percent}%) return warnings def send_alert(self, message): 发送警报 # 配置邮件 msg MIMEText(message, plain, utf-8) msg[Subject] f爬虫系统警报 - {datetime.now()} msg[From] monitorexample.com msg[To] adminexample.com try: server smtplib.SMTP(smtp.example.com, 587) server.starttls() server.login(username, password) server.send_message(msg) server.quit() except Exception as e: print(f发送警报失败: {str(e)})

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询