Python 进阶教程:网络爬虫实战

1. 网络爬虫基础概念

1.1 什么是网络爬虫

网络爬虫(Web Spider)是一种自动化的网页数据抓取程序,通过 HTTP 协议向目标服务器发送请求,解析响应内容,提取所需数据。

1.2 爬虫工作流程

┌─────────────┐    HTTP请求    ┌─────────────┐
│   爬虫程序   │ ─────────────→ │   Web服务器  │
│             │ ←───────────── │             │
└─────────────┘    HTTP响应    └─────────────┘
       │
       ▼
┌─────────────┐
│  HTML解析   │ ──→ 提取数据 ──→ 存储到文件/数据库
└─────────────┘

1.3 常用术语

术语英文说明
UAUser-Agent浏览器标识,用于伪装请求
IP 封禁IP Ban频繁访问导致 IP 被服务器封禁
robots.txtrobots.txt网站允许爬取的规则文件
验证码CAPTCHA用于区分人机的高级反爬手段
代理池Proxy Pool多个 IP 地址轮换使用

2. 核心工具对比

工具类型适用场景学习曲线异步支持
urllib标准库简单请求
requests第三方库常规爬虫⭐⭐
aiohttp第三方库高并发爬虫⭐⭐⭐
httpx第三方库同步/异步两用⭐⭐
Scrapy框架大型爬虫项目⭐⭐⭐⭐
Playwright浏览器自动化JavaScript 渲染页面⭐⭐⭐

3. 环境准备与依赖安装

3.1 创建虚拟环境

# 创建专属虚拟环境
python -m venv spider_env
​
# 激活环境(Windows)
spider_env\Scripts\activate
​
# 激活环境(Linux/Mac)
source spider_env/bin/activate

3.2 安装依赖

# 基础依赖
pip install requests beautifulsoup4 lxml
​
# 异步爬虫依赖
pip install aiohttp httpx
​
# 大型爬虫框架
pip install scrapy
​
# 浏览器自动化(处理 JS 渲染页面)
pip install playwright
playwright install chromium
​
# 数据处理
pip install pandas openpyxl

3.3 验证安装

import requests
import bs4
import lxml
​
print(f"requests: {requests.__version__}")
print(f"beautifulsoup4: {bs4.__version__}")
print(f"lxml: {lxml.__version__}")

4. 基础请求:urllib 与 requests

4.1 urllib(标准库,无需安装)

"""
urllib 基本用法演示
作者:erick
"""
​
import urllib.request
import urllib.parse
import urllib.error
import json
​
def basic_get_request():
    """GET 请求基本用法"""
    url = "https://httpbin.org/get"
    
    try:
        # 创建请求对象,可添加 headers
        req = urllib.request.Request(
            url,
            headers={
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
            }
        )
        
        # 发送请求
        with urllib.request.urlopen(req, timeout=10) as response:
            data = response.read().decode('utf-8')
            print("响应状态码:", response.status)
            print("响应头:", dict(response.headers))
            return json.loads(data)
            
    except urllib.error.URLError as e:
        print(f"请求失败: {e.reason}")
        return None
​
def post_request_with_data():
    """POST 请求带参数"""
    url = "https://httpbin.org/post"
    data = {
        'username': 'erick',
        'password': '123456'
    }
    
    # URL 编码参数
    encoded_data = urllib.parse.urlencode(data).encode('utf-8')
    
    req = urllib.request.Request(
        url,
        data=encoded_data,
        headers={
            'User-Agent': 'Mozilla/5.0',
            'Content-Type': 'application/x-www-form-urlencoded'
        }
    )
    
    with urllib.request.urlopen(req, timeout=10) as response:
        return json.loads(response.read().decode('utf-8'))
​
def download_file():
    """下载文件"""
    url = "https://httpbin.org/image/png"
    output_path = "downloaded_image.png"
    
    req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'})
    
    with urllib.request.urlopen(req, timeout=30) as response:
        with open(output_path, 'wb') as f:
            f.write(response.read())
    
    print(f"文件已保存到: {output_path}")
​
if __name__ == "__main__":
    print("=" * 50)
    print("urllib GET 请求演示")
    print("=" * 50)
    result = basic_get_request()
    if result:
        print("请求成功!")

4.2 requests(更人性化的 HTTP 库)

"""
requests 库高级用法演示
作者:erick
"""
​
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
import time
import random
​
class HTTPClient:
    """封装 HTTP 客户端,支持重试和代理"""
    
    def __init__(self, proxies=None):
        self.session = requests.Session()
        self.proxies = proxies
        
        # 配置重试策略
        retry_strategy = Retry(
            total=3,                    # 总重试次数
            backoff_factor=1,           # 重试间隔因子
            status_forcelist=[429, 500, 502, 503, 504]
        )
        
        adapter = HTTPAdapter(max_retries=retry_strategy)
        self.session.mount("http://", adapter)
        self.session.mount("https://", adapter)
    
    def get(self, url, **kwargs):
        """发送 GET 请求"""
        kwargs.setdefault('timeout', 10)
        kwargs.setdefault('headers', self._random_headers())
        
        if self.proxies:
            kwargs['proxies'] = self.proxies
        
        return self.session.get(url, **kwargs)
    
    def post(self, url, **kwargs):
        """发送 POST 请求"""
        kwargs.setdefault('timeout', 10)
        kwargs.setdefault('headers', self._random_headers())
        
        if self.proxies:
            kwargs['proxies'] = self.proxies
        
        return self.session.post(url, **kwargs)
    
    @staticmethod
    def _random_headers():
        """随机生成请求头"""
        ua_list = [
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
            'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36',
        ]
        return {
            'User-Agent': random.choice(ua_list),
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
        }
​
def handle_cookies():
    """处理 Cookies 的两种方式"""
    
    # 方式1:Session 自动管理
    session = requests.Session()
    session.headers.update({
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
    })
    
    # 模拟登录
    login_url = "https://httpbin.org/post"
    session.post(login_url, data={'username': 'erick', 'token': 'abc123'})
    
    # 后续请求自动携带 cookies
    response = session.get("https://httpbin.org/get")
    print("使用 Session:", response.status_code)
    
    # 方式2:手动设置 Cookies
    cookies = {
        'session_id': 'xyz789',
        'user': 'erick'
    }
    response = requests.get(
        "https://httpbin.org/get",
        cookies=cookies,
        headers={'User-Agent': 'Mozilla/5.0'}
    )
    print("手动 Cookies:", response.status_code)
