Python 爬虫实战完全指南:从入门到精通的数据采集技术
Python 爬虫是数据采集的核心技术,本文将系统讲解从基础请求到高级反爬应对的完整技术栈,包含 Requests、BeautifulSoup、Scrapy、Selenium 等主流工具的实战应用。
一、Python 爬虫核心概念
网络爬虫(Web Crawler)是一种自动化程序,能够模拟浏览器行为,从互联网上批量抓取数据。在 2026 年,Python 依然是爬虫领域的首选语言,其丰富的生态和简洁的语法让数据采集变得高效而优雅。
爬虫的基本工作流程:
- 发送请求:向目标网站发送 HTTP 请求,获取网页内容
- 解析内容:从 HTML/JSON 中提取目标数据
- 存储数据:将数据保存到文件或数据库
- 处理反爬:应对网站的反爬机制,确保稳定采集
二、核心工具库详解
2.1 HTTP 请求库
Requests - 经典首选
Requests 是 Python 最受欢迎的 HTTP 库,以简洁优雅著称:
import requests
from fake_useragent import UserAgent
# 创建会话对象,自动管理 Cookie
session = requests.Session()
# 设置请求头
headers = {
"User-Agent": UserAgent().chrome,
"Referer": "https://example.com",
"Accept": "text/html,application/xhtml+xml"
}
# 发送 GET 请求
response = session.get(
"https://example.com/api/data",
headers=headers,
timeout=10 # 设置超时,避免卡死
)
# 检查响应状态
if response.status_code == 200:
# 获取 JSON 数据
data = response.json()
print(f"获取数据成功: {data}")
else:
print(f"请求失败: {response.status_code}")
Httpx - 现代异步方案
Httpx 支持 async/await,适合高并发场景:
import httpx
import asyncio
async def fetch_multiple_urls(urls):
"""异步批量抓取多个 URL"""
async with httpx.AsyncClient() as client:
tasks = [client.get(url, timeout=10) for url in urls]
responses = await asyncio.gather(*tasks, return_exceptions=True)
results = []
for response in responses:
if isinstance(response, httpx.Response):
results.append(response.text)
return results
# 运行异步任务
urls = [f"https://example.com/page/{i}" for i in range(1, 11)]
results = asyncio.run(fetch_multiple_urls(urls))
print(f"成功抓取 {len(results)} 个页面")
2.2 HTML 解析库
BeautifulSoup - 初学者友好
from bs4 import BeautifulSoup
import requests
# 获取网页
response = requests.get("https://example.com/products")
soup = BeautifulSoup(response.text, "lxml") # 使用 lxml 解析器,速度快
# 查找单个元素
title = soup.find("h1", class_="product-title").text.strip()
# 查找多个元素
products = soup.find_all("div", class_="product-item")
for product in products:
name = product.find("span", class_="name").text
price = product.find("span", class_="price").text
print(f"商品: {name}, 价格: {price}")
# 使用 CSS 选择器(更灵活)
links = soup.select("a.product-link[href]")
for link in links:
print(link["href"])
# 使用 XPath(需要 lxml)
from lxml import etree
tree = etree.HTML(response.text)
prices = tree.xpath('//span[@class="price"]/text()')
2.3 Scrapy - 生产级框架
Scrapy 是功能最完善的爬虫框架,适合大规模数据采集:
# items.py - 定义数据结构
import scrapy
class ProductItem(scrapy.Item):
name = scrapy.Field()
price = scrapy.Field()
rating = scrapy.Field()
url = scrapy.Field()
# spiders/products.py - 爬虫逻辑
import scrapy
from myproject.items import ProductItem
class ProductSpider(scrapy.Spider):
name = "products"
start_urls = ["https://example.com/products"]
# 自定义设置
custom_settings = {
"DOWNLOAD_DELAY": 1, # 下载延迟 1 秒
"CONCURRENT_REQUESTS": 8, # 并发请求数
"USER_AGENT": "Mozilla/5.0..."
}
def parse(self, response):
"""解析商品列表页"""
products = response.css("div.product-item")
for product in products:
item = ProductItem()
item["name"] = product.css("h3.title::text").get()
item["price"] = product.css("span.price::text").get()
item["url"] = response.urljoin(
product.css("a::attr(href)").get()
)
yield item
# 跟进下一页
next_page = response.css("a.next-page::attr(href)").get()
if next_page:
yield response.follow(next_page, callback=self.parse)
# pipelines.py - 数据处理管道
import json
import csv
class JsonPipeline:
"""保存为 JSON 文件"""
def open_spider(self, spider):
self.file = open("products.json", "w", encoding="utf-8")
def close_spider(self, spider):
self.file.close()
def process_item(self, item, spider):
line = json.dumps(dict(item), ensure_ascii=False) + "\n"
self.file.write(line)
return item
# 运行爬虫: scrapy crawl products
2.4 Selenium/Playwright - 动态内容处理
对于 JavaScript 渲染的网站,需要使用浏览器自动化工具:
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
# 配置浏览器选项
options = Options()
options.add_argument("--headless") # 无头模式
options.add_argument("--disable-gpu")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
# 启动浏览器
driver = webdriver.Chrome(options=options)
try:
# 访问页面
driver.get("https://example.com/dynamic-page")
# 等待元素加载(显式等待)
wait = WebDriverWait(driver, 10)
element = wait.until(
EC.presence_of_element_located((By.CLASS_NAME, "product-list"))
)
# 滚动页面触发懒加载
driver.execute_script(
"window.scrollTo(0, document.body.scrollHeight);"
)
# 提取数据
products = driver.find_elements(By.CLASS_NAME, "product-item")
for product in products:
name = product.find_element(By.CLASS_NAME, "name").text
print(name)
finally:
driver.quit()
# Playwright 更现代的选择(异步、多浏览器支持)
# pip install playwright && playwright install
from playwright.async_api import async_playwright
async def scrape_with_playwright():
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
page = await browser.new_page()
await page.goto("https://example.com")
await page.wait_for_selector(".product-list")
# 提取数据
products = await page.query_selector_all(".product-item")
for product in products:
name = await product.inner_text()
print(name)
await browser.close()
三、反爬机制应对策略
3.1 User-Agent 轮换
from fake_useragent import UserAgent
import random
ua = UserAgent()
# 方式1: 随机 User-Agent
headers = {"User-Agent": ua.random}
# 方式2: 自定义 User-Agent 池
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36"
]
def get_random_ua():
return random.choice(USER_AGENTS)
3.2 代理 IP 池
import requests
from itertools import cycle
# 代理池(实际使用时从代理服务商获取)
PROXIES = [
"http://ip1:port",
"http://ip2:port",
"http://ip3:port"
]
proxy_pool = cycle(PROXIES)
def request_with_proxy(url, max_retries=3):
"""使用代理发送请求,失败自动切换"""
for _ in range(max_retries):
proxy = next(proxy_pool)
try:
response = requests.get(
url,
proxies={"http": proxy, "https": proxy},
timeout=10
)
if response.status_code == 200:
return response
except Exception as e:
print(f"代理 {proxy} 失败: {e}")
continue
return None
# 验证代理是否可用
def check_proxy(proxy):
try:
response = requests.get(
"https://httpbin.org/ip",
proxies={"http": proxy, "https": proxy},
timeout=5
)
return response.status_code == 200
except:
return False
3.3 请求频率控制
import time
import random
def smart_delay(min_delay=1, max_delay=3):
"""智能延迟,模拟人类行为"""
delay = random.uniform(min_delay, max_delay)
time.sleep(delay)
# Scrapy 中的延迟设置
custom_settings = {
"DOWNLOAD_DELAY": 2, # 基础延迟
"RANDOMIZE_DOWNLOAD_DELAY": True, # 随机化延迟
"AUTOTHROTTLE_ENABLED": True, # 自动限速
}
3.4 Cookie 和 Session 管理
import requests
session = requests.Session()
# 模拟登录
login_data = {
"username": "your_username",
"password": "your_password"
}
response = session.post(
"https://example.com/login",
data=login_data
)
# 登录成功后,session 会自动管理 Cookie
# 后续请求无需手动添加 Cookie
profile = session.get("https://example.com/profile")
# 保存 Cookie 供下次使用
import pickle
with open("cookies.pkl", "wb") as f:
pickle.dump(session.cookies, f)
# 加载 Cookie
with open("cookies.pkl", "rb") as f:
session.cookies.update(pickle.load(f))
四、实战案例:电商商品采集
"""
电商商品信息采集完整示例
功能:采集商品名称、价格、评分、评论数
"""
import requests
from bs4 import BeautifulSoup
import pandas as pd
import time
import random
from fake_useragent import UserAgent
class EcommerceScraper:
def __init__(self):
self.session = requests.Session()
self.ua = UserAgent()
self.results = []
def get_headers(self):
"""生成随机请求头"""
return {
"User-Agent": self.ua.random,
"Accept": "text/html,application/xhtml+xml",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
}
def scrape_product(self, url):
"""采集单个商品页面"""
try:
response = self.session.get(
url,
headers=self.get_headers(),
timeout=10
)
response.raise_for_status()
soup = BeautifulSoup(response.text, "lxml")
# 提取商品信息
product = {
"name": self._extract_text(soup, "h1.product-title"),
"price": self._extract_text(soup, "span.price"),
"rating": self._extract_text(soup, "span.rating-score"),
"reviews": self._extract_text(soup, "span.review-count"),
"url": url
}
self.results.append(product)
print(f"成功采集: {product['name']}")
except Exception as e:
print(f"采集失败 {url}: {e}")
def _extract_text(self, soup, selector):
"""安全提取文本"""
element = soup.select_one(selector)
return element.text.strip() if element else "N/A"
def scrape_category(self, base_url, pages=10):
"""采集分类页面"""
for page in range(1, pages + 1):
url = f"{base_url}?page={page}"
print(f"正在采集第 {page} 页...")
try:
response = self.session.get(
url,
headers=self.get_headers()
)
soup = BeautifulSoup(response.text, "lxml")
# 获取商品链接
links = soup.select("a.product-link")
for link in links[:20]: # 每页限制 20 个
product_url = link["href"]
self.scrape_product(product_url)
time.sleep(random.uniform(1, 2))
except Exception as e:
print(f"页面采集失败: {e}")
continue
def save_to_csv(self, filename="products.csv"):
"""保存结果到 CSV"""
df = pd.DataFrame(self.results)
df.to_csv(filename, index=False, encoding="utf-8-sig")
print(f"已保存 {len(self.results)} 条数据到 {filename}")
# 使用示例
scraper = EcommerceScraper()
scraper.scrape_category("https://example.com/products", pages=5)
scraper.save_to_csv()
五、数据存储方案
5.1 存储到 CSV/JSON
import json
import csv
# JSON 存储
def save_to_json(data, filename):
with open(filename, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
# CSV 存储
def save_to_csv(data, filename):
with open(filename, "w", newline="", encoding="utf-8-sig") as f:
writer = csv.DictWriter(f, fieldnames=data[0].keys())
writer.writeheader()
writer.writerows(data)
5.2 存储到 SQLite
import sqlite3
def save_to_sqlite(data, db_name="crawler.db"):
"""保存数据到 SQLite 数据库"""
conn = sqlite3.connect(db_name)
cursor = conn.cursor()
# 创建表
cursor.execute("""
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
price REAL,
rating REAL,
url TEXT UNIQUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# 插入数据
for item in data:
cursor.execute("""
INSERT OR IGNORE INTO products (name, price, rating, url)
VALUES (?, ?, ?, ?)
""", (item["name"], item["price"], item["rating"], item["url"]))
conn.commit()
conn.close()
六、爬虫道德与法律规范
在进行爬虫开发时,务必遵守以下原则:
- 遵守 robots.txt:检查网站的 robots 协议,不爬取禁止访问的页面
- 控制请求频率:避免对服务器造成压力,设置合理的延迟
- 尊重版权:不随意传播抓取的内容,遵守知识产权法律
- 保护隐私:不抓取涉及个人隐私的数据
- 合法使用:用于学习和研究目的,不用于商业竞争
import urllib.robotparser
def check_robots_txt(url):
"""检查 URL 是否允许爬取"""
rp = urllib.robotparser.RobotFileParser()
rp.set_url(url + "/robots.txt")
rp.read()
return rp.can_fetch("*", url)
总结
Python 爬虫技术栈已经非常成熟,从简单的 Requests + BeautifulSoup 组合,到强大的 Scrapy 框架,再到处理动态内容的 Selenium/Playwright,每种场景都有合适的工具。在实际项目中,需要根据网站特点选择合适的技术方案,同时做好反爬应对和合规工作。
技术选型建议:
- 简单静态页面:Requests + BeautifulSoup
- 大规模采集:Scrapy 框架
- 动态渲染页面:Playwright(推荐)或 Selenium
- 高并发场景:Httpx + asyncio
掌握这些技术,你就能应对绝大多数数据采集需求。记住:技术是为了解决问题,但要用得合法、合规、有道德。
本文链接:https://www.kkkliao.cn/?id=951 转载需授权!
版权声明:本文由廖万里的博客发布,如需转载请注明出处。



手机流量卡
免费领卡
号卡合伙人
产品服务
关于本站
