哪家成都公司做网站长沙本地推广联系电话
2026/1/11 7:57:39 网站建设 项目流程
哪家成都公司做网站,长沙本地推广联系电话,防红短网址一键生成,身无分文一天赚2000引言#xff1a;影视资源聚合的爬虫技术挑战在当今数字化娱乐时代#xff0c;影视资源信息聚合成为用户获取影视内容的重要途径。传统的同步爬虫在应对海量影视网站时面临效率低下、反爬规避困难等问题。本文将深入探讨如何利用Python最新异步协程技术、智能解析算法和分布式…引言影视资源聚合的爬虫技术挑战在当今数字化娱乐时代影视资源信息聚合成为用户获取影视内容的重要途径。传统的同步爬虫在应对海量影视网站时面临效率低下、反爬规避困难等问题。本文将深入探讨如何利用Python最新异步协程技术、智能解析算法和分布式架构构建高效稳定的影视资源聚合爬虫系统。技术架构概览核心技术栈异步框架aiohttp asyncio 实现高并发请求解析引擎Playwright BeautifulSoup4 应对动态渲染智能代理Rotating proxy pools with automatic retry数据存储PostgreSQL Redis 异步缓存反反爬策略请求指纹随机化 浏览器特征模拟完整爬虫系统实现1. 异步爬虫核心引擎pythonimport asyncio import aiohttp import logging from typing import List, Dict, Optional from dataclasses import dataclass from urllib.parse import urljoin, urlparse import random import hashlib from datetime import datetime import json # 配置日志 logging.basicConfig( levellogging.INFO, format%(asctime)s - %(name)s - %(levelname)s - %(message)s ) logger logging.getLogger(__name__) dataclass class MovieResource: 影视资源数据模型 title: str alternative_titles: List[str] year: int directors: List[str] actors: List[str] genres: List[str] rating: Optional[float] description: str resources: List[Dict] # 播放资源列表 cover_url: str imdb_id: Optional[str] douban_id: Optional[str] update_time: datetime class AsyncMovieSpider: 异步影视爬虫基类 def __init__(self, max_concurrent: int 50, request_timeout: int 30, use_proxy: bool True): 初始化爬虫 Args: max_concurrent: 最大并发数 request_timeout: 请求超时时间 use_proxy: 是否使用代理 self.max_concurrent max_concurrent self.request_timeout aiohttp.ClientTimeout(totalrequest_timeout) self.use_proxy use_proxy self.session None self.proxy_pool self._init_proxy_pool() self.headers_pool self._init_headers_pool() def _init_proxy_pool(self) - List[str]: 初始化代理池实际使用时应从API获取 return [ http://proxy1.example.com:8080, http://proxy2.example.com:8080, # 更多代理... ] def _init_headers_pool(self) - List[Dict]: 初始化请求头池 user_agents [ Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36, Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15, Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36, ] return [ { User-Agent: ua, Accept: text/html,application/xhtmlxml,application/xml;q0.9,image/webp,*/*;q0.8, Accept-Language: zh-CN,zh;q0.8,en-US;q0.5,en;q0.3, Accept-Encoding: gzip, deflate, DNT: 1, Connection: keep-alive, Upgrade-Insecure-Requests: 1, Cache-Control: max-age0, Sec-Fetch-Dest: document, Sec-Fetch-Mode: navigate, Sec-Fetch-Site: none, Sec-Fetch-User: ?1, } for ua in user_agents ] def _get_random_headers(self) - Dict: 获取随机请求头 return random.choice(self.headers_pool) def _get_random_proxy(self) - Optional[str]: 获取随机代理 if self.use_proxy and self.proxy_pool: return random.choice(self.proxy_pool) return None async def __aenter__(self): 异步上下文管理器入口 connector aiohttp.TCPConnector( limitself.max_concurrent, sslFalse ) self.session aiohttp.ClientSession( connectorconnector, timeoutself.request_timeout ) return self async def __aexit__(self, exc_type, exc_val, exc_tb): 异步上下文管理器出口 if self.session: await self.session.close() async def fetch(self, url: str, **kwargs) - Optional[str]: 异步获取页面内容 Args: url: 目标URL **kwargs: 其他请求参数 Returns: 页面HTML内容或None headers self._get_random_headers() proxy self._get_random_proxy() try: async with self.session.get( url, headersheaders, proxyproxy, **kwargs ) as response: if response.status 200: content await response.text() logger.info(f成功获取 {url}) return content else: logger.warning(f请求失败 {url}: 状态码 {response.status}) return None except Exception as e: logger.error(f请求异常 {url}: {str(e)}) return None async def fetch_multiple(self, urls: List[str]) - Dict[str, Optional[str]]: 并发获取多个页面 Args: urls: URL列表 Returns: 字典映射 URL - 内容 tasks [] for url in urls: task asyncio.create_task(self.fetch(url)) tasks.append((url, task)) results {} for url, task in tasks: try: content await task results[url] content except Exception as e: logger.error(f任务异常 {url}: {str(e)}) results[url] None return results2. 智能解析器与动态渲染处理pythonfrom bs4 import BeautifulSoup import re from playwright.async_api import async_playwright import asyncio from typing import Set, Tuple import hashlib class SmartParser: 智能页面解析器 def __init__(self): self.title_patterns [ rtitle[^]*(.*?)/title, rh1[^]*(.*?)/h1, rclass.*?title.*?(.*?), rid.*?title.*?(.*?) ] async def parse_with_playwright(self, url: str) - Optional[str]: 使用Playwright处理动态渲染页面 Args: url: 目标URL Returns: 渲染后的HTML async with async_playwright() as p: # 启动浏览器可配置无头模式 browser await p.chromium.launch( headlessTrue, args[--disable-blink-featuresAutomationControlled] ) # 创建上下文 context await browser.new_context( user_agentMozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36, viewport{width: 1920, height: 1080} ) # 创建页面 page await context.new_page() try: # 导航到页面 await page.goto(url, wait_untilnetworkidle) # 等待可能的内容加载 await page.wait_for_timeout(2000) # 获取页面内容 content await page.content() return content except Exception as e: logger.error(fPlaywright渲染失败 {url}: {str(e)}) return None finally: await browser.close() def extract_movie_info(self, html: str, base_url: str) - Optional[MovieResource]: 从HTML中提取影视信息 Args: html: HTML内容 base_url: 基础URL Returns: MovieResource对象或None if not html: return None soup BeautifulSoup(html, lxml) # 尝试多种方式提取标题 title self._extract_title(soup) # 提取其他信息 year self._extract_year(soup) directors self._extract_directors(soup) actors self._extract_actors(soup) genres self._extract_genres(soup) rating self._extract_rating(soup) description self._extract_description(soup) cover_url self._extract_cover_url(soup, base_url) # 提取播放资源 resources self._extract_resources(soup, base_url) return MovieResource( titletitle, alternative_titles[], yearyear, directorsdirectors, actorsactors, genresgenres, ratingrating, descriptiondescription, resourcesresources, cover_urlcover_url, imdb_idNone, douban_idNone, update_timedatetime.now() ) def _extract_title(self, soup: BeautifulSoup) - str: 提取标题 # 尝试从多个位置提取 selectors [ h1.title, .movie-title, #title, meta[propertyog:title], meta[nametitle] ] for selector in selectors: element soup.select_one(selector) if element: title element.get(content) if element.name meta else element.text if title and len(title.strip()) 0: return title.strip() # 回退到页面标题 if soup.title: return soup.title.string.strip() return 未知标题 def _extract_resources(self, soup: BeautifulSoup, base_url: str) - List[Dict]: 提取播放资源 resources [] # 查找可能的资源链接 resource_patterns [ (magnet, rmagnet:\?xturn:btih:[a-zA-Z0-9]{32,40}), (ed2k, red2k://\|file\|.*), (thunder, rthunder://.*), (ftp, rftp://.*\.(mp4|avi|mkv|rmvb)), (http, rhttps?://.*\.(mp4|avi|mkv|rmvb|flv)) ] # 扫描所有文本内容 all_text soup.get_text() for resource_type, pattern in resource_patterns: matches re.findall(pattern, all_text, re.IGNORECASE) for match in matches: resources.append({ type: resource_type, url: match, quality: self._detect_quality(match), source: direct }) # 查找播放按钮 play_selectors [ a[href*play], a[href*video], button[onclick*play], .play-btn ] for selector in play_selectors: elements soup.select(selector) for element in elements: href element.get(href) or element.get(onclick, ) if href and (http in href or // in href): full_url urljoin(base_url, href) resources.append({ type: play_link, url: full_url, quality: unknown, source: player }) return resources def _detect_quality(self, url: str) - str: 检测资源质量 quality_patterns [ (r4k|2160p|uhd, 4K), (r1080p|fhd, 1080P), (r720p|hd, 720P), (r480p|sd, 480P), (rbdrip|bluray, BluRay), (rweb-dl|webdl, WEB-DL), (rdvdrip|dvd, DVD), (rts|tc|cam, 枪版) ] url_lower url.lower() for pattern, quality in quality_patterns: if re.search(pattern, url_lower): return quality return 未知3. 分布式任务调度与去重pythonimport redis import pickle import zlib from abc import ABC, abstractmethod from typing import Any, List import asyncio_redis class DistributedScheduler: 分布式任务调度器 def __init__(self, redis_url: str redis://localhost:6379/0): 初始化调度器 Args: redis_url: Redis连接URL self.redis_url redis_url self.connection_pool None self.bloom_filter None async def connect(self): 连接Redis self.connection_pool await asyncio_redis.Pool.create( hostlocalhost, port6379, poolsize10 ) async def add_url(self, queue_name: str, url: str, priority: int 0): 添加URL到队列 Args: queue_name: 队列名称 url: URL priority: 优先级越高越优先 if not await self.is_duplicate(url): score priority * 1000000 int(datetime.now().timestamp()) await self.connection_pool.zadd(queue_name, {url: score}) async def get_url(self, queue_name: str) - Optional[str]: 从队列获取URL Args: queue_name: 队列名称 Returns: URL或None results await self.connection_pool.zpopmax(queue_name, count1) if results and len(results) 0: url, _ results[0] return url.decode() if isinstance(url, bytes) else url return None async def is_duplicate(self, url: str) - bool: 检查URL是否重复 Args: url: 要检查的URL Returns: 是否重复 # 使用布隆过滤器进行快速去重 url_hash hashlib.md5(url.encode()).hexdigest() key furl:bloom:{url_hash[:2]} # 检查是否已存在 exists await self.connection_pool.getbit(key, int(url_hash[2:4], 16)) if not exists: # 标记为已存在 await self.connection_pool.setbit(key, int(url_hash[2:4], 16), 1) return False return True class MovieAggregationCrawler: 影视聚合爬虫主类 def __init__(self, start_urls: List[str], max_depth: int 3, worker_count: int 10): 初始化聚合爬虫 Args: start_urls: 起始URL列表 max_depth: 最大爬取深度 worker_count: 工作协程数量 self.start_urls start_urls self.max_depth max_depth self.worker_count worker_count self.spider AsyncMovieSpider(max_concurrentworker_count) self.parser SmartParser() self.scheduler DistributedScheduler() self.results [] self.visited_urls set() async def crawl(self): 开始爬取 logger.info(开始影视资源聚合爬取) # 连接Redis await self.scheduler.connect() # 添加起始URL for url in self.start_urls: await self.scheduler.add_url(pending_urls, url, priority10) # 创建工作协程 tasks [] for i in range(self.worker_count): task asyncio.create_task(self.worker(fworker-{i})) tasks.append(task) # 等待所有任务完成 await asyncio.gather(*tasks) logger.info(f爬取完成共获取 {len(self.results)} 个影视资源) return self.results async def worker(self, worker_id: str): 工作协程 Args: worker_id: 工作者ID logger.info(f启动工作协程 {worker_id}) while True: try: # 获取待处理URL url await self.scheduler.get_url(pending_urls) if not url: # 短暂等待后重试 await asyncio.sleep(2) continue # 检查是否已访问 if url in self.visited_urls: continue self.visited_urls.add(url) logger.info(f{worker_id} 正在处理: {url}) # 获取页面内容 html await self.spider.fetch(url) if not html: continue # 解析页面 movie_info self.parser.extract_movie_info(html, url) if movie_info: self.results.append(movie_info) logger.info(f{worker_id} 成功提取: {movie_info.title}) # 提取新链接并加入队列 new_urls await self.extract_links(html, url) for new_url in new_urls: if new_url not in self.visited_urls: await self.scheduler.add_url(pending_urls, new_url, priority1) # 礼貌性延迟 await asyncio.sleep(random.uniform(0.5, 1.5)) except Exception as e: logger.error(f{worker_id} 发生错误: {str(e)}) await asyncio.sleep(5) # 错误后等待 async def extract_links(self, html: str, base_url: str) - Set[str]: 从HTML中提取链接 Args: html: HTML内容 base_url: 基础URL Returns: 链接集合 soup BeautifulSoup(html, lxml) links set() # 查找所有a标签 for a_tag in soup.find_all(a, hrefTrue): href a_tag[href] # 过滤无效链接 if not href or href.startswith((javascript:, mailto:, tel:)): continue # 转换为绝对URL full_url urljoin(base_url, href) # 过滤非HTTP链接和特定扩展名 parsed urlparse(full_url) if parsed.scheme not in (http, https): continue # 添加有效链接 links.add(full_url) return links4. 数据存储与导出模块pythonimport asyncpg import pandas as pd from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.orm import declarative_base, sessionmaker from sqlalchemy import Column, Integer, String, Float, Text, JSON, DateTime import csv import json as json_lib from typing import List Base declarative_base() class MovieResourceDB(Base): 影视资源数据库模型 __tablename__ movie_resources id Column(Integer, primary_keyTrue) title Column(String(500), nullableFalse, indexTrue) year Column(Integer, indexTrue) directors Column(JSON) actors Column(JSON) genres Column(JSON) rating Column(Float) description Column(Text) resources Column(JSON) cover_url Column(String(1000)) imdb_id Column(String(100)) douban_id Column(String(100)) source_url Column(String(1000)) created_at Column(DateTime, defaultdatetime.now) updated_at Column(DateTime, defaultdatetime.now, onupdatedatetime.now) class DataStorage: 数据存储管理器 def __init__(self, db_url: str postgresqlasyncpg://user:passwordlocalhost/movies): 初始化数据存储 Args: db_url: 数据库URL self.db_url db_url self.engine None self.async_session None async def initialize(self): 初始化数据库连接 self.engine create_async_engine( self.db_url, echoFalse, pool_size20, max_overflow30 ) self.async_session sessionmaker( self.engine, class_AsyncSession, expire_on_commitFalse ) # 创建表 async with self.engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) async def save_movie(self, movie: MovieResource, source_url: str) - int: 保存电影资源到数据库 Args: movie: 电影资源对象 source_url: 来源URL Returns: 插入的ID async with self.async_session() as session: db_movie MovieResourceDB( titlemovie.title, yearmovie.year, directorsmovie.directors, actorsmovie.actors, genresmovie.genres, ratingmovie.rating, descriptionmovie.description, resourcesmovie.resources, cover_urlmovie.cover_url, imdb_idmovie.imdb_id, douban_idmovie.douban_id, source_urlsource_url ) session.add(db_movie) await session.commit() await session.refresh(db_movie) return db_movie.id async def batch_save(self, movies: List[MovieResource], source_urls: List[str]): 批量保存电影资源 Args: movies: 电影资源列表 source_urls: 来源URL列表 async with self.async_session() as session: for movie, source_url in zip(movies, source_urls): db_movie MovieResourceDB( titlemovie.title, yearmovie.year, directorsmovie.directors, actorsmovie.actors, genresmovie.genres, ratingmovie.rating, descriptionmovie.description, resourcesmovie.resources, cover_urlmovie.cover_url, imdb_idmovie.imdb_id, douban_idmovie.douban_id, source_urlsource_url ) session.add(db_movie) await session.commit() def export_to_csv(self, movies: List[MovieResource], filename: str): 导出到CSV文件 Args: movies: 电影资源列表 filename: 文件名 data [] for movie in movies: data.append({ 标题: movie.title, 年份: movie.year, 导演: 、.join(movie.directors), 演员: 、.join(movie.actors[:5]), # 只取前5个 类型: 、.join(movie.genres), 评分: movie.rating or 无, 简介: movie.description[:100] ... if len(movie.description) 100 else movie.description, 资源数量: len(movie.resources), 封面URL: movie.cover_url }) df pd.DataFrame(data) df.to_csv(filename, indexFalse, encodingutf-8-sig) def export_to_json(self, movies: List[MovieResource], filename: str): 导出到JSON文件 Args: movies: 电影资源列表 filename: 文件名 data [] for movie in movies: movie_dict { title: movie.title, year: movie.year, directors: movie.directors, actors: movie.actors, genres: movie.genres, rating: movie.rating, description: movie.description, resources: movie.resources, cover_url: movie.cover_url, imdb_id: movie.imdb_id, douban_id: movie.douban_id, update_time: movie.update_time.isoformat() } data.append(movie_dict) with open(filename, w, encodingutf-8) as f: json_lib.dump(data, f, ensure_asciiFalse, indent2)5. 主程序与配置pythonimport yaml import signal import sys from contextlib import asynccontextmanager class Config: 配置管理类 def __init__(self, config_file: str config.yaml): 初始化配置 Args: config_file: 配置文件路径 self.config_file config_file self.config self.load_config() def load_config(self) - Dict: 加载配置文件 default_config { crawler: { max_concurrent: 50, request_timeout: 30, max_depth: 3, worker_count: 10, use_proxy: True, delay_range: [0.5, 2.0] }, database: { url: postgresqlasyncpg://user:passwordlocalhost/movies, pool_size: 20, max_overflow: 30 }, redis: { url: redis://localhost:6379/0, poolsize: 10 }, websites: [ https://www.example-movie-site1.com, https://www.example-movie-site2.com, https://www.example-movie-site3.com ], output: { csv_path: movies.csv, json_path: movies.json } } try: with open(self.config_file, r, encodingutf-8) as f: user_config yaml.safe_load(f) # 合并配置 self.merge_config(default_config, user_config) except FileNotFoundError: logger.warning(f配置文件 {self.config_file} 不存在使用默认配置) return default_config def merge_config(self, default: Dict, user: Dict, parent_key: str ): 递归合并配置 for key, value in user.items(): if key in default: if isinstance(value, dict) and isinstance(default[key], dict): self.merge_config(default[key], value, f{parent_key}.{key}) else: default[key] value else: default[key] value asynccontextmanager async def crawler_lifetime(config: Config): 爬虫生命周期管理上下文 Args: config: 配置对象 # 初始化组件 storage DataStorage(config.config[database][url]) await storage.initialize() crawler MovieAggregationCrawler( start_urlsconfig.config[websites], max_depthconfig.config[crawler][max_depth], worker_countconfig.config[crawler][worker_count] ) try: yield crawler, storage finally: # 清理资源 logger.info(正在清理资源...) async def main(): 主函数 # 加载配置 config Config(movie_crawler_config.yaml) # 设置信号处理 def signal_handler(signum, frame): logger.info(收到停止信号正在优雅退出...) sys.exit(0) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) async with crawler_lifetime(config) as (crawler, storage): try: # 开始爬取 movies await crawler.crawl() # 保存到数据库 if movies: source_urls [list(crawler.visited_urls)[i % len(crawler.visited_urls)] for i in range(len(movies))] await storage.batch_save(movies, source_urls) logger.info(f已保存 {len(movies)} 条记录到数据库) # 导出文件 if movies: storage.export_to_csv(movies, config.config[output][csv_path]) storage.export_to_json(movies, config.config[output][json_path]) logger.info(f已导出数据到文件) # 生成统计报告 generate_report(movies) except Exception as e: logger.error(f爬虫执行失败: {str(e)}, exc_infoTrue) def generate_report(movies: List[MovieResource]): 生成统计报告 if not movies: logger.info(未获取到电影数据) return total_movies len(movies) total_resources sum(len(movie.resources) for movie in movies) # 统计年份分布 year_dist {} for movie in movies: if movie.year: year_dist[movie.year] year_dist.get(movie.year, 0) 1 # 统计类型分布 genre_dist {} for movie in movies: for genre in movie.genres: genre_dist[genre] genre_dist.get(genre, 0) 1 # 统计资源类型 resource_type_dist {} for movie in movies: for resource in movie.resources: rtype resource.get(type, unknown) resource_type_dist[rtype] resource_type_dist.get(rtype, 0) 1 logger.info( * 50) logger.info(爬取统计报告) logger.info( * 50) logger.info(f总电影数量: {total_movies}) logger.info(f总资源数量: {total_resources}) logger.info(f平均资源/电影: {total_resources/total_movies:.2f}) logger.info(f年份分布 (前10): {dict(sorted(year_dist.items(), keylambda x: x[1], reverseTrue)[:10])}) logger.info(f类型分布 (前10): {dict(sorted(genre_dist.items(), keylambda x: x[1], reverseTrue)[:10])}) logger.info(f资源类型分布: {resource_type_dist}) logger.info( * 50) if __name__ __main__: # 运行主程序 asyncio.run(main())配置文件示例 (config.yaml)yamlcrawler: max_concurrent: 100 request_timeout: 60 max_depth: 5 worker_count: 20 use_proxy: true delay_range: [0.3, 1.5] database: url: postgresqlasyncpg://movie_user:password123localhost:5432/movie_db pool_size: 50 max_overflow: 100 redis: url: redis://localhost:6379/0 poolsize: 20 websites: - https://www.imdb.com - https://www.douban.com - https://www.netflix.com - https://www.hulu.com - https://www.amazon.com/primevideo output: csv_path: data/movies_export.csv json_path: data/movies_export.json database_backup: data/movies_backup.sql proxy: api_url: https://proxy-provider.com/api/get api_key: your_api_key_here check_interval: 300部署与优化建议1. 容器化部署 (Docker)dockerfile# Dockerfile FROM python:3.9-slim WORKDIR /app # 安装系统依赖 RUN apt-get update apt-get install -y \ wget \ gnupg \ wget -q -O - https://dl-ssl.google.com/linux/linux_signing_key.pub | apt-key add - \ echo deb [archamd64] http://dl.google.com/linux/chrome/deb/ stable main /etc/apt/sources.list.d/google.list \ apt-get update apt-get install -y \ google-chrome-stable \ fonts-ipafont-gothic \ fonts-wqy-zenhei \ fonts-thai-tlwg \ fonts-kacst \ fonts-freefont-ttf \ libxss1 \ --no-install-recommends \ rm -rf /var/lib/apt/lists/* # 安装Python依赖 COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # 复制源代码 COPY . . # 运行爬虫 CMD [python, main.py]2. 性能优化技巧连接池优化调整数据库和HTTP连接池大小缓存策略使用Redis缓存已解析页面智能延迟根据网站响应动态调整请求间隔增量爬取只爬取更新的内容故障转移实现多个数据源备份3. 监控与日志python# 监控装饰器 def monitor_performance(func): async def wrapper(*args, **kwargs): start_time time.time() result await func(*args, **kwargs) end_time time.time() logger.info(f{func.__name__} 执行时间: {end_time - start_time:.2f}秒) return result return wrapper # 应用监控 monitor_performance async def crawl_with_monitoring(): # 爬取逻辑 pass法律与道德声明重要提示本爬虫代码仅用于技术学习和研究目的在实际使用前请确保遵守目标网站的robots.txt协议尊重网站的服务条款控制请求频率避免对目标网站造成负担仅爬取公开可访问的信息不得将本代码用于侵犯版权的内容获取商业盗版资源收集任何违法活动总结本文详细介绍了基于Python最新异步技术的影视资源信息聚合爬虫的实现。该系统采用了多种先进技术异步并发架构使用asyncio和aiohttp实现高并发请求智能解析系统结合BeautifulSoup和Playwright处理静态和动态页面分布式调度基于Redis的分布式任务队列和去重机制数据持久化支持多种存储后端和导出格式容错机制完善的错误处理和重试逻辑

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询