​
if __name__ == "__main__":
    client = HTTPClient()
    
    # 测试 GET 请求
    response = client.get("https://httpbin.org/get")
    print(f"状态码: {response.status_code}")
    print(f"响应内容: {response.json()}")
    
    # 测试 Cookies
    print("\n" + "=" * 50)
    handle_cookies()

5. HTML 解析:BeautifulSoup 与 lxml

5.1 BeautifulSoup 基础

"""
BeautifulSoup 解析 HTML 演示
作者:erick
"""
​
from bs4 import BeautifulSoup
import requests
​
def basic_parse():
    """BeautifulSoup 基本用法"""
    html = """
    <html>
        <head><title>测试页面</title></head>
        <body>
            <div class="container">
                <h1 class="title">欢迎来到 Python 爬虫教程</h1>
                <ul id="course-list">
                    <li class="course">Python 基础</li>
                    <li class="course">Python 进阶</li>
                    <li class="course highlight">Python 爬虫实战</li>
                </ul>
                <a href="https://example.com">访问示例网站</a>
                <a href="https://example.org">另一个链接</a>
            </div>
        </body>
    </html>
    """
    
    # 解析器选择:html.parser / lxml / html5lib
    soup = BeautifulSoup(html, 'lxml')
    
    print("=" * 50)
    print("BeautifulSoup 基本选择器演示")
    print("=" * 50)
    
    # 1. 通过标签名查找(返回第一个匹配)
    title = soup.find('title')
    print(f"标题: {title.text}")
    
    # 2. 通过标签名查找所有
    all_links = soup.find_all('a')
    print(f"\n所有链接 ({len(all_links)} 个):")
    for link in all_links:
        print(f"  - {link.text}: {link.get('href')}")
    
    # 3. 通过 class 查找
    courses = soup.find_all('li', class_='course')
    print(f"\n课程列表:")
    for course in courses:
        print(f"  • {course.text}")
    
    # 4. 通过 id 查找
    course_list = soup.find('ul', id='course-list')
    print(f"\nID 为 course-list 的元素: {course_list.name}")
    
    # 5. CSS 选择器
    print("\n使用 CSS 选择器:")
    print(f"  h1.title: {soup.select_one('h1.title').text}")
    print(f"  li.highlight: {soup.select_one('li.highlight').text}")
    
    # 6. 获取属性值
    first_link = soup.find('a')
    print(f"\n链接文本: {first_link.text}")
    print(f"链接地址: {first_link['href']}")
​
def extract_nested_data():
    """提取嵌套数据结构"""
    html = """
    <div class="product-list">
        <div class="product">
            <span class="price">¥299</span>
            <span class="name">iPhone 15 Pro</span>
            <span class="rating">4.8</span>
        </div>
        <div class="product">
            <span class="price">¥199</span>
            <span class="name">AirPods Pro</span>
            <span class="rating">4.9</span>
        </div>
    </div>
    """
    
    soup = BeautifulSoup(html, 'lxml')
    products = []
    
    for product in soup.select('.product'):
        products.append({
            'name': product.select_one('.name').text,
            'price': product.select_one('.price').text,
            'rating': product.select_one('.rating').text
        })
    
    print("\n提取到的产品数据:")
    for p in products:
        print(f"  {p['name']} - {p['price']} - 评分: {p['rating']}")
​
if __name__ == "__main__":
    basic_parse()
    extract_nested_data()

5.2 解析器性能对比

解析器速度容错性依赖
html.parser中等一般
lxmllibxml2
html5lib最好html5lib
def parser_comparison():
    """解析器性能对比"""
    import time
    
    html = "<div>" * 1000 + "内容" + "</div>" * 1000
    
    parsers = ['html.parser', 'lxml', 'html5lib']
    
    print("\n解析器性能对比:")
    for parser in parsers:
        start = time.time()
        for _ in range(100):
            soup = BeautifulSoup(html, parser)
        elapsed = time.time() - start
        print(f"  {parser:15s}: {elapsed:.4f}秒")
​
if __name__ == "__main__":
    parser_comparison()

6. 异步爬虫:aiohttp 与 httpx

6.1 aiohttp 异步爬虫

"""
aiohttp 异步爬虫实战
作者:erick
"""
​
import asyncio
import aiohttp
from bs4 import BeautifulSoup
import json
import time
from typing import List, Dict
​
class AsyncSpider:
    """异步爬虫基类"""
    
    def __init__(self, concurrency: int = 10):
        self.concurrency = concurrency  # 并发数限制
        self.semaphore = None
        self.results: List[Dict] = []
    
    async def fetch(self, session: aiohttp.ClientSession, url: str) -> Dict:
        """抓取单个页面"""
        async with self.semaphore:  # 控制并发数
            try:
                async with session.get(url, timeout=30) as response:
                    if response.status == 200:
                        html = await response.text()
                        return {'url': url, 'html': html, 'status': 200}
                    else:
                        return {'url': url, 'status': response.status}
            except Exception as e:
                return {'url': url, 'error': str(e)}
    
    async def crawl(self, urls: List[str]) -> List[Dict]:
        """并发抓取多个 URL"""
        self.semaphore = asyncio.Semaphore(self.concurrency)
        
        async with aiohttp.ClientSession(
            headers={
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
            }
        ) as session:
            tasks = [self.fetch(session, url) for url in urls]
            results = await asyncio.gather(*tasks)
            return results
    
    def parse(self, html: str) -> Dict:
        """解析 HTML(子类实现)"""
        raise NotImplementedError
​
async def demo_simple():
    """aiohttp 简单示例"""
    
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/1",
    ]
    
    spider = AsyncSpider(concurrency=3)
    
    print("=" * 50)
    print("aiohttp 异步并发演示")
    print("=" * 50)
    
    start = time.time()
    results = await spider.crawl(urls)
    elapsed = time.time() - start
    
    print(f"抓取 {len(urls)} 个页面(每个延迟1秒)")
    print(f"顺序执行预计: 3秒")
    print(f"并发执行实际: {elapsed:.2f}秒")
    print(f"状态码: {[r.get('status') for r in results]}")
