Python案例怎么爬取静态网页?

wen python案例 8

本文目录导读:

Python案例怎么爬取静态网页?

  1. 基础方法:使用requests + BeautifulSoup
  2. 完整爬虫案例:爬取新闻列表
  3. 使用lxml解析(性能更好)
  4. 实用工具函数
  5. 反爬虫应对技巧
  6. 注意事项

我来介绍几种爬取静态网页的常用方法和案例。

基础方法:使用requests + BeautifulSoup

这是最常用的组合,适合大多数静态网页。

案例:爬取电影信息

import requests
from bs4 import BeautifulSoup
import time
# 设置请求头,模拟浏览器访问
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
def get_movie_info(url):
    try:
        # 发送请求
        response = requests.get(url, headers=headers, timeout=10)
        response.encoding = 'utf-8'
        # 解析HTML
        soup = BeautifulSoup(response.text, 'html.parser')
        # 提取标题
        title = soup.find('h1').text.strip() if soup.find('h1') else '无标题'
        print(f'标题: {title}')
        # 提取所有段落
        paragraphs = soup.find_all('p')
        for i, p in enumerate(paragraphs, 1):
            text = p.text.strip()
            if text:
                print(f'段落{i}: {text[:50]}...')  # 只显示前50个字符
    except Exception as e:
        print(f'爬取失败: {e}')
# 使用示例
url = 'https://example.com/movie-page'
get_movie_info(url)

完整爬虫案例:爬取新闻列表

import requests
from bs4 import BeautifulSoup
import pandas as pd
from datetime import datetime
class NewsScraper:
    def __init__(self, base_url):
        self.base_url = base_url
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        }
        self.news_list = []
    def get_page_content(self, page_url):
        """获取页面内容"""
        try:
            response = requests.get(page_url, headers=self.headers, timeout=10)
            response.encoding = 'utf-8'
            return response.text
        except Exception as e:
            print(f'请求失败: {e}')
            return None
    def parse_news(self, html):
        """解析新闻列表"""
        soup = BeautifulSoup(html, 'html.parser')
        # 假设新闻在class为'news-item'的div中
        news_items = soup.find_all('div', class_='news-item')
        for item in news_items:
            try:
                # 提取标题
                title_tag = item.find('h2') or item.find('a')
                title = title_tag.text.strip() if title_tag else '无标题'
                # 提取链接
                link = item.find('a')['href'] if item.find('a') else ''
                # 提取发布时间
                time_tag = item.find('span', class_='time')
                pub_time = time_tag.text.strip() if time_tag else '未知'
                # 提取摘要
                summary_tag = item.find('p', class_='summary')
                summary = summary_tag.text.strip() if summary_tag else ''
                self.news_list.append({
                    'title': title,
                    'link': self.base_url + link if link else '',
                    'time': pub_time,
                    'summary': summary
                })
            except Exception as e:
                print(f'解析条目失败: {e}')
                continue
    def scrape_multiple_pages(self, pages=5):
        """爬取多页"""
        for page in range(1, pages + 1):
            url = f'{self.base_url}/page/{page}'
            print(f'正在爬取第{page}页: {url}')
            html = self.get_page_content(url)
            if html:
                self.parse_news(html)
                time.sleep(1)  # 礼貌性延迟
        return self.news_list
    def save_to_csv(self, filename=None):
        """保存到CSV文件"""
        if not filename:
            filename = f'news_{datetime.now().strftime("%Y%m%d")}.csv'
        df = pd.DataFrame(self.news_list)
        df.to_csv(filename, index=False, encoding='utf-8-sig')
        print(f'数据已保存到: {filename}')
# 使用示例
scraper = NewsScraper('https://example-news-site.com')
news_data = scraper.scrape_multiple_pages(pages=3)
scraper.save_to_csv()

使用lxml解析(性能更好)

import requests
from lxml import etree
import re
class XPathScraper:
    def __init__(self):
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        }
    def scrape_by_xpath(self, url, xpath_dict):
        """使用XPath提取数据"""
        try:
            response = requests.get(url, headers=self.headers)
            response.encoding = 'utf-8'
            # 使用lxml解析
            tree = etree.HTML(response.text)
            result = {}
            for name, xpath in xpath_dict.items():
                elements = tree.xpath(xpath)
                if elements:
                    result[name] = elements[0].text.strip() if hasattr(elements[0], 'text') else elements[0]
                else:
                    result[name] = '未找到'
            return result
        except Exception as e:
            print(f'爬取失败: {e}')
            return None
