1. 网络爬虫基础概念
1.1 什么是网络爬虫
网络爬虫(Web Spider)是一种自动化的网页数据抓取程序,通过 HTTP 协议向目标服务器发送请求,解析响应内容,提取所需数据。
1.2 爬虫工作流程
┌─────────────┐ HTTP请求 ┌─────────────┐
│ 爬虫程序 │ ─────────────→ │ Web服务器 │
│ │ ←───────────── │ │
└─────────────┘ HTTP响应 └─────────────┘
│
▼
┌─────────────┐
│ HTML解析 │ ──→ 提取数据 ──→ 存储到文件/数据库
└─────────────┘
1.3 常用术语
| 术语 | 英文 | 说明 |
|---|
| UA | User-Agent | 浏览器标识,用于伪装请求 |
| IP 封禁 | IP Ban | 频繁访问导致 IP 被服务器封禁 |
| robots.txt | robots.txt | 网站允许爬取的规则文件 |
| 验证码 | CAPTCHA | 用于区分人机的高级反爬手段 |
| 代理池 | Proxy Pool | 多个 IP 地址轮换使用 |
2. 核心工具对比
| 工具 | 类型 | 适用场景 | 学习曲线 | 异步支持 |
|---|
| urllib | 标准库 | 简单请求 | ⭐ | ❌ |
| requests | 第三方库 | 常规爬虫 | ⭐⭐ | ❌ |
| aiohttp | 第三方库 | 高并发爬虫 | ⭐⭐⭐ | ✅ |
| httpx | 第三方库 | 同步/异步两用 | ⭐⭐ | ✅ |
| Scrapy | 框架 | 大型爬虫项目 | ⭐⭐⭐⭐ | ✅ |
| Playwright | 浏览器自动化 | JavaScript 渲染页面 | ⭐⭐⭐ | ✅ |
3. 环境准备与依赖安装
3.1 创建虚拟环境
# 创建专属虚拟环境
python -m venv spider_env
# 激活环境(Windows)
spider_env\Scripts\activate
# 激活环境(Linux/Mac)
source spider_env/bin/activate
3.2 安装依赖
# 基础依赖
pip install requests beautifulsoup4 lxml
# 异步爬虫依赖
pip install aiohttp httpx
# 大型爬虫框架
pip install scrapy
# 浏览器自动化(处理 JS 渲染页面)
pip install playwright
playwright install chromium
# 数据处理
pip install pandas openpyxl
3.3 验证安装
import requests
import bs4
import lxml
print(f"requests: {requests.__version__}")
print(f"beautifulsoup4: {bs4.__version__}")
print(f"lxml: {lxml.__version__}")
4. 基础请求:urllib 与 requests
4.1 urllib(标准库,无需安装)
"""
urllib 基本用法演示
作者:erick
"""
import urllib.request
import urllib.parse
import urllib.error
import json
def basic_get_request():
"""GET 请求基本用法"""
url = "https://httpbin.org/get"
try:
# 创建请求对象,可添加 headers
req = urllib.request.Request(
url,
headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
)
# 发送请求
with urllib.request.urlopen(req, timeout=10) as response:
data = response.read().decode('utf-8')
print("响应状态码:", response.status)
print("响应头:", dict(response.headers))
return json.loads(data)
except urllib.error.URLError as e:
print(f"请求失败: {e.reason}")
return None
def post_request_with_data():
"""POST 请求带参数"""
url = "https://httpbin.org/post"
data = {
'username': 'erick',
'password': '123456'
}
# URL 编码参数
encoded_data = urllib.parse.urlencode(data).encode('utf-8')
req = urllib.request.Request(
url,
data=encoded_data,
headers={
'User-Agent': 'Mozilla/5.0',
'Content-Type': 'application/x-www-form-urlencoded'
}
)
with urllib.request.urlopen(req, timeout=10) as response:
return json.loads(response.read().decode('utf-8'))
def download_file():
"""下载文件"""
url = "https://httpbin.org/image/png"
output_path = "downloaded_image.png"
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'})
with urllib.request.urlopen(req, timeout=30) as response:
with open(output_path, 'wb') as f:
f.write(response.read())
print(f"文件已保存到: {output_path}")
if __name__ == "__main__":
print("=" * 50)
print("urllib GET 请求演示")
print("=" * 50)
result = basic_get_request()
if result:
print("请求成功!")
4.2 requests(更人性化的 HTTP 库)
"""
requests 库高级用法演示
作者:erick
"""
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
import time
import random
class HTTPClient:
"""封装 HTTP 客户端,支持重试和代理"""
def __init__(self, proxies=None):
self.session = requests.Session()
self.proxies = proxies
# 配置重试策略
retry_strategy = Retry(
total=3, # 总重试次数
backoff_factor=1, # 重试间隔因子
status_forcelist=[429, 500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
def get(self, url, **kwargs):
"""发送 GET 请求"""
kwargs.setdefault('timeout', 10)
kwargs.setdefault('headers', self._random_headers())
if self.proxies:
kwargs['proxies'] = self.proxies
return self.session.get(url, **kwargs)
def post(self, url, **kwargs):
"""发送 POST 请求"""
kwargs.setdefault('timeout', 10)
kwargs.setdefault('headers', self._random_headers())
if self.proxies:
kwargs['proxies'] = self.proxies
return self.session.post(url, **kwargs)
@staticmethod
def _random_headers():
"""随机生成请求头"""
ua_list = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36',
]
return {
'User-Agent': random.choice(ua_list),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
}
def handle_cookies():
"""处理 Cookies 的两种方式"""
# 方式1:Session 自动管理
session = requests.Session()
session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
# 模拟登录
login_url = "https://httpbin.org/post"
session.post(login_url, data={'username': 'erick', 'token': 'abc123'})
# 后续请求自动携带 cookies
response = session.get("https://httpbin.org/get")
print("使用 Session:", response.status_code)
# 方式2:手动设置 Cookies
cookies = {
'session_id': 'xyz789',
'user': 'erick'
}
response = requests.get(
"https://httpbin.org/get",
cookies=cookies,
headers={'User-Agent': 'Mozilla/5.0'}
)
print("手动 Cookies:", response.status_code)
if __name__ == "__main__":
client = HTTPClient()
# 测试 GET 请求
response = client.get("https://httpbin.org/get")
print(f"状态码: {response.status_code}")
print(f"响应内容: {response.json()}")
# 测试 Cookies
print("\n" + "=" * 50)
handle_cookies()
5. HTML 解析:BeautifulSoup 与 lxml
5.1 BeautifulSoup 基础
"""
BeautifulSoup 解析 HTML 演示
作者:erick
"""
from bs4 import BeautifulSoup
import requests
def basic_parse():
"""BeautifulSoup 基本用法"""
html = """
<html>
<head><title>测试页面</title></head>
<body>
<div class="container">
<h1 class="title">欢迎来到 Python 爬虫教程</h1>
<ul id="course-list">
<li class="course">Python 基础</li>
<li class="course">Python 进阶</li>
<li class="course highlight">Python 爬虫实战</li>
</ul>
<a href="https://example.com">访问示例网站</a>
<a href="https://example.org">另一个链接</a>
</div>
</body>
</html>
"""
# 解析器选择:html.parser / lxml / html5lib
soup = BeautifulSoup(html, 'lxml')
print("=" * 50)
print("BeautifulSoup 基本选择器演示")
print("=" * 50)
# 1. 通过标签名查找(返回第一个匹配)
title = soup.find('title')
print(f"标题: {title.text}")
# 2. 通过标签名查找所有
all_links = soup.find_all('a')
print(f"\n所有链接 ({len(all_links)} 个):")
for link in all_links:
print(f" - {link.text}: {link.get('href')}")
# 3. 通过 class 查找
courses = soup.find_all('li', class_='course')
print(f"\n课程列表:")
for course in courses:
print(f" • {course.text}")
# 4. 通过 id 查找
course_list = soup.find('ul', id='course-list')
print(f"\nID 为 course-list 的元素: {course_list.name}")
# 5. CSS 选择器
print("\n使用 CSS 选择器:")
print(f" h1.title: {soup.select_one('h1.title').text}")
print(f" li.highlight: {soup.select_one('li.highlight').text}")
# 6. 获取属性值
first_link = soup.find('a')
print(f"\n链接文本: {first_link.text}")
print(f"链接地址: {first_link['href']}")
def extract_nested_data():
"""提取嵌套数据结构"""
html = """
<div class="product-list">
<div class="product">
<span class="price">¥299</span>
<span class="name">iPhone 15 Pro</span>
<span class="rating">4.8</span>
</div>
<div class="product">
<span class="price">¥199</span>
<span class="name">AirPods Pro</span>
<span class="rating">4.9</span>
</div>
</div>
"""
soup = BeautifulSoup(html, 'lxml')
products = []
for product in soup.select('.product'):
products.append({
'name': product.select_one('.name').text,
'price': product.select_one('.price').text,
'rating': product.select_one('.rating').text
})
print("\n提取到的产品数据:")
for p in products:
print(f" {p['name']} - {p['price']} - 评分: {p['rating']}")
if __name__ == "__main__":
basic_parse()
extract_nested_data()
5.2 解析器性能对比
| 解析器 | 速度 | 容错性 | 依赖 |
|---|
| html.parser | 中等 | 一般 | 无 |
| lxml | 快 | 好 | libxml2 |
| html5lib | 慢 | 最好 | html5lib |
def parser_comparison():
"""解析器性能对比"""
import time
html = "<div>" * 1000 + "内容" + "</div>" * 1000
parsers = ['html.parser', 'lxml', 'html5lib']
print("\n解析器性能对比:")
for parser in parsers:
start = time.time()
for _ in range(100):
soup = BeautifulSoup(html, parser)
elapsed = time.time() - start
print(f" {parser:15s}: {elapsed:.4f}秒")
if __name__ == "__main__":
parser_comparison()
6. 异步爬虫:aiohttp 与 httpx
6.1 aiohttp 异步爬虫
"""
aiohttp 异步爬虫实战
作者:erick
"""
import asyncio
import aiohttp
from bs4 import BeautifulSoup
import json
import time
from typing import List, Dict
class AsyncSpider:
"""异步爬虫基类"""
def __init__(self, concurrency: int = 10):
self.concurrency = concurrency # 并发数限制
self.semaphore = None
self.results: List[Dict] = []
async def fetch(self, session: aiohttp.ClientSession, url: str) -> Dict:
"""抓取单个页面"""
async with self.semaphore: # 控制并发数
try:
async with session.get(url, timeout=30) as response:
if response.status == 200:
html = await response.text()
return {'url': url, 'html': html, 'status': 200}
else:
return {'url': url, 'status': response.status}
except Exception as e:
return {'url': url, 'error': str(e)}
async def crawl(self, urls: List[str]) -> List[Dict]:
"""并发抓取多个 URL"""
self.semaphore = asyncio.Semaphore(self.concurrency)
async with aiohttp.ClientSession(
headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
) as session:
tasks = [self.fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
def parse(self, html: str) -> Dict:
"""解析 HTML(子类实现)"""
raise NotImplementedError
async def demo_simple():
"""aiohttp 简单示例"""
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
]
spider = AsyncSpider(concurrency=3)
print("=" * 50)
print("aiohttp 异步并发演示")
print("=" * 50)
start = time.time()
results = await spider.crawl(urls)
elapsed = time.time() - start
print(f"抓取 {len(urls)} 个页面(每个延迟1秒)")
print(f"顺序执行预计: 3秒")
print(f"并发执行实际: {elapsed:.2f}秒")
print(f"状态码: {[r.get('status') for r in results]}")
async def crawl_images():
"""并发下载图片示例"""
import os
import aiofiles
image_urls = [
"https://httpbin.org/image/jpeg",
"https://httpbin.org/image/png",
]
async def download_image(session: aiohttp.ClientSession, url: str, filename: str):
async with session.get(url) as response:
if response.status == 200:
content = await response.read()
async with aiofiles.open(filename, 'wb') as f:
await f.write(content)
return filename
return None
async with aiohttp.ClientSession() as session:
tasks = [
download_image(session, url, f"image_{i}.jpg")
for i, url in enumerate(image_urls)
]
results = await asyncio.gather(*tasks)
print(f"下载完成: {[r for r in results if r]}")
if __name__ == "__main__":
asyncio.run(demo_simple())
6.2 httpx(同步/异步两用)
"""
httpx 同步与异步用法对比
作者:erick
"""
import httpx
import asyncio
# ==================== 同步方式 ====================
def sync_demo():
"""同步请求"""
client = httpx.Client(timeout=10.0)
response = client.get("https://httpbin.org/get")
print(f"同步请求状态: {response.status_code}")
print(f"响应内容: {response.json()}")
client.close()
# ==================== 异步方式 ====================
async def async_demo():
"""异步请求"""
async with httpx.AsyncClient(timeout=30.0) as client:
# 并发发送多个请求
responses = await client.gather(
client.get("https://httpbin.org/get"),
client.get("https://httpbin.org/ip"),
client.get("https://httpbin.org/headers"),
)
for resp in responses:
print(f"状态: {resp.status_code}, URL: {resp.url}")
# ==================== 实战:批量请求 ====================
async def batch_request_demo():
"""批量请求多个 API"""
urls = [
"https://httpbin.org/get?id=1",
"https://httpbin.org/get?id=2",
"https://httpbin.org/get?id=3",
"https://httpbin.org/get?id=4",
"https://httpbin.org/get?id=5",
]
async with httpx.AsyncClient() as client:
import time
start = time.time()
# 并发请求
responses = await asyncio.gather(*[
client.get(url) for url in urls
])
elapsed = time.time() - start
print(f"批量请求 {len(urls)} 个 URL,耗时: {elapsed:.2f}秒")
for resp in responses:
data = resp.json()
print(f" ID={data['args'].get('id')} -> 状态: {resp.status_code}")
if __name__ == "__main__":
print("同步请求演示:")
sync_demo()
print("\n异步请求演示:")
asyncio.run(async_demo())
print("\n批量请求演示:")
asyncio.run(batch_request_demo())
7. 反爬应对策略
7.1 常用反爬机制与应对
| 反爬机制 | 说明 | 应对策略 |
|---|
| User-Agent 检测 | 检测浏览器标识 | 使用真实 UA 列表 |
| IP 限流 | 限制单 IP 请求频率 | 使用代理池 + 请求间隔 |
| 验证码 | CAPTCHAs 人机验证 | 打码平台 / 机器学习 |
| Cookie 验证 | 验证会话合法性 | 维护登录态 Cookies |
| 请求签名 | 接口带签名参数 | 分析 JS 逆向签名算法 |
| 动态加载 | 数据由 JS 动态生成 | Selenium / Playwright |
7.2 UA 轮换与请求间隔
"""
反爬应对策略:UA轮换、代理、请求间隔
作者:erick
"""
import random
import time
import requests
from typing import List, Dict, Optional
class AntiCrawlerClient:
"""反反爬策略封装"""
def __init__(self, proxies: Optional[List[str]] = None):
self.session = requests.Session()
self.proxies = proxies or []
self.proxy_index = 0
# 常用 User-Agent 列表
self.ua_list = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
]
def _get_random_headers(self) -> Dict:
"""生成随机请求头"""
return {
'User-Agent': random.choice(self.ua_list),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
}
def _get_proxy(self) -> Optional[Dict]:
"""获取代理"""
if not self.proxies:
return None
proxy = self.proxies[self.proxy_index % len(self.proxies)]
self.proxy_index += 1
return {'http': proxy, 'https': proxy}
def get(self, url: str, delay_range: tuple = (1, 3), **kwargs) -> requests.Response:
"""
发送 GET 请求
Args:
url: 目标 URL
delay_range: 请求间隔随机范围(秒)
**kwargs: 传递给 requests.get 的其他参数
"""
# 随机延迟
delay = random.uniform(*delay_range)
time.sleep(delay)
# 设置请求头
kwargs.setdefault('headers', self._get_random_headers())
# 设置代理
proxy = self._get_proxy()
if proxy:
kwargs['proxies'] = proxy
# 发送请求
response = self.session.get(url, **kwargs)
# 随机延迟(请求后)
time.sleep(random.uniform(*delay_range))
return response
def post(self, url: str, delay_range: tuple = (1, 3), **kwargs) -> requests.Response:
"""发送 POST 请求"""
delay = random.uniform(*delay_range)
time.sleep(delay)
kwargs.setdefault('headers', self._get_random_headers())
proxy = self._get_proxy()
if proxy:
kwargs['proxies'] = proxy
return self.session.post(url, **kwargs)
def demo_anti_crawler():
"""反爬策略演示"""
# 初始化客户端(可添加代理列表)
client = AntiCrawlerClient(
proxies=[
# 'http://user:pass@proxy1.com:8080',
# 'http://user:pass@proxy2.com:8080',
]
)
print("=" * 50)
print("反爬策略演示")
print("=" * 50)
print("将随机更换 User-Agent 并添加请求间隔\n")
for i in range(3):
print(f"第 {i+1} 次请求:")
response = client.get(
"https://httpbin.org/get",
delay_range=(0.5, 1) # 演示用较短间隔
)
headers = response.json().get('headers', {})
print(f" User-Agent: {headers.get('User-Agent', 'N/A')[:50]}...")
print(f" 状态码: {response.status_code}")
if __name__ == "__main__":
demo_anti_crawler()
7.3 使用代理池
"""
代理池管理
作者:erick
"""
import random
import asyncio
import aiohttp
from dataclasses import dataclass
from typing import List, Optional
import time
@dataclass
class Proxy:
"""代理信息"""
ip: str
port: int
protocol: str = 'http'
@property
def url(self) -> str:
return f"{self.protocol}://{self.ip}:{self.port}"
class ProxyPool:
"""代理池管理"""
def __init__(self):
self.proxies: List[Proxy] = []
self.failed_proxies: set = set()
self.check_interval = 300 # 每5分钟检测一次
self.last_check = 0
def add_proxy(self, proxy: Proxy):
"""添加代理"""
if proxy.url not in self.failed_proxies:
self.proxies.append(proxy)
def get_random_proxy(self) -> Optional[Proxy]:
"""随机获取可用代理"""
available = [p for p in self.proxies if p.url not in self.failed_proxies]
if not available:
return None
return random.choice(available)
def mark_failed(self, proxy: Proxy):
"""标记失败的代理"""
self.failed_proxies.add(proxy.url)
if proxy in self.proxies:
self.proxies.remove(proxy)
print(f"代理 {proxy.url} 已标记为失败")
async def check_proxy(self, proxy: Proxy, timeout: int = 5) -> bool:
"""检测代理是否可用"""
try:
async with aiohttp.ClientSession() as session:
start = time.time()
async with session.get(
'https://httpbin.org/ip',
proxy=proxy.url,
timeout=aiohttp.ClientTimeout(total=timeout)
) as response:
elapsed = time.time() - start
return response.status == 200 and elapsed < timeout
except:
return False
async def refresh_proxies(self):
"""刷新代理池"""
print("开始检测代理可用性...")
tasks = []
for proxy in self.proxies[:]: # 复制列表避免迭代中修改
tasks.append(self.check_proxy(proxy))
results = await asyncio.gather(*tasks)
# 移除不可用代理
for proxy, is_ok in zip(self.proxies[:], results):
if not is_ok:
self.mark_failed(proxy)
print(f"代理池刷新完成,剩余可用代理: {len(self.proxies)}")
# 使用示例
async def demo_proxy_pool():
pool = ProxyPool()
# 添加测试代理
pool.add_proxy(Proxy(ip='127.0.0.1', port=8080))
pool.add_proxy(Proxy(ip='127.0.0.1', port=8081))
proxy = pool.get_random_proxy()
if proxy:
print(f"获取到代理: {proxy.url}")
else:
print("无可用代理")
# 检测代理
if proxy:
is_ok = await pool.check_proxy(proxy)
print(f"代理 {proxy.url} 可用性: {'可用' if is_ok else '不可用'}")
if __name__ == "__main__":
asyncio.run(demo_proxy_pool())
8. 实战案例:抓取豆瓣电影 Top250
8.1 项目结构
douban_spider/
├── main.py # 入口文件
├── spider/
│ ├── __init__.py
│ ├── fetcher.py # 请求模块
│ ├── parser.py # 解析模块
│ ├── storage.py # 存储模块
│ └── config.py # 配置文件
├── data/ # 数据存储目录
└── requirements.txt # 依赖清单
8.2 完整实现代码
"""
豆瓣电影 Top250 爬虫
作者:erick
功能:抓取豆瓣电影排行榜数据并存储为 JSON/CSV
"""
import requests
from bs4 import BeautifulSoup
import csv
import json
import os
import time
import random
from dataclasses import dataclass, asdict
from typing import List, Optional
from pathlib import Path
# ==================== 数据模型 ====================
@dataclass
class Movie:
"""电影数据模型"""
rank: int # 排名
title: str # 电影名
rating: float # 评分
quote: str # 经典语录
director: str # 导演
actor: str # 主演
year: str # 年份
genre: str # 类型
country: str # 国家/地区
url: str # 详情页 URL
# ==================== 配置 ====================
class Config:
BASE_URL = "https://movie.douban.com/top250"
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Cookie': 'bid=xxxxxxxxxxxxxx; __yadk_uid=xxxxxxxxxxxxxx', # 建议添加 Cookie
}
DELAY_RANGE = (2, 5) # 请求间隔(秒)
OUTPUT_DIR = Path(__file__).parent / 'data'
# ==================== 请求模块 ====================
class DoubanFetcher:
"""豆瓣Fetcher"""
def __init__(self):
self.session = requests.Session()
self.session.headers.update(Config.HEADERS)
def fetch_page(self, page: int = 0) -> Optional[str]:
"""
抓取单页 HTML
Args:
page: 页码(0-9,每页25部电影)
Returns:
HTML 字符串,失败返回 None
"""
url = Config.BASE_URL
if page > 0:
url = f"{Config.BASE_URL}?start={page * 25}"
try:
# 添加随机延迟
time.sleep(random.uniform(*Config.DELAY_RANGE))
response = self.session.get(url, timeout=15)
response.raise_for_status()
response.encoding = 'utf-8'
print(f"✓ 第 {page + 1} 页抓取成功 (状态码: {response.status_code})")
return response.text
except requests.RequestException as e:
print(f"✗ 第 {page + 1} 页抓取失败: {e}")
return None
def close(self):
self.session.close()
# ==================== 解析模块 ====================
class DoubanParser:
"""豆瓣 HTML 解析器"""
@staticmethod
def parse_movies(html: str) -> List[Movie]:
"""
解析电影列表
Args:
html: HTML 字符串
Returns:
Movie 对象列表
"""
soup = BeautifulSoup(html, 'lxml')
movie_items = soup.select('div.item')
movies = []
for item in movie_items:
try:
# 排名
rank = int(item.select_one('em').text)
# 电影标题(可能有多个)
title_elem = item.select_one('span.title')
title = title_elem.text if title_elem else "未知"
# 评分
rating_elem = item.select_one('span.rating_num')
rating = float(rating_elem.text) if rating_elem else 0.0
# 经典语录
quote_elem = item.select_one('span.inq')
quote = quote_elem.text if quote_elem else ""
# 详情链接
link_elem = item.select_one('a.nbg')
url = link_elem.get('href', '') if link_elem else ""
# 导演、演员、年份等(从 bd div 获取)
bd = item.select_one('div.bd')
info_text = bd.get_text(separator=' ', strip=True) if bd else ""
# 解析信息
parts = info_text.split('/')
if len(parts) >= 3:
# 格式: "导演: xxx / 主演: xxx / 2024 / xxx / xxx"
year = parts[-3].strip() if len(parts) >= 3 else ""
country_genre = parts[-2].strip() if len(parts) >= 2 else ""
genre = parts[-1].strip()
# 提取导演和演员
info_parts = parts[0].split('主演:')
director = info_parts[0].replace('导演:', '').strip()
actor = info_parts[1].strip() if len(info_parts) > 1 else ""
else:
director = actor = year = genre = country_genre = ""
movie = Movie(
rank=rank,
title=title,
rating=rating,
quote=quote,
director=director,
actor=actor,
year=year,
genre=genre,
country=country_genre,
url=url
)
movies.append(movie)
except Exception as e:
print(f"解析电影信息失败: {e}")
continue
return movies
# ==================== 存储模块 ====================
class DoubanStorage:
"""数据存储"""
def __init__(self, output_dir: Path = Config.OUTPUT_DIR):
self.output_dir = output_dir
self.output_dir.mkdir(parents=True, exist_ok=True)
def save_json(self, movies: List[Movie], filename: str = "douban_top250.json"):
"""保存为 JSON"""
filepath = self.output_dir / filename
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(
[asdict(m) for m in movies],
f,
ensure_ascii=False,
indent=2
)
print(f"✓ JSON 数据已保存: {filepath}")
def save_csv(self, movies: List[Movie], filename: str = "douban_top250.csv"):
"""保存为 CSV"""
filepath = self.output_dir / filename
with open(filepath, 'w', encoding='utf-8-sig', newline='') as f:
writer = csv.writer(f)
# 写入表头
writer.writerow(['排名', '电影名', '评分', '经典语录', '导演', '主演', '年份', '类型', '国家/地区', '详情链接'])
# 写入数据
for m in movies:
writer.writerow([
m.rank, m.title, m.rating, m.quote,
m.director, m.actor, m.year, m.genre,
m.country, m.url
])
print(f"✓ CSV 数据已保存: {filepath}")
def save_markdown(self, movies: List[Movie], filename: str = "douban_top250.md"):
"""保存为 Markdown 表格"""
filepath = self.output_dir / filename
with open(filepath, 'w', encoding='utf-8') as f:
f.write("# 豆瓣电影 Top250\n\n")
f.write("| 排名 | 电影名 | 评分 | 年份 | 类型 |\n")
f.write("|------|--------|------|------|------|\n")
for m in movies:
f.write(f"| {m.rank} | {m.title} | {m.rating} | {m.year} | {m.genre} |\n")
print(f"✓ Markdown 数据已保存: {filepath}")
# ==================== 主程序 ====================
def main():
"""豆瓣 Top250 爬虫主程序"""
print("=" * 50)
print("豆瓣电影 Top250 爬虫")
print("=" * 50)
fetcher = DoubanFetcher()
parser = DoubanParser()
storage = DoubanStorage()
all_movies = []
try:
# 抓取全部 10 页(Top250)
for page in range(10):
print(f"\n正在抓取第 {page + 1}/10 页...")
html = fetcher.fetch_page(page)
if html:
movies = parser.parse_movies(html)
all_movies.extend(movies)
print(f" 本页解析到 {len(movies)} 部电影")
print(f"\n总计抓取 {len(all_movies)} 部电影")
# 保存数据
print("\n" + "=" * 50)
print("保存数据")
print("=" * 50)
storage.save_json(all_movies)
storage.save_csv(all_movies)
storage.save_markdown(all_movies)
print("\n✓ 爬取完成!")
# 显示前 10 名
print("\n📊 Top 10 电影预览:")
for i, m in enumerate(all_movies[:10], 1):
print(f" {i:2d}. {m.title} ({m.year}) - 评分: {m.rating}")
finally:
fetcher.close()
if __name__ == "__main__":
main()
8.3 运行效果
==================================================
豆瓣电影 Top250 爬虫
==================================================
正在抓取第 1/10 页...
✓ 第 1 页抓取成功 (状态码: 200)
本页解析到 25 部电影
正在抓取第 2/10 页...
✓ 第 2 页抓取成功 (状态码: 200)
本页解析到 25 部电影
...
==================================================
保存数据
==================================================
✓ JSON 数据已保存: data\douban_top250.json
✓ CSV 数据已保存: data\douban_top250.csv
✓ Markdown 数据已保存: data\douban_top250.md
✓ 爬取完成!
📊 Top 10 电影预览:
1. 肖申克的救赎 (1994) - 评分: 9.7
2. 霸王别姬 (1993) - 评分: 9.6
3. 阿甘正传 (1994) - 评分: 9.5
...
9. 数据存储:JSON/CSV/数据库
9.1 多种存储方式对比
| 存储方式 | 适用场景 | 优点 | 缺点 |
|---|
| JSON | 小规模、层级数据 | 格式清晰、易读 | 不适合大规模数据 |
| CSV | 表格数据、分析 | 体积小、易导入 Excel | 不支持嵌套结构 |
| SQLite | 中等规模、结构化 | 零配置、跨平台 | 并发写入差 |
| MySQL/PostgreSQL | 生产环境 | 强大、成熟 | 需要部署 |
| MongoDB | 非结构化数据 | 灵活、易扩展 | 查询性能略低 |
9.2 数据存储工具类
"""
数据存储工具类
作者:erick
"""
import json
import csv
import sqlite3
from pathlib import Path
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from datetime import datetime
@dataclass
class MovieRecord:
"""电影记录"""
id: Optional[int]
title: str
rating: float
year: int
genre: str
created_at: str = None
def __post_init__(self):
if self.created_at is None:
self.created_at = datetime.now().isoformat()
class DataStorage:
"""数据存储基类"""
def save(self, data: List[Dict]):
raise NotImplementedError
def load(self) -> List[Dict]:
raise NotImplementedError
class JSONStorage(DataStorage):
"""JSON 文件存储"""
def __init__(self, filepath: str):
self.filepath = Path(filepath)
def save(self, data: List[Dict]):
"""保存为 JSON"""
self.filepath.parent.mkdir(parents=True, exist_ok=True)
with open(self.filepath, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f"✓ 已保存 {len(data)} 条数据到 {self.filepath}")
def load(self) -> List[Dict]:
"""读取 JSON"""
if not self.filepath.exists():
return []
with open(self.filepath, 'r', encoding='utf-8') as f:
return json.load(f)
class CSVStorage(DataStorage):
"""CSV 文件存储"""
def __init__(self, filepath: str, fieldnames: List[str] = None):
self.filepath = Path(filepath)
self.fieldnames = fieldnames or ['id', 'title', 'rating', 'year', 'genre']
def save(self, data: List[Dict]):
"""追加保存为 CSV"""
self.filepath.parent.mkdir(parents=True, exist_ok=True)
write_header = not self.filepath.exists()
with open(self.filepath, 'a', encoding='utf-8-sig', newline='') as f:
writer = csv.DictWriter(f, fieldnames=self.fieldnames)
if write_header:
writer.writeheader()
writer.writerows(data)
print(f"✓ 已追加 {len(data)} 条数据到 {self.filepath}")
def load(self) -> List[Dict]:
"""读取 CSV"""
if not self.filepath.exists():
return []
with open(self.filepath, 'r', encoding='utf-8-sig', newline='') as f:
reader = csv.DictReader(f)
return list(reader)
class SQLiteStorage(DataStorage):
"""SQLite 数据库存储"""
def __init__(self, db_path: str, table_name: str = 'movies'):
self.db_path = Path(db_path)
self.table_name = table_name
self._init_db()
def _init_db(self):
"""初始化数据库表"""
with sqlite3.connect(self.db_path) as conn:
conn.execute(f"""
CREATE TABLE IF NOT EXISTS {self.table_name} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
rating REAL,
year INTEGER,
genre TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)
""")
def save(self, data: List[Dict]):
"""批量插入数据"""
with sqlite3.connect(self.db_path) as conn:
for item in data:
conn.execute(f"""
INSERT INTO {self.table_name} (title, rating, year, genre)
VALUES (?, ?, ?, ?)
""", (
item.get('title', ''),
item.get('rating', 0),
item.get('year', 0),
item.get('genre', '')
))
conn.commit()
print(f"✓ 已插入 {len(data)} 条数据到 {self.table_name}")
def load(self, limit: int = 100) -> List[Dict]:
"""查询数据"""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute(
f"SELECT * FROM {self.table_name} ORDER BY rating DESC LIMIT ?",
(limit,)
)
return [dict(row) for row in cursor.fetchall()]
def query(self, sql: str, params: tuple = ()) -> List[Dict]:
"""执行自定义查询"""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute(sql, params)
return [dict(row) for row in cursor.fetchall()]
# 使用示例
def demo_storage():
"""存储演示"""
test_data = [
{'title': '肖申克的救赎', 'rating': 9.7, 'year': 1994, 'genre': '犯罪'},
{'title': '霸王别姬', 'rating': 9.6, 'year': 1993, 'genre': '剧情'},
{'title': '阿甘正传', 'rating': 9.5, 'year': 1994, 'genre': '爱情'},
]
print("=" * 50)
print("数据存储演示")
print("=" * 50)
# JSON 存储
json_storage = JSONStorage('data/movies.json')
json_storage.save(test_data)
# CSV 存储
csv_storage = CSVStorage('data/movies.csv')
csv_storage.save(test_data)
# SQLite 存储
db_storage = SQLiteStorage('data/movies.db')
db_storage.save(test_data)
# 查询数据
print("\n查询评分大于 9.0 的电影:")
results = db_storage.query(
"SELECT * FROM movies WHERE rating > ? ORDER BY rating DESC",
(9.0,)
)
for row in results:
print(f" {row['title']} - {row['rating']}")
if __name__ == "__main__":
demo_storage()
10. 爬虫框架:Scrapy 快速入门
10.1 Scrapy 项目结构
myproject/
├── scrapy.cfg # 项目配置
└── myproject/
├── __init__.py
├── items.py # 数据模型
├── pipelines.py # 数据管道
├── settings.py # 配置
└── spiders/
└── movie_spider.py # 爬虫文件
10.2 Scrapy 核心组件
| 组件 | 作用 |
|---|
| Engine | 核心引擎,协调各组件工作 |
| Scheduler | 调度器,管理请求队列 |
| Downloader | 下载器,发起 HTTP 请求 |
| Spider | 爬虫,解析响应、提取数据 |
| Item Pipeline | 管道,处理爬取的数据 |
| Middleware | 中间件,请求/响应的预处理 |
10.3 Scrapy 爬虫代码
"""
Scrapy 爬虫示例:豆瓣电影 Top250
作者:erick
"""
# items.py
import scrapy
class DoubanMovieItem(scrapy.Item):
"""电影数据项"""
rank = scrapy.Field() # 排名
title = scrapy.Field() # 标题
rating = scrapy.Field() # 评分
quote = scrapy.Field() # 语录
director = scrapy.Field() # 导演
actor = scrapy.Field() # 主演
year = scrapy.Field() # 年份
genre = scrapy.Field() # 类型
url = scrapy.Field() # 链接
# movie_spider.py
import scrapy
from douban.items import DoubanMovieItem
class DoubanTop250Spider(scrapy.Spider):
"""豆瓣 Top250 爬虫"""
name = 'douban_top250'
allowed_domains = ['movie.douban.com']
# 初始 URL
start_urls = ['https://movie.douban.com/top250']
def parse(self, response):
"""解析列表页"""
# 提取当前页的电影条目
for item in response.css('div.item'):
movie = DoubanMovieItem()
movie['rank'] = int(item.css('em::text').get())
movie['title'] = item.css('span.title::text').get()
movie['rating'] = float(item.css('span.rating_num::text').get())
movie['quote'] = item.css('span.inq::text').get() or ''
movie['url'] = item.css('a.nbg::attr(href)').get()
# 解析详细信息
info = item.css('div.bd p::text').getall()
if len(info) >= 2:
basic_info = info[0].strip()
movie['director'] = basic_info.split('/')[0].replace('导演', '').strip()
movie['year'] = basic_info.split('/')[-1].strip()
movie['genre'] = info[1].strip()
yield movie
# 翻页:找到下一页链接
next_page = response.css('span.next a::attr(href)').get()
if next_page:
yield response.follow(next_page, callback=self.parse)
def parse_detail(self, response):
"""解析详情页(可选)"""
movie = DoubanMovieItem()
movie['url'] = response.url
movie['title'] = response.css('h1 span::text').get()
# 更多详情提取...
yield movie
# pipelines.py
class DataCleanPipeline:
"""数据清洗管道"""
def process_item(self, item, spider):
# 清洗评分
if item.get('rating'):
item['rating'] = float(item['rating'])
# 清洗标题
if item.get('title'):
item['title'] = item['title'].strip()
return item
class JSONExportPipeline:
"""JSON 导出管道"""
def open_spider(self, spider):
self.file = open('movies.json', 'w', encoding='utf-8')
self.file.write('[\n')
self.count = 0
def close_spider(self, spider):
self.file.write('\n]')
self.file.close()
spider.logger.info(f'共导出 {self.count} 条数据')
def process_item(self, item, spider):
if self.count > 0:
self.file.write(',\n')
import json
line = json.dumps(dict(item), ensure_ascii=False)
self.file.write(' ' + line)
self.count += 1
return item
# settings.py 配置
BOT_NAME = 'douban'
SPIDER_MODULES = ['douban.spiders']
NEWSPIDER_MODULE = 'douban.spiders'
# 禁用 robots.txt 规则
ROBOTSTXT_OBEY = False
# 设置请求头
DEFAULT_REQUEST_HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
}
# 启用管道
ITEM_PIPELINES = {
'douban.pipelines.DataCleanPipeline': 300,
'douban.pipelines.JSONExportPipeline': 400,
}
# 下载延迟(秒)
DOWNLOAD_DELAY = 2
10.4 运行 Scrapy 爬虫
# 进入项目目录
cd myproject
# 运行爬虫
scrapy crawl douban_top250
# 指定输出文件
scrapy crawl douban_top250 -o movies.json
# 指定输出格式(csv/json/xml)
scrapy crawl douban_top250 -o movies.csv -t csv
# 限速运行(每秒 0.5 个请求)
scrapy crawl douban_top250 -s DOWNLOAD_DELAY=2 -s CONCURRENT_REQUESTS_PER_DOMAIN=1
11. 排错指南与最佳实践
11.1 常见错误与解决方案
| 错误类型 | 错误信息 | 原因 | 解决方案 |
|---|
| 编码错误 | UnicodeDecodeError | 响应编码识别错误 | 设置 response.encoding='utf-8' |
| 超时错误 | TimeoutError | 网络慢/目标站慢 | 增加 timeout 参数 |
| 403 Forbidden | 访问被拒绝 | 反爬检测 | 添加/更换 User-Agent、使用代理 |
| 404 Not Found | 资源不存在 | URL 错误/页面重构 | 检查 URL、更新选择器 |
| 编码乱码 | 显示乱码 | 页面编码与解析编码不一致 | 使用 chardet 库检测编码 |
| 解析为空 | 选择器返回 None | CSS/XPath 选择器错误 | 使用浏览器开发者工具检查 |
| 连接重置 | ConnectionResetError | 请求过于频繁 | 降低频率、使用代理 |
| SSL 错误 | SSLError | 证书问题 | 设置 verify=False |
11.2 编码问题处理
"""
编码问题处理
作者:erick
"""
import chardet
import requests
def detect_encoding(response: requests.Response) -> str:
"""自动检测响应编码"""
# 方法1:使用 chardet 检测
raw_data = response.content
detected = chardet.detect(raw_data)
encoding = detected['encoding']
confidence = detected['confidence']
print(f"检测到的编码: {encoding} (置信度: {confidence:.2%})")
# 方法2:常见编码优先尝试
common_encodings = ['utf-8', 'gbk', 'gb2312', 'gb18030', 'big5']
for enc in common_encodings:
try:
text = raw_data.decode(enc)
print(f"成功使用 {enc} 解码")
return enc
except:
continue
# 方法3:response.apparent_encoding
print(f" apparent_encoding: {response.apparent_encoding}")
return response.apparent_encoding
def safe_request(url: str) -> str:
"""安全请求,自动处理编码"""
response = requests.get(url)
# 优先使用响应头中的编码
content_type = response.headers.get('Content-Type', '')
if 'charset=' in content_type:
encoding = content_type.split('charset=')[-1]
else:
encoding = detect_encoding(response)
return response.content.decode(encoding, errors='replace')
# 示例
if __name__ == "__main__":
print("编码检测演示:")
response = requests.get("https://httpbin.org/get")
print(f"apparent_encoding: {response.apparent_encoding}")
11.3 请求重试装饰器
"""
请求重试装饰器
作者:erick
"""
import time
import functools
import requests
from typing import Callable, Any
def retry(max_attempts: int = 3, delay: float = 1.0, backoff: float = 2.0):
"""
请求重试装饰器
Args:
max_attempts: 最大尝试次数
delay: 初始延迟(秒)
backoff: 延迟倍数(指数退避)
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs) -> Any:
current_delay = delay
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except requests.RequestException as e:
if attempt == max_attempts - 1:
raise
print(f"请求失败 ({attempt + 1}/{max_attempts}): {e}")
print(f" {current_delay:.1f} 秒后重试...")
time.sleep(current_delay)
current_delay *= backoff
return None
return wrapper
return decorator
# 使用示例
@retry(max_attempts=3, delay=2, backoff=2)
def fetch_with_retry(url: str) -> dict:
"""带重试的请求"""
response = requests.get(url, timeout=10)
response.raise_for_status()
return response.json()
# 使用
if __name__ == "__main__":
try:
result = fetch_with_retry("https://httpbin.org/get")
print(f"成功: {result}")
except Exception as e:
print(f"最终失败: {e}")
11.4 最佳实践清单
"""
爬虫最佳实践
作者:erick
"""
# ==================== 最佳实践清单 ====================
BEST_PRACTICES = """
📋 Python 网络爬虫最佳实践
✅ 必须遵守:
------------
1. 遵守 robots.txt
- 使用 scrapy 时设置 ROBOTSTXT_OBEY = True
- 手动爬虫也应尊重网站规则
2. 设置合理的请求间隔
- 建议 2-5 秒间隔
- 使用 random.uniform() 避免固定间隔
3. 设置合理的超时时间
- timeout=10~30 秒
- 避免无限等待
4. 错误处理与日志记录
- try-except 包裹所有网络操作
- 记录成功/失败状态
5. User-Agent 伪装
- 使用真实浏览器 UA
- 定期轮换 UA 列表
⚠️ 推荐做法:
-----------
1. 使用 Session 保持连接
- 复用 TCP 连接
- 自动管理 Cookies
2. 数据及时存储
- 边爬边存,避免数据丢失
- 使用事务保证完整性
3. 增量爬取
- 记录爬取进度
- 支持断点续爬
4. 并发控制
- 单域名并发不超过 2-3
- 全局并发不超过 10-20
❌ 避免事项:
-----------
1. 禁止:
- 爬取个人信息用于商业目的
- 绕过付费墙或验证码
- 高频请求导致服务器瘫痪
2. 不要:
- 使用固定 IP 频繁请求
- 忽略异常继续执行
- 在生产环境使用调试代码
"""
if __name__ == "__main__":
print(BEST_PRACTICES)
11.5 调试技巧
"""
爬虫调试技巧
作者:erick
"""
import requests
from bs4 import BeautifulSoup
def debug_request():
"""调试请求"""
url = "https://httpbin.org/headers"
# 1. 打印完整请求信息
req = requests.Request('GET', url)
prepared = req.prepare()
print("=" * 50)
print("请求详情")
print("=" * 50)
print(f"URL: {prepared.url}")
print(f"Method: {prepared.method}")
print("Headers:")
for k, v in prepared.headers.items():
print(f" {k}: {v}")
# 2. 发送请求并检查响应头
response = requests.get(url)
print(f"\n状态码: {response.status_code}")
print("响应头:")
for k, v in response.headers.items():
print(f" {k}: {v}")
def debug_parser():
"""调试解析器"""
html = """
<div class="container">
<div class="item first">
<span class="text">Item 1</span>
</div>
<div class="item">
<span class="text">Item 2</span>
</div>
</div>
"""
soup = BeautifulSoup(html, 'lxml')
print("\n" + "=" * 50)
print("CSS 选择器调试")
print("=" * 50)
selectors = [
'.container',
'.item',
'.item.first',
'.item .text',
'div.item span.text',
'span.text:first-child',
]
for selector in selectors:
result = soup.select(selector)
print(f"\n选择器: {selector}")
print(f" 结果: {len(result)} 个")
for i, elem in enumerate(result):
print(f" [{i}] {elem.text.strip()}")
def debug_ajax():
"""调试 AJAX 请求(模拟浏览器抓包)"""
# 常见 AJAX 请求特征
print("\n" + "=" * 50)
print("常见 AJAX 请求识别")
print("=" * 50)
print("""
1. 检查 Network 面板中:
- XHR / Fetch 类型请求
- URL 包含 ?callback=, api/, /json/ 等
2. 常见数据接口模式:
- https://api.xxx.com/data
- https://xxx.com/api/list?page=1
- https://xxx.com/data.json
3. POST 请求检查:
- Form Data 或 Request Payload
- JSON 格式参数
""")
if __name__ == "__main__":
debug_request()
debug_parser()
debug_ajax()
📚 总结
核心要点回顾
| 模块 | 核心技能 |
|---|
| 请求 | requests / aiohttp / httpx |
| 解析 | BeautifulSoup / lxml / 正则表达式 |
| 反爬 | UA 轮换 / 代理池 / 请求间隔 |
| 存储 | JSON / CSV / SQLite |
| 框架 | Scrapy 适合大型项目 |
| 最佳实践 | 遵守 robots.txt、控制频率、错误处理 |
暂无评论内容