​
async def crawl_images():
    """并发下载图片示例"""
    import os
    import aiofiles
    
    image_urls = [
        "https://httpbin.org/image/jpeg",
        "https://httpbin.org/image/png",
    ]
    
    async def download_image(session: aiohttp.ClientSession, url: str, filename: str):
        async with session.get(url) as response:
            if response.status == 200:
                content = await response.read()
                async with aiofiles.open(filename, 'wb') as f:
                    await f.write(content)
                return filename
            return None
    
    async with aiohttp.ClientSession() as session:
        tasks = [
            download_image(session, url, f"image_{i}.jpg")
            for i, url in enumerate(image_urls)
        ]
        results = await asyncio.gather(*tasks)
        print(f"下载完成: {[r for r in results if r]}")
​
if __name__ == "__main__":
    asyncio.run(demo_simple())

6.2 httpx(同步/异步两用)

"""
httpx 同步与异步用法对比
作者:erick
"""
​
import httpx
import asyncio
​
# ==================== 同步方式 ====================
def sync_demo():
    """同步请求"""
    client = httpx.Client(timeout=10.0)
    
    response = client.get("https://httpbin.org/get")
    print(f"同步请求状态: {response.status_code}")
    print(f"响应内容: {response.json()}")
    
    client.close()
​
# ==================== 异步方式 ====================
async def async_demo():
    """异步请求"""
    async with httpx.AsyncClient(timeout=30.0) as client:
        # 并发发送多个请求
        responses = await client.gather(
            client.get("https://httpbin.org/get"),
            client.get("https://httpbin.org/ip"),
            client.get("https://httpbin.org/headers"),
        )
        
        for resp in responses:
            print(f"状态: {resp.status_code}, URL: {resp.url}")
​
# ==================== 实战:批量请求 ====================
async def batch_request_demo():
    """批量请求多个 API"""
    urls = [
        "https://httpbin.org/get?id=1",
        "https://httpbin.org/get?id=2",
        "https://httpbin.org/get?id=3",
        "https://httpbin.org/get?id=4",
        "https://httpbin.org/get?id=5",
    ]
    
    async with httpx.AsyncClient() as client:
        import time
        start = time.time()
        
        # 并发请求
        responses = await asyncio.gather(*[
            client.get(url) for url in urls
        ])
        
        elapsed = time.time() - start
        print(f"批量请求 {len(urls)} 个 URL,耗时: {elapsed:.2f}秒")
        
        for resp in responses:
            data = resp.json()
            print(f"  ID={data['args'].get('id')} -> 状态: {resp.status_code}")
​
if __name__ == "__main__":
    print("同步请求演示:")
    sync_demo()
    
    print("\n异步请求演示:")
    asyncio.run(async_demo())
    
    print("\n批量请求演示:")
    asyncio.run(batch_request_demo())

7. 反爬应对策略

7.1 常用反爬机制与应对

反爬机制说明应对策略
User-Agent 检测检测浏览器标识使用真实 UA 列表
IP 限流限制单 IP 请求频率使用代理池 + 请求间隔
验证码CAPTCHAs 人机验证打码平台 / 机器学习
Cookie 验证验证会话合法性维护登录态 Cookies
请求签名接口带签名参数分析 JS 逆向签名算法
动态加载数据由 JS 动态生成Selenium / Playwright

7.2 UA 轮换与请求间隔

"""
反爬应对策略:UA轮换、代理、请求间隔
作者:erick
"""
​
import random
import time
import requests
from typing import List, Dict, Optional
​
class AntiCrawlerClient:
    """反反爬策略封装"""
    
    def __init__(self, proxies: Optional[List[str]] = None):
        self.session = requests.Session()
        self.proxies = proxies or []
        self.proxy_index = 0
        
        # 常用 User-Agent 列表
        self.ua_list = [
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15',
            'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
        ]
    
    def _get_random_headers(self) -> Dict:
        """生成随机请求头"""
        return {
            'User-Agent': random.choice(self.ua_list),
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
            'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
            'Accept-Encoding': 'gzip, deflate, br',
            'Connection': 'keep-alive',
            'Upgrade-Insecure-Requests': '1',
        }
    
    def _get_proxy(self) -> Optional[Dict]:
        """获取代理"""
        if not self.proxies:
            return None
        proxy = self.proxies[self.proxy_index % len(self.proxies)]
        self.proxy_index += 1
        return {'http': proxy, 'https': proxy}
    
    def get(self, url: str, delay_range: tuple = (1, 3), **kwargs) -> requests.Response:
        """
        发送 GET 请求
        
        Args:
            url: 目标 URL
            delay_range: 请求间隔随机范围(秒)
            **kwargs: 传递给 requests.get 的其他参数
        """
        # 随机延迟
        delay = random.uniform(*delay_range)
        time.sleep(delay)
        
        # 设置请求头
        kwargs.setdefault('headers', self._get_random_headers())
        
        # 设置代理
        proxy = self._get_proxy()
        if proxy:
            kwargs['proxies'] = proxy
        
        # 发送请求
        response = self.session.get(url, **kwargs)
        
        # 随机延迟(请求后)
        time.sleep(random.uniform(*delay_range))
        
        return response
    
    def post(self, url: str, delay_range: tuple = (1, 3), **kwargs) -> requests.Response:
        """发送 POST 请求"""
        delay = random.uniform(*delay_range)
        time.sleep(delay)
        
        kwargs.setdefault('headers', self._get_random_headers())
        proxy = self._get_proxy()
        if proxy:
            kwargs['proxies'] = proxy
        
        return self.session.post(url, **kwargs)
​
def demo_anti_crawler():
    """反爬策略演示"""
    
    # 初始化客户端(可添加代理列表)
    client = AntiCrawlerClient(
        proxies=[
            # 'http://user:pass@proxy1.com:8080',
            # 'http://user:pass@proxy2.com:8080',
        ]
    )
    
    print("=" * 50)
    print("反爬策略演示")
    print("=" * 50)
    print("将随机更换 User-Agent 并添加请求间隔\n")
    
    for i in range(3):
        print(f"第 {i+1} 次请求:")
        response = client.get(
            "https://httpbin.org/get",
            delay_range=(0.5, 1)  # 演示用较短间隔
        )
        headers = response.json().get('headers', {})
        print(f"  User-Agent: {headers.get('User-Agent', 'N/A')[:50]}...")
        print(f"  状态码: {response.status_code}")
