当前位置:首页 > 学习笔记 > 正文内容

Python 爬虫实战完全指南:从入门到精通的数据采集技术

Python 爬虫是数据采集的核心技术,本文将系统讲解从基础请求到高级反爬应对的完整技术栈,包含 Requests、BeautifulSoup、Scrapy、Selenium 等主流工具的实战应用。

一、Python 爬虫核心概念

网络爬虫(Web Crawler)是一种自动化程序,能够模拟浏览器行为,从互联网上批量抓取数据。在 2026 年,Python 依然是爬虫领域的首选语言,其丰富的生态和简洁的语法让数据采集变得高效而优雅。

爬虫的基本工作流程:

  1. 发送请求:向目标网站发送 HTTP 请求,获取网页内容
  2. 解析内容:从 HTML/JSON 中提取目标数据
  3. 存储数据:将数据保存到文件或数据库
  4. 处理反爬:应对网站的反爬机制,确保稳定采集

二、核心工具库详解

2.1 HTTP 请求库

Requests - 经典首选

Requests 是 Python 最受欢迎的 HTTP 库,以简洁优雅著称:

import requests
from fake_useragent import UserAgent

# 创建会话对象,自动管理 Cookie
session = requests.Session()

# 设置请求头
headers = {
    "User-Agent": UserAgent().chrome,
    "Referer": "https://example.com",
    "Accept": "text/html,application/xhtml+xml"
}

# 发送 GET 请求
response = session.get(
    "https://example.com/api/data",
    headers=headers,
    timeout=10  # 设置超时,避免卡死
)

# 检查响应状态
if response.status_code == 200:
    # 获取 JSON 数据
    data = response.json()
    print(f"获取数据成功: {data}")
else:
    print(f"请求失败: {response.status_code}")

Httpx - 现代异步方案

Httpx 支持 async/await,适合高并发场景:

import httpx
import asyncio

async def fetch_multiple_urls(urls):
    """异步批量抓取多个 URL"""
    async with httpx.AsyncClient() as client:
        tasks = [client.get(url, timeout=10) for url in urls]
        responses = await asyncio.gather(*tasks, return_exceptions=True)
        
        results = []
        for response in responses:
            if isinstance(response, httpx.Response):
                results.append(response.text)
        return results

# 运行异步任务
urls = [f"https://example.com/page/{i}" for i in range(1, 11)]
results = asyncio.run(fetch_multiple_urls(urls))
print(f"成功抓取 {len(results)} 个页面")

2.2 HTML 解析库

BeautifulSoup - 初学者友好

from bs4 import BeautifulSoup
import requests

# 获取网页
response = requests.get("https://example.com/products")
soup = BeautifulSoup(response.text, "lxml")  # 使用 lxml 解析器,速度快

# 查找单个元素
title = soup.find("h1", class_="product-title").text.strip()

# 查找多个元素
products = soup.find_all("div", class_="product-item")
for product in products:
    name = product.find("span", class_="name").text
    price = product.find("span", class_="price").text
    print(f"商品: {name}, 价格: {price}")

# 使用 CSS 选择器(更灵活)
links = soup.select("a.product-link[href]")
for link in links:
    print(link["href"])

# 使用 XPath(需要 lxml)
from lxml import etree
tree = etree.HTML(response.text)
prices = tree.xpath('//span[@class="price"]/text()')

2.3 Scrapy - 生产级框架

Scrapy 是功能最完善的爬虫框架,适合大规模数据采集:

# items.py - 定义数据结构
import scrapy

class ProductItem(scrapy.Item):
    name = scrapy.Field()
    price = scrapy.Field()
    rating = scrapy.Field()
    url = scrapy.Field()

# spiders/products.py - 爬虫逻辑
import scrapy
from myproject.items import ProductItem

class ProductSpider(scrapy.Spider):
    name = "products"
    start_urls = ["https://example.com/products"]
    
    # 自定义设置
    custom_settings = {
        "DOWNLOAD_DELAY": 1,  # 下载延迟 1 秒
        "CONCURRENT_REQUESTS": 8,  # 并发请求数
        "USER_AGENT": "Mozilla/5.0..."
    }
    
    def parse(self, response):
        """解析商品列表页"""
        products = response.css("div.product-item")
        
        for product in products:
            item = ProductItem()
            item["name"] = product.css("h3.title::text").get()
            item["price"] = product.css("span.price::text").get()
            item["url"] = response.urljoin(
                product.css("a::attr(href)").get()
            )
            yield item
        
        # 跟进下一页
        next_page = response.css("a.next-page::attr(href)").get()
        if next_page:
            yield response.follow(next_page, callback=self.parse)

# pipelines.py - 数据处理管道
import json
import csv