# 使用示例
scraper = XPathScraper()
xpath_rules = {: '//h1/text()',
    'content': '//div[@class="content"]/p/text()',
    'author': '//span[@class="author"]/text()'
}
data = scraper.scrape_by_xpath('https://example.com/article', xpath_rules)
print(data)

实用工具函数

import requests
from bs4 import BeautifulSoup
import json
import re
def clean_text(text):
    """清理文本"""
    if not text:
        return ''
    text = re.sub(r'\s+', ' ', text)  # 合并多余空白
    text = re.sub(r'\n+', '\n', text)  # 合并多余换行
    return text.strip()
def get_all_links(url, domain_filter=None):
    """获取页面所有链接"""
    headers = {'User-Agent': 'Mozilla/5.0'}
    try:
        response = requests.get(url, headers=headers, timeout=10)
        soup = BeautifulSoup(response.text, 'html.parser')
        links = set()
        for a_tag in soup.find_all('a', href=True):
            href = a_tag['href']
            if href.startswith('http') or href.startswith('/'):
                if domain_filter:
                    if domain_filter in href:
                        links.add(href)
                else:
                    links.add(href)
        return list(links)
    except Exception as e:
        print(f'获取链接失败: {e}')
        return []
def download_images(url, save_dir='images'):
    """下载页面图片"""
    import os
    from urllib.parse import urljoin
    headers = {'User-Agent': 'Mozilla/5.0'}
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)
    try:
        response = requests.get(url, headers=headers)
        soup = BeautifulSoup(response.text, 'html.parser')
        img_tags = soup.find_all('img')
        for img in img_tags:
            img_url = img.get('src')
            if not img_url:
                continue
            # 处理相对路径
            img_url = urljoin(url, img_url)
            # 下载图片
            try:
                img_response = requests.get(img_url, headers=headers)
                img_name = img_url.split('/')[-1]
                with open(f'{save_dir}/{img_name}', 'wb') as f:
                    f.write(img_response.content)
                print(f'下载图片: {img_name}')
            except Exception as e:
                print(f'下载图片失败 {img_url}: {e}')
    except Exception as e:
        print(f'页面访问失败: {e}')
# 使用示例
links = get_all_links('https://example.com', domain_filter='example.com')
print(f'发现 {len(links)} 个链接')
download_images('https://example.com/gallery')

反爬虫应对技巧

import requests
import random
import time
class AntiCrawlScraper:
    def __init__(self):
        self.user_agents = [
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
            'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36'
        ]
        # 代理IP列表(示例)
        self.proxies = [
            {'http': 'http://proxy1:port', 'https': 'https://proxy1:port'},
            # 实际使用时需要有效的代理
        ]
    def get_random_headers(self):
        """随机User-Agent"""
        return {
            'User-Agent': random.choice(self.user_agents),
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
            'Accept-Encoding': 'gzip, deflate',
            'Connection': 'keep-alive',
        }
    def request_with_retry(self, url, max_retries=3):
        """带重试的请求"""
        for i in range(max_retries):
            try:
                headers = self.get_random_headers()
                # 随机延迟
                time.sleep(random.uniform(1, 3))
                # 使用代理(可选)
                proxy = random.choice(self.proxies) if self.proxies else None
                response = requests.get(
                    url, 
                    headers=headers, 
                    proxies=proxy,
                    timeout=10
                )
                if response.status_code == 200:
                    return response
                elif response.status_code == 403:
                    print(f'被拒绝访问,等待后重试...')
                    time.sleep(5)
                else:
                    print(f'状态码: {response.status_code}')
            except Exception as e:
                print(f'请求失败 (尝试 {i+1}/{max_retries}): {e}')
                time.sleep(2)
        return None
# 使用示例
scraper = AntiCrawlScraper()
response = scraper.request_with_retry('https://example.com')
if response:
    print('爬取成功')

注意事项

  1. 遵守robots.txt:爬取前检查网站的robots.txt
  2. 控制爬取频率:添加适当的延迟,避免给服务器造成压力
  3. 尊重版权:不要批量下载受版权保护的内容
  4. 数据使用:遵守网站的使用条款和数据保护法规

这些示例涵盖了大多数静态网页爬取场景,你可以根据实际需求选择合适的方案,记得根据具体网站的HTML结构调整选择器。

抱歉,评论功能暂时关闭!