​
if __name__ == "__main__":
    demo_anti_crawler()

7.3 使用代理池

"""
代理池管理
作者:erick
"""
​
import random
import asyncio
import aiohttp
from dataclasses import dataclass
from typing import List, Optional
import time
​
@dataclass
class Proxy:
    """代理信息"""
    ip: str
    port: int
    protocol: str = 'http'
    
    @property
    def url(self) -> str:
        return f"{self.protocol}://{self.ip}:{self.port}"
​
class ProxyPool:
    """代理池管理"""
    
    def __init__(self):
        self.proxies: List[Proxy] = []
        self.failed_proxies: set = set()
        self.check_interval = 300  # 每5分钟检测一次
        self.last_check = 0
    
    def add_proxy(self, proxy: Proxy):
        """添加代理"""
        if proxy.url not in self.failed_proxies:
            self.proxies.append(proxy)
    
    def get_random_proxy(self) -> Optional[Proxy]:
        """随机获取可用代理"""
        available = [p for p in self.proxies if p.url not in self.failed_proxies]
        if not available:
            return None
        return random.choice(available)
    
    def mark_failed(self, proxy: Proxy):
        """标记失败的代理"""
        self.failed_proxies.add(proxy.url)
        if proxy in self.proxies:
            self.proxies.remove(proxy)
        print(f"代理 {proxy.url} 已标记为失败")
    
    async def check_proxy(self, proxy: Proxy, timeout: int = 5) -> bool:
        """检测代理是否可用"""
        try:
            async with aiohttp.ClientSession() as session:
                start = time.time()
                async with session.get(
                    'https://httpbin.org/ip',
                    proxy=proxy.url,
                    timeout=aiohttp.ClientTimeout(total=timeout)
                ) as response:
                    elapsed = time.time() - start
                    return response.status == 200 and elapsed < timeout
        except:
            return False
    
    async def refresh_proxies(self):
        """刷新代理池"""
        print("开始检测代理可用性...")
        
        tasks = []
        for proxy in self.proxies[:]:  # 复制列表避免迭代中修改
            tasks.append(self.check_proxy(proxy))
        
        results = await asyncio.gather(*tasks)
        
        # 移除不可用代理
        for proxy, is_ok in zip(self.proxies[:], results):
            if not is_ok:
                self.mark_failed(proxy)
        
        print(f"代理池刷新完成,剩余可用代理: {len(self.proxies)}")
​
# 使用示例
async def demo_proxy_pool():
    pool = ProxyPool()
    
    # 添加测试代理
    pool.add_proxy(Proxy(ip='127.0.0.1', port=8080))
    pool.add_proxy(Proxy(ip='127.0.0.1', port=8081))
    
    proxy = pool.get_random_proxy()
    if proxy:
        print(f"获取到代理: {proxy.url}")
    else:
        print("无可用代理")
    
    # 检测代理
    if proxy:
        is_ok = await pool.check_proxy(proxy)
        print(f"代理 {proxy.url} 可用性: {'可用' if is_ok else '不可用'}")
​
if __name__ == "__main__":
    asyncio.run(demo_proxy_pool())

8. 实战案例:抓取豆瓣电影 Top250

8.1 项目结构

douban_spider/
├── main.py              # 入口文件
├── spider/
│   ├── __init__.py
│   ├── fetcher.py        # 请求模块
│   ├── parser.py         # 解析模块
│   ├── storage.py        # 存储模块
│   └── config.py         # 配置文件
├── data/                 # 数据存储目录
└── requirements.txt     # 依赖清单

8.2 完整实现代码

"""
豆瓣电影 Top250 爬虫
作者:erick
功能:抓取豆瓣电影排行榜数据并存储为 JSON/CSV
"""
​
import requests
from bs4 import BeautifulSoup
import csv
import json
import os
import time
import random
from dataclasses import dataclass, asdict
from typing import List, Optional
from pathlib import Path
​
# ==================== 数据模型 ====================
@dataclass
class Movie:
    """电影数据模型"""
    rank: int           # 排名
    title: str          # 电影名
    rating: float       # 评分
    quote: str          # 经典语录
    director: str       # 导演
    actor: str          # 主演
    year: str           # 年份
    genre: str          # 类型
    country: str        # 国家/地区
    url: str            # 详情页 URL
​
# ==================== 配置 ====================
class Config:
    BASE_URL = "https://movie.douban.com/top250"
    HEADERS = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
        'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
        'Cookie': 'bid=xxxxxxxxxxxxxx; __yadk_uid=xxxxxxxxxxxxxx',  # 建议添加 Cookie
    }
    DELAY_RANGE = (2, 5)  # 请求间隔(秒)
    OUTPUT_DIR = Path(__file__).parent / 'data'
​
# ==================== 请求模块 ====================
class DoubanFetcher:
    """豆瓣Fetcher"""
    
    def __init__(self):
        self.session = requests.Session()
        self.session.headers.update(Config.HEADERS)
    
    def fetch_page(self, page: int = 0) -> Optional[str]:
        """
        抓取单页 HTML
        
        Args:
            page: 页码(0-9,每页25部电影)
        Returns:
            HTML 字符串,失败返回 None
        """
        url = Config.BASE_URL
        if page > 0:
            url = f"{Config.BASE_URL}?start={page * 25}"
        
        try:
            # 添加随机延迟
            time.sleep(random.uniform(*Config.DELAY_RANGE))
            
            response = self.session.get(url, timeout=15)
            response.raise_for_status()
            response.encoding = 'utf-8'
            
            print(f"✓ 第 {page + 1} 页抓取成功 (状态码: {response.status_code})")
            return response.text
            
        except requests.RequestException as e:
            print(f"✗ 第 {page + 1} 页抓取失败: {e}")
            return None
    
    def close(self):
        self.session.close()
