Python案例怎么批量新增数据?

wen python案例 24

本文目录导读:

Python案例怎么批量新增数据?

  1. 批量新增到Excel文件
  2. 批量新增到数据库(MySQL示例)
  3. 批量新增到JSON文件
  4. 批量生成测试数据
  5. 批量新增到CSV文件
  6. 性能优化的批量插入

我可以帮你提供几种Python批量新增数据的方法,下面我给出几个常见场景的示例:

批量新增到Excel文件

import pandas as pd
from openpyxl import load_workbook
# 方法1:使用pandas
data = {
    '姓名': ['张三', '李四', '王五', '赵六'],
    '年龄': [25, 30, 28, 35],
    '城市': ['北京', '上海', '广州', '深圳']
}
df = pd.DataFrame(data)
df.to_excel('新增数据.xlsx', index=False, sheet_name='Sheet1')
# 方法2:追加到已有Excel
book = load_workbook('已有文件.xlsx')
writer = pd.ExcelWriter('已有文件.xlsx', engine='openpyxl')
writer.book = book
df.to_excel(writer, sheet_name='Sheet1', startrow=writer.sheets['Sheet1'].max_row)
writer.save()

批量新增到数据库(MySQL示例)

import mysql.connector
from mysql.connector import Error
def batch_insert_to_mysql(data_list):
    try:
        connection = mysql.connector.connect(
            host='localhost',
            database='your_database',
            user='your_user',
            password='your_password'
        )
        cursor = connection.cursor()
        # 批量插入SQL
        sql = "INSERT INTO users (name, age, email) VALUES (%s, %s, %s)"
        # 准备数据
        records = [
            ('张三', 25, 'zhangsan@email.com'),
            ('李四', 30, 'lisi@email.com'),
            ('王五', 28, 'wangwu@email.com')
        ]
        # 批量执行
        cursor.executemany(sql, records)
        connection.commit()
        print(f"成功插入 {cursor.rowcount} 条数据")
    except Error as e:
        print(f"错误: {e}")
        connection.rollback()
    finally:
        if connection.is_connected():
            cursor.close()
            connection.close()

批量新增到JSON文件

import json
def batch_add_to_json(new_data, json_file='data.json'):
    # 读取现有数据
    try:
        with open(json_file, 'r', encoding='utf-8') as f:
            existing_data = json.load(f)
    except (FileNotFoundError, json.JSONDecodeError):
        existing_data = []
    # 添加新数据
    existing_data.extend(new_data)
    # 写回文件
    with open(json_file, 'w', encoding='utf-8') as f:
        json.dump(existing_data, f, ensure_ascii=False, indent=2)
    print(f"成功添加 {len(new_data)} 条数据,总计 {len(existing_data)} 条")
# 使用示例
new_students = [
    {'id': 4, 'name': '小明', 'score': 95},
    {'id': 5, 'name': '小红', 'score': 88},
    {'id': 6, 'name': '小刚', 'score': 92}
]
batch_add_to_json(new_students)

批量生成测试数据

import random
import string
import pandas as pd
def generate_test_data(count=100):
    """生成测试数据"""
    data = []
    for i in range(count):
        record = {
            'ID': i + 1,
            '姓名': ''.join(random.choices('张三丰李四王五赵六', k=2)),
            '年龄': random.randint(18, 60),
            '工资': random.randint(5000, 20000),
            '部门': random.choice(['技术部', '市场部', '行政部', '财务部']),
            '入职日期': f'2024-{random.randint(1,12):02d}-{random.randint(1,28):02d}'
        }
        data.append(record)
    return data
# 生成并保存
test_data = generate_test_data(1000)
df = pd.DataFrame(test_data)
df.to_excel('测试数据.xlsx', index=False)
print("测试数据生成完成!")

批量新增到CSV文件

import csv
def batch_append_to_csv(data_list, filename='data.csv'):
    """批量追加数据到CSV文件"""
    # 检查文件是否存在
    file_exists = False
    try:
        with open(filename, 'r') as f:
            file_exists = True
    except FileNotFoundError:
        pass
    # 写入数据
    with open(filename, 'a', newline='', encoding='utf-8') as f:
        writer = csv.DictWriter(f, fieldnames=data_list[0].keys())
        # 如果文件不存在,写入表头
        if not file_exists:
            writer.writeheader()
        # 批量写入数据
        writer.writerows(data_list)
    print(f"成功追加 {len(data_list)} 条数据到 {filename}")
# 使用示例
new_records = [
    {'name': 'Tom', 'age': 25, 'city': 'Beijing'},
    {'name': 'Jerry', 'age': 28, 'city': 'Shanghai'},
    {'name': 'Alice', 'age': 23, 'city': 'Guangzhou'}
]
batch_append_to_csv(new_records)

性能优化的批量插入

import time
import sqlite3
def batch_insert_optimized(data_list, batch_size=1000):
    """优化批量插入性能"""
    conn = sqlite3.connect('example.db')
    cursor = conn.cursor()
    # 创建表
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS users (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            name TEXT,
            age INTEGER
        )
    ''')
    start_time = time.time()
    # 分批插入
    for i in range(0, len(data_list), batch_size):
        batch = data_list[i:i+batch_size]
        # 使用事务
        conn.execute('BEGIN TRANSACTION')
        for data in batch:
            cursor.execute(
                "INSERT INTO users (name, age) VALUES (?, ?)",
                (data['name'], data['age'])
            )
        conn.commit()
    end_time = time.time()
    print(f"插入 {len(data_list)} 条数据耗时: {end_time - start_time:.2f} 秒")
    conn.close()
# 生成大量测试数据
test_data = [{'name': f'User{i}', 'age': i % 50 + 18} for i in range(10000)]
batch_insert_optimized(test_data)
  1. 选择合适的工具:根据数据存储位置选择对应的方法
  2. 使用事务:数据库操作使用事务提高性能
  3. 分批处理:处理大量数据时分批进行
  4. 异常处理:添加适当的错误处理
  5. 性能优化:使用executemany()等批量操作方法

你需要针对哪种具体场景的批量新增?我可以提供更针对性的代码示例。

抱歉,评论功能暂时关闭!