Python案例怎么查询数据库数据?

wen python案例 8

本文目录导读:

Python案例怎么查询数据库数据?

  1. MySQL数据库查询
  2. SQLite数据库查询
  3. PostgreSQL查询
  4. 高级查询示例
  5. 使用ORM(以SQLAlchemy为例)
  6. 实用技巧

我来介绍几种常用的Python查询数据库数据的方法:

MySQL数据库查询

使用mysql-connector-python

import mysql.connector
# 连接数据库
conn = mysql.connector.connect(
    host="localhost",
    user="your_username",
    password="your_password",
    database="your_database"
)
cursor = conn.cursor()
# 查询数据
cursor.execute("SELECT * FROM users WHERE age > %s", (18,))
results = cursor.fetchall()
for row in results:
    print(f"ID: {row[0]}, Name: {row[1]}, Age: {row[2]}")
# 关闭连接
cursor.close()
conn.close()

使用PyMySQL

import pymysql
# 连接数据库
connection = pymysql.connect(
    host='localhost',
    user='root',
    password='password',
    database='testdb',
    charset='utf8mb4'
)
try:
    with connection.cursor() as cursor:
        # 查询所有数据
        sql = "SELECT id, name, email FROM users"
        cursor.execute(sql)
        results = cursor.fetchall()
        for row in results:
            print(f"ID: {row[0]}, Name: {row[1]}, Email: {row[2]}")
        # 带条件的查询
        sql = "SELECT * FROM users WHERE name LIKE %s"
        cursor.execute(sql, ('%张%',))
        search_results = cursor.fetchall()
finally:
    connection.close()

SQLite数据库查询

import sqlite3
# 连接数据库(如果不存在会自动创建)
conn = sqlite3.connect('example.db')
cursor = conn.cursor()
# 创建表(如果不存在)
cursor.execute('''
    CREATE TABLE IF NOT EXISTS users (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        name TEXT NOT NULL,
        age INTEGER,
        email TEXT
    )
''')
# 插入示例数据
cursor.execute("INSERT INTO users (name, age, email) VALUES (?, ?, ?)",
               ('张三', 25, 'zhangsan@example.com'))
conn.commit()
# 查询数据
# 1. 查询所有数据
cursor.execute("SELECT * FROM users")
all_users = cursor.fetchall()
print("所有用户:", all_users)
# 2. 带条件查询
cursor.execute("SELECT * FROM users WHERE age > ?", (20,))
older_users = cursor.fetchall()
print("年龄大于20的用户:", older_users)
# 3. 模糊查询
cursor.execute("SELECT * FROM users WHERE name LIKE ?", ('%张%',))
zhang_users = cursor.fetchall()
# 4. 使用fetchone()
cursor.execute("SELECT * FROM users LIMIT 1")
first_user = cursor.fetchone()
print("第一个用户:", first_user)
conn.close()

PostgreSQL查询

import psycopg2
# 连接数据库
conn = psycopg2.connect(
    host="localhost",
    database="testdb",
    user="postgres",
    password="password"
)
cursor = conn.cursor()
# 查询数据
cursor.execute("SELECT id, name, age FROM users WHERE age BETWEEN %s AND %s", (20, 30))
results = cursor.fetchall()
# 使用列名访问
for row in results:
    print(f"ID: {row[0]}, Name: {row[1]}, Age: {row[2]}")
# 使用DictCursor获取字典格式结果
from psycopg2.extras import DictCursor
cursor = conn.cursor(cursor_factory=DictCursor)
cursor.execute("SELECT * FROM users")
for row in cursor:
    print(f"Name: {row['name']}, Age: {row['age']}")
cursor.close()
conn.close()

高级查询示例

使用上下文管理器(推荐)

import sqlite3
from contextlib import closing
def query_database():
    with closing(sqlite3.connect('example.db')) as conn:
        conn.row_factory = sqlite3.Row
        cursor = conn.cursor()
        # 多表连接查询
        cursor.execute('''
            SELECT u.name, o.order_date, o.total
            FROM users u
            LEFT JOIN orders o ON u.id = o.user_id
            WHERE o.total > ?
        ''', (100,))
        for row in cursor:
            print(f"用户: {row['name']}, 订单日期: {row['order_date']}, 金额: {row['total']}")

使用参数化查询(防止SQL注入)

# 不推荐(容易SQL注入)
cursor.execute("SELECT * FROM users WHERE name = '" + user_input + "'")
# 推荐(参数化查询)
cursor.execute("SELECT * FROM users WHERE name = %s", (user_input,))
# SQLite的占位符是 ?
cursor.execute("SELECT * FROM users WHERE name = ?", (user_input,))

查询结果转换为字典

import sqlite3
def query_to_dict(conn, query, params=None):
    conn.row_factory = sqlite3.Row
    cursor = conn.cursor()
    if params:
        cursor.execute(query, params)
    else:
        cursor.execute(query)
    rows = cursor.fetchall()
    return [dict(row) for row in rows]
# 使用示例
conn = sqlite3.connect('example.db')
results = query_to_dict(conn, "SELECT * FROM users WHERE age > ?", (20,))
print(results[0]['name'])  # 使用列名访问

使用ORM(以SQLAlchemy为例)

from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    age = Column(Integer)
    email = Column(String)
# 创建引擎和会话
engine = create_engine('sqlite:///example.db', echo=True)
Session = sessionmaker(bind=engine)
session = Session()
# 查询数据
# 1. 查询所有用户
users = session.query(User).all()
for user in users:
    print(f"Name: {user.name}, Age: {user.age}")
# 2. 带条件的查询
young_users = session.query(User).filter(User.age < 30).all()
# 3. 单个结果
user = session.query(User).filter_by(name='张三').first()
# 4. 排序
ordered_users = session.query(User).order_by(User.age.desc()).all()
session.close()

实用技巧

连接池管理

from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
engine = create_engine(
    'mysql+mysqlconnector://user:password@localhost/dbname',
    poolclass=QueuePool,
    pool_size=5,
    max_overflow=10
)

错误处理

try:
    cursor.execute(query, params)
    conn.commit()
except Exception as e:
    conn.rollback()
    print(f"查询失败: {e}")
finally:
    cursor.close()
    conn.close()

选择哪个方案取决于你的具体需求:

  • 小型项目:使用SQLite + 原生SQL
  • Web应用:使用SQLAlchemy ORM
  • 数据量大:使用连接池 + 参数化查询
  • 需要高性能:考虑使用异步数据库驱动

记得在生产环境中总是使用参数化查询来防止SQL注入!

抱歉,评论功能暂时关闭!