class JsonPipeline:
    """保存为 JSON 文件"""
    def open_spider(self, spider):
        self.file = open("products.json", "w", encoding="utf-8")
        
    def close_spider(self, spider):
        self.file.close()
        
    def process_item(self, item, spider):
        line = json.dumps(dict(item), ensure_ascii=False) + "\n"
        self.file.write(line)
        return item

# 运行爬虫: scrapy crawl products

2.4 Selenium/Playwright - 动态内容处理

对于 JavaScript 渲染的网站,需要使用浏览器自动化工具:

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options

# 配置浏览器选项
options = Options()
options.add_argument("--headless")  # 无头模式
options.add_argument("--disable-gpu")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")

# 启动浏览器
driver = webdriver.Chrome(options=options)

try:
    # 访问页面
    driver.get("https://example.com/dynamic-page")
    
    # 等待元素加载(显式等待)
    wait = WebDriverWait(driver, 10)
    element = wait.until(
        EC.presence_of_element_located((By.CLASS_NAME, "product-list"))
    )
    
    # 滚动页面触发懒加载
    driver.execute_script(
        "window.scrollTo(0, document.body.scrollHeight);"
    )
    
    # 提取数据
    products = driver.find_elements(By.CLASS_NAME, "product-item")
    for product in products:
        name = product.find_element(By.CLASS_NAME, "name").text
        print(name)
        
finally:
    driver.quit()

# Playwright 更现代的选择(异步、多浏览器支持)
# pip install playwright && playwright install
from playwright.async_api import async_playwright

async def scrape_with_playwright():
    async with async_playwright() as p:
        browser = await p.chromium.launch(headless=True)
        page = await browser.new_page()
        
        await page.goto("https://example.com")
        await page.wait_for_selector(".product-list")
        
        # 提取数据
        products = await page.query_selector_all(".product-item")
        for product in products:
            name = await product.inner_text()
            print(name)
        
        await browser.close()

三、反爬机制应对策略

3.1 User-Agent 轮换

from fake_useragent import UserAgent
import random

ua = UserAgent()

# 方式1: 随机 User-Agent
headers = {"User-Agent": ua.random}

# 方式2: 自定义 User-Agent 池
USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
    "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36"
]

def get_random_ua():
    return random.choice(USER_AGENTS)

3.2 代理 IP 池

import requests
from itertools import cycle

# 代理池(实际使用时从代理服务商获取)
PROXIES = [
    "http://ip1:port",
    "http://ip2:port",
    "http://ip3:port"
]

proxy_pool = cycle(PROXIES)

def request_with_proxy(url, max_retries=3):
    """使用代理发送请求,失败自动切换"""
    for _ in range(max_retries):
        proxy = next(proxy_pool)
        try:
            response = requests.get(
                url,
                proxies={"http": proxy, "https": proxy},
                timeout=10
            )
            if response.status_code == 200:
                return response
        except Exception as e:
            print(f"代理 {proxy} 失败: {e}")
            continue
    return None

# 验证代理是否可用
def check_proxy(proxy):
    try:
        response = requests.get(
            "https://httpbin.org/ip",
            proxies={"http": proxy, "https": proxy},
            timeout=5
        )
        return response.status_code == 200
    except:
        return False

3.3 请求频率控制

import time
import random

def smart_delay(min_delay=1, max_delay=3):
    """智能延迟,模拟人类行为"""
    delay = random.uniform(min_delay, max_delay)
    time.sleep(delay)

# Scrapy 中的延迟设置
custom_settings = {
    "DOWNLOAD_DELAY": 2,  # 基础延迟
    "RANDOMIZE_DOWNLOAD_DELAY": True,  # 随机化延迟
    "AUTOTHROTTLE_ENABLED": True,  # 自动限速
}

3.4 Cookie 和 Session 管理

import requests

session = requests.Session()

# 模拟登录
login_data = {
    "username": "your_username",
    "password": "your_password"
}

response = session.post(
    "https://example.com/login",
    data=login_data
)

# 登录成功后,session 会自动管理 Cookie
# 后续请求无需手动添加 Cookie
profile = session.get("https://example.com/profile")

# 保存 Cookie 供下次使用
import pickle
with open("cookies.pkl", "wb") as f:
    pickle.dump(session.cookies, f)

# 加载 Cookie
with open("cookies.pkl", "rb") as f:
    session.cookies.update(pickle.load(f))

四、实战案例:电商商品采集

"""
电商商品信息采集完整示例
功能:采集商品名称、价格、评分、评论数
"""
import requests
from bs4 import BeautifulSoup
import pandas as pd
import time
import random
from fake_useragent import UserAgent

