本文目录导读:

我来为你详细讲解网页抓取脚本的写法,包括基础到进阶的多种方法。
基础方案:使用 Python + Requests + BeautifulSoup
安装依赖
pip install requests beautifulsoup4 lxml
基础抓取示例
import requests
from bs4 import BeautifulSoup
import time
def basic_crawl(url):
# 设置请求头,模拟浏览器
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
try:
# 发送请求
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status() # 检查请求是否成功
response.encoding = 'utf-8' # 设置编码
# 解析HTML
soup = BeautifulSoup(response.text, 'html.parser')
# 提取数据示例
title = soup.title.string if soup.title else '无标题'
all_links = [a.get('href') for a in soup.find_all('a', href=True)]
all_text = soup.get_text(strip=True)
return {
'title': title,
'links': all_links[:10], # 只取前10个链接
'text_length': len(all_text)
}
except requests.RequestException as e:
print(f"请求失败: {e}")
return None
except Exception as e:
print(f"解析失败: {e}")
return None
# 使用示例
result = basic_crawl('https://example.com')
print(result)
进阶方案:动态页面抓取
对于JavaScript渲染的页面,使用 Selenium:
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
def dynamic_crawl(url):
# 配置Chrome选项
options = webdriver.ChromeOptions()
options.add_argument('--headless') # 无头模式
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
# 初始化驱动
driver = webdriver.Chrome(options=options)
try:
# 加载页面
driver.get(url)
# 等待特定元素加载
wait = WebDriverWait(driver, 10)
element = wait.until(EC.presence_of_element_located((By.TAG_NAME, "body")))
# 滚动页面(如果需要加载更多内容)
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(2)
# 提取数据
page_text = driver.find_element(By.TAG_NAME, 'body').text
page_title = driver.title
return {
'title': page_title,
'content': page_text[:500] # 前500字符
}
finally:
driver.quit()
# 使用示例
result = dynamic_crawl('https://example.com')
完整项目结构
创建一个专业的爬虫项目:
import requests
from bs4 import BeautifulSoup
import json
import csv
import os
from datetime import datetime
import logging
import random
from typing import Dict, List, Optional
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class WebScraper:
def __init__(self, base_url: str, delay: float = 1.0):
self.base_url = base_url
self.delay = delay
self.session = requests.Session()
self.session.headers.update({
'User-Agent': self._get_random_user_agent()
})
def _get_random_user_agent(self) -> str:
"""随机User-Agent"""
user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36'
]
return random.choice(user_agents)
def fetch_page(self, url: str) -> Optional[BeautifulSoup]:
"""获取页面并解析"""
try:
time.sleep(self.delay) # 礼貌性延迟
response = self.session.get(url, timeout=10)
response.raise_for_status()
response.encoding = 'utf-8'
logger.info(f"成功获取页面: {url}")
return BeautifulSoup(response.text, 'html.parser')
except requests.RequestException as e:
logger.error(f"请求失败 {url}: {e}")
return None
def extract_data(self, soup: BeautifulSoup) -> Dict:
"""提取页面数据"""
if not soup:
return {}
data = {
'url': self.current_url,
'title': self._extract_title(soup),
'meta_description': self._extract_meta(soup),
'headings': self._extract_headings(soup),
'links': self._extract_links(soup),
'images': self._extract_images(soup),
'timestamp': datetime.now().isoformat()
}
return data
def _extract_title(self, soup: BeautifulSoup) -> str:
return soup.title.string if soup.title else ''
def _extract_meta(self, soup: BeautifulSoup) -> str:
meta = soup.find('meta', attrs={'name': 'description'})
return meta.get('content', '') if meta else ''
def _extract_headings(self, soup: BeautifulSoup) -> Dict:
return {
'h1': [h.text.strip() for h in soup.find_all('h1')],
'h2': [h.text.strip() for h in soup.find_all('h2')]
}
def _extract_links(self, soup: BeautifulSoup) -> List[Dict]:
links = []
for a in soup.find_all('a', href=True):
links.append({
'text': a.text.strip()[:50],
'href': a['href']
})
return links[:20] # 限制数量
def _extract_images(self, soup: BeautifulSoup) -> List[str]:
images = []
for img in soup.find_all('img', src=True):
images.append(img['src'])
return images[:10]
def save_to_json(self, data: List[Dict], filename: str = 'output.json'):
"""保存为JSON"""
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
logger.info(f"数据已保存到 {filename}")
def save_to_csv(self, data: List[Dict], filename: str = 'output.csv'):
"""保存为CSV"""
if not data:
return
with open(filename, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=data[0].keys())
writer.writeheader()
writer.writerows(data)
logger.info(f"数据已保存到 {filename}")
def crawl(self, urls: List[str]) -> List[Dict]:
"""主爬取方法"""
results = []
for url in urls:
self.current_url = url
soup = self.fetch_page(url)
data = self.extract_data(soup)
if data:
results.append(data)
return results
# 使用示例
if __name__ == "__main__":
# 初始化爬虫
scraper = WebScraper(base_url='https://example.com', delay=1.5)
# 要爬取的URL列表
urls = [
'https://example.com/page1',
'https://example.com/page2'
]
# 执行爬取
data = scraper.crawl(urls)
# 保存结果
scraper.save_to_json(data, 'scraped_data.json')
scraper.save_to_csv(data, 'scraped_data.csv')
实战示例:抓取新闻标题
import requests
from bs4 import BeautifulSoup
import re
from datetime import datetime
def fetch_news_titles(url):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
try:
response = requests.get(url, headers=headers, timeout=10)
soup = BeautifulSoup(response.text, 'html.parser')
news_items = []
# 根据不同网站调整选择器
# 示例:抓取所有h2标签下的链接
for article in soup.find_all('h2'):
link = article.find('a')
if link and link.get('href'):
news_items.append({
'title': link.text.strip(),
'url': link['href'],
'timestamp': datetime.now().isoformat()
})
return news_items
except Exception as e:
print(f"抓取失败: {e}")
return []
# 使用示例
news = fetch_news_titles('https://news.ycombinator.com/')
for item in news[:10]:
print(f"标题: {item['title']}")
注意事项和最佳实践
法律和道德注意事项:
- 遵守网站的 robots.txt 文件
- 不要过度请求(添加延迟)
- 尊重版权和隐私政策
- 只抓取公开数据
技术建议:
# 1. 添加重试机制
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1))
def fetch_with_retry(url):
return requests.get(url, timeout=10)
# 2. 使用代理和IP轮换
proxies = {
'http': 'http://proxy.example.com:8080',
'https': 'http://proxy.example.com:8080'
}
# 3. 处理Cookies和会话
session = requests.Session()
session.cookies.update({'key': 'value'})
# 4. 保存中间结果,避免重复请求
import pickle
cache_file = 'cache.pkl'
try:
with open(cache_file, 'rb') as f:
cache = pickle.load(f)
except FileNotFoundError:
cache = {}
学习建议
- 从简单开始:先抓取静态HTML页面
- 理解HTML结构:学会使用浏览器开发者工具
- 渐进式学习:进阶到动态页面抓取
- 注意反爬机制:学习处理验证码、IP封禁等问题
- 使用框架:熟悉 Scrapy、PySpider 等专业框架
需要我详细介绍某个特定场景的抓取方案吗?比如处理登录、API抓取或者分布式爬虫?