​
# ==================== 解析模块 ====================
class DoubanParser:
    """豆瓣 HTML 解析器"""
    
    @staticmethod
    def parse_movies(html: str) -> List[Movie]:
        """
        解析电影列表
        
        Args:
            html: HTML 字符串
        Returns:
            Movie 对象列表
        """
        soup = BeautifulSoup(html, 'lxml')
        movie_items = soup.select('div.item')
        movies = []
        
        for item in movie_items:
            try:
                # 排名
                rank = int(item.select_one('em').text)
                
                # 电影标题(可能有多个)
                title_elem = item.select_one('span.title')
                title = title_elem.text if title_elem else "未知"
                
                # 评分
                rating_elem = item.select_one('span.rating_num')
                rating = float(rating_elem.text) if rating_elem else 0.0
                
                # 经典语录
                quote_elem = item.select_one('span.inq')
                quote = quote_elem.text if quote_elem else ""
                
                # 详情链接
                link_elem = item.select_one('a.nbg')
                url = link_elem.get('href', '') if link_elem else ""
                
                # 导演、演员、年份等(从 bd div 获取)
                bd = item.select_one('div.bd')
                info_text = bd.get_text(separator=' ', strip=True) if bd else ""
                
                # 解析信息
                parts = info_text.split('/')
                
                if len(parts) >= 3:
                    # 格式: "导演: xxx / 主演: xxx / 2024 / xxx / xxx"
                    year = parts[-3].strip() if len(parts) >= 3 else ""
                    country_genre = parts[-2].strip() if len(parts) >= 2 else ""
                    genre = parts[-1].strip()
                    # 提取导演和演员
                    info_parts = parts[0].split('主演:')
                    director = info_parts[0].replace('导演:', '').strip()
                    actor = info_parts[1].strip() if len(info_parts) > 1 else ""
                else:
                    director = actor = year = genre = country_genre = ""
                
                movie = Movie(
                    rank=rank,
                    title=title,
                    rating=rating,
                    quote=quote,
                    director=director,
                    actor=actor,
                    year=year,
                    genre=genre,
                    country=country_genre,
                    url=url
                )
                movies.append(movie)
                
            except Exception as e:
                print(f"解析电影信息失败: {e}")
                continue
        
        return movies
​
# ==================== 存储模块 ====================
class DoubanStorage:
    """数据存储"""
    
    def __init__(self, output_dir: Path = Config.OUTPUT_DIR):
        self.output_dir = output_dir
        self.output_dir.mkdir(parents=True, exist_ok=True)
    
    def save_json(self, movies: List[Movie], filename: str = "douban_top250.json"):
        """保存为 JSON"""
        filepath = self.output_dir / filename
        with open(filepath, 'w', encoding='utf-8') as f:
            json.dump(
                [asdict(m) for m in movies],
                f,
                ensure_ascii=False,
                indent=2
            )
        print(f"✓ JSON 数据已保存: {filepath}")
    
    def save_csv(self, movies: List[Movie], filename: str = "douban_top250.csv"):
        """保存为 CSV"""
        filepath = self.output_dir / filename
        with open(filepath, 'w', encoding='utf-8-sig', newline='') as f:
            writer = csv.writer(f)
            # 写入表头
            writer.writerow(['排名', '电影名', '评分', '经典语录', '导演', '主演', '年份', '类型', '国家/地区', '详情链接'])
            # 写入数据
            for m in movies:
                writer.writerow([
                    m.rank, m.title, m.rating, m.quote,
                    m.director, m.actor, m.year, m.genre,
                    m.country, m.url
                ])
        print(f"✓ CSV 数据已保存: {filepath}")
    
    def save_markdown(self, movies: List[Movie], filename: str = "douban_top250.md"):
        """保存为 Markdown 表格"""
        filepath = self.output_dir / filename
        with open(filepath, 'w', encoding='utf-8') as f:
            f.write("# 豆瓣电影 Top250\n\n")
            f.write("| 排名 | 电影名 | 评分 | 年份 | 类型 |\n")
            f.write("|------|--------|------|------|------|\n")
            for m in movies:
                f.write(f"| {m.rank} | {m.title} | {m.rating} | {m.year} | {m.genre} |\n")
        print(f"✓ Markdown 数据已保存: {filepath}")
​
# ==================== 主程序 ====================
def main():
    """豆瓣 Top250 爬虫主程序"""
    print("=" * 50)
    print("豆瓣电影 Top250 爬虫")
    print("=" * 50)
    
    fetcher = DoubanFetcher()
    parser = DoubanParser()
    storage = DoubanStorage()
    
    all_movies = []
    
    try:
        # 抓取全部 10 页(Top250)
        for page in range(10):
            print(f"\n正在抓取第 {page + 1}/10 页...")
            
            html = fetcher.fetch_page(page)
            if html:
                movies = parser.parse_movies(html)
                all_movies.extend(movies)
                print(f"  本页解析到 {len(movies)} 部电影")
        
        print(f"\n总计抓取 {len(all_movies)} 部电影")
        
        # 保存数据
        print("\n" + "=" * 50)
        print("保存数据")
        print("=" * 50)
        storage.save_json(all_movies)
        storage.save_csv(all_movies)
        storage.save_markdown(all_movies)
        
        print("\n✓ 爬取完成!")
        
        # 显示前 10 名
        print("\n📊 Top 10 电影预览:")
        for i, m in enumerate(all_movies[:10], 1):
            print(f"  {i:2d}. {m.title} ({m.year}) - 评分: {m.rating}")
    
    finally:
        fetcher.close()
​
if __name__ == "__main__":
    main()

8.3 运行效果

==================================================
豆瓣电影 Top250 爬虫
==================================================
​
正在抓取第 1/10 页...
✓ 第 1 页抓取成功 (状态码: 200)
  本页解析到 25 部电影
​
正在抓取第 2/10 页...
✓ 第 2 页抓取成功 (状态码: 200)
  本页解析到 25 部电影
...
​
==================================================
保存数据
==================================================
✓ JSON 数据已保存: data\douban_top250.json
✓ CSV 数据已保存: data\douban_top250.csv
✓ Markdown 数据已保存: data\douban_top250.md
​
✓ 爬取完成!
​
📊 Top 10 电影预览:
   1. 肖申克的救赎 (1994) - 评分: 9.7
   2. 霸王别姬 (1993) - 评分: 9.6
   3. 阿甘正传 (1994) - 评分: 9.5
   ...

9. 数据存储:JSON/CSV/数据库

9.1 多种存储方式对比

