网页抓取脚本咋写?

wen 实用脚本 48

本文目录导读:

网页抓取脚本咋写?

  1. 基础方案:使用 Python + Requests + BeautifulSoup
  2. 进阶方案:动态页面抓取
  3. 完整项目结构
  4. 实战示例:抓取新闻标题
  5. 注意事项和最佳实践
  6. 学习建议

我来为你详细讲解网页抓取脚本的写法,包括基础到进阶的多种方法。

基础方案:使用 Python + Requests + BeautifulSoup

安装依赖

pip install requests beautifulsoup4 lxml

基础抓取示例

import requests
from bs4 import BeautifulSoup
import time
def basic_crawl(url):
    # 设置请求头,模拟浏览器
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
    }
    try:
        # 发送请求
        response = requests.get(url, headers=headers, timeout=10)
        response.raise_for_status()  # 检查请求是否成功
        response.encoding = 'utf-8'  # 设置编码
        # 解析HTML
        soup = BeautifulSoup(response.text, 'html.parser')
        # 提取数据示例
        title = soup.title.string if soup.title else '无标题'
        all_links = [a.get('href') for a in soup.find_all('a', href=True)]
        all_text = soup.get_text(strip=True)
        return {
            'title': title,
            'links': all_links[:10],  # 只取前10个链接
            'text_length': len(all_text)
        }
    except requests.RequestException as e:
        print(f"请求失败: {e}")
        return None
    except Exception as e:
        print(f"解析失败: {e}")
        return None
# 使用示例
result = basic_crawl('https://example.com')
print(result)

进阶方案:动态页面抓取

对于JavaScript渲染的页面,使用 Selenium:

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
def dynamic_crawl(url):
    # 配置Chrome选项
    options = webdriver.ChromeOptions()
    options.add_argument('--headless')  # 无头模式
    options.add_argument('--no-sandbox')
    options.add_argument('--disable-dev-shm-usage')
    # 初始化驱动
    driver = webdriver.Chrome(options=options)
    try:
        # 加载页面
        driver.get(url)
        # 等待特定元素加载
        wait = WebDriverWait(driver, 10)
        element = wait.until(EC.presence_of_element_located((By.TAG_NAME, "body")))
        # 滚动页面(如果需要加载更多内容)
        driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
        time.sleep(2)
        # 提取数据
        page_text = driver.find_element(By.TAG_NAME, 'body').text
        page_title = driver.title
        return {
            'title': page_title,
            'content': page_text[:500]  # 前500字符
        }
    finally:
        driver.quit()
# 使用示例
result = dynamic_crawl('https://example.com')

完整项目结构

创建一个专业的爬虫项目:

import requests
from bs4 import BeautifulSoup
import json
import csv
import os
from datetime import datetime
import logging
import random
from typing import Dict, List, Optional
# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class WebScraper:
    def __init__(self, base_url: str, delay: float = 1.0):
        self.base_url = base_url
        self.delay = delay
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': self._get_random_user_agent()
        })
    def _get_random_user_agent(self) -> str:
        """随机User-Agent"""
        user_agents = [
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
            'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36'
        ]
        return random.choice(user_agents)
    def fetch_page(self, url: str) -> Optional[BeautifulSoup]:
        """获取页面并解析"""
        try:
            time.sleep(self.delay)  # 礼貌性延迟
            response = self.session.get(url, timeout=10)
            response.raise_for_status()
            response.encoding = 'utf-8'
            logger.info(f"成功获取页面: {url}")
            return BeautifulSoup(response.text, 'html.parser')
        except requests.RequestException as e:
            logger.error(f"请求失败 {url}: {e}")
            return None
    def extract_data(self, soup: BeautifulSoup) -> Dict:
        """提取页面数据"""
        if not soup:
            return {}
        data = {
            'url': self.current_url,
            'title': self._extract_title(soup),
            'meta_description': self._extract_meta(soup),
            'headings': self._extract_headings(soup),
            'links': self._extract_links(soup),
            'images': self._extract_images(soup),
            'timestamp': datetime.now().isoformat()
        }
        return data
    def _extract_title(self, soup: BeautifulSoup) -> str:
        return soup.title.string if soup.title else ''
    def _extract_meta(self, soup: BeautifulSoup) -> str:
        meta = soup.find('meta', attrs={'name': 'description'})
        return meta.get('content', '') if meta else ''
    def _extract_headings(self, soup: BeautifulSoup) -> Dict:
        return {
            'h1': [h.text.strip() for h in soup.find_all('h1')],
            'h2': [h.text.strip() for h in soup.find_all('h2')]
        }
    def _extract_links(self, soup: BeautifulSoup) -> List[Dict]:
        links = []
        for a in soup.find_all('a', href=True):
            links.append({
                'text': a.text.strip()[:50],
                'href': a['href']
            })
        return links[:20]  # 限制数量
    def _extract_images(self, soup: BeautifulSoup) -> List[str]:
        images = []
        for img in soup.find_all('img', src=True):
            images.append(img['src'])
        return images[:10]
    def save_to_json(self, data: List[Dict], filename: str = 'output.json'):
        """保存为JSON"""
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(data, f, ensure_ascii=False, indent=2)
        logger.info(f"数据已保存到 {filename}")
    def save_to_csv(self, data: List[Dict], filename: str = 'output.csv'):
        """保存为CSV"""
        if not data:
            return
        with open(filename, 'w', newline='', encoding='utf-8') as f:
            writer = csv.DictWriter(f, fieldnames=data[0].keys())
            writer.writeheader()
            writer.writerows(data)
        logger.info(f"数据已保存到 {filename}")
    def crawl(self, urls: List[str]) -> List[Dict]:
        """主爬取方法"""
        results = []
        for url in urls:
            self.current_url = url
            soup = self.fetch_page(url)
            data = self.extract_data(soup)
            if data:
                results.append(data)
        return results
