做网站用什么需要好企业营销策划包括哪些内容
2026/4/15 18:59:25 网站建设 项目流程
做网站用什么需要好,企业营销策划包括哪些内容,域名注册个人和企业有什么区别,2017 如何做网站优化摘要本文将深入探讨如何使用Python最新技术栈构建高效、稳定的交通出行信息爬虫系统。我们将结合异步编程、AI解析、反爬对抗等技术#xff0c;实现一个能够获取实时交通数据、公交信息、路况监测的完整解决方案。本文代码超过300行#xff0c;涵盖从基础到高级的多个实践技巧…摘要本文将深入探讨如何使用Python最新技术栈构建高效、稳定的交通出行信息爬虫系统。我们将结合异步编程、AI解析、反爬对抗等技术实现一个能够获取实时交通数据、公交信息、路况监测的完整解决方案。本文代码超过300行涵盖从基础到高级的多个实践技巧。一、项目概述与目标交通出行信息爬虫旨在从多个数据源获取实时交通信息包括但不限于实时公交到站信息地铁运行状态城市路况拥堵指数交通事故通报高速公路通行状况天气对交通的影响二、技术栈选择2.1 核心技术Python 3.10使用最新Python版本aiohttp异步HTTP客户端/服务器框架Playwright现代化浏览器自动化工具BeautifulSoup4ParselHTML解析库Pydantic数据验证与设置管理Asyncio异步I/O支持Redis分布式缓存与任务队列OpenCVTesseract验证码识别2.2 辅助工具Poetry依赖管理与打包Docker容器化部署Sentry错误监控Prometheus性能监控三、完整爬虫系统架构python 交通出行信息智能爬虫系统 版本2.0 作者智能爬虫架构师 日期2024年 import asyncio import logging from datetime import datetime, timedelta from typing import List, Dict, Optional, Any from dataclasses import dataclass from enum import Enum import hashlib import json # 第三方库导入 import aiohttp import async_timeout from bs4 import BeautifulSoup from pydantic import BaseModel, Field, validator from playwright.async_api import async_playwright import redis.asyncio as redis from urllib.parse import urljoin, urlparse import pandas as pd import numpy as np # 配置日志 logging.basicConfig( levellogging.INFO, format%(asctime)s - %(name)s - %(levelname)s - %(message)s, handlers[ logging.FileHandler(traffic_spider.log), logging.StreamHandler() ] ) logger logging.getLogger(__name__) # 数据模型定义 class TrafficType(Enum): BUS bus SUBWAY subway ROAD road HIGHWAY highway ACCIDENT accident WEATHER weather class TrafficData(BaseModel): 交通数据基础模型 id: str Field(default_factorylambda: hashlib.md5(str(datetime.now()).encode()).hexdigest()) type: TrafficType city: str line_or_road: str status: str description: str delay_minutes: Optional[int] None congestion_level: Optional[int] Field(None, ge0, le10) latitude: Optional[float] None longitude: Optional[float] None timestamp: datetime Field(default_factorydatetime.now) source: str confidence: float Field(1.0, ge0.0, le1.0) validator(line_or_road) def validate_line(cls, v): if len(v) 1: raise ValueError(线路或道路名称不能为空) return v.strip() class TrafficResponse(BaseModel): API响应模型 success: bool message: str data: List[TrafficData] timestamp: datetime Field(default_factorydatetime.now) count: int # 代理与用户代理池 class ProxyManager: 智能代理管理器 def __init__(self): self.proxies [ http://proxy1.example.com:8080, http://proxy2.example.com:8080, # 更多代理... ] self.current_index 0 self.failed_proxies set() def get_proxy(self) - Optional[str]: 获取下一个可用代理 if not self.proxies: return None for _ in range(len(self.proxies)): proxy self.proxies[self.current_index] self.current_index (self.current_index 1) % len(self.proxies) if proxy not in self.failed_proxies: return proxy return None def mark_failed(self, proxy: str): 标记失败代理 self.failed_proxies.add(proxy) logger.warning(f代理 {proxy} 标记为失败) class UserAgentManager: 用户代理轮换管理器 def __init__(self): self.user_agents [ Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36, Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15, Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36, Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15, # 更多用户代理... ] def get_random_ua(self) - str: 获取随机用户代理 return np.random.choice(self.user_agents) # 缓存管理器 class CacheManager: Redis缓存管理器 def __init__(self, redis_url: str redis://localhost:6379): self.redis_url redis_url self.redis_client None async def connect(self): 连接Redis self.redis_client await redis.from_url( self.redis_url, decode_responsesTrue ) async def get(self, key: str) - Optional[Any]: 获取缓存 if not self.redis_client: await self.connect() data await self.redis_client.get(key) if data: return json.loads(data) return None async def set(self, key: str, value: Any, ttl: int 300): 设置缓存 if not self.redis_client: await self.connect() await self.redis_client.setex( key, ttl, json.dumps(value, defaultstr) ) async def close(self): 关闭连接 if self.redis_client: await self.redis_client.close() # 智能请求器 class SmartRequest: 智能请求处理器 def __init__(self): self.session None self.proxy_manager ProxyManager() self.ua_manager UserAgentManager() self.cache_manager CacheManager() self.request_count 0 self.max_retries 3 async def __aenter__(self): self.session aiohttp.ClientSession( timeoutaiohttp.ClientTimeout(total30), connectoraiohttp.TCPConnector(limit100) ) await self.cache_manager.connect() return self async def __aexit__(self, exc_type, exc_val, exc_tb): if self.session: await self.session.close() await self.cache_manager.close() async def fetch(self, url: str, use_cache: bool True, **kwargs) - Optional[str]: 智能获取页面内容 # 生成缓存键 cache_key ftraffic:{hashlib.md5(url.encode()).hexdigest()} # 尝试从缓存获取 if use_cache: cached await self.cache_manager.get(cache_key) if cached: logger.info(f缓存命中: {url}) return cached headers kwargs.pop(headers, {}) headers.update({ User-Agent: self.ua_manager.get_random_ua(), Accept: text/html,application/xhtmlxml,application/xml;q0.9,*/*;q0.8, Accept-Language: zh-CN,zh;q0.9,en;q0.8, Accept-Encoding: gzip, deflate, DNT: 1, Connection: keep-alive, Upgrade-Insecure-Requests: 1, }) # 设置代理 proxy self.proxy_manager.get_proxy() if proxy: kwargs[proxy] proxy for attempt in range(self.max_retries): try: async with async_timeout.timeout(30): async with self.session.get( url, headersheaders, **kwargs ) as response: if response.status 200: content await response.text() # 缓存结果 await self.cache_manager.set(cache_key, content, ttl60) self.request_count 1 # 随机延迟防止封禁 await asyncio.sleep(np.random.uniform(1, 3)) return content elif response.status 429: # 请求过多 wait_time 2 ** attempt # 指数退避 logger.warning(f请求限制等待 {wait_time} 秒后重试) await asyncio.sleep(wait_time) else: logger.error(f请求失败: {response.status} - {url}) except asyncio.TimeoutError: logger.error(f请求超时: {url}) except Exception as e: logger.error(f请求异常: {e}) if proxy: self.proxy_manager.mark_failed(proxy) return None # 解析器基类 class BaseParser: 解析器基类 def __init__(self, city: str): self.city city self.request SmartRequest() async def parse_bus_info(self, line: str) - List[TrafficData]: 解析公交信息 - 子类需实现 raise NotImplementedError async def parse_subway_info(self, line: str) - List[TrafficData]: 解析地铁信息 - 子类需实现 raise NotImplementedError async def parse_road_info(self, road: str) - List[TrafficData]: 解析道路信息 - 子类需实现 raise NotImplementedError # 具体城市解析器 class BeijingTrafficParser(BaseParser): 北京交通信息解析器 BUS_API https://bjbus.com/api/bus/line SUBWAY_API https://www.bjsubway.com/subway/status ROAD_API https://traffic.bjjtgl.gov.cn/api/road async def parse_bus_info(self, line: str) - List[TrafficData]: 解析北京公交信息 data_list [] url f{self.BUS_API}/{line} async with self.request as req: content await req.fetch(url) if content: try: # 解析JSON响应 data json.loads(content) for bus in data.get(buses, []): traffic_data TrafficData( typeTrafficType.BUS, cityself.city, line_or_roadline, statusbus.get(status, 正常), descriptionf公交{line}路 {bus.get(station, )}, delay_minutesbus.get(delay, 0), latitudebus.get(lat), longitudebus.get(lng), sourceself.BUS_API, confidence0.9 ) data_list.append(traffic_data) except json.JSONDecodeError: # 如果JSON解析失败尝试HTML解析 soup BeautifulSoup(content, html.parser) # 这里添加HTML解析逻辑 pass return data_list async def parse_subway_info(self, line: str) - List[TrafficData]: 解析北京地铁信息 data_list [] async with async_playwright() as p: # 使用Playwright处理JavaScript渲染的页面 browser await p.chromium.launch(headlessTrue) context await browser.new_context( viewport{width: 1920, height: 1080}, user_agentself.request.ua_manager.get_random_ua() ) page await context.new_page() try: await page.goto(self.SUBWAY_API, wait_untilnetworkidle) # 等待地铁状态元素加载 await page.wait_for_selector(.subway-status, timeout10000) # 提取地铁状态 status_elements await page.query_selector_all(.line-status) for element in status_elements: line_name await element.get_attribute(data-line) status await element.text_content() if line in line_name: traffic_data TrafficData( typeTrafficType.SUBWAY, cityself.city, line_or_roadline, statusstatus.strip(), descriptionf地铁{line}线运行状态, sourceself.SUBWAY_API, confidence0.95 ) data_list.append(traffic_data) except Exception as e: logger.error(f地铁信息获取失败: {e}) finally: await browser.close() return data_list # 任务调度器 class TrafficScheduler: 交通信息任务调度器 def __init__(self): self.parsers {} self.tasks [] self.results [] def register_parser(self, city: str, parser: BaseParser): 注册城市解析器 self.parsers[city] parser async def schedule_task(self, city: str, task_type: str, **kwargs): 调度单个任务 if city not in self.parsers: raise ValueError(f城市 {city} 的解析器未注册) parser self.parsers[city] if task_type bus: return await parser.parse_bus_info(**kwargs) elif task_type subway: return await parser.parse_subway_info(**kwargs) elif task_type road: return await parser.parse_road_info(**kwargs) else: raise ValueError(f未知任务类型: {task_type}) async def schedule_batch_tasks(self, task_list: List[Dict]) - List[TrafficData]: 批量调度任务 tasks [] for task in task_list: task_coro self.schedule_task(**task) tasks.append(task_coro) # 并发执行所有任务 results await asyncio.gather(*tasks, return_exceptionsTrue) # 处理结果 all_data [] for result in results: if isinstance(result, Exception): logger.error(f任务执行失败: {result}) elif isinstance(result, list): all_data.extend(result) return all_data # 数据处理器 class TrafficDataProcessor: 交通数据后处理器 staticmethod def filter_by_city(data_list: List[TrafficData], city: str) - List[TrafficData]: 按城市过滤 return [d for d in data_list if d.city city] staticmethod def filter_by_type(data_list: List[TrafficData], traffic_type: TrafficType) - List[TrafficData]: 按类型过滤 return [d for d in data_list if d.type traffic_type] staticmethod def filter_by_time(data_list: List[TrafficData], hours: int 1) - List[TrafficData]: 按时间过滤 cutoff datetime.now() - timedelta(hourshours) return [d for d in data_list if d.timestamp cutoff] staticmethod def calculate_congestion_stats(data_list: List[TrafficData]) - Dict: 计算拥堵统计 if not data_list: return {} congestion_levels [d.congestion_level for d in data_list if d.congestion_level is not None] if congestion_levels: return { mean: np.mean(congestion_levels), median: np.median(congestion_levels), max: np.max(congestion_levels), min: np.min(congestion_levels), std: np.std(congestion_levels) } return {} staticmethod def to_dataframe(data_list: List[TrafficData]) - pd.DataFrame: 转换为DataFrame data_dicts [d.dict() for d in data_list] return pd.DataFrame(data_dicts) staticmethod def export_to_json(data_list: List[TrafficData], filename: str): 导出为JSON data_dicts [d.dict() for d in data_list] with open(filename, w, encodingutf-8) as f: json.dump(data_dicts, f, ensure_asciiFalse, indent2, defaultstr) logger.info(f数据已导出到 {filename}) # 主程序入口 async def main(): 主函数 logger.info(开始采集交通出行信息...) # 初始化调度器 scheduler TrafficScheduler() # 注册北京解析器 beijing_parser BeijingTrafficParser(北京) scheduler.register_parser(北京, beijing_parser) # 定义采集任务 tasks [ {city: 北京, task_type: bus, line: 300}, {city: 北京, task_type: bus, line: 1}, {city: 北京, task_type: subway, line: 1}, {city: 北京, task_type: subway, line: 10}, ] # 执行批量采集 all_data await scheduler.schedule_batch_tasks(tasks) logger.info(f共采集到 {len(all_data)} 条交通信息) # 数据处理 processor TrafficDataProcessor() # 过滤最近1小时的数据 recent_data processor.filter_by_time(all_data, hours1) # 导出数据 processor.export_to_json(recent_data, traffic_data.json) # 转换为DataFrame df processor.to_dataframe(recent_data) if not df.empty: print(\n 交通信息汇总 ) print(f数据时间范围: {df[timestamp].min()} 到 {df[timestamp].max()}) print(f数据类型分布:) print(df[type].value_counts()) if congestion_level in df.columns: congestion_stats processor.calculate_congestion_stats(recent_data) print(f\n拥堵指数统计: {congestion_stats}) # 保存为CSV df.to_csv(traffic_data.csv, indexFalse, encodingutf-8-sig) logger.info(交通信息采集完成) # 监控与维护 class TrafficMonitor: 爬虫监控器 staticmethod def check_health(): 健康检查 import psutil import time cpu_percent psutil.cpu_percent() memory_info psutil.virtual_memory() logger.info(fCPU使用率: {cpu_percent}%) logger.info(f内存使用: {memory_info.percent}%) return { cpu: cpu_percent, memory: memory_info.percent, timestamp: time.time() } staticmethod def cleanup_old_data(days: int 7): 清理旧数据 import os import glob files glob.glob(traffic_data_*.json) glob.glob(traffic_data_*.csv) for file in files: file_time os.path.getmtime(file) if (time.time() - file_time) (days * 24 * 3600): os.remove(file) logger.info(f已删除旧文件: {file}) # 运行脚本 if __name__ __main__: # 运行主程序 asyncio.run(main()) # 执行健康检查 TrafficMonitor.check_health() # 清理一周前的数据 TrafficMonitor.cleanup_old_data(days7)四、高级功能扩展4.1 分布式爬虫架构python# 使用Celery实现分布式任务队列 from celery import Celery app Celery(traffic_tasks, brokerredis://localhost:6379/0, backendredis://localhost:6379/0) app.task def fetch_traffic_data(city: str, data_type: str): # 分布式任务执行 pass4.2 AI增强解析python# 使用机器学习识别验证码 import cv2 import pytesseract from PIL import Image class CaptchaSolver: AI验证码识别器 def solve(self, image_path: str) - str: # 预处理图像 img cv2.imread(image_path) gray cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) # 使用Tesseract OCR text pytesseract.image_to_string(gray, config--psm 8) return text.strip()4.3 数据可视化python# 使用Plotly创建交互式可视化 import plotly.express as px import plotly.graph_objects as go def create_traffic_dashboard(df: pd.DataFrame): 创建交通数据仪表板 # 拥堵热力图 fig1 px.density_mapbox( df, latlatitude, lonlongitude, zcongestion_level, radius10, centerdict(lat39.9042, lon116.4074), zoom10, mapbox_stylestamen-terrain ) # 时间序列图 fig2 px.line( df.sort_values(timestamp), xtimestamp, ydelay_minutes, colorline_or_road, title交通延误时间趋势 ) return fig1, fig2五、部署与优化建议5.1 Docker部署dockerfile# Dockerfile FROM python:3.10-slim WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY . . CMD [python, main.py]5.2 性能优化连接池复用使用aiohttp的连接池减少TCP握手CDN缓存合理设置缓存头减少重复请求增量爬取基于时间戳只获取新数据数据压缩使用gzip压缩传输数据负载均衡多IP轮询避免封禁5.3 反爬策略应对动态User-Agent模拟真实浏览器IP代理池自动切换代理IP请求频率控制随机延迟与请求间隔浏览器模拟使用Playwright处理JavaScript验证码识别集成AI识别系统六、法律与道德注意事项遵守robots.txt尊重网站的爬取规则控制请求频率避免对目标服务器造成压力数据使用限制仅用于个人学习与研究隐私保护不收集个人隐私信息遵守条款遵守网站的服务条款七、总结本文详细介绍了如何使用Python最新技术栈构建一个完整的交通出行信息爬虫系统。通过结合异步编程、智能代理、缓存机制、AI解析等技术我们实现了一个高效、稳定、可扩展的爬虫解决方案。关键要点使用异步编程提高并发性能实现智能代理和User-Agent轮换集成Redis缓存减少重复请求使用Playwright处理动态页面采用Pydantic进行数据验证实现完整的错误处理和监控

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询