存储方式适用场景优点缺点
JSON小规模、层级数据格式清晰、易读不适合大规模数据
CSV表格数据、分析体积小、易导入 Excel不支持嵌套结构
SQLite中等规模、结构化零配置、跨平台并发写入差
MySQL/PostgreSQL生产环境强大、成熟需要部署
MongoDB非结构化数据灵活、易扩展查询性能略低

9.2 数据存储工具类

"""
数据存储工具类
作者:erick
"""
​
import json
import csv
import sqlite3
from pathlib import Path
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from datetime import datetime
​
@dataclass
class MovieRecord:
    """电影记录"""
    id: Optional[int]
    title: str
    rating: float
    year: int
    genre: str
    created_at: str = None
    
    def __post_init__(self):
        if self.created_at is None:
            self.created_at = datetime.now().isoformat()
​
class DataStorage:
    """数据存储基类"""
    
    def save(self, data: List[Dict]):
        raise NotImplementedError
    
    def load(self) -> List[Dict]:
        raise NotImplementedError
​
class JSONStorage(DataStorage):
    """JSON 文件存储"""
    
    def __init__(self, filepath: str):
        self.filepath = Path(filepath)
    
    def save(self, data: List[Dict]):
        """保存为 JSON"""
        self.filepath.parent.mkdir(parents=True, exist_ok=True)
        with open(self.filepath, 'w', encoding='utf-8') as f:
            json.dump(data, f, ensure_ascii=False, indent=2)
        print(f"✓ 已保存 {len(data)} 条数据到 {self.filepath}")
    
    def load(self) -> List[Dict]:
        """读取 JSON"""
        if not self.filepath.exists():
            return []
        with open(self.filepath, 'r', encoding='utf-8') as f:
            return json.load(f)
​
class CSVStorage(DataStorage):
    """CSV 文件存储"""
    
    def __init__(self, filepath: str, fieldnames: List[str] = None):
        self.filepath = Path(filepath)
        self.fieldnames = fieldnames or ['id', 'title', 'rating', 'year', 'genre']
    
    def save(self, data: List[Dict]):
        """追加保存为 CSV"""
        self.filepath.parent.mkdir(parents=True, exist_ok=True)
        
        write_header = not self.filepath.exists()
        
        with open(self.filepath, 'a', encoding='utf-8-sig', newline='') as f:
            writer = csv.DictWriter(f, fieldnames=self.fieldnames)
            if write_header:
                writer.writeheader()
            writer.writerows(data)
        
        print(f"✓ 已追加 {len(data)} 条数据到 {self.filepath}")
    
    def load(self) -> List[Dict]:
        """读取 CSV"""
        if not self.filepath.exists():
            return []
        with open(self.filepath, 'r', encoding='utf-8-sig', newline='') as f:
            reader = csv.DictReader(f)
            return list(reader)
​
class SQLiteStorage(DataStorage):
    """SQLite 数据库存储"""
    
    def __init__(self, db_path: str, table_name: str = 'movies'):
        self.db_path = Path(db_path)
        self.table_name = table_name
        self._init_db()
    
    def _init_db(self):
        """初始化数据库表"""
        with sqlite3.connect(self.db_path) as conn:
            conn.execute(f"""
                CREATE TABLE IF NOT EXISTS {self.table_name} (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    title TEXT NOT NULL,
                    rating REAL,
                    year INTEGER,
                    genre TEXT,
                    created_at TEXT DEFAULT CURRENT_TIMESTAMP
                )
            """)
    
    def save(self, data: List[Dict]):
        """批量插入数据"""
        with sqlite3.connect(self.db_path) as conn:
            for item in data:
                conn.execute(f"""
                    INSERT INTO {self.table_name} (title, rating, year, genre)
                    VALUES (?, ?, ?, ?)
                """, (
                    item.get('title', ''),
                    item.get('rating', 0),
                    item.get('year', 0),
                    item.get('genre', '')
                ))
            conn.commit()
        print(f"✓ 已插入 {len(data)} 条数据到 {self.table_name}")
    
    def load(self, limit: int = 100) -> List[Dict]:
        """查询数据"""
        with sqlite3.connect(self.db_path) as conn:
            conn.row_factory = sqlite3.Row
            cursor = conn.execute(
                f"SELECT * FROM {self.table_name} ORDER BY rating DESC LIMIT ?",
                (limit,)
            )
            return [dict(row) for row in cursor.fetchall()]
    
    def query(self, sql: str, params: tuple = ()) -> List[Dict]:
        """执行自定义查询"""
        with sqlite3.connect(self.db_path) as conn:
            conn.row_factory = sqlite3.Row
            cursor = conn.execute(sql, params)
            return [dict(row) for row in cursor.fetchall()]
​
# 使用示例
def demo_storage():
    """存储演示"""
    test_data = [
        {'title': '肖申克的救赎', 'rating': 9.7, 'year': 1994, 'genre': '犯罪'},
        {'title': '霸王别姬', 'rating': 9.6, 'year': 1993, 'genre': '剧情'},
        {'title': '阿甘正传', 'rating': 9.5, 'year': 1994, 'genre': '爱情'},
    ]
    
    print("=" * 50)
    print("数据存储演示")
    print("=" * 50)
    
    # JSON 存储
    json_storage = JSONStorage('data/movies.json')
    json_storage.save(test_data)
    
    # CSV 存储
    csv_storage = CSVStorage('data/movies.csv')
    csv_storage.save(test_data)
    
    # SQLite 存储
    db_storage = SQLiteStorage('data/movies.db')
    db_storage.save(test_data)
    
    # 查询数据
    print("\n查询评分大于 9.0 的电影:")
    results = db_storage.query(
        "SELECT * FROM movies WHERE rating > ? ORDER BY rating DESC",
        (9.0,)
    )
    for row in results:
        print(f"  {row['title']} - {row['rating']}")
​
if __name__ == "__main__":
    demo_storage()

10. 爬虫框架:Scrapy 快速入门

10.1 Scrapy 项目结构

myproject/
├── scrapy.cfg              # 项目配置
└── myproject/
    ├── __init__.py
    ├── items.py            # 数据模型
    ├── pipelines.py        # 数据管道
    ├── settings.py         # 配置
    └── spiders/
        └── movie_spider.py # 爬虫文件

10.2 Scrapy 核心组件

