西安网站建设优化服务公司深圳网站设计 工作室
2026/3/10 10:07:45 网站建设 项目流程
西安网站建设优化服务公司,深圳网站设计 工作室,企业做网络推广有什么好处,最好的做网站公司引言#xff1a;招聘信息聚合的挑战与机遇在当今数字化招聘时代#xff0c;求职者常常需要在多个招聘平台间切换搜索#xff0c;而招聘网站反爬机制日益复杂#xff0c;传统爬虫技术已难以应对。本文将介绍一个基于Python异步协程、智能代理池和机器学习去重技术的现代化招…引言招聘信息聚合的挑战与机遇在当今数字化招聘时代求职者常常需要在多个招聘平台间切换搜索而招聘网站反爬机制日益复杂传统爬虫技术已难以应对。本文将介绍一个基于Python异步协程、智能代理池和机器学习去重技术的现代化招聘信息聚合爬虫系统实现高效、稳定、智能的数据采集。技术栈概览异步框架: asyncio aiohttp aiomysql反反爬技术: 动态代理池 请求指纹模拟 浏览器行为模拟智能解析: Playwright自动化 XPath/CSS选择器 正则表达式数据存储: MySQL 8.0 Redis 异步数据库操作去重技术: SimHash算法 布隆过滤器 文本相似度计算监控部署: Prometheus Grafana Docker容器化系统架构设计python 智能招聘信息聚合爬虫系统架构 import asyncio import aiohttp import aiomysql from typing import Dict, List, Optional from dataclasses import dataclass from datetime import datetime import hashlib import json dataclass class JobPosition: 职位数据模型 id: str title: str company: str salary: str location: str experience: str education: str tags: List[str] description: str source: str url: str publish_time: datetime crawl_time: datetime hash_value: str None def __post_init__(self): 生成内容哈希值用于去重 content f{self.title}{self.company}{self.description} self.hash_value self.generate_simhash(content) staticmethod def generate_simhash(content: str, bits: int 64) - str: SimHash算法生成文档指纹 import numpy as np # 分词和哈希简化版 words content.split() vector np.zeros(bits) for word in words: # 生成每个词的哈希 word_hash bin(int(hashlib.md5(word.encode()).hexdigest(), 16))[2:].zfill(bits) # 加权累加 for i, bit in enumerate(word_hash): vector[i] 1 if bit 1 else -1 # 生成SimHash simhash .join([1 if v 0 else 0 for v in vector]) return simhash核心爬虫实现1. 异步爬虫引擎pythonimport asyncio import aiohttp from aiohttp import ClientTimeout, TCPConnector from contextlib import asynccontextmanager import random import logging from urllib.parse import urlparse, urljoin import backoff logging.basicConfig(levellogging.INFO) logger logging.getLogger(__name__) class AsyncSpiderEngine: 异步爬虫引擎 def __init__(self, max_concurrent: int 10, request_timeout: int 30): self.max_concurrent max_concurrent self.semaphore asyncio.Semaphore(max_concurrent) self.timeout ClientTimeout(totalrequest_timeout) self.session None self.proxy_pool ProxyPool() self.user_agents [ Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36, Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36, Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 ] asynccontextmanager async def create_session(self): 创建aiohttp会话 connector TCPConnector(limitself.max_concurrent, sslFalse) async with aiohttp.ClientSession( connectorconnector, timeoutself.timeout, headersself._get_headers() ) as session: self.session session yield session def _get_headers(self) - Dict: 生成随机请求头 return { User-Agent: random.choice(self.user_agents), Accept: text/html,application/xhtmlxml,application/xml;q0.9,*/*;q0.8, Accept-Language: zh-CN,zh;q0.9,en;q0.8, Accept-Encoding: gzip, deflate, br, Connection: keep-alive, Upgrade-Insecure-Requests: 1, Sec-Fetch-Dest: document, Sec-Fetch-Mode: navigate, Sec-Fetch-Site: none, Cache-Control: max-age0, } backoff.on_exception( backoff.expo, (aiohttp.ClientError, asyncio.TimeoutError), max_tries3, max_time30 ) async def fetch(self, url: str, use_proxy: bool True) - Optional[str]: 异步获取页面内容 async with self.semaphore: try: proxy await self.proxy_pool.get_proxy() if use_proxy else None async with self.session.get( url, proxyproxy, headersself._get_headers(), cookiesself._get_cookies() ) as response: if response.status 200: content await response.text() # 更新代理评分 if proxy: await self.proxy_pool.update_score(proxy, True) return content else: logger.warning(f请求失败: {url}, 状态码: {response.status}) if proxy: await self.proxy_pool.update_score(proxy, False) return None except Exception as e: logger.error(f请求异常 {url}: {e}) return None def _get_cookies(self) - Dict: 生成模拟cookies return { session_id: hashlib.md5(str(random.random()).encode()).hexdigest(), user_token: hashlib.md5(str(random.random()).encode()).hexdigest()[:16] }2. 智能代理池实现pythonimport asyncio import aiohttp from typing import List, Dict import random class ProxyPool: 智能代理池管理 def __init__(self): self.proxies [] self.proxy_scores {} self.lock asyncio.Lock() self.proxy_sources [ http://www.proxy-list.org/, https://free-proxy-list.net/, http://www.gatherproxy.com/ ] async def initialize(self): 初始化代理池 await self.refresh_proxies() # 启动定时刷新任务 asyncio.create_task(self._scheduled_refresh()) async def refresh_proxies(self): 刷新代理列表 async with self.lock: new_proxies [] for source in self.proxy_sources: proxies await self._fetch_proxies_from_source(source) new_proxies.extend(proxies) # 验证代理可用性 valid_proxies await self._validate_proxies(new_proxies) self.proxies valid_proxies # 初始化分数 for proxy in valid_proxies: self.proxy_scores[proxy] 100 async def get_proxy(self) - Optional[str]: 获取高质量代理 if not self.proxies: await self.refresh_proxies() # 根据分数选择代理加权随机 weighted_proxies [] for proxy in self.proxies: weight self.proxy_scores.get(proxy, 50) weighted_proxies.extend([proxy] * weight) return random.choice(weighted_proxies) if weighted_proxies else None async def update_score(self, proxy: str, success: bool): 更新代理评分 current_score self.proxy_scores.get(proxy, 50) if success: new_score min(current_score 10, 200) else: new_score max(current_score - 30, 0) if new_score 0: self.proxies.remove(proxy) self.proxy_scores.pop(proxy, None) return self.proxy_scores[proxy] new_score async def _scheduled_refresh(self): 定时刷新代理 while True: await asyncio.sleep(3600) # 每小时刷新一次 await self.refresh_proxies()3. Playwright动态渲染支持pythonimport asyncio from playwright.async_api import async_playwright from bs4 import BeautifulSoup class DynamicPageRenderer: 处理JavaScript动态渲染的页面 def __init__(self): self.browser None self.context None async def __aenter__(self): self.playwright await async_playwright().start() self.browser await self.playwright.chromium.launch( headlessTrue, args[--disable-blink-featuresAutomationControlled] ) self.context await self.browser.new_context( user_agentMozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36, viewport{width: 1920, height: 1080} ) return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.context.close() await self.browser.close() await self.playwright.stop() async def render_page(self, url: str, wait_for_selector: str None) - str: 渲染动态页面 page await self.context.new_page() # 模拟人类行为 await self._simulate_human_behavior(page) try: await page.goto(url, wait_untilnetworkidle) if wait_for_selector: await page.wait_for_selector(wait_for_selector, timeout10000) # 随机滚动 await self._random_scroll(page) # 获取渲染后的内容 content await page.content() return content finally: await page.close() async def _simulate_human_behavior(self, page): 模拟人类浏览行为 # 随机延迟 await asyncio.sleep(random.uniform(1, 3)) # 随机移动鼠标 await page.mouse.move( random.randint(100, 500), random.randint(100, 500) ) async def _random_scroll(self, page): 随机滚动页面 for _ in range(random.randint(2, 5)): scroll_amount random.randint(300, 800) await page.evaluate(fwindow.scrollBy(0, {scroll_amount})) await asyncio.sleep(random.uniform(0.5, 1.5))4. 招聘网站解析器pythonclass JobSiteParser: 招聘网站解析基类 def __init__(self): self.spider AsyncSpiderEngine() self.renderer DynamicPageRenderer() async def parse_job_list(self, url: str) - List[Dict]: 解析职位列表页 raise NotImplementedError async def parse_job_detail(self, url: str) - Optional[JobPosition]: 解析职位详情页 raise NotImplementedError def extract_salary_range(self, salary_text: str) - Dict: 提取薪资范围 import re pattern r(\d\.?\d*)[kK]?-?(\d\.?\d*)?[kK]? match re.search(pattern, salary_text) if match: min_salary float(match.group(1)) * 1000 max_salary float(match.group(2)) * 1000 if match.group(2) else min_salary return { min: min_salary, max: max_salary, text: salary_text } return {min: 0, max: 0, text: salary_text} class BossZhiPinParser(JobSiteParser): Boss直聘解析器 async def parse_job_list(self, url: str) - List[Dict]: 解析Boss直聘列表页 content await self.spider.fetch(url) if not content: # 尝试使用动态渲染 async with self.renderer: content await self.renderer.render_page( url, wait_for_selector.job-list-box ) soup BeautifulSoup(content, html.parser) jobs [] for item in soup.select(.job-card-wrapper): try: job { title: item.select_one(.job-title).text.strip(), company: item.select_one(.company-name).text.strip(), salary: item.select_one(.salary).text.strip(), location: item.select_one(.job-area).text.strip(), experience: item.select_one(.tag-list).text.strip(), link: item.select_one(a)[href], source: boss_zhipin } jobs.append(job) except Exception as e: logger.error(f解析职位项失败: {e}) return jobs async def parse_job_detail(self, url: str) - Optional[JobPosition]: 解析Boss直聘详情页 full_url fhttps://www.zhipin.com{url} if not url.startswith(http) else url async with self.renderer: content await self.renderer.render_page( full_url, wait_for_selector.job-detail ) soup BeautifulSoup(content, html.parser) try: title soup.select_one(.job-title).text.strip() company soup.select_one(.company-name).text.strip() salary soup.select_one(.salary).text.strip() # 提取其他信息 info_items soup.select(.job-detail-section-item) location info_items[0].text.strip() if len(info_items) 0 else experience info_items[1].text.strip() if len(info_items) 1 else description soup.select_one(.job-sec-text).text.strip() return JobPosition( idhashlib.md5(full_url.encode()).hexdigest(), titletitle, companycompany, salarysalary, locationlocation, experienceexperience, education, # 可根据需要提取 tags[], # 可根据需要提取 descriptiondescription, sourceboss_zhipin, urlfull_url, publish_timedatetime.now(), crawl_timedatetime.now() ) except Exception as e: logger.error(f解析详情页失败 {full_url}: {e}) return None class LagouParser(JobSiteParser): 拉勾网解析器 # 实现类似Boss直聘的解析逻辑 pass class ZhilianParser(JobSiteParser): 智联招聘解析器 # 实现类似Boss直聘的解析逻辑 pass5. 异步数据存储pythonimport aiomysql from motor.motor_asyncio import AsyncIOMotorClient from redis import asyncio as aioredis import json class AsyncDataStorage: 异步数据存储管理器 def __init__(self, mysql_config: Dict, redis_config: Dict, mongo_config: Dict None): self.mysql_config mysql_config self.redis_config redis_config self.mongo_config mongo_config self.pool None self.redis None self.mongo None async def initialize(self): 初始化数据库连接 # 初始化MySQL连接池 self.pool await aiomysql.create_pool(**self.mysql_config) # 初始化Redis连接 self.redis await aioredis.from_url( fredis://{self.redis_config[host]}:{self.redis_config[port]}, passwordself.redis_config.get(password), dbself.redis_config.get(db, 0) ) # 初始化MongoDB连接可选 if self.mongo_config: self.mongo AsyncIOMotorClient(self.mongo_config[uri]) async def save_job(self, job: JobPosition): 保存职位信息 # 1. 使用布隆过滤器去重 if await self.is_duplicate(job.hash_value): logger.info(f检测到重复职位: {job.title}) return False async with self.pool.acquire() as conn: async with conn.cursor() as cursor: sql INSERT INTO jobs ( id, title, company, salary, location, experience, education, description, source, url, publish_time, crawl_time, hash_value ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE crawl_time VALUES(crawl_time) await cursor.execute(sql, ( job.id, job.title, job.company, job.salary, job.location, job.experience, job.education, job.description, job.source, job.url, job.publish_time, job.crawl_time, job.hash_value )) await conn.commit() # 2. 添加到布隆过滤器 await self.redis.setbit(job_bloom_filter, int(job.hash_value[:8], 16) % (2**20), 1) # 3. 缓存到Redis cache_key fjob:{job.id} await self.redis.setex( cache_key, 3600 * 24, # 缓存24小时 json.dumps(job.__dict__, defaultstr) ) return True async def is_duplicate(self, hash_value: str) - bool: 使用布隆过滤器检查是否重复 # 简单的布隆过滤器实现 position int(hash_value[:8], 16) % (2**20) result await self.redis.getbit(job_bloom_filter, position) return bool(result) async def close(self): 关闭数据库连接 if self.pool: self.pool.close() await self.pool.wait_closed() if self.redis: await self.redis.close()6. 主调度程序pythonclass JobSpiderScheduler: 爬虫调度器 def __init__(self): self.parsers { boss_zhipin: BossZhiPinParser(), lagou: LagouParser(), zhilian: ZhilianParser() } self.storage AsyncDataStorage( mysql_config{ host: localhost, port: 3306, user: root, password: password, db: job_spider, minsize: 1, maxsize: 10 }, redis_config{ host: localhost, port: 6379, password: , db: 0 } ) self.task_queue asyncio.Queue() self.results [] async def run(self): 运行爬虫 logger.info(开始运行招聘信息聚合爬虫...) # 初始化存储 await self.storage.initialize() # 定义爬取任务 tasks [ (boss_zhipin, https://www.zhipin.com/web/geek/job?querypythoncity101010100), (lagou, https://www.lagou.com/jobs/list_python?city北京), (zhilian, https://sou.zhaopin.com/?jl北京kwpython) ] # 启动消费者任务 consumer_tasks [ asyncio.create_task(self._consumer()) for _ in range(5) # 5个并发消费者 ] # 添加任务到队列 for task in tasks: await self.task_queue.put(task) # 等待所有任务完成 await self.task_queue.join() # 取消消费者任务 for consumer in consumer_tasks: consumer.cancel() # 关闭存储连接 await self.storage.close() logger.info(f爬虫完成共采集 {len(self.results)} 个职位) async def _consumer(self): 消费者处理爬取任务 while True: try: source, url await self.task_queue.get() parser self.parsers.get(source) if parser: # 解析列表页 jobs await parser.parse_job_list(url) for job_info in jobs[:10]: # 限制每个站点爬取数量 # 解析详情页 job await parser.parse_job_detail(job_info[link]) if job: # 保存到数据库 success await self.storage.save_job(job) if success: self.results.append(job) logger.info(f成功保存职位: {job.title}) # 礼貌延迟 await asyncio.sleep(random.uniform(1, 3)) self.task_queue.task_done() except asyncio.CancelledError: break except Exception as e: logger.error(f处理任务失败: {e}) self.task_queue.task_done() async def main(): 主函数 scheduler JobSpiderScheduler() try: await scheduler.run() except KeyboardInterrupt: logger.info(收到中断信号优雅退出...) except Exception as e: logger.error(f爬虫运行异常: {e}) finally: logger.info(爬虫结束) if __name__ __main__: # 创建数据库表一次性执行 create_table_sql CREATE TABLE IF NOT EXISTS jobs ( id VARCHAR(64) PRIMARY KEY, title VARCHAR(255) NOT NULL, company VARCHAR(255) NOT NULL, salary VARCHAR(100), location VARCHAR(100), experience VARCHAR(100), education VARCHAR(100), description TEXT, source VARCHAR(50), url VARCHAR(500), publish_time DATETIME, crawl_time DATETIME, hash_value VARCHAR(128), INDEX idx_title (title(100)), INDEX idx_company (company(100)), INDEX idx_source (source), INDEX idx_crawl_time (crawl_time) ) ENGINEInnoDB DEFAULT CHARSETutf8mb4; # 运行爬虫 asyncio.run(main())高级功能扩展1. 机器学习去重优化pythonfrom sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity import numpy as np class SmartDeduplicator: 智能去重器 def __init__(self): self.vectorizer TfidfVectorizer(max_features5000) self.job_vectors [] self.job_ids [] def calculate_similarity(self, text1: str, text2: str) - float: 计算文本相似度 vectors self.vectorizer.fit_transform([text1, text2]) similarity cosine_similarity(vectors[0:1], vectors[1:2])[0][0] return similarity def is_similar_job(self, new_job: JobPosition, threshold: float 0.8) - bool: 判断是否为相似职位 for job_id, vector in zip(self.job_ids, self.job_vectors): similarity cosine_similarity( self.vectorizer.transform([new_job.description]), vector )[0][0] if similarity threshold: return True return False2. 反爬策略监控pythonclass AntiAntiSpiderMonitor: 反反爬监控器 def __init__(self): self.request_count 0 self.blocked_count 0 self.success_count 0 async def monitor_request(self, url: str, success: bool): 监控请求状态 self.request_count 1 if success: self.success_count 1 else: self.blocked_count 1 # 计算成功率 success_rate self.success_count / max(self.request_count, 1) # 如果成功率过低触发警报 if success_rate 0.3: logger.warning(f爬虫被频繁拦截成功率: {success_rate:.2%}) await self.adjust_strategy() async def adjust_strategy(self): 调整爬取策略 logger.info(调整爬取策略增加延迟、更换代理、更换User-Agent) # 实现策略调整逻辑部署与监控Docker部署配置dockerfile# Dockerfile FROM python:3.9-slim WORKDIR /app # 安装依赖 COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # 安装Playwright浏览器 RUN playwright install chromium # 复制代码 COPY . . # 运行爬虫 CMD [python, main.py]Prometheus监控配置yaml# prometheus.yml scrape_configs: - job_name: job_spider static_configs: - targets: [localhost:9091]

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询