Python案例如何设置任务周期?

wen python案例 10

本文目录导读:

Python案例如何设置任务周期?

  1. 使用 time.sleep() 简单循环(轻量级)
  2. 使用 schedule 库(最常用)
  3. 使用 APScheduler(高级调度器)
  4. 使用 threading.Timerthreading 模块
  5. 使用 asyncio 异步方案(适合异步IO密集任务)
  6. 选择建议
  7. 完整示例:带配置的定时任务系统

在Python中设置任务周期(定时任务),主要取决于你的应用场景和运行环境,以下是几种常见的实现方式:

使用 time.sleep() 简单循环(轻量级)

适用于不需要精确时间、简单的周期性任务:

import time
def periodic_task():
    print("任务执行中...", time.strftime("%Y-%m-%d %H:%M:%S"))
# 每5秒执行一次
while True:
    periodic_task()
    time.sleep(5)

缺点:任务执行时间会影响周期精度,不适合长时间运行。

使用 schedule 库(最常用)

需要安装:pip install schedule

import schedule
import time
def job():
    print("任务执行...", time.strftime("%Y-%m-%d %H:%M:%S"))
# 设置不同的周期方式
schedule.every(10).seconds.do(job)          # 每10秒
schedule.every(1).minutes.do(job)           # 每分钟
schedule.every(2).hours.do(job)             # 每2小时
schedule.every().day.at("10:30").do(job)    # 每天10:30
schedule.every().monday.at("08:00").do(job) # 每周一08:00
schedule.every().hour.at(":15").do(job)     # 每小时的第15分钟
while True:
    schedule.run_pending()
    time.sleep(1)  # 避免CPU占用过高

使用 APScheduler(高级调度器)

更强大,支持持久化存储和更复杂的调度策略。

from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime
def job():
    print(f"任务执行时间: {datetime.now()}")
scheduler = BlockingScheduler()
# 添加定时任务
scheduler.add_job(job, 'interval', seconds=5)  # 每5秒
scheduler.add_job(job, 'interval', minutes=30) # 每30分钟
scheduler.add_job(job, 'cron', hour=8, minute=30)  # 每天8:30
scheduler.add_job(job, 'cron', day_of_week='mon-fri', hour=9)  # 工作日9点
# 启动调度器
try:
    scheduler.start()
except (KeyboardInterrupt, SystemExit):
    pass

APScheduler优势

  • 支持cron表达式
  • 任务可以持久化到数据库
  • 支持分布式任务
  • 错误处理更完善

使用 threading.Timerthreading 模块

import threading
import time
def job():
    print("定时任务执行", time.strftime("%Y-%m-%d %H:%M:%S"))
    # 重新设置下一个定时器
    threading.Timer(5, job).start()
# 启动第一次任务
job()
# 保持主线程运行
while True:
    time.sleep(1)

使用 asyncio 异步方案(适合异步IO密集任务)

import asyncio
async def periodic_task():
    while True:
        print("异步任务执行...", time.strftime("%Y-%m-%d %H:%M:%S"))
        await asyncio.sleep(5)  # 每5秒执行一次
async def main():
    # 可以同时运行多个周期性任务
    task1 = asyncio.create_task(periodic_task())
    task2 = asyncio.create_task(periodic_task())
    await asyncio.gather(task1, task2)
asyncio.run(main())

选择建议

场景 推荐方案
简单脚本、演示 time.sleep()
中小型项目、快速开发 schedule
企业级应用、复杂调度 APScheduler
异步IO密集型任务 asyncio
需要精确到毫秒级 threading.Timer

完整示例:带配置的定时任务系统

import schedule
import time
import logging
from datetime import datetime
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')
class TaskScheduler:
    def __init__(self):
        self.tasks = {}
    def add_task(self, name, func, interval_type='seconds', interval_value=10, args=None):
        """添加定时任务
        interval_type: seconds, minutes, hours, days
        """
        if interval_type == 'seconds':
            job = schedule.every(interval_value).seconds.do(func)
        elif interval_type == 'minutes':
            job = schedule.every(interval_value).minutes.do(func)
        elif interval_type == 'hours':
            job = schedule.every(interval_value).hours.do(func)
        elif interval_type == 'days':
            job = schedule.every(interval_value).days.do(func)
        else:
            raise ValueError("不支持的间隔类型")
        self.tasks[name] = job
        logging.info(f"添加任务: {name}, 间隔: {interval_value} {interval_type}")
    def run(self):
        """启动调度器"""
        logging.info("调度器启动")
        try:
            while True:
                schedule.run_pending()
                time.sleep(1)
        except KeyboardInterrupt:
            logging.info("调度器停止")
# 使用示例
def my_task():
    logging.info("执行定时任务")
scheduler = TaskScheduler()
scheduler.add_task("task1", my_task, 'seconds', 5)
scheduler.add_task("task2", my_task, 'minutes', 1)
scheduler.run()

选择哪种方式取决于你的具体需求:任务数量、精度要求、是否需要持久化、是否分布式等因素,对于大多数Python应用,scheduleAPScheduler 是最常用的选择。

抱歉,评论功能暂时关闭!