做会所在哪个网站推广怎么做网站文字图片
2026/3/29 20:59:41 网站建设 项目流程
做会所在哪个网站推广,怎么做网站文字图片,二手车网站建设,特色的佛山网站建设一、项目概述在本文中#xff0c;我将详细介绍如何使用Python最新技术栈构建一个高效、健壮的豆瓣电影Top250爬虫。我们将使用异步编程#xff08;aiohttp asyncio#xff09;、现代HTML解析库#xff08;parsel#xff09;以及数据持久化技术#xff0c;实现一个完整的…一、项目概述在本文中我将详细介绍如何使用Python最新技术栈构建一个高效、健壮的豆瓣电影Top250爬虫。我们将使用异步编程aiohttp asyncio、现代HTML解析库parsel以及数据持久化技术实现一个完整的网络爬虫项目。这个项目不仅适合初学者学习爬虫基础也适合有一定经验的开发者了解最新的爬虫技术趋势。二、技术栈介绍1.异步HTTP客户端aiohttp传统的requests库是同步的在大量请求时效率较低。aiohttp基于asyncio支持高并发请求能显著提高爬虫效率。2.异步任务管理asyncioPython的异步IO框架用于管理并发任务避免阻塞等待。3.HTML解析parsel比BeautifulSoup更快的解析库语法类似Scrapy的Selector支持XPath和CSS选择器。4.数据存储SQLAlchemy SQLiteORM框架使数据库操作更加方便SQLite适合小型项目。5.其他工具fake-useragent随机生成User-Agenttenacity优雅的重试机制pandas数据分析和导出rich美观的控制台输出三、完整爬虫代码实现python 豆瓣电影Top250爬虫 - 使用最新异步技术栈 作者Python爬虫专家 创建时间2024年 import asyncio import sqlite3 from datetime import datetime from typing import List, Dict, Optional from dataclasses import dataclass, asdict import json import aiohttp from aiohttp import TCPConnector import pandas as pd from parsel import Selector from fake_useragent import UserAgent from tenacity import retry, stop_after_attempt, wait_exponential from sqlalchemy import create_engine, Column, Integer, String, Float, Text from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from rich.console import Console from rich.progress import Progress, SpinnerColumn, TextColumn from rich.table import Table from rich import print as rprint # 数据类定义 dataclass class Movie: 电影数据类 ranking: int # 排名 title: str # 标题 original_title: str # 原始标题 year: str # 年份 directors: str # 导演 actors: str # 主演 regions: str # 制片地区 categories: str # 类型 rating: float # 评分 rating_count: int # 评分人数 quote: str # 经典台词 cover_url: str # 封面URL detail_url: str # 详情页URL def to_dict(self) - Dict: 转换为字典 return asdict(self) # 数据库模型 Base declarative_base() class MovieModel(Base): 电影数据库模型 __tablename__ movies id Column(Integer, primary_keyTrue) ranking Column(Integer, nullableFalse, indexTrue) title Column(String(200), nullableFalse) original_title Column(String(200)) year Column(String(20)) directors Column(String(500)) actors Column(String(1000)) regions Column(String(100)) categories Column(String(100)) rating Column(Float) rating_count Column(Integer) quote Column(Text) cover_url Column(String(500)) detail_url Column(String(500), uniqueTrue) created_at Column(String(50), defaultlambda: datetime.now().strftime(%Y-%m-%d %H:%M:%S)) class DoubanMovieSpider: 豆瓣电影Top250爬虫类 def __init__(self, concurrency: int 10): 初始化爬虫 Args: concurrency: 并发数 self.concurrency concurrency self.base_url https://movie.douban.com/top250 self.console Console() self.ua UserAgent() # 初始化数据库 self.engine create_engine(sqlite:///douban_movies.db, echoFalse) Base.metadata.create_all(self.engine) Session sessionmaker(bindself.engine) self.session Session() # 请求头 self.headers { Accept: text/html,application/xhtmlxml,application/xml;q0.9,image/webp,*/*;q0.8, Accept-Language: zh-CN,zh;q0.8,zh-TW;q0.7,zh-HK;q0.5,en-US;q0.3,en;q0.2, Accept-Encoding: gzip, deflate, br, Connection: keep-alive, Upgrade-Insecure-Requests: 1, } async def fetch_page(self, session: aiohttp.ClientSession, url: str) - Optional[str]: 异步获取页面内容 Args: session: aiohttp会话 url: 目标URL Returns: 页面HTML内容 retry( stopstop_after_attempt(3), waitwait_exponential(multiplier1, min2, max10) ) async def _fetch(): headers self.headers.copy() headers[User-Agent] self.ua.random async with session.get( url, headersheaders, timeoutaiohttp.ClientTimeout(total10), sslFalse ) as response: if response.status 200: return await response.text(encodingutf-8) else: self.console.log(f[red]请求失败: {url}, 状态码: {response.status}[/red]) return None try: return await _fetch() except Exception as e: self.console.log(f[red]请求异常: {url}, 错误: {e}[/red]) return None def parse_movie_list(self, html: str) - List[str]: 解析电影列表页获取电影详情页链接 Args: html: 列表页HTML Returns: 电影详情页URL列表 selector Selector(texthtml) detail_urls [] # 提取详情页链接 items selector.css(.grid_view .item) for item in items: detail_path item.css(.hd a::attr(href)).get() if detail_path: detail_urls.append(detail_path) # 检查是否有下一页 next_page selector.css(.next a::attr(href)).get() return detail_urls, next_page def parse_movie_detail(self, html: str, url: str) - Optional[Movie]: 解析电影详情页 Args: html: 详情页HTML url: 详情页URL Returns: 电影对象 try: selector Selector(texthtml) # 提取基本信息 title selector.css(h1 span:nth-child(1)::text).get().strip() original_title selector.css(.original-title::text).get() if original_title: original_title original_title.replace(原名: , ).strip() # 提取年份 year_text selector.css(.year::text).get() year year_text.strip(()) if year_text else # 提取导演和演员 directors [] actors [] # 从详情区域提取信息 info selector.css(#info).get() info_selector Selector(textinfo) # 导演 for director in info_selector.css(span:contains(导演) span a::text).getall(): directors.append(director.strip()) # 演员前3位 for actor in info_selector.css(span:contains(主演) span a::text).getall()[:3]: actors.append(actor.strip()) # 地区和类型 regions [] categories [] # 地区 regions_text info_selector.css(span:contains(制片国家/地区)::text).get() if regions_text and : in regions_text: regions [r.strip() for r in regions_text.split(:)[1].split(/)] # 类型 for category in info_selector.css(span[propertyv:genre]::text).getall(): categories.append(category.strip()) # 评分信息 rating_text selector.css(.rating_num::text).get(0) rating float(rating_text) if rating_text else 0.0 rating_count_text selector.css(.rating_people span::text).get(0) rating_count int(rating_count_text.replace(,, )) if rating_count_text else 0 # 经典台词 quote selector.css(.related-info .indent span::text).get() if not quote: quote selector.css(.review-content p::text).get() quote quote.strip() if quote else # 封面URL cover_url selector.css(#mainpic img::attr(src)).get() # 排名从URL中提取或从列表页获取 ranking 0 return Movie( rankingranking, titletitle, original_titleoriginal_title, yearyear, directors, .join(directors), actors, .join(actors), regions, .join(regions), categories, .join(categories), ratingrating, rating_countrating_count, quotequote, cover_urlcover_url, detail_urlurl ) except Exception as e: self.console.log(f[red]解析失败: {url}, 错误: {e}[/red]) return None async def process_movie(self, session: aiohttp.ClientSession, url: str, ranking: int) - Optional[Movie]: 处理单个电影 Args: session: aiohttp会话 url: 电影详情页URL ranking: 电影排名 Returns: 电影对象 html await self.fetch_page(session, url) if html: movie self.parse_movie_detail(html, url) if movie: movie.ranking ranking return movie return None async def worker(self, session: aiohttp.ClientSession, queue: asyncio.Queue, results: List[Movie], progress: Progress, task_id: int): 工作线程 Args: session: aiohttp会话 queue: 任务队列 results: 结果列表 progress: 进度条 task_id: 任务ID while True: try: url, ranking await queue.get() movie await self.process_movie(session, url, ranking) if movie: results.append(movie) progress.update(task_id, advance1, descriptionf[green]已获取: {movie.title}) queue.task_done() except asyncio.CancelledError: break except Exception as e: self.console.log(f[red]工作线程异常: {e}[/red]) queue.task_done() async def crawl(self) - List[Movie]: 主爬取函数 Returns: 电影列表 self.console.log([bold blue]开始爬取豆瓣电影Top250...[/bold blue]) all_movies [] current_page 0 # 创建aiohttp会话 connector TCPConnector(limitself.concurrency, sslFalse) async with aiohttp.ClientSession(connectorconnector) as session: # 第一步获取所有电影详情页URL self.console.log([yellow]获取电影列表...[/yellow]) detail_urls_with_ranking [] ranking_counter 1 start_url self.base_url while start_url: full_url fhttps://movie.douban.com/top250{start_url} if start_url.startswith(?) else start_url html await self.fetch_page(session, full_url) if not html: break detail_urls, next_page self.parse_movie_list(html) # 为每个URL添加排名 for url in detail_urls: detail_urls_with_ranking.append((url, ranking_counter)) ranking_counter 1 current_page 1 self.console.log(f[cyan]已获取第{current_page}页累计发现{len(detail_urls_with_ranking)}部电影[/cyan]) if next_page: start_url next_page await asyncio.sleep(2) # 礼貌性延迟 else: break if not detail_urls_with_ranking: self.console.log([red]未获取到电影列表[/red]) return [] # 第二步并发获取详情页 self.console.log(f[yellow]开始获取{len(detail_urls_with_ranking)}部电影的详细信息...[/yellow]) # 创建任务队列 queue asyncio.Queue() for url_ranking in detail_urls_with_ranking: await queue.put(url_ranking) # 创建结果列表和进度条 results [] with Progress( SpinnerColumn(), TextColumn([progress.description]{task.description}), consoleself.console ) as progress: task_id progress.add_task( [cyan]爬取进度..., totallen(detail_urls_with_ranking) ) # 创建工作线程 workers [] for i in range(min(self.concurrency, len(detail_urls_with_ranking))): worker_task asyncio.create_task( self.worker(session, queue, results, progress, task_id) ) workers.append(worker_task) # 等待所有任务完成 await queue.join() # 取消工作线程 for worker in workers: worker.cancel() # 等待所有工作线程结束 await asyncio.gather(*workers, return_exceptionsTrue) all_movies results return all_movies def save_to_database(self, movies: List[Movie]): 保存数据到数据库 Args: movies: 电影列表 try: count 0 for movie in movies: # 检查是否已存在 existing self.session.query(MovieModel).filter_by( detail_urlmovie.detail_url ).first() if not existing: movie_dict movie.to_dict() movie_model MovieModel(**movie_dict) self.session.add(movie_model) count 1 self.session.commit() self.console.log(f[green]成功保存{count}条数据到数据库[/green]) except Exception as e: self.console.log(f[red]保存到数据库失败: {e}[/red]) self.session.rollback() def save_to_json(self, movies: List[Movie], filename: str douban_top250.json): 保存数据到JSON文件 Args: movies: 电影列表 filename: 文件名 try: movies_dict [movie.to_dict() for movie in movies] with open(filename, w, encodingutf-8) as f: json.dump(movies_dict, f, ensure_asciiFalse, indent2) self.console.log(f[green]数据已保存到 {filename}[/green]) except Exception as e: self.console.log(f[red]保存到JSON失败: {e}[/red]) def save_to_excel(self, movies: List[Movie], filename: str douban_top250.xlsx): 保存数据到Excel文件 Args: movies: 电影列表 filename: 文件名 try: movies_dict [movie.to_dict() for movie in movies] df pd.DataFrame(movies_dict) # 按排名排序 df df.sort_values(ranking) # 保存到Excel df.to_excel(filename, indexFalse, engineopenpyxl) self.console.log(f[green]数据已保存到 {filename}[/green]) except Exception as e: self.console.log(f[red]保存到Excel失败: {e}[/red]) def display_results(self, movies: List[Movie], limit: int 10): 显示结果 Args: movies: 电影列表 limit: 显示数量 if not movies: self.console.log([red]没有获取到数据[/red]) return # 按排名排序 sorted_movies sorted(movies, keylambda x: x.ranking) # 创建表格 table Table(title豆瓣电影Top250 (前10部), show_headerTrue, header_stylebold magenta) table.add_column(排名, styledim, width5) table.add_column(标题, stylebold, width30) table.add_column(导演, width20) table.add_column(评分, justifyright, width8) table.add_column(年份, width6) for movie in sorted_movies[:limit]: table.add_row( str(movie.ranking), movie.title, movie.directors[:20] ... if len(movie.directors) 20 else movie.directors, str(movie.rating), movie.year ) self.console.print(table) # 显示统计信息 total_movies len(movies) avg_rating sum(movie.rating for movie in movies) / total_movies if total_movies 0 else 0 total_ratings sum(movie.rating_count for movie in movies) self.console.print(f\n[bold cyan]统计信息:[/bold cyan]) self.console.print(f 获取电影总数: {total_movies}) self.console.print(f 平均评分: {avg_rating:.2f}) self.console.print(f 总评分人数: {total_ratings:,}) # 按类型统计 categories_count {} for movie in movies: for category in movie.categories.split(, ): if category: categories_count[category] categories_count.get(category, 0) 1 if categories_count: self.console.print(f\n[bold cyan]电影类型分布:[/bold cyan]) for category, count in sorted(categories_count.items(), keylambda x: x[1], reverseTrue)[:5]: self.console.print(f {category}: {count}部) async def run(self): 运行爬虫 start_time datetime.now() try: # 爬取数据 movies await self.crawl() if movies: # 显示结果 self.display_results(movies) # 保存数据 self.save_to_database(movies) self.save_to_json(movies) self.save_to_excel(movies) end_time datetime.now() duration (end_time - start_time).total_seconds() self.console.print(f\n[bold green]爬取完成![/bold green]) self.console.print(f 耗时: {duration:.2f}秒) self.console.print(f 平均每部电影: {duration/len(movies):.2f}秒) else: self.console.print([bold red]爬取失败未获取到数据[/bold red]) except KeyboardInterrupt: self.console.print(\n[yellow]爬虫被用户中断[/yellow]) except Exception as e: self.console.print(f[bold red]爬虫运行异常: {e}[/bold red]) import traceback traceback.print_exc() def main(): 主函数 # 创建控制台输出 console Console() console.print([bold magenta]豆瓣电影Top250爬虫[/bold magenta]) console.print( * 50) console.print(特点:) console.print( • 使用异步技术提高爬取效率) console.print( • 支持自动重试和错误处理) console.print( • 多种数据存储方式) console.print( • 友好的进度显示) console.print( • 数据分析和统计) console.print( * 50) # 配置参数 concurrency 5 # 并发数避免给服务器造成压力 # 创建爬虫实例 spider DoubanMovieSpider(concurrencyconcurrency) # 运行爬虫 asyncio.run(spider.run()) if __name__ __main__: main()

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询