网站开发的两种模式免费网站优化工具
2026/3/26 9:06:12 网站建设 项目流程
网站开发的两种模式,免费网站优化工具,建设手机银行app下载安装最新版,为什么不用原来的网站做推广一、引言#xff1a;房源信息聚合的重要性与挑战在当今数字化房地产市场中#xff0c;房源信息分散在多个平台如链家、安居客、贝壳等#xff0c;用户需要同时浏览多个网站才能获得全面信息。本文介绍如何使用Python最新技术构建一个高效、稳定的跨平台房源信息聚合系统房源信息聚合的重要性与挑战在当今数字化房地产市场中房源信息分散在多个平台如链家、安居客、贝壳等用户需要同时浏览多个网站才能获得全面信息。本文介绍如何使用Python最新技术构建一个高效、稳定的跨平台房源信息聚合系统通过智能解析和异步处理技术实现多平台房源数据的自动化采集与整合。二、技术栈选择异步框架:aiohttpasyncio- 实现高并发数据抓取解析引擎:parselBeautifulSoup4- 支持CSS和XPath混合选择器浏览器自动化:playwright- 处理动态渲染页面数据存储:SQLAlchemyAlembic- ORM与数据库迁移反反爬策略: 代理池、请求指纹伪装、浏览器特征模拟部署监控:scrapyscrapy-playwright- 生产级爬虫架构三、系统架构设计3.1 整体架构text数据源层(链家/安居客/贝壳) → 爬虫调度层 → 解析层 → 数据清洗层 → 存储层 → API服务层3.2 模块划分代理管理模块: 维护动态代理池请求管理模块: 处理请求头、Cookie、会话管理解析器工厂: 多平台解析器动态调度数据标准化: 统一不同平台的数据格式异常处理: 智能重试与降级策略四、核心代码实现4.1 异步请求基础框架pythonimport asyncio import aiohttp from aiohttp import TCPConnector from typing import Dict, Any, Optional import logging from dataclasses import dataclass from urllib.parse import urljoin import json logging.basicConfig(levellogging.INFO) logger logging.getLogger(__name__) dataclass class RequestConfig: 请求配置类 timeout: int 30 max_retries: int 3 delay: float 1.0 use_proxy: bool True headers: Optional[Dict] None class AsyncRequestClient: 异步HTTP客户端 def __init__(self, config: RequestConfig None): self.config config or RequestConfig() self.session None self.proxy_pool [] # 代理池实例 async def __aenter__(self): connector TCPConnector(limit100, sslFalse) self.session aiohttp.ClientSession( connectorconnector, headersself._get_default_headers() ) return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.session.close() def _get_default_headers(self): 生成动态请求头 return { User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36, Accept: text/html,application/xhtmlxml,application/xml;q0.9,*/*;q0.8, Accept-Language: zh-CN,zh;q0.9,en;q0.8, Accept-Encoding: gzip, deflate, br, Connection: keep-alive, Upgrade-Insecure-Requests: 1, Sec-Fetch-Dest: document, Sec-Fetch-Mode: navigate, Sec-Fetch-Site: none, Sec-Fetch-User: ?1, Cache-Control: max-age0, } async def fetch(self, url: str, method: str GET, **kwargs) - Optional[str]: 执行异步请求 for attempt in range(self.config.max_retries): try: proxy self._get_proxy() if self.config.use_proxy else None async with self.session.request( methodmethod, urlurl, proxyproxy, timeoutaiohttp.ClientTimeout(totalself.config.timeout), **kwargs ) as response: if response.status 200: content await response.text() logger.info(fSuccessfully fetched {url}) return content elif response.status in [403, 429]: logger.warning(fBlocked by {url}, retrying...) await self._rotate_proxy() await asyncio.sleep(self.config.delay * 2) else: logger.error(fHTTP {response.status} for {url}) except Exception as e: logger.error(fAttempt {attempt 1} failed for {url}: {str(e)}) await asyncio.sleep(self.config.delay * (attempt 1)) return None def _get_proxy(self): 从代理池获取代理 if self.proxy_pool: import random return random.choice(self.proxy_pool) return None async def _rotate_proxy(self): 切换代理 # 实现代理切换逻辑 pass4.2 智能解析器引擎pythonfrom parsel import Selector import re from enum import Enum import hashlib class Platform(Enum): LIANJIA lianjia ANJUKE anjuke BEIKE beike class ParserFactory: 解析器工厂 staticmethod def get_parser(platform: Platform): parsers { Platform.LIANJIA: LianJiaParser, Platform.ANJUKE: AnJuKeParser, Platform.BEIKE: BeiKeParser } return parsers.get(platform, BaseParser)() class BaseParser: 基础解析器 def extract_price(self, text: str) - float: 提取价格信息 patterns [ r(\d(?:\.\d)?)\s*万, r¥\s*(\d(?:\.\d)?), r(\d(?:\.\d)?)\s*元/月 ] for pattern in patterns: match re.search(pattern, text) if match: return float(match.group(1)) return 0.0 def extract_area(self, text: str) - float: 提取面积 pattern r(\d(?:\.\d)?)\s*㎡ match re.search(pattern, text) return float(match.group(1)) if match else 0.0 def clean_text(self, text: str) - str: 清洗文本 if not text: return return re.sub(r\s, , text).strip() class LianJiaParser(BaseParser): 链家解析器 def parse_listing(self, html: str) - Dict[str, Any]: 解析房源列表页 selector Selector(html) items [] for house in selector.css(.sellListContent li): item { platform: lianjia, house_id: house.css(::attr(data-lj_action_housedel_id)).get(), title: self.clean_text(house.css(.title a::text).get()), price: self.extract_price(house.css(.totalPrice span::text).get() or ), unit_price: self.extract_price(house.css(.unitPrice span::text).get() or ), area: self.extract_area(house.css(.houseInfo::text).get() or ), district: self.clean_text(house.css(.positionInfo a::text).get() or ), community: self.clean_text(house.css(.positionInfo a::text)[1].get() if len(house.css(.positionInfo a::text)) 1 else ), url: urljoin(https://sh.lianjia.com, house.css(.title a::attr(href)).get()), tags: [tag.get() for tag in house.css(.tag span::text)] } items.append(item) # 提取分页信息 pagination { total_pages: self._extract_total_pages(selector), current_page: self._extract_current_page(selector) } return {items: items, pagination: pagination} def _extract_total_pages(self, selector): 提取总页数 page_data selector.css(.page-box.house-lst-page-box::attr(page-data)).get() if page_data: try: return json.loads(page_data).get(totalPage, 1) except: pass return 1 def _extract_current_page(self, selector): 提取当前页数 current selector.css(.content .page-box a.on::text).get() return int(current) if current and current.isdigit() else 1 class AnJuKeParser(BaseParser): 安居客解析器 def parse_listing(self, html: str) - Dict[str, Any]: 解析安居客房源列表 selector Selector(html) items [] for house in selector.css(.house-list .list-item): item { platform: anjuke, house_id: house.css(::attr(data-id)).get(), title: self.clean_text(house.css(.house-title a::text).get()), price: self.extract_price(house.css(.price .price-con::text).get() or ), unit_price: self.extract_price(house.css(.unit-price::text).get() or ), area: self.extract_area(house.css(.details-item .area span::text).get() or ), district: self.clean_text(house.css(.address .area a::text).get() or ), community: self.clean_text(house.css(.address .comm-address a::text).get() or ), url: house.css(.house-title a::attr(href)).get(), layout: self.clean_text(house.css(.details-item .area::text).get() or ), } items.append(item) return {items: items, pagination: self._parse_pagination(selector)} def _parse_pagination(self, selector): 解析分页 # 实现分页解析逻辑 pass class BeiKeParser(BaseParser): 贝壳解析器 # 贝壳解析器实现类似4.3 Playwright动态渲染处理pythonfrom playwright.async_api import async_playwright import asyncio class DynamicPageCrawler: 处理动态渲染页面 def __init__(self, headless: bool True): self.headless headless self.browser None self.context None async def __aenter__(self): playwright await async_playwright().start() self.browser await playwright.chromium.launch( headlessself.headless, args[ --disable-blink-featuresAutomationControlled, --disable-dev-shm-usage, --no-sandbox ] ) # 创建上下文模拟真实浏览器 self.context await self.browser.new_context( viewport{width: 1920, height: 1080}, user_agentMozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36, localezh-CN, timezone_idAsia/Shanghai, permissions[geolocation], extra_http_headers{ Accept-Language: zh-CN,zh;q0.9, } ) # 屏蔽WebDriver特性 await self.context.add_init_script( Object.defineProperty(navigator, webdriver, { get: () undefined }); window.chrome { runtime: {} }; ) return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.browser.close() async def crawl_page(self, url: str, wait_for: str None, timeout: int 30000): 爬取动态页面 page await self.context.new_page() try: # 设置请求拦截优化性能 await page.route(**/*.{png,jpg,jpeg,gif,svg,woff,woff2}, lambda route: route.abort()) # 监听请求 page.on(request, lambda request: logger.debug(f {request.method} {request.url})) page.on(response, lambda response: logger.debug(f {response.status} {response.url})) # 导航到页面 await page.goto(url, wait_untilnetworkidle) # 等待特定元素 if wait_for: await page.wait_for_selector(wait_for, timeouttimeout) # 随机滚动模拟用户行为 await self._simulate_scroll(page) # 获取页面内容 content await page.content() # 提取JSON-LD结构化数据 structured_data await self._extract_structured_data(page) # 截屏调试用 # await page.screenshot(pathfscreenshot_{hashlib.md5(url.encode()).hexdigest()[:8]}.png) return { html: content, structured_data: structured_data, url: url, title: await page.title() } except Exception as e: logger.error(fError crawling {url}: {str(e)}) return None finally: await page.close() async def _simulate_scroll(self, page, scroll_times: int 3): 模拟滚动行为 import random for i in range(scroll_times): scroll_height random.randint(300, 800) await page.evaluate(fwindow.scrollBy(0, {scroll_height})) await asyncio.sleep(random.uniform(0.5, 2.0)) async def _extract_structured_data(self, page): 提取结构化数据JSON-LD try: return await page.evaluate( () { const scripts document.querySelectorAll(script[typeapplication/ldjson]); return Array.from(scripts).map(script { try { return JSON.parse(script.textContent); } catch (e) { return null; } }).filter(data data ! null); } ) except: return []4.4 数据存储与管理pythonfrom sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, Text, JSON from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from datetime import datetime import pandas as pd Base declarative_base() class HouseListing(Base): 房源信息数据模型 __tablename__ house_listings id Column(Integer, primary_keyTrue) house_id Column(String(100), uniqueTrue, indexTrue) platform Column(String(50), indexTrue) title Column(String(500)) price Column(Float) unit_price Column(Float) area Column(Float) layout Column(String(50)) floor Column(String(100)) orientation Column(String(50)) district Column(String(100)) community Column(String(200)) address Column(String(500)) longitude Column(Float) latitude Column(Float) tags Column(JSON) url Column(String(1000)) raw_data Column(JSON) created_at Column(DateTime, defaultdatetime.now) updated_at Column(DateTime, defaultdatetime.now, onupdatedatetime.now) def to_dict(self): return { field.name: getattr(self, field.name) for field in self.__table__.c if field.name not in [raw_data] } class DataManager: 数据管理器 def __init__(self, db_url: str sqlite:///houses.db): self.engine create_engine(db_url) self.Session sessionmaker(bindself.engine) Base.metadata.create_all(self.engine) def save_listings(self, listings: list): 保存房源信息 session self.Session() try: for listing_data in listings: # 检查是否已存在 existing session.query(HouseListing).filter_by( house_idlisting_data[house_id], platformlisting_data[platform] ).first() if existing: # 更新现有记录 for key, value in listing_data.items(): if hasattr(existing, key): setattr(existing, key, value) existing.updated_at datetime.now() else: # 创建新记录 house HouseListing(**listing_data) session.add(house) session.commit() logger.info(fSaved/Updated {len(listings)} listings) except Exception as e: session.rollback() logger.error(fError saving listings: {str(e)}) raise finally: session.close() def export_to_csv(self, filepath: str): 导出到CSV session self.Session() try: query session.query(HouseListing) df pd.read_sql(query.statement, session.bind) df.to_csv(filepath, indexFalse, encodingutf-8-sig) logger.info(fExported data to {filepath}) finally: session.close()4.5 主爬虫调度器pythonimport asyncio from typing import List import signal import sys class HouseSpiderScheduler: 爬虫调度器 def __init__(self, platforms: List[str] None): self.platforms platforms or [lianjia, anjuke, beike] self.request_client None self.dynamic_crawler None self.data_manager DataManager() self.tasks [] async def initialize(self): 初始化组件 self.request_client AsyncRequestClient() self.dynamic_crawler DynamicPageCrawler(headlessTrue) async def crawl_platform(self, platform: str, city: str sh, max_pages: int 10): 爬取特定平台 urls self._generate_urls(platform, city, max_pages) async with self.request_client as client: tasks [self._crawl_page(client, platform, url) for url in urls] results await asyncio.gather(*tasks, return_exceptionsTrue) # 处理结果 all_listings [] for result in results: if isinstance(result, Exception): logger.error(fCrawl error: {str(result)}) elif result: all_listings.extend(result) # 保存数据 if all_listings: self.data_manager.save_listings(all_listings) return len(all_listings) def _generate_urls(self, platform: str, city: str, max_pages: int): 生成爬取URL templates { lianjia: fhttps://{city}.lianjia.com/ershoufang/pg{{}}/, anjuke: fhttps://{city}.anjuke.com/sale/p{{}}/, beike: fhttps://{city}.ke.com/ershoufang/pg{{}}/ } if platform not in templates: return [] return [templates[platform].format(i) for i in range(1, max_pages 1)] async def _crawl_page(self, client, platform: str, url: str): 爬取单个页面 try: # 首先尝试静态爬取 html await client.fetch(url) if not html: return [] # 使用相应解析器 parser ParserFactory.get_parser(Platform(platform)) result parser.parse_listing(html) # 如果需要使用动态爬取获取详情页 if result.get(items): detail_tasks [ self._crawl_detail(client, parser, item[url]) for item in result[items][:3] # 限制并发防止被封 ] detail_results await asyncio.gather(*detail_tasks) # 合并详情信息 for item, detail in zip(result[items], detail_results): if detail: item.update(detail) return result.get(items, []) except Exception as e: logger.error(fError crawling {url}: {str(e)}) return [] async def _crawl_detail(self, client, parser, url: str): 爬取详情页 try: html await client.fetch(url) if not html: return {} # 这里可以添加详情页解析逻辑 # 例如解析房源描述、图片、配套设施等 return { detail_fetched: True, fetched_at: datetime.now().isoformat() } except Exception as e: logger.error(fError crawling detail {url}: {str(e)}) return {} async def run(self, cities: List[str] None): 运行爬虫 cities cities or [sh, bj, gz, sz] logger.info(Starting house spider scheduler...) await self.initialize() # 为每个城市和平台创建任务 tasks [] for city in cities: for platform in self.platforms: task asyncio.create_task( self.crawl_platform(platform, city, max_pages3) ) tasks.append(task) # 等待所有任务完成 results await asyncio.gather(*tasks, return_exceptionsTrue) total_listings 0 for result in results: if isinstance(result, int): total_listings result logger.info(fCrawl completed. Total listings: {total_listings}) # 导出数据 self.data_manager.export_to_csv(fhouse_listings_{datetime.now().strftime(%Y%m%d_%H%M%S)}.csv) return total_listings def signal_handler(signum, frame): 信号处理 logger.info(Received shutdown signal, cleaning up...) sys.exit(0) async def main(): 主函数 # 注册信号处理 signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) # 创建调度器并运行 scheduler HouseSpiderScheduler() try: total await scheduler.run([sh]) # 先爬取上海 print(f\n 爬取完成共获取 {total} 条房源信息) # 显示统计信息 session scheduler.data_manager.Session() try: from sqlalchemy import func stats session.query( HouseListing.platform, func.count(HouseListing.id).label(count), func.avg(HouseListing.price).label(avg_price) ).group_by(HouseListing.platform).all() print(\n 平台统计:) for platform, count, avg_price in stats: print(f {platform}: {count} 条记录, 平均价格: {avg_price:.2f} 万) finally: session.close() except Exception as e: logger.error(fMain execution error: {str(e)}) if __name__ __main__: # 设置事件循环策略Windows兼容 if sys.platform win32: asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) # 运行异步主函数 asyncio.run(main())五、高级特性实现5.1 反反爬策略增强pythonclass AntiAntiCrawler: 反反爬策略管理器 def __init__(self): self.fingerprints [] def generate_fingerprint(self): 生成浏览器指纹 import random return { user_agent: self._random_user_agent(), screen_resolution: self._random_screen_resolution(), timezone_offset: self._random_timezone(), webgl_vendor: self._random_webgl_info(), plugins: self._random_plugins(), fonts: self._random_fonts(), language: zh-CN,zh;q0.9, hardware_concurrency: random.choice([4, 8, 16]), device_memory: random.choice([4, 8, 16]) } def _random_user_agent(self): 随机用户代理 agents [ Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36, Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15, Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36, Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 ] import random return random.choice(agents)5.2 数据去重与质量控制pythonclass DataQualityController: 数据质量控制 staticmethod def deduplicate(listings: list, key_fields: list None): 去重 if not key_fields: key_fields [house_id, platform, title] seen set() unique_listings [] for listing in listings: # 生成唯一标识 identifier tuple(str(listing.get(field, )) for field in key_fields) identifier_hash hashlib.md5(.join(identifier).encode()).hexdigest() if identifier_hash not in seen: seen.add(identifier_hash) unique_listings.append(listing) return unique_listings staticmethod def validate_listing(listing: dict) - bool: 验证房源数据有效性 required_fields [title, price, area, district] # 检查必需字段 for field in required_fields: if not listing.get(field): return False # 价格合理性检查 price listing.get(price, 0) area listing.get(area, 0) if price 0 or area 0: return False # 单价合理性检查 unit_price price / area if area 0 else 0 if unit_price 1000 or unit_price 200000: # 假设合理范围 return False return True六、部署与监控6.1 使用Scrapy框架重构python# scrapy_spider.py import scrapy from scrapy_playwright.page import PageMethod from itemadapter import ItemAdapter class HouseSpider(scrapy.Spider): name house_spider def start_requests(self): urls [ https://sh.lianjia.com/ershoufang/, https://sh.anjuke.com/sale/ ] for url in urls: yield scrapy.Request( urlurl, callbackself.parse, metadict( playwrightTrue, playwright_page_methods[ PageMethod(wait_for_selector, .house-list), PageMethod(evaluate, window.scrollBy(0, document.body.scrollHeight)), ] ) ) async def parse(self, response): # Scrapy解析逻辑 pass6.2 分布式爬虫架构python# 使用Redis实现分布式任务队列 import redis from rq import Queue class DistributedScheduler: 分布式调度器 def __init__(self, redis_urlredis://localhost:6379): self.redis_conn redis.from_url(redis_url) self.queue Queue(house_crawler, connectionself.redis_conn) def enqueue_crawl_task(self, platform, city, pages): 入队爬取任务 from tasks import crawl_platform_task job self.queue.enqueue( crawl_platform_task, platform, city, pages, job_timeout30m ) return job.id七、性能优化建议连接池优化: 使用aiohttp.TCPConnector管理连接池缓存策略: 对静态资源实现本地缓存请求限流: 使用asyncio.Semaphore控制并发数增量爬取: 基于时间戳只爬取更新内容CDN优化: 识别并直接请求CDN资源八、法律与伦理考虑遵守robots.txt: 尊重网站的爬虫协议请求频率控制: 避免对目标网站造成压力数据使用限制: 仅用于个人学习研究隐私保护: 不爬取个人隐私信息版权尊重: 不侵犯数据知识产权

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询