本文目录导读:

- 使用
time.sleep()简单循环(轻量级) - 使用
schedule库(最常用) - 使用
APScheduler(高级调度器) - 使用
threading.Timer或threading模块 - 使用
asyncio异步方案(适合异步IO密集任务) - 选择建议
- 完整示例:带配置的定时任务系统
在Python中设置任务周期(定时任务),主要取决于你的应用场景和运行环境,以下是几种常见的实现方式:
使用 time.sleep() 简单循环(轻量级)
适用于不需要精确时间、简单的周期性任务:
import time
def periodic_task():
print("任务执行中...", time.strftime("%Y-%m-%d %H:%M:%S"))
# 每5秒执行一次
while True:
periodic_task()
time.sleep(5)
缺点:任务执行时间会影响周期精度,不适合长时间运行。
使用 schedule 库(最常用)
需要安装:pip install schedule
import schedule
import time
def job():
print("任务执行...", time.strftime("%Y-%m-%d %H:%M:%S"))
# 设置不同的周期方式
schedule.every(10).seconds.do(job) # 每10秒
schedule.every(1).minutes.do(job) # 每分钟
schedule.every(2).hours.do(job) # 每2小时
schedule.every().day.at("10:30").do(job) # 每天10:30
schedule.every().monday.at("08:00").do(job) # 每周一08:00
schedule.every().hour.at(":15").do(job) # 每小时的第15分钟
while True:
schedule.run_pending()
time.sleep(1) # 避免CPU占用过高
使用 APScheduler(高级调度器)
更强大,支持持久化存储和更复杂的调度策略。
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime
def job():
print(f"任务执行时间: {datetime.now()}")
scheduler = BlockingScheduler()
# 添加定时任务
scheduler.add_job(job, 'interval', seconds=5) # 每5秒
scheduler.add_job(job, 'interval', minutes=30) # 每30分钟
scheduler.add_job(job, 'cron', hour=8, minute=30) # 每天8:30
scheduler.add_job(job, 'cron', day_of_week='mon-fri', hour=9) # 工作日9点
# 启动调度器
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
pass
APScheduler优势:
- 支持cron表达式
- 任务可以持久化到数据库
- 支持分布式任务
- 错误处理更完善
使用 threading.Timer 或 threading 模块
import threading
import time
def job():
print("定时任务执行", time.strftime("%Y-%m-%d %H:%M:%S"))
# 重新设置下一个定时器
threading.Timer(5, job).start()
# 启动第一次任务
job()
# 保持主线程运行
while True:
time.sleep(1)
使用 asyncio 异步方案(适合异步IO密集任务)
import asyncio
async def periodic_task():
while True:
print("异步任务执行...", time.strftime("%Y-%m-%d %H:%M:%S"))
await asyncio.sleep(5) # 每5秒执行一次
async def main():
# 可以同时运行多个周期性任务
task1 = asyncio.create_task(periodic_task())
task2 = asyncio.create_task(periodic_task())
await asyncio.gather(task1, task2)
asyncio.run(main())
选择建议
| 场景 | 推荐方案 |
|---|---|
| 简单脚本、演示 | time.sleep() |
| 中小型项目、快速开发 | schedule |
| 企业级应用、复杂调度 | APScheduler |
| 异步IO密集型任务 | asyncio |
| 需要精确到毫秒级 | threading.Timer |
完整示例:带配置的定时任务系统
import schedule
import time
import logging
from datetime import datetime
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')
class TaskScheduler:
def __init__(self):
self.tasks = {}
def add_task(self, name, func, interval_type='seconds', interval_value=10, args=None):
"""添加定时任务
interval_type: seconds, minutes, hours, days
"""
if interval_type == 'seconds':
job = schedule.every(interval_value).seconds.do(func)
elif interval_type == 'minutes':
job = schedule.every(interval_value).minutes.do(func)
elif interval_type == 'hours':
job = schedule.every(interval_value).hours.do(func)
elif interval_type == 'days':
job = schedule.every(interval_value).days.do(func)
else:
raise ValueError("不支持的间隔类型")
self.tasks[name] = job
logging.info(f"添加任务: {name}, 间隔: {interval_value} {interval_type}")
def run(self):
"""启动调度器"""
logging.info("调度器启动")
try:
while True:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
logging.info("调度器停止")
# 使用示例
def my_task():
logging.info("执行定时任务")
scheduler = TaskScheduler()
scheduler.add_task("task1", my_task, 'seconds', 5)
scheduler.add_task("task2", my_task, 'minutes', 1)
scheduler.run()
选择哪种方式取决于你的具体需求:任务数量、精度要求、是否需要持久化、是否分布式等因素,对于大多数Python应用,schedule 和 APScheduler 是最常用的选择。