临淄哪里做网站广水网站设计
2026/2/12 21:44:18 网站建设 项目流程
临淄哪里做网站,广水网站设计,合肥做网站建设公司,卡盟网站怎么做一、项目概述#xff1a;构建智能化的职位信息聚合平台在当今竞争激烈的就业市场中#xff0c;求职者往往需要在多个招聘平台间切换#xff0c;耗费大量时间重复搜索相同的职位信息。本博客将详细介绍如何使用Python构建一个高效、智能的多源职位信息聚合爬虫系统#xff0…一、项目概述构建智能化的职位信息聚合平台在当今竞争激烈的就业市场中求职者往往需要在多个招聘平台间切换耗费大量时间重复搜索相同的职位信息。本博客将详细介绍如何使用Python构建一个高效、智能的多源职位信息聚合爬虫系统能够从主流招聘网站同步抓取数据并通过先进的技术手段进行数据清洗、去重和智能分析。二、技术栈选型拥抱最新异步爬虫生态我们的爬虫系统采用以下现代化技术栈异步框架aiohttp asyncio 实现高并发抓取解析引擎Playwright 处理动态渲染页面BeautifulSoup4 解析静态内容数据存储MongoDB 存储非结构化数据Redis 实现分布式去重反爬对抗代理池、请求头轮换、行为模拟任务调度Celery Redis 实现分布式任务队列三、系统架构设计3.1 模块化架构text职位聚合爬虫系统 ├── 调度中心 (Scheduler) ├── 爬虫引擎 (Spider Engine) │ ├── Boss直聘爬虫 │ ├── 拉勾网爬虫 │ ├── 智联招聘爬虫 │ └── 猎聘网爬虫 ├── 数据处理中心 (Data Processor) ├── 存储层 (Storage Layer) └── 监控告警系统 (Monitoring)3.2 核心类设计python# requirements.txt # 请先安装以下依赖 # aiohttp3.8.0 # playwright1.40.0 # beautifulsoup44.12.0 # pymongo4.6.0 # redis5.0.0 # celery5.3.0 # pydantic2.5.0 # lxml4.9.0四、完整代码实现4.1 数据模型定义python# models.py from pydantic import BaseModel, Field from typing import Optional, List from datetime import datetime from enum import Enum class JobPlatform(str, Enum): BOSS boss LAGOU lagou ZHILIAN zhilian LIEPIN liepin class JobInfo(BaseModel): 职位信息数据模型 id: str Field(..., description职位唯一ID) platform: JobPlatform Field(..., description平台来源) title: str Field(..., description职位标题) company: str Field(..., description公司名称) salary: str Field(..., description薪资范围) location: str Field(..., description工作地点) experience: Optional[str] Field(None, description工作经验要求) education: Optional[str] Field(None, description学历要求) tags: List[str] Field(default_factorylist, description职位标签) description: str Field(..., description职位描述) url: str Field(..., description原始链接) published_at: datetime Field(default_factorydatetime.now) crawled_at: datetime Field(default_factorydatetime.now) class Config: json_encoders { datetime: lambda v: v.isoformat() }4.2 异步爬虫基类python# base_spider.py import asyncio import aiohttp from abc import ABC, abstractmethod from typing import Dict, List, Any from dataclasses import dataclass import random import logging from playwright.async_api import async_playwright logging.basicConfig(levellogging.INFO) logger logging.getLogger(__name__) dataclass class ProxyConfig: 代理配置 enabled: bool False proxy_url: str max_retries: int 3 class BaseAsyncSpider(ABC): 异步爬虫基类 def __init__(self, name: str, base_url: str, proxy_config: ProxyConfig None): self.name name self.base_url base_url self.proxy_config proxy_config or ProxyConfig() self.session None self.headers_pool self._init_headers_pool() def _init_headers_pool(self) - List[Dict]: 初始化请求头池 return [ { User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36, Accept: text/html,application/xhtmlxml,application/xml;q0.9,*/*;q0.8, Accept-Language: zh-CN,zh;q0.9,en;q0.8, Accept-Encoding: gzip, deflate, br, Connection: keep-alive, }, { User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15, Accept: text/html,application/xhtmlxml,application/xml;q0.9,*/*;q0.8, Accept-Language: zh-CN,zh;q0.9,en;q0.8, Accept-Encoding: gzip, deflate, br, Connection: keep-alive, } ] async def create_session(self): 创建aiohttp会话 timeout aiohttp.ClientTimeout(total30) connector aiohttp.TCPConnector(limit100, sslFalse) if self.proxy_config.enabled: self.session aiohttp.ClientSession( timeouttimeout, connectorconnector, headersrandom.choice(self.headers_pool) ) else: self.session aiohttp.ClientSession( timeouttimeout, connectorconnector, headersrandom.choice(self.headers_pool) ) async def fetch(self, url: str, method: str GET, **kwargs) - str: 发送HTTP请求 if not self.session: await self.create_session() for attempt in range(self.proxy_config.max_retries): try: async with self.session.request(method, url, **kwargs) as response: if response.status 200: return await response.text() elif response.status 429: await asyncio.sleep(2 ** attempt) # 指数退避 else: logger.error(f请求失败: {url}, 状态码: {response.status}) except Exception as e: logger.error(f请求异常: {url}, 错误: {e}) await asyncio.sleep(1) return async def fetch_with_playwright(self, url: str, wait_for_selector: str None) - str: 使用Playwright处理动态页面 async with async_playwright() as p: browser await p.chromium.launch(headlessTrue) context await browser.new_context( user_agentrandom.choice(self.headers_pool)[User-Agent] ) page await context.new_page() try: await page.goto(url, wait_untilnetworkidle) if wait_for_selector: await page.wait_for_selector(wait_for_selector, timeout10000) # 滚动页面加载更多内容 await self._auto_scroll(page) content await page.content() await browser.close() return content except Exception as e: logger.error(fPlaywright抓取失败: {url}, 错误: {e}) await browser.close() return async def _auto_scroll(self, page): 自动滚动页面 await page.evaluate( async () { await new Promise((resolve) { let totalHeight 0; const distance 100; const timer setInterval(() { const scrollHeight document.body.scrollHeight; window.scrollBy(0, distance); totalHeight distance; if(totalHeight scrollHeight){ clearInterval(timer); resolve(); } }, 100); }); } ) abstractmethod async def search_jobs(self, keyword: str, city: str, page: int 1) - List[Dict]: 搜索职位子类必须实现 pass abstractmethod def parse_job_list(self, html: str) - List[Dict]: 解析职位列表 pass abstractmethod async def parse_job_detail(self, job_url: str) - Dict: 解析职位详情 pass async def close(self): 关闭会话 if self.session: await self.session.close()4.3 Boss直聘爬虫实现python# boss_spider.py from base_spider import BaseAsyncSpider from bs4 import BeautifulSoup import re from urllib.parse import quote import json from typing import Dict, List import asyncio class BossZhiPinSpider(BaseAsyncSpider): Boss直聘爬虫 def __init__(self): super().__init__( nameboss, base_urlhttps://www.zhipin.com ) self.search_url https://www.zhipin.com/web/geek/job async def search_jobs(self, keyword: str, city: str, page: int 1) - List[Dict]: 搜索Boss直聘职位 params { query: keyword, city: self._get_city_code(city), page: page } # Boss直聘使用API接口需要构造特定参数 api_url f{self.search_url}?{self._build_query(params)} # 使用Playwright处理动态内容 html await self.fetch_with_playwright( api_url, wait_for_selector.job-list-box ) if html: return self.parse_job_list(html) return [] def _build_query(self, params: Dict) - str: 构建查询参数 return .join([f{k}{quote(str(v))} for k, v in params.items()]) def _get_city_code(self, city: str) - int: 城市名称转编码简化版 city_map { 北京: 101010100, 上海: 101020100, 深圳: 101280600, 广州: 101280100, 杭州: 101210100, } return city_map.get(city, 101010100) def parse_job_list(self, html: str) - List[Dict]: 解析职位列表页 soup BeautifulSoup(html, lxml) jobs [] job_items soup.select(.job-card-wrapper) for item in job_items: try: job { platform: boss, title: item.select_one(.job-name).text.strip() if item.select_one(.job-name) else , company: item.select_one(.company-name).text.strip() if item.select_one(.company-name) else , salary: item.select_one(.salary).text.strip() if item.select_one(.salary) else , location: item.select_one(.job-area).text.strip() if item.select_one(.job-area) else , experience: , education: , url: f{self.base_url}{item.select_one(a)[href]} if item.select_one(a) else , tags: [tag.text.strip() for tag in item.select(.tag-list li)] } # 提取经验和学历要求 info_list item.select(.job-info clearfix span) if len(info_list) 3: job[experience] info_list[1].text.strip() job[education] info_list[2].text.strip() jobs.append(job) except Exception as e: continue return jobs async def parse_job_detail(self, job_url: str) - Dict: 解析职位详情页 html await self.fetch_with_playwright( job_url, wait_for_selector.job-detail ) if not html: return {} soup BeautifulSoup(html, lxml) detail { description: , requirements: , benefits: [] } # 解析职位描述 desc_elem soup.select_one(.job-sec-text) if desc_elem: detail[description] desc_elem.get_text(stripTrue) # 解析职位要求 req_elem soup.select_one(.job-sec.require) if req_elem: detail[requirements] req_elem.get_text(stripTrue) # 解析公司福利 benefit_elems soup.select(.job-tags span) detail[benefits] [elem.text.strip() for elem in benefit_elems] return detail4.4 爬虫调度器python# scheduler.py import asyncio from typing import List, Dict from concurrent.futures import ThreadPoolExecutor import logging from datetime import datetime from models import JobInfo logger logging.getLogger(__name__) class JobSpiderScheduler: 爬虫调度器 def __init__(self): self.spiders [] self.max_concurrent 5 def register_spider(self, spider): 注册爬虫 self.spiders.append(spider) logger.info(f注册爬虫: {spider.name}) async def search_all_platforms(self, keyword: str, city: str, max_pages: int 3) - List[JobInfo]: 在所有平台搜索职位 all_jobs [] # 并发执行所有爬虫 tasks [] for spider in self.spiders: for page in range(1, max_pages 1): task self._search_single_spider(spider, keyword, city, page) tasks.append(task) # 等待所有任务完成 results await asyncio.gather(*tasks, return_exceptionsTrue) # 处理结果 for result in results: if isinstance(result, Exception): logger.error(f爬虫任务异常: {result}) continue if result and isinstance(result, list): all_jobs.extend(result) # 去重处理 unique_jobs self._deduplicate_jobs(all_jobs) return unique_jobs async def _search_single_spider(self, spider, keyword: str, city: str, page: int): 单个爬虫搜索任务 try: jobs await spider.search_jobs(keyword, city, page) # 获取职位详情 detailed_jobs [] for job in jobs[:10]: # 限制每个页面处理10个职位 if job.get(url): detail await spider.parse_job_detail(job[url]) job.update(detail) # 转换为JobInfo模型 job_info JobInfo( idf{spider.name}_{hash(job[url])}, platformspider.name, **{k: v for k, v in job.items() if k in JobInfo.__fields__} ) detailed_jobs.append(job_info) logger.info(f{spider.name} 第{page}页抓取到{len(detailed_jobs)}个职位) return detailed_jobs except Exception as e: logger.error(f{spider.name} 搜索失败: {e}) return [] def _deduplicate_jobs(self, jobs: List[JobInfo]) - List[JobInfo]: 职位去重基于公司、职位、薪资的相似度 seen set() unique_jobs [] for job in jobs: # 创建唯一标识简化版实际应使用更复杂的相似度算法 job_key f{job.company}_{job.title}_{job.salary} if job_key not in seen: seen.add(job_key) unique_jobs.append(job) return unique_jobs async def close_all(self): 关闭所有爬虫 for spider in self.spiders: await spider.close()4.5 数据存储与去重python# storage.py from pymongo import MongoClient from pymongo.errors import DuplicateKeyError import redis from typing import List import hashlib import json from models import JobInfo class JobStorage: 职位数据存储管理器 def __init__(self, mongo_uri: str, redis_uri: str): self.mongo_client MongoClient(mongo_uri) self.db self.mongo_client.job_aggregator self.jobs_collection self.db.jobs # 创建索引 self.jobs_collection.create_index([(id, 1)], uniqueTrue) self.jobs_collection.create_index([(platform, 1)]) self.jobs_collection.create_index([(title, text), (company, text)]) # Redis连接用于布隆过滤和缓存 self.redis_client redis.from_url(redis_uri) def save_jobs(self, jobs: List[JobInfo]): 保存职位信息到MongoDB saved_count 0 for job in jobs: try: # 转换为字典 job_dict job.dict() job_dict[_id] job.id # 插入或更新 self.jobs_collection.update_one( {_id: job.id}, {$set: job_dict}, upsertTrue ) saved_count 1 # 添加到Redis布隆过滤器简化版 self._add_to_bloom_filter(job.id) except DuplicateKeyError: continue except Exception as e: print(f保存职位失败: {e}) return saved_count def _add_to_bloom_filter(self, job_id: str): 添加到布隆过滤器简化实现 key fjob_bloom:{hashlib.md5(job_id.encode()).hexdigest()[:8]} self.redis_client.setbit(key, 0, 1) def is_duplicate(self, job_id: str) - bool: 检查是否重复 key fjob_bloom:{hashlib.md5(job_id.encode()).hexdigest()[:8]} return self.redis_client.getbit(key, 0) 1 def search_jobs(self, keyword: str, limit: int 50) - List[dict]: 搜索职位 query { $text: {$search: keyword} } results self.jobs_collection.find(query).limit(limit) return list(results) def close(self): 关闭连接 self.mongo_client.close() self.redis_client.close()4.6 主程序入口python# main.py import asyncio import sys from boss_spider import BossZhiPinSpider from scheduler import JobSpiderScheduler from storage import JobStorage import logging logging.basicConfig( levellogging.INFO, format%(asctime)s - %(name)s - %(levelname)s - %(message)s ) async def main(): 主函数 # 初始化组件 scheduler JobSpiderScheduler() storage JobStorage( mongo_urimongodb://localhost:27017/, redis_uriredis://localhost:6379/0 ) # 注册爬虫这里只注册了Boss可以扩展其他平台 boss_spider BossZhiPinSpider() scheduler.register_spider(boss_spider) try: # 搜索职位 keyword Python开发 city 北京 logger.info(f开始搜索: {keyword} - {city}) jobs await scheduler.search_all_platforms( keywordkeyword, citycity, max_pages2 ) logger.info(f共获取到 {len(jobs)} 个职位) # 保存到数据库 saved_count storage.save_jobs(jobs) logger.info(f成功保存 {saved_count} 个职位到数据库) # 查询示例 search_results storage.search_jobs(Python, limit10) logger.info(f搜索结果: {len(search_results)} 条) finally: # 清理资源 await scheduler.close_all() storage.close() if __name__ __main__: # Windows系统需要设置事件循环策略 if sys.platform win32: asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) asyncio.run(main())4.7 分布式任务队列Celery配置python# celery_app.py from celery import Celery from celery.schedules import crontab app Celery( job_crawler, brokerredis://localhost:6379/1, backendredis://localhost:6379/2, include[crawl_tasks] ) # 定时任务配置 app.conf.beat_schedule { crawl-jobs-every-hour: { task: crawl_tasks.crawl_all_jobs, schedule: crontab(minute0, hour*/2), # 每2小时执行一次 args: (Python, 北京, 3) }, } app.conf.timezone Asia/Shanghai # tasks.py from celery_app import app from main import main as crawl_main import asyncio app.task def crawl_all_jobs(keyword: str, city: str, pages: int): Celery任务爬取所有职位 # 注意需要在合适的上下文中运行异步函数 loop asyncio.get_event_loop() if loop.is_running(): # 如果已有事件循环 asyncio.create_task(crawl_main_wrapper(keyword, city, pages)) else: # 新建事件循环 loop.run_until_complete(crawl_main_wrapper(keyword, city, pages)) async def crawl_main_wrapper(keyword: str, city: str, pages: int): 包装主爬虫函数 # 这里需要重构main函数以接受参数 # 实际实现中应提取核心逻辑 pass4.8 配置文件python# config.py import os from dataclasses import dataclass dataclass class CrawlerConfig: 爬虫配置类 # 请求配置 REQUEST_TIMEOUT: int 30 MAX_CONCURRENT: int 10 RETRY_TIMES: int 3 # 代理配置 PROXY_ENABLED: bool False PROXY_POOL: list None # 爬取限制 DELAY_BETWEEN_REQUESTS: float 1.0 MAX_PAGES_PER_SEARCH: int 5 MAX_JOBS_PER_PAGE: int 15 # 存储配置 MONGO_URI: str os.getenv(MONGO_URI, mongodb://localhost:27017/) REDIS_URI: str os.getenv(REDIS_URI, redis://localhost:6379/0) # 日志配置 LOG_LEVEL: str INFO LOG_FILE: str job_crawler.log def __post_init__(self): if self.PROXY_POOL is None: self.PROXY_POOL [] # 全局配置实例 config CrawlerConfig()五、高级功能扩展5.1 智能去重算法python# deduplicator.py from datasketch import MinHash, MinHashLSH import jieba from typing import List class JobDeduplicator: 基于MinHash-LSH的智能去重 def __init__(self, threshold: float 0.8): self.threshold threshold self.lsh MinHashLSH(thresholdthreshold, num_perm128) def create_minhash(self, text: str) - MinHash: 创建文本的MinHash m MinHash(num_perm128) words jieba.lcut(text) for word in words: m.update(word.encode(utf-8)) return m def deduplicate(self, jobs: List[dict]) - List[dict]: 智能去重 unique_jobs [] job_signatures {} for job in jobs: # 创建特征文本 feature_text f{job[title]} {job[company]} {job[description][:200]} minhash self.create_minhash(feature_text) # 查询相似项 result self.lsh.query(minhash) if not result: # 没有相似项添加新职位 job_id len(job_signatures) job_signatures[job_id] minhash self.lsh.insert(fjob_{job_id}, minhash) unique_jobs.append(job) return unique_jobs5.2 反爬虫策略集成python# anti_anti_crawler.py import random import time from fp.fp import FreeProxy class AntiAntiCrawler: 反反爬虫策略管理器 def __init__(self): self.proxy_list [] self.user_agents [ # 大量User-Agent列表 ] async def get_proxy(self) - str: 获取代理IP if not self.proxy_list: await self.refresh_proxies() return random.choice(self.proxy_list) if self.proxy_list else None async def refresh_proxies(self): 刷新代理池 try: proxy FreeProxy() self.proxy_list proxy.get_proxy_list() except: pass def get_random_delay(self) - float: 获取随机延迟 return random.uniform(1.0, 3.0) def get_random_headers(self) - dict: 获取随机请求头 return { User-Agent: random.choice(self.user_agents), Accept: text/html,application/xhtmlxml,application/xml;q0.9,*/*;q0.8, Accept-Language: random.choice([zh-CN,zh;q0.9, en-US,en;q0.8]), Referer: random.choice([https://www.google.com/, https://www.baidu.com/]), }六、部署与监控6.1 Docker部署配置dockerfile# Dockerfile FROM python:3.10-slim WORKDIR /app # 安装系统依赖 RUN apt-get update apt-get install -y \ wget \ gnupg \ wget -q -O - https://dl-ssl.google.com/linux/linux_signing_key.pub | apt-key add - \ echo deb [archamd64] http://dl.google.com/linux/chrome/deb/ stable main /etc/apt/sources.list.d/google.list \ apt-get update apt-get install -y google-chrome-stable \ rm -rf /var/lib/apt/lists/* # 安装Python依赖 COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt RUN playwright install chromium # 复制代码 COPY . . CMD [python, main.py]6.2 Prometheus监控配置python# metrics.py from prometheus_client import start_http_server, Counter, Gauge, Histogram import time class CrawlerMetrics: 爬虫监控指标 def __init__(self, port: int 8000): self.port port # 定义指标 self.requests_total Counter( crawler_requests_total, Total requests made, [spider, status] ) self.jobs_collected Counter( crawler_jobs_collected_total, Total jobs collected, [spider] ) self.request_duration Histogram( crawler_request_duration_seconds, Request duration in seconds, [spider] ) self.queue_size Gauge( crawler_queue_size, Current queue size ) # 启动Prometheus HTTP服务器 start_http_server(self.port) def record_request(self, spider: str, status: str, duration: float): 记录请求指标 self.requests_total.labels(spiderspider, statusstatus).inc() self.request_duration.labels(spiderspider).observe(duration)七、最佳实践与注意事项7.1 合法合规性遵守robots.txt始终检查目标网站的robots.txt文件控制请求频率避免对目标网站造成过大压力数据使用规范仅用于个人学习研究不用于商业用途用户隐私保护不抓取个人隐私信息7.2 性能优化建议连接池复用重用HTTP连接减少TCP握手开销异步I/O充分利用asyncio提高并发性能缓存策略对静态内容实施合理缓存增量抓取仅抓取新增或更新的职位7.3 错误处理与重试指数退避对失败请求采用指数退避策略熔断机制当目标网站异常时暂时停止抓取降级策略当主方法失败时使用备用方案监控告警实时监控爬虫健康状态八、总结本文详细介绍了一个完整的多源职位信息聚合爬虫系统的设计与实现。该系统具备以下特点现代化技术栈采用aiohttp、Playwright等最新异步技术高可扩展性模块化设计易于添加新的招聘平台智能处理集成去重、分类、排序等智能功能健壮性强完善的错误处理和反爬虫对抗机制易于部署提供Docker和监控配置

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询