2026/3/6 6:36:17
网站建设
项目流程
外贸建站软件,广州网络公司策划,muse做的网站怎么样,WordPress分类信息主题引言#xff1a;电商时代的价格监控革命在当今的电商时代#xff0c;商品价格波动频繁且迅速。对于精明的消费者、电商从业者、数据分析师和市场研究人员来说#xff0c;实时监控亚马逊、淘宝、京东等主要电商平台的商品价格变化已成为一项重要技能。通过构建智能价格追踪爬…引言电商时代的价格监控革命在当今的电商时代商品价格波动频繁且迅速。对于精明的消费者、电商从业者、数据分析师和市场研究人员来说实时监控亚马逊、淘宝、京东等主要电商平台的商品价格变化已成为一项重要技能。通过构建智能价格追踪爬虫系统我们不仅可以捕捉最佳购买时机还能进行市场趋势分析和竞争对手监控。本文将深入探讨如何使用Python最新技术构建一个高效、稳定的跨平台商品价格追踪系统。技术栈概览我们将在本项目中采用以下现代Python技术栈Playwright微软开发的现代化浏览器自动化工具支持无头浏览器操作AsyncioPython原生异步IO框架实现高并发爬取BeautifulSoup4/Selectolax高效的HTML解析库Pydantic数据验证与设置管理SQLAlchemy SQLite轻量级数据存储方案FastAPI可选的可视化API后端代理池支持处理反爬机制机器学习预警基于历史价格的智能预测系统架构设计1. 项目结构规划textprice-tracker/ ├── crawlers/ # 各平台爬虫模块 │ ├── base.py # 基础爬虫类 │ ├── amazon.py # 亚马逊爬虫 │ ├── taobao.py # 淘宝爬虫 │ └── jd.py # 京东爬虫 ├── models/ # 数据模型 │ ├── product.py # 商品模型 │ └── price.py # 价格记录模型 ├── database/ # 数据库操作 ├── config/ # 配置文件 ├── utils/ # 工具函数 ├── alerts/ # 预警系统 └── main.py # 主程序入口2. 核心功能模块多平台适配器统一接口支持不同电商平台智能请求调度自适应请求频率控制反爬虫对抗自动切换代理、用户代理和浏览器指纹数据清洗管道标准化不同平台的数据格式实时监控与预警基于规则和机器学习的价格异常检测完整代码实现1. 环境配置与依赖安装首先创建并激活虚拟环境然后安装所需依赖bash# 创建虚拟环境 python -m venv venv source venv/bin/activate # Linux/Mac # 或 venv\Scripts\activate # Windows # 安装核心依赖 pip install playwright selectolax pydantic sqlalchemy aiohttp pip install asyncio nest-asyncio pandas numpy pip install fastapi uvicorn jinja2 # 可选用于Web界面 # 安装Playwright浏览器 playwright install chromium2. 配置管理模块python# config/settings.py from pydantic import BaseSettings from typing import List, Optional import os class Settings(BaseSettings): # 数据库配置 DATABASE_URL: str sqlite:///./price_tracker.db # 爬虫配置 REQUEST_TIMEOUT: int 30 MAX_CONCURRENT_REQUESTS: int 5 DEFAULT_USER_AGENT: str Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 # 代理配置 PROXY_ENABLED: bool False PROXY_POOL: List[str] [] # 平台特定配置 AMAZON_DOMAINS: dict { US: amazon.com, UK: amazon.co.uk, DE: amazon.de, JP: amazon.co.jp } # 价格监控阈值 PRICE_DROP_THRESHOLD: float 0.15 # 价格下降15%时触发通知 CHECK_INTERVAL_HOURS: int 6 class Config: env_file .env settings Settings()3. 数据模型定义python# models/product.py from pydantic import BaseModel, HttpUrl from typing import Optional, List from datetime import datetime from enum import Enum class Platform(str, Enum): AMAZON amazon TAOBAO taobao JD jd EBAY ebay class Product(BaseModel): id: Optional[int] None platform: Platform product_id: str url: HttpUrl title: str description: Optional[str] None current_price: Optional[float] None original_price: Optional[float] None currency: str USD availability: bool True image_url: Optional[str] None category: Optional[str] None last_updated: datetime datetime.now() class Config: orm_mode True class PriceHistory(BaseModel): id: Optional[int] None product_id: int price: float currency: str timestamp: datetime datetime.now() availability: bool True special_offer: Optional[str] None class Config: orm_mode True4. 数据库管理python# database/database.py from sqlalchemy import create_engine, Column, Integer, String, Float, Boolean, DateTime, Text, ForeignKey from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker, relationship from datetime import datetime import os from config.settings import settings Base declarative_base() class ProductORM(Base): __tablename__ products id Column(Integer, primary_keyTrue, indexTrue) platform Column(String(50), nullableFalse) product_id Column(String(100), uniqueTrue, nullableFalse) url Column(String(500), nullableFalse) title Column(String(500), nullableFalse) description Column(Text) current_price Column(Float) original_price Column(Float) currency Column(String(10), defaultUSD) availability Column(Boolean, defaultTrue) image_url Column(String(500)) category Column(String(200)) last_updated Column(DateTime, defaultdatetime.now) price_history relationship(PriceHistoryORM, back_populatesproduct, cascadeall, delete-orphan) class PriceHistoryORM(Base): __tablename__ price_history id Column(Integer, primary_keyTrue, indexTrue) product_id Column(Integer, ForeignKey(products.id)) price Column(Float, nullableFalse) currency Column(String(10), defaultUSD) timestamp Column(DateTime, defaultdatetime.now) availability Column(Boolean, defaultTrue) special_offer Column(String(200)) product relationship(ProductORM, back_populatesprice_history) # 数据库初始化 engine create_engine(settings.DATABASE_URL, connect_args{check_same_thread: False} if sqlite in settings.DATABASE_URL else {}) SessionLocal sessionmaker(autocommitFalse, autoflushFalse, bindengine) def init_db(): Base.metadata.create_all(bindengine) def get_db(): db SessionLocal() try: yield db finally: db.close()5. 基础爬虫类python# crawlers/base.py import asyncio import aiohttp from typing import Optional, Dict, Any from selectolax.parser import HTMLParser import random import time from dataclasses import dataclass from urllib.parse import urlparse import logging logging.basicConfig(levellogging.INFO) logger logging.getLogger(__name__) dataclass class CrawlerConfig: user_agents: list None proxy: Optional[str] None timeout: int 30 max_retries: int 3 delay_range: tuple (1, 3) class BaseCrawler: def __init__(self, config: CrawlerConfig None): self.config config or CrawlerConfig() if not self.config.user_agents: self.config.user_agents [ Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36, Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15, Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 ] def get_random_user_agent(self) - str: return random.choice(self.config.user_agents) async def fetch(self, url: str, session: aiohttp.ClientSession) - Optional[str]: headers { User-Agent: self.get_random_user_agent(), Accept: text/html,application/xhtmlxml,application/xml;q0.9,image/webp,*/*;q0.8, Accept-Language: en-US,en;q0.5, Accept-Encoding: gzip, deflate, br, Connection: keep-alive, Upgrade-Insecure-Requests: 1, } for attempt in range(self.config.max_retries): try: async with session.get( url, headersheaders, proxyself.config.proxy, timeoutaiohttp.ClientTimeout(totalself.config.timeout) ) as response: if response.status 200: return await response.text() elif response.status 429: # Too Many Requests wait_time 2 ** attempt # Exponential backoff logger.warning(fRate limited. Waiting {wait_time} seconds...) await asyncio.sleep(wait_time) else: logger.error(fFailed to fetch {url}: Status {response.status}) return None except Exception as e: logger.error(fAttempt {attempt 1} failed for {url}: {str(e)}) if attempt self.config.max_retries - 1: await asyncio.sleep(2 ** attempt) else: return None # 随机延迟避免请求过于频繁 await asyncio.sleep(random.uniform(*self.config.delay_range)) return None def parse_html(self, html: str) - HTMLParser: return HTMLParser(html)6. 亚马逊爬虫实现python# crawlers/amazon.py from crawlers.base import BaseCrawler, CrawlerConfig from models.product import Product from typing import Optional import re from urllib.parse import urlparse, parse_qs import json import logging logger logging.getLogger(__name__) class AmazonCrawler(BaseCrawler): def __init__(self, country: str US, config: CrawlerConfig None): super().__init__(config) self.country country.upper() self.base_domain famazon.{self._get_domain_suffix()} def _get_domain_suffix(self) - str: domains { US: com, UK: co.uk, DE: de, JP: co.jp, FR: fr, IT: it, ES: es, CA: ca, AU: com.au } return domains.get(self.country, com) def extract_product_id(self, url: str) - Optional[str]: 提取亚马逊商品ID patterns [ r/dp/([A-Z0-9]{10}), r/gp/product/([A-Z0-9]{10}), r/product/([A-Z0-9]{10}), r/dp/([A-Z0-9]{10})/ ] for pattern in patterns: match re.search(pattern, url) if match: return match.group(1) # 尝试从查询参数中提取 parsed urlparse(url) query_params parse_qs(parsed.query) if asin in query_params: return query_params[asin][0] return None async def get_product_info(self, url: str, session) - Optional[Product]: html await self.fetch(url, session) if not html: return None tree self.parse_html(html) # 提取商品信息 product_data self._parse_product_page(tree) if product_data: return Product( platformamazon, product_idproduct_data[product_id], urlurl, titleproduct_data[title], current_priceproduct_data[current_price], original_priceproduct_data[original_price], currencyproduct_data[currency], availabilityproduct_data[availability], image_urlproduct_data.get(image_url), descriptionproduct_data.get(description) ) return None def _parse_product_page(self, tree) - Optional[dict]: 解析亚马逊商品页面 try: # 尝试从JSON-LD中提取数据 script_tags tree.css(script[typeapplication/ldjson]) for script in script_tags: try: data json.loads(script.text()) if data.get(type) Product: product_info { product_id: data.get(sku) or data.get(productID, ), title: data.get(name, ), description: data.get(description, ), image_url: data.get(image, ), currency: USD, availability: False, current_price: None, original_price: None } # 提取价格信息 if offers in data: offers data[offers] if isinstance(offers, dict): offers [offers] for offer in offers: if offer.get(availability) https://schema.org/InStock: product_info[availability] True price offer.get(price) if price: product_info[current_price] float(price) product_info[currency] offer.get(priceCurrency, USD) return product_info except json.JSONDecodeError: continue # 备用解析方法直接解析HTML元素 title_elem tree.css_first(#productTitle) title title_elem.text(stripTrue) if title_elem else # 价格解析 price_selectors [ .a-price .a-offscreen, #price_inside_buybox, #priceblock_ourprice, #priceblock_dealprice, .a-color-price ] current_price None for selector in price_selectors: price_elem tree.css_first(selector) if price_elem: price_text price_elem.text(stripTrue) price_match re.search(r[\d,]\.?\d*, price_text.replace(,, )) if price_match: current_price float(price_match.group()) break # 提取商品ID product_id None asin_elem tree.css_first(input[nameasin]) if asin_elem and asin_elem.attributes.get(value): product_id asin_elem.attributes[value] return { product_id: product_id or , title: title, current_price: current_price, original_price: None, currency: USD, availability: current_price is not None, image_url: None, description: } except Exception as e: logger.error(fError parsing Amazon page: {str(e)}) return None7. 淘宝爬虫实现python# crawlers/taobao.py from crawlers.base import BaseCrawler, CrawlerConfig from models.product import Product from typing import Optional import re import json import logging logger logging.getLogger(__name__) class TaobaoCrawler(BaseCrawler): def __init__(self, config: CrawlerConfig None): super().__init__(config) self.base_url https://item.taobao.com def extract_product_id(self, url: str) - Optional[str]: 提取淘宝商品ID # 匹配淘宝商品URL模式 patterns [ ritem\.taobao\.com/item\.htm\?id(\d), rtaobao\.com/item\.htm\?id(\d), r/item/(\d)\.html ] for pattern in patterns: match re.search(pattern, url) if match: return match.group(1) return None async def get_product_info(self, url: str, session) - Optional[Product]: # 淘宝需要特殊处理可能需要使用无头浏览器 from playwright.async_api import async_playwright async with async_playwright() as p: browser await p.chromium.launch(headlessTrue) context await browser.new_context( user_agentself.get_random_user_agent(), viewport{width: 1920, height: 1080} ) page await context.new_page() try: await page.goto(url, timeout30000) # 等待页面加载完成 await page.wait_for_load_state(networkidle, timeout10000) # 提取商品信息 title await self._extract_title(page) price await self._extract_price(page) product_id self.extract_product_id(url) if title and product_id: return Product( platformtaobao, product_idproduct_id, urlurl, titletitle, current_pricefloat(price) if price else None, currencyCNY, availabilityprice is not None ) except Exception as e: logger.error(fError crawling Taobao: {str(e)}) finally: await browser.close() return None async def _extract_title(self, page) - Optional[str]: 提取商品标题 try: # 尝试多种选择器 selectors [ .tb-detail-hd h1, .tb-main-title, [data-title], .title-text ] for selector in selectors: element await page.query_selector(selector) if element: title await element.text_content() if title and len(title.strip()) 0: return title.strip() return None except Exception as e: logger.error(fError extracting title: {str(e)}) return None async def _extract_price(self, page) - Optional[str]: 提取商品价格 try: # 价格选择器 selectors [ .tb-rmb-num, .tm-price, .price, [propertyog:product:price] ] for selector in selectors: element await page.query_selector(selector) if element: price_text await element.text_content() if price_text: # 提取数字 match re.search(r[\d,]\.?\d*, price_text.replace(,, )) if match: return match.group() return None except Exception as e: logger.error(fError extracting price: {str(e)}) return None8. 异步爬虫调度器python# crawlers/scheduler.py import asyncio from typing import List, Dict, Any, Optional import aiohttp from datetime import datetime import logging from dataclasses import dataclass from concurrent.futures import ThreadPoolExecutor from crawlers.amazon import AmazonCrawler from crawlers.taobao import TaobaoCrawler from models.product import Product, Platform from database.database import SessionLocal, ProductORM, PriceHistoryORM logger logging.getLogger(__name__) dataclass class TrackingTask: url: str platform: Platform check_interval: int 6 # 检查间隔小时 last_checked: Optional[datetime] None class PriceTrackerScheduler: def __init__(self, max_concurrent: int 5): self.max_concurrent max_concurrent self.semaphore asyncio.Semaphore(max_concurrent) self.crawlers { Platform.AMAZON: AmazonCrawler(), Platform.TAOBAO: TaobaoCrawler(), } self.tracking_tasks: List[TrackingTask] [] def add_tracking_task(self, url: str, platform: Platform, check_interval: int 6): 添加追踪任务 task TrackingTask(urlurl, platformplatform, check_intervalcheck_interval) self.tracking_tasks.append(task) logger.info(fAdded tracking task: {url}) async def crawl_product(self, task: TrackingTask, session: aiohttp.ClientSession) - Optional[Product]: 爬取单个商品信息 async with self.semaphore: try: crawler self.crawlers.get(task.platform) if not crawler: logger.error(fNo crawler for platform: {task.platform}) return None product await crawler.get_product_info(task.url, session) if product: logger.info(fSuccessfully crawled {product.title}) return product except Exception as e: logger.error(fError crawling {task.url}: {str(e)}) return None async def process_batch(self, tasks: List[TrackingTask]): 批量处理追踪任务 async with aiohttp.ClientSession() as session: tasks_list [self.crawl_product(task, session) for task in tasks] results await asyncio.gather(*tasks_list, return_exceptionsTrue) successful_products [] for result in results: if isinstance(result, Product): successful_products.append(result) elif isinstance(result, Exception): logger.error(fTask failed with exception: {str(result)}) # 保存到数据库 await self.save_to_database(successful_products) return successful_products async def save_to_database(self, products: List[Product]): 保存商品信息到数据库 db SessionLocal() try: for product in products: # 检查商品是否已存在 db_product db.query(ProductORM).filter( ProductORM.platform product.platform, ProductORM.product_id product.product_id ).first() if db_product: # 更新现有商品 price_changed db_product.current_price ! product.current_price db_product.current_price product.current_price db_product.original_price product.original_price or db_product.original_price db_product.availability product.availability db_product.last_updated datetime.now() # 如果价格变化添加历史记录 if price_changed and product.current_price: price_history PriceHistoryORM( product_iddb_product.id, priceproduct.current_price, currencyproduct.currency, availabilityproduct.availability, timestampdatetime.now() ) db.add(price_history) logger.info(fUpdated product: {product.title}) else: # 添加新商品 new_product ProductORM( platformproduct.platform.value, product_idproduct.product_id, urlstr(product.url), titleproduct.title, descriptionproduct.description, current_priceproduct.current_price, original_priceproduct.original_price, currencyproduct.currency, availabilityproduct.availability, image_urlproduct.image_url, categoryproduct.category ) db.add(new_product) db.flush() # 获取新商品的ID # 添加初始价格记录 if product.current_price: price_history PriceHistoryORM( product_idnew_product.id, priceproduct.current_price, currencyproduct.currency, availabilityproduct.availability, timestampdatetime.now() ) db.add(price_history) logger.info(fAdded new product: {product.title}) db.commit() except Exception as e: db.rollback() logger.error(fError saving to database: {str(e)}) finally: db.close() async def run_continuous_monitoring(self, interval_hours: int 6): 持续监控任务 logger.info(fStarting continuous monitoring with {interval_hours} hour interval) while True: try: # 筛选需要检查的任务 now datetime.now() tasks_to_check [ task for task in self.tracking_tasks if task.last_checked is None or (now - task.last_checked).total_seconds() task.check_interval * 3600 ] if tasks_to_check: logger.info(fChecking {len(tasks_to_check)} products...) results await self.process_batch(tasks_to_check) # 更新最后检查时间 for task in tasks_to_check: task.last_checked now logger.info(fCompleted checking {len(results)} products) else: logger.info(No products need checking at this time) # 等待下一个检查周期 await asyncio.sleep(interval_hours * 3600) except KeyboardInterrupt: logger.info(Monitoring stopped by user) break except Exception as e: logger.error(fError in monitoring loop: {str(e)}) await asyncio.sleep(300) # 出错后等待5分钟再试9. 价格分析与预警系统python# alerts/price_analyzer.py import numpy as np from typing import List, Dict, Any, Optional from datetime import datetime, timedelta import logging from dataclasses import dataclass from database.database import SessionLocal, PriceHistoryORM logger logging.getLogger(__name__) dataclass class PriceAlert: product_id: int alert_type: str # price_drop, price_increase, availability_change message: str threshold: float current_value: float previous_value: float timestamp: datetime class PriceAnalyzer: def __init__(self, price_drop_threshold: float 0.15): self.price_drop_threshold price_drop_threshold def analyze_price_history(self, product_id: int, days_back: int 30) - List[PriceAlert]: 分析商品价格历史 alerts [] db SessionLocal() try: # 获取价格历史 cutoff_date datetime.now() - timedelta(daysdays_back) price_history db.query(PriceHistoryORM).filter( PriceHistoryORM.product_id product_id, PriceHistoryORM.timestamp cutoff_date ).order_by(PriceHistoryORM.timestamp.desc()).all() if len(price_history) 2: return alerts # 获取当前价格和之前的价格 current_price price_history[0].price previous_price price_history[1].price if current_price and previous_price: # 计算价格变化百分比 price_change (current_price - previous_price) / previous_price # 检查价格下降 if price_change -self.price_drop_threshold: alerts.append(PriceAlert( product_idproduct_id, alert_typeprice_drop, messagef价格下降 {abs(price_change*100):.1f}%, thresholdself.price_drop_threshold, current_valuecurrent_price, previous_valueprevious_price, timestampdatetime.now() )) # 检查价格上涨可选 if price_change 0.10: # 上涨10% alerts.append(PriceAlert( product_idproduct_id, alert_typeprice_increase, messagef价格上涨 {price_change*100:.1f}%, threshold0.10, current_valuecurrent_price, previous_valueprevious_price, timestampdatetime.now() )) # 检查可用性变化 current_availability price_history[0].availability previous_availability price_history[1].availability if current_availability ! previous_availability: status 有货 if current_availability else 缺货 alerts.append(PriceAlert( product_idproduct_id, alert_typeavailability_change, messagef商品状态变化: 现在{status}, threshold0, current_valuecurrent_availability, previous_valueprevious_availability, timestampdatetime.now() )) except Exception as e: logger.error(fError analyzing price history: {str(e)}) finally: db.close() return alerts def detect_price_patterns(self, prices: List[float]) - Dict[str, Any]: 检测价格模式 if len(prices) 5: return {} prices_array np.array(prices) # 计算统计信息 stats { mean: float(np.mean(prices_array)), std: float(np.std(prices_array)), min: float(np.min(prices_array)), max: float(np.max(prices_array)), current: float(prices_array[-1]), is_lowest: prices_array[-1] np.min(prices_array[:-1]), trend: unknown } # 检测趋势简单线性回归 if len(prices_array) 3: x np.arange(len(prices_array)) slope, intercept np.polyfit(x, prices_array, 1) if slope -0.01: stats[trend] downward elif slope 0.01: stats[trend] upward else: stats[trend] stable return stats10. 主程序与Web界面python# main.py import asyncio import uvicorn from fastapi import FastAPI, Depends, HTTPException, Request from fastapi.responses import HTMLResponse from fastapi.templating import Jinja2Templates from sqlalchemy.orm import Session from typing import List import logging from crawlers.scheduler import PriceTrackerScheduler, TrackingTask from models.product import Product, Platform from database.database import get_db, init_db, ProductORM, PriceHistoryORM from config.settings import settings # 初始化FastAPI应用 app FastAPI(titlePrice Tracker API, version1.0.0) templates Jinja2Templates(directorytemplates) # 全局调度器实例 scheduler PriceTrackerScheduler(max_concurrentsettings.MAX_CONCURRENT_REQUESTS) app.on_event(startup) async def startup_event(): 应用启动时初始化数据库和调度器 init_db() logger.info(Application started) app.get(/, response_classHTMLResponse) async def dashboard(request: Request, db: Session Depends(get_db)): 价格追踪仪表板 products db.query(ProductORM).order_by(ProductORM.last_updated.desc()).limit(50).all() return templates.TemplateResponse(dashboard.html, {request: request, products: products}) app.post(/api/track) async def track_product(url: str, platform: Platform): 开始追踪商品 scheduler.add_tracking_task(url, platform) return {message: fStarted tracking {url}, platform: platform} app.get(/api/products, response_modelList[Product]) async def get_products(db: Session Depends(get_db)): 获取所有追踪的商品 products db.query(ProductORM).all() return products app.get(/api/price-history/{product_id}) async def get_price_history(product_id: int, db: Session Depends(get_db)): 获取商品价格历史 history db.query(PriceHistoryORM).filter( PriceHistoryORM.product_id product_id ).order_by(PriceHistoryORM.timestamp.desc()).limit(100).all() return [{ price: h.price, timestamp: h.timestamp, availability: h.availability } for h in history] app.post(/api/check-now) async def check_now(): 立即执行一次检查 tasks scheduler.tracking_tasks.copy() if tasks: results await scheduler.process_batch(tasks) return {checked: len(results), message: Check completed} return {message: No products to check} async def main(): 主函数 # 示例添加一些初始追踪任务 scheduler.add_tracking_task( https://www.amazon.com/dp/B08N5WRWNW, # 示例商品 Platform.AMAZON ) scheduler.add_tracking_task( https://item.taobao.com/item.htm?id123456789, # 示例商品 Platform.TAOBAO ) # 启动监控循环 monitoring_task asyncio.create_task( scheduler.run_continuous_monitoring(interval_hourssettings.CHECK_INTERVAL_HOURS) ) # 启动Web服务器 config uvicorn.Config(app, host0.0.0.0, port8000, log_levelinfo) server uvicorn.Server(config) # 同时运行监控和Web服务器 await asyncio.gather( monitoring_task, server.serve() ) if __name__ __main__: logging.basicConfig( levellogging.INFO, format%(asctime)s - %(name)s - %(levelname)s - %(message)s ) asyncio.run(main())高级功能扩展1. 机器学习价格预测python# alerts/predictor.py from sklearn.linear_model import LinearRegression from sklearn.preprocessing import PolynomialFeatures import numpy as np from typing import List, Optional from datetime import datetime, timedelta class PricePredictor: def __init__(self): self.model LinearRegression() def predict_future_prices(self, historical_prices: List[float], days_ahead: int 7) - List[float]: 预测未来价格 if len(historical_prices) 10: return [] # 准备特征矩阵 X np.arange(len(historical_prices)).reshape(-1, 1) y np.array(historical_prices) # 添加多项式特征 poly PolynomialFeatures(degree2) X_poly poly.fit_transform(X) # 训练模型 self.model.fit(X_poly, y) # 预测未来价格 future_X np.arange(len(historical_prices), len(historical_prices) days_ahead).reshape(-1, 1) future_X_poly poly.transform(future_X) predictions self.model.predict(future_X_poly) return predictions.tolist()2. 代理池集成python# utils/proxy_manager.py import aiohttp import asyncio from typing import List, Optional import random class ProxyManager: def __init__(self, proxy_urls: List[str] None): self.proxies proxy_urls or [] self.current_index 0 def get_random_proxy(self) - Optional[str]: 获取随机代理 if not self.proxies: return None return random.choice(self.proxies) def get_next_proxy(self) - Optional[str]: 轮询获取代理 if not self.proxies: return None proxy self.proxies[self.current_index] self.current_index (self.current_index 1) % len(self.proxies) return proxy async def validate_proxy(self, proxy_url: str) - bool: 验证代理可用性 try: async with aiohttp.ClientSession() as session: async with session.get( https://httpbin.org/ip, proxyproxy_url, timeout10 ) as response: return response.status 200 except: return False async def refresh_proxies(self, api_url: str): 从代理API刷新代理列表 try: async with aiohttp.ClientSession() as session: async with session.get(api_url, timeout30) as response: if response.status 200: data await response.json() self.proxies data.get(proxies, []) except Exception as e: print(fError refreshing proxies: {e})部署与优化建议1. 部署方案本地运行适合个人使用直接运行main.pyDocker容器化便于部署和扩展云服务器部署使用AWS、Google Cloud或阿里云Serverless架构使用AWS Lambda或Google Cloud Functions2. 性能优化使用连接池管理数据库连接实现缓存机制减少重复请求分布式爬虫架构处理大规模监控使用CDN存储静态资源3. 反爬虫策略应对随机化请求间隔使用住宅代理IP模拟人类浏览行为定期更换User-Agent处理验证码使用OCR或第三方服务4. 数据可视化使用Matplotlib或Plotly生成价格趋势图构建实时仪表板显示价格变化发送邮件或短信通知法律与伦理考虑遵守robots.txt尊重网站的爬虫政策限制请求频率避免对目标网站造成负担数据使用限制仅用于个人或研究目的用户隐私保护不收集用户个人信息遵守服务条款遵守各电商平台的使用条款结论本文详细介绍了一个完整的跨平台商品价格追踪系统的设计与实现。通过结合现代Python技术栈包括Playwright、异步编程、SQLAlchemy等工具我们构建了一个强大、可扩展的价格监控解决方案。系统不仅能够实时追踪亚马逊、淘宝等主流电商平台的商品价格还提供了数据分析、价格预测和智能预警功能。