0
点赞
收藏
分享

微信扫一扫

django apscheduler 定时任务,轮询执行某个任务(上篇)

这是一个 Django 应用程序,它为 APScheduler 添加了一个轻量级的包装器。它允许使用 Django 的 ORM 在数据库中存储持久作业。

django-apscheduler 是一个很好的选择,可以快速轻松地将基本调度功能添加到您的 Django 应用程序中,并且依赖最少,附加配置很少。理想的用例可能涉及按固定执行计划运行少量任务。

这种简单性的代价是您需要小心确保只有一个调度程序在特定时间点处于活动状态。

这种限制源于这样一个事实,即 APScheduler 当前没有任何​​进程间同步和信令方案​​ ,可以在作业已添加、修改或从作业存储中删除时通知调度程序。

似乎计划在​​即将发布的 APScheduler 4.0 版本中​​​支持多个调度程序之间的持久作业存储共享。在此之前,一个典型的Django​​部署在生产中​​​会启动多个工作进程,如果每个工作进程结束了运行其自己的调度那么这可能会导致工作中被遗漏或多次执行,以及重复的条目​​DjangoJobExecution​​表正在创建。

所以现在你的选择是:

  1. 使用自定义 Django 管理命令在其自己的专用进程中启动单个调度程序(推荐- 请参见​​runapscheduler.py​​下面的示例);或者
  2. 实现您自己的​​远程处理​​ 逻辑,以确保​​DjangoJobStore​​所有 Web 服务器的工作进程都可以以协调和同步的方式使用单个进程(对于大多数用例而言,额外的努力和增加的复杂性可能不值得);或者
  3. 选择的替代任务处理库确实使用某种像的Redis,RabbitMQ的,亚马逊SQS或类似共享消息代理的支持进程间通信(参见: ​​https://djangopackages.org/grids/g/workers-queues-tasks /​​对于流行的选项)。

该软件包的特点包括:

  • 自定义​​DjangoJobStore​​:一个​​APScheduler 作业存储​​ ,将预定作业保存到 Django 数据库。您可以直接通过 Django 管理界面查看计划作业并监控作业执行情况:
    ​​​

    django  apscheduler 定时任务,轮询执行某个任务(上篇)_调度程序

    ​​
  • 作业存储还维护当前计划作业的所有作业执行历史,以及状态代码和异常(如果有):
    ​​​

    django  apscheduler 定时任务,轮询执行某个任务(上篇)_django_02

    ​​
  • 注意: APScheduler 会 在作业的最后一次计划执行被触发后​​自动​​从作业存储中​​删除作业​​。这也会从数据库中删除相应的作业执行条目(即,作业执行日志仅针对“活动”作业进行维护。)
  • 也可以通过​​DjangoJob​​管理页面手动触发作业执行:
    ​​​​​
  • 注意:为了防止长时间运行的作业导致 Django HTTP 请求超时,通过 Django 管理站点启动的所有 APScheduler 作业的组合最大运行时间为 25 秒。这个超时值可以通过​​APSCHEDULER_RUN_NOW_TIMEOUT​​设置来配置。

安装

pip  install  django - apscheduler

​​​​快速开始

  • 像这样添加​​django_apscheduler​​到您的​​INSTALLED_APPS​​设置中:

INSTALLED_APPS  = (
# ...
"django_apscheduler" ,
)

  • django-apscheduler 带有开箱即用的合理配置默认值。可以通过将以下设置添加到 Django​​settings.py​​文件来覆盖默认值:

  • # Format string for displaying run time timestamps in the Django admin site. The default
    # just adds seconds to the standard Django format, which is useful for displaying the timestamps
    # for jobs that are scheduled to run on intervals of less than one minute.
    #
    # See https://docs.djangoproject.com/en/dev/ref/settings/#datetime-format for format string
    # syntax details.
    APSCHEDULER_DATETIME_FORMAT = "N j, Y, f:s a"

    # Maximum run time allowed for jobs that are triggered manually via the Django admin site, which
    # prevents admin site HTTP requests from timing out.
    #
    # Longer running jobs should probably be handed over to a background task processing library
    # that supports multiple background worker processes instead (e.g. Dramatiq, Celery, Django-RQ,
    # etc. See: https://djangopackages.org/grids/g/workers-queues-tasks/ for popular options).
    APSCHEDULER_RUN_NOW_TIMEOUT = 25 # Seconds


      


    # runapscheduler.py
    import logging

    from django.conf import settings

    from apscheduler.schedulers.blocking import BlockingScheduler
    from apscheduler.triggers.cron import CronTrigger
    from django.core.management.base import BaseCommand
    from django_apscheduler.jobstores import DjangoJobStore
    from django_apscheduler.models import DjangoJobExecution


    logger = logging.getLogger(__name__)


    def my_job():
    # Your job processing logic here...
    pass

    def delete_old_job_executions(max_age=604_800):
    """This job deletes all apscheduler job executions older than `max_age` from the database."""
    DjangoJobExecution.objects.delete_old_job_executions(max_age)


    class Command(BaseCommand):
    help = "Runs apscheduler."

    def handle(self, *args, **options):
    scheduler = BlockingScheduler(timezone=settings.TIME_ZONE)
    scheduler.add_jobstore(DjangoJobStore(), "default")

    scheduler.add_job(
    my_job,
    trigger=CronTrigger(second="*/10"), # Every 10 seconds
    id="my_job", # The `id` assigned to each job MUST be unique
    max_instances=1,
    replace_existing=True,
    )
    logger.info("Added job 'my_job'.")

    scheduler.add_job(
    delete_old_job_executions,
    trigger=CronTrigger(
    day_of_week="mon", hour="00", minute="00"
    ), # Midnight on Monday, before start of the next work week.
    id="delete_old_job_executions",
    max_instances=1,
    replace_existing=True,
    )
    logger.info(
    "Added weekly job: 'delete_old_job_executions'."
    )

    try:
    logger.info("Starting scheduler...")
    scheduler.start()
    except KeyboardInterrupt:
    logger.info("Stopping scheduler...")
    scheduler.shutdown()
    logger.info("Scheduler shut down successfully!")


      

    • ​./manage.py runapscheduler​​每当启动为 Django 应用程序提供服务的 Web 服务器时,都应调用此管理命令。如何以及在何处完成此操作的详细信息取决于具体实现,并取决于您使用的 Web 服务器以及您如何将应用程序部署到生产环境。对于大多数人来说,这应该涉及配置各种​​主管​​进程。

    • 像往常一样注册任何 APScheduler 作业。请注意,如果您尚未设置​​DjangoJobStore​​为​​'default'​​ 作业存储,则需要将其包含​​jobstore='djangojobstore'​​在您的​​scheduler.add_job()​​调用中。

    ​​​​高级用法

    django-apscheduler 假设您已经熟悉 APScheduler 及其正确使用。如果没有,请转到项目页面并查看​​APScheduler 文档​​。

    根据您的环境和用例,可以使用​​不同类型的调度程序​​。如果您更喜欢运行 a​​BackgroundScheduler​​而不是使用 a ​​BlockingScheduler​​,那么您应该知道将 APScheduler 与 uWSGI 一起使用需要一些额外的 ​​配置步骤​​才能重新启用线程支持。

    ​​​​支持的数据库

    请注意​​Django 官方支持​​的数据库列表。django-apscheduler 可能不适用于不受支持的数据库,如 Microsoft SQL Server、MongoDB 等。

    ​​​​数据库连接和超时

    django-apscheduler 依赖于标准的 Django 数据库​​配置设置​​。这些设置与您的数据库的配置方式相结合,决定了您的特定部署将如何进行连接管理。

    如果您遇到任何类型的“丢失连接”错误,则可能意味着:

    • 您的数据库连接已超时。可能是时候开始考虑部署连接池程序(如 ​​pgbouncer​​)来为您管理数据库连接。
    • 您的数据库服务器已崩溃/已重新启动。Django​​不会自动重新连接​​。



举报

相关推荐

0 条评论