# 使用示例
if __name__ == "__main__":
    # 初始化爬虫
    scraper = WebScraper(base_url='https://example.com', delay=1.5)
    # 要爬取的URL列表
    urls = [
        'https://example.com/page1',
        'https://example.com/page2'
    ]
    # 执行爬取
    data = scraper.crawl(urls)
    # 保存结果
    scraper.save_to_json(data, 'scraped_data.json')
    scraper.save_to_csv(data, 'scraped_data.csv')

实战示例:抓取新闻标题

import requests
from bs4 import BeautifulSoup
import re
from datetime import datetime
def fetch_news_titles(url):
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
    }
    try:
        response = requests.get(url, headers=headers, timeout=10)
        soup = BeautifulSoup(response.text, 'html.parser')
        news_items = []
        # 根据不同网站调整选择器
        # 示例:抓取所有h2标签下的链接
        for article in soup.find_all('h2'):
            link = article.find('a')
            if link and link.get('href'):
                news_items.append({
                    'title': link.text.strip(),
                    'url': link['href'],
                    'timestamp': datetime.now().isoformat()
                })
        return news_items
    except Exception as e:
        print(f"抓取失败: {e}")
        return []
# 使用示例
news = fetch_news_titles('https://news.ycombinator.com/')
for item in news[:10]:
    print(f"标题: {item['title']}")

注意事项和最佳实践

法律和道德注意事项:

  • 遵守网站的 robots.txt 文件
  • 不要过度请求(添加延迟)
  • 尊重版权和隐私政策
  • 只抓取公开数据

技术建议:

# 1. 添加重试机制
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1))
def fetch_with_retry(url):
    return requests.get(url, timeout=10)
# 2. 使用代理和IP轮换
proxies = {
    'http': 'http://proxy.example.com:8080',
    'https': 'http://proxy.example.com:8080'
}
# 3. 处理Cookies和会话
session = requests.Session()
session.cookies.update({'key': 'value'})
# 4. 保存中间结果,避免重复请求
import pickle
cache_file = 'cache.pkl'
try:
    with open(cache_file, 'rb') as f:
        cache = pickle.load(f)
except FileNotFoundError:
    cache = {}

学习建议

  1. 从简单开始:先抓取静态HTML页面
  2. 理解HTML结构:学会使用浏览器开发者工具
  3. 渐进式学习:进阶到动态页面抓取
  4. 注意反爬机制:学习处理验证码、IP封禁等问题
  5. 使用框架:熟悉 Scrapy、PySpider 等专业框架

需要我详细介绍某个特定场景的抓取方案吗?比如处理登录、API抓取或者分布式爬虫?

抱歉,评论功能暂时关闭!