组件作用
Engine核心引擎,协调各组件工作
Scheduler调度器,管理请求队列
Downloader下载器,发起 HTTP 请求
Spider爬虫,解析响应、提取数据
Item Pipeline管道,处理爬取的数据
Middleware中间件,请求/响应的预处理

10.3 Scrapy 爬虫代码

"""
Scrapy 爬虫示例:豆瓣电影 Top250
作者:erick
"""
​
# items.py
import scrapy
​
class DoubanMovieItem(scrapy.Item):
    """电影数据项"""
    rank = scrapy.Field()       # 排名
    title = scrapy.Field()      # 标题
    rating = scrapy.Field()     # 评分
    quote = scrapy.Field()       # 语录
    director = scrapy.Field()    # 导演
    actor = scrapy.Field()       # 主演
    year = scrapy.Field()        # 年份
    genre = scrapy.Field()       # 类型
    url = scrapy.Field()         # 链接
​
# movie_spider.py
import scrapy
from douban.items import DoubanMovieItem
​
class DoubanTop250Spider(scrapy.Spider):
    """豆瓣 Top250 爬虫"""
    
    name = 'douban_top250'
    allowed_domains = ['movie.douban.com']
    
    # 初始 URL
    start_urls = ['https://movie.douban.com/top250']
    
    def parse(self, response):
        """解析列表页"""
        # 提取当前页的电影条目
        for item in response.css('div.item'):
            movie = DoubanMovieItem()
            
            movie['rank'] = int(item.css('em::text').get())
            movie['title'] = item.css('span.title::text').get()
            movie['rating'] = float(item.css('span.rating_num::text').get())
            movie['quote'] = item.css('span.inq::text').get() or ''
            movie['url'] = item.css('a.nbg::attr(href)').get()
            
            # 解析详细信息
            info = item.css('div.bd p::text').getall()
            if len(info) >= 2:
                basic_info = info[0].strip()
                movie['director'] = basic_info.split('/')[0].replace('导演', '').strip()
                movie['year'] = basic_info.split('/')[-1].strip()
                movie['genre'] = info[1].strip()
            
            yield movie
        
        # 翻页:找到下一页链接
        next_page = response.css('span.next a::attr(href)').get()
        if next_page:
            yield response.follow(next_page, callback=self.parse)
    
    def parse_detail(self, response):
        """解析详情页(可选)"""
        movie = DoubanMovieItem()
        movie['url'] = response.url
        movie['title'] = response.css('h1 span::text').get()
        
        # 更多详情提取...
        yield movie
​
# pipelines.py
class DataCleanPipeline:
    """数据清洗管道"""
    
    def process_item(self, item, spider):
        # 清洗评分
        if item.get('rating'):
            item['rating'] = float(item['rating'])
        
        # 清洗标题
        if item.get('title'):
            item['title'] = item['title'].strip()
        
        return item
​
class JSONExportPipeline:
    """JSON 导出管道"""
    
    def open_spider(self, spider):
        self.file = open('movies.json', 'w', encoding='utf-8')
        self.file.write('[\n')
        self.count = 0
    
    def close_spider(self, spider):
        self.file.write('\n]')
        self.file.close()
        spider.logger.info(f'共导出 {self.count} 条数据')
    
    def process_item(self, item, spider):
        if self.count > 0:
            self.file.write(',\n')
        
        import json
        line = json.dumps(dict(item), ensure_ascii=False)
        self.file.write('  ' + line)
        self.count += 1
        return item
​
# settings.py 配置
BOT_NAME = 'douban'
SPIDER_MODULES = ['douban.spiders']
NEWSPIDER_MODULE = 'douban.spiders'
​
# 禁用 robots.txt 规则
ROBOTSTXT_OBEY = False
​
# 设置请求头
DEFAULT_REQUEST_HEADERS = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
    'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
}
​
# 启用管道
ITEM_PIPELINES = {
    'douban.pipelines.DataCleanPipeline': 300,
    'douban.pipelines.JSONExportPipeline': 400,
}
​
# 下载延迟(秒)
DOWNLOAD_DELAY = 2

10.4 运行 Scrapy 爬虫

# 进入项目目录
cd myproject
​
# 运行爬虫
scrapy crawl douban_top250
​
# 指定输出文件
scrapy crawl douban_top250 -o movies.json
​
# 指定输出格式(csv/json/xml)
scrapy crawl douban_top250 -o movies.csv -t csv
​
# 限速运行(每秒 0.5 个请求)
scrapy crawl douban_top250 -s DOWNLOAD_DELAY=2 -s CONCURRENT_REQUESTS_PER_DOMAIN=1

11. 排错指南与最佳实践

11.1 常见错误与解决方案

错误类型错误信息原因解决方案
编码错误UnicodeDecodeError响应编码识别错误设置 response.encoding='utf-8'
超时错误TimeoutError网络慢/目标站慢增加 timeout 参数
403 Forbidden访问被拒绝反爬检测添加/更换 User-Agent、使用代理
404 Not Found资源不存在URL 错误/页面重构检查 URL、更新选择器
编码乱码显示乱码页面编码与解析编码不一致使用 chardet 库检测编码
解析为空选择器返回 NoneCSS/XPath 选择器错误使用浏览器开发者工具检查
连接重置ConnectionResetError请求过于频繁降低频率、使用代理
SSL 错误SSLError证书问题设置 verify=False

11.2 编码问题处理

"""
编码问题处理
作者:erick
"""
​
import chardet
import requests
​
def detect_encoding(response: requests.Response) -> str:
    """自动检测响应编码"""
    # 方法1:使用 chardet 检测
    raw_data = response.content
    detected = chardet.detect(raw_data)
    encoding = detected['encoding']
    confidence = detected['confidence']
    
    print(f"检测到的编码: {encoding} (置信度: {confidence:.2%})")
    
    # 方法2:常见编码优先尝试
    common_encodings = ['utf-8', 'gbk', 'gb2312', 'gb18030', 'big5']
    
    for enc in common_encodings:
        try:
            text = raw_data.decode(enc)
            print(f"成功使用 {enc} 解码")
            return enc
        except:
            continue
    
    # 方法3:response.apparent_encoding
    print(f" apparent_encoding: {response.apparent_encoding}")
    return response.apparent_encoding