class EcommerceScraper:
    def __init__(self):
        self.session = requests.Session()
        self.ua = UserAgent()
        self.results = []
        
    def get_headers(self):
        """生成随机请求头"""
        return {
            "User-Agent": self.ua.random,
            "Accept": "text/html,application/xhtml+xml",
            "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
            "Accept-Encoding": "gzip, deflate, br",
        }
    
    def scrape_product(self, url):
        """采集单个商品页面"""
        try:
            response = self.session.get(
                url, 
                headers=self.get_headers(),
                timeout=10
            )
            response.raise_for_status()
            
            soup = BeautifulSoup(response.text, "lxml")
            
            # 提取商品信息
            product = {
                "name": self._extract_text(soup, "h1.product-title"),
                "price": self._extract_text(soup, "span.price"),
                "rating": self._extract_text(soup, "span.rating-score"),
                "reviews": self._extract_text(soup, "span.review-count"),
                "url": url
            }
            
            self.results.append(product)
            print(f"成功采集: {product['name']}")
            
        except Exception as e:
            print(f"采集失败 {url}: {e}")
    
    def _extract_text(self, soup, selector):
        """安全提取文本"""
        element = soup.select_one(selector)
        return element.text.strip() if element else "N/A"
    
    def scrape_category(self, base_url, pages=10):
        """采集分类页面"""
        for page in range(1, pages + 1):
            url = f"{base_url}?page={page}"
            print(f"正在采集第 {page} 页...")
            
            try:
                response = self.session.get(
                    url, 
                    headers=self.get_headers()
                )
                soup = BeautifulSoup(response.text, "lxml")
                
                # 获取商品链接
                links = soup.select("a.product-link")
                for link in links[:20]:  # 每页限制 20 个
                    product_url = link["href"]
                    self.scrape_product(product_url)
                    time.sleep(random.uniform(1, 2))
                    
            except Exception as e:
                print(f"页面采集失败: {e}")
                continue
    
    def save_to_csv(self, filename="products.csv"):
        """保存结果到 CSV"""
        df = pd.DataFrame(self.results)
        df.to_csv(filename, index=False, encoding="utf-8-sig")
        print(f"已保存 {len(self.results)} 条数据到 {filename}")

# 使用示例
scraper = EcommerceScraper()
scraper.scrape_category("https://example.com/products", pages=5)
scraper.save_to_csv()

五、数据存储方案

5.1 存储到 CSV/JSON

import json
import csv

# JSON 存储
def save_to_json(data, filename):
    with open(filename, "w", encoding="utf-8") as f:
        json.dump(data, f, ensure_ascii=False, indent=2)

# CSV 存储
def save_to_csv(data, filename):
    with open(filename, "w", newline="", encoding="utf-8-sig") as f:
        writer = csv.DictWriter(f, fieldnames=data[0].keys())
        writer.writeheader()
        writer.writerows(data)

5.2 存储到 SQLite

import sqlite3

def save_to_sqlite(data, db_name="crawler.db"):
    """保存数据到 SQLite 数据库"""
    conn = sqlite3.connect(db_name)
    cursor = conn.cursor()
    
    # 创建表
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS products (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            name TEXT NOT NULL,
            price REAL,
            rating REAL,
            url TEXT UNIQUE,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    """)
    
    # 插入数据
    for item in data:
        cursor.execute("""
            INSERT OR IGNORE INTO products (name, price, rating, url)
            VALUES (?, ?, ?, ?)
        """, (item["name"], item["price"], item["rating"], item["url"]))
    
    conn.commit()
    conn.close()

六、爬虫道德与法律规范

在进行爬虫开发时,务必遵守以下原则:

  1. 遵守 robots.txt:检查网站的 robots 协议,不爬取禁止访问的页面
  2. 控制请求频率:避免对服务器造成压力,设置合理的延迟
  3. 尊重版权:不随意传播抓取的内容,遵守知识产权法律
  4. 保护隐私:不抓取涉及个人隐私的数据
  5. 合法使用:用于学习和研究目的,不用于商业竞争
import urllib.robotparser

def check_robots_txt(url):
    """检查 URL 是否允许爬取"""
    rp = urllib.robotparser.RobotFileParser()
    rp.set_url(url + "/robots.txt")
    rp.read()
    return rp.can_fetch("*", url)

总结

Python 爬虫技术栈已经非常成熟,从简单的 Requests + BeautifulSoup 组合,到强大的 Scrapy 框架,再到处理动态内容的 Selenium/Playwright,每种场景都有合适的工具。在实际项目中,需要根据网站特点选择合适的技术方案,同时做好反爬应对和合规工作。

技术选型建议:

  • 简单静态页面:Requests + BeautifulSoup
  • 大规模采集:Scrapy 框架
  • 动态渲染页面:Playwright(推荐)或 Selenium
  • 高并发场景:Httpx + asyncio

掌握这些技术,你就能应对绝大多数数据采集需求。记住:技术是为了解决问题,但要用得合法、合规、有道德。

本文链接:https://www.kkkliao.cn/?id=951 转载需授权!

分享到:

版权声明:本文由廖万里的博客发布,如需转载请注明出处。


发表评论

访客

看不清,换一张

◎欢迎参与讨论,请在这里发表您的看法和观点。