​
def safe_request(url: str) -> str:
    """安全请求,自动处理编码"""
    response = requests.get(url)
    
    # 优先使用响应头中的编码
    content_type = response.headers.get('Content-Type', '')
    if 'charset=' in content_type:
        encoding = content_type.split('charset=')[-1]
    else:
        encoding = detect_encoding(response)
    
    return response.content.decode(encoding, errors='replace')
​
# 示例
if __name__ == "__main__":
    print("编码检测演示:")
    response = requests.get("https://httpbin.org/get")
    print(f"apparent_encoding: {response.apparent_encoding}")

11.3 请求重试装饰器

"""
请求重试装饰器
作者:erick
"""
​
import time
import functools
import requests
from typing import Callable, Any
​
def retry(max_attempts: int = 3, delay: float = 1.0, backoff: float = 2.0):
    """
    请求重试装饰器
    
    Args:
        max_attempts: 最大尝试次数
        delay: 初始延迟(秒)
        backoff: 延迟倍数(指数退避)
    """
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        def wrapper(*args, **kwargs) -> Any:
            current_delay = delay
            
            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except requests.RequestException as e:
                    if attempt == max_attempts - 1:
                        raise
                    
                    print(f"请求失败 ({attempt + 1}/{max_attempts}): {e}")
                    print(f"  {current_delay:.1f} 秒后重试...")
                    time.sleep(current_delay)
                    current_delay *= backoff
            
            return None
        
        return wrapper
    return decorator
​
# 使用示例
@retry(max_attempts=3, delay=2, backoff=2)
def fetch_with_retry(url: str) -> dict:
    """带重试的请求"""
    response = requests.get(url, timeout=10)
    response.raise_for_status()
    return response.json()
​
# 使用
if __name__ == "__main__":
    try:
        result = fetch_with_retry("https://httpbin.org/get")
        print(f"成功: {result}")
    except Exception as e:
        print(f"最终失败: {e}")

11.4 最佳实践清单

"""
爬虫最佳实践
作者:erick
"""
​
# ==================== 最佳实践清单 ====================
​
BEST_PRACTICES = """
📋 Python 网络爬虫最佳实践
​
✅ 必须遵守:
------------
1. 遵守 robots.txt
   - 使用 scrapy 时设置 ROBOTSTXT_OBEY = True
   - 手动爬虫也应尊重网站规则
​
2. 设置合理的请求间隔
   - 建议 2-5 秒间隔
   - 使用 random.uniform() 避免固定间隔
​
3. 设置合理的超时时间
   - timeout=10~30 秒
   - 避免无限等待
​
4. 错误处理与日志记录
   - try-except 包裹所有网络操作
   - 记录成功/失败状态
​
5. User-Agent 伪装
   - 使用真实浏览器 UA
   - 定期轮换 UA 列表
​
⚠️ 推荐做法:
-----------
1. 使用 Session 保持连接
   - 复用 TCP 连接
   - 自动管理 Cookies
​
2. 数据及时存储
   - 边爬边存,避免数据丢失
   - 使用事务保证完整性
​
3. 增量爬取
   - 记录爬取进度
   - 支持断点续爬
​
4. 并发控制
   - 单域名并发不超过 2-3
   - 全局并发不超过 10-20
​
❌ 避免事项:
-----------
1. 禁止:
   - 爬取个人信息用于商业目的
   - 绕过付费墙或验证码
   - 高频请求导致服务器瘫痪
​
2. 不要:
   - 使用固定 IP 频繁请求
   - 忽略异常继续执行
   - 在生产环境使用调试代码
"""
​
if __name__ == "__main__":
    print(BEST_PRACTICES)

11.5 调试技巧

"""
爬虫调试技巧
作者:erick
"""
​
import requests
from bs4 import BeautifulSoup
​
def debug_request():
    """调试请求"""
    url = "https://httpbin.org/headers"
    
    # 1. 打印完整请求信息
    req = requests.Request('GET', url)
    prepared = req.prepare()
    
    print("=" * 50)
    print("请求详情")
    print("=" * 50)
    print(f"URL: {prepared.url}")
    print(f"Method: {prepared.method}")
    print("Headers:")
    for k, v in prepared.headers.items():
        print(f"  {k}: {v}")
    
    # 2. 发送请求并检查响应头
    response = requests.get(url)
    print(f"\n状态码: {response.status_code}")
    print("响应头:")
    for k, v in response.headers.items():
        print(f"  {k}: {v}")
​
def debug_parser():
    """调试解析器"""
    html = """
    <div class="container">
        <div class="item first">
            <span class="text">Item 1</span>
        </div>
        <div class="item">
            <span class="text">Item 2</span>
        </div>
    </div>
    """
    
    soup = BeautifulSoup(html, 'lxml')
    
    print("\n" + "=" * 50)
    print("CSS 选择器调试")
    print("=" * 50)
    
    selectors = [
        '.container',
        '.item',
        '.item.first',
        '.item .text',
        'div.item span.text',
        'span.text:first-child',
    ]
    
    for selector in selectors:
        result = soup.select(selector)
        print(f"\n选择器: {selector}")
        print(f"  结果: {len(result)} 个")
        for i, elem in enumerate(result):
            print(f"    [{i}] {elem.text.strip()}")
​
def debug_ajax():
    """调试 AJAX 请求(模拟浏览器抓包)"""
    
    # 常见 AJAX 请求特征
    print("\n" + "=" * 50)
    print("常见 AJAX 请求识别")
    print("=" * 50)
    print("""
1. 检查 Network 面板中:
   - XHR / Fetch 类型请求
   - URL 包含 ?callback=, api/, /json/ 等
   
2. 常见数据接口模式:
   - https://api.xxx.com/data
   - https://xxx.com/api/list?page=1
   - https://xxx.com/data.json
   
3. POST 请求检查:
   - Form Data 或 Request Payload
   - JSON 格式参数
""")
​
if __name__ == "__main__":
    debug_request()
    debug_parser()
    debug_ajax()

📚 总结

核心要点回顾

模块核心技能
请求requests / aiohttp / httpx
解析BeautifulSoup / lxml / 正则表达式
反爬UA 轮换 / 代理池 / 请求间隔
存储JSON / CSV / SQLite
框架Scrapy 适合大型项目
最佳实践遵守 robots.txt、控制频率、错误处理
© 版权声明
THE END
喜欢就支持一下吧
点赞8 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容