0
点赞
收藏
分享

微信扫一扫

使用Celery实现计划任务与异步任务

前言

Celery是一个开源的分布式任务队列系统,用于处理异步任务和分布式任务调度。使用消息代理(如RabbitMQ、Redis)来实现任务队列和消息传递。

在使用Python开发web应用过程中,经常使用Celery完成两种任务需求:

  1. 异步任务。将任务提交到任务队列中,然后继续处理其他任务,而不必等待任务完成。
  2. 定时任务。根据需求设置任务的执行时间和频率。

本文的目的就是通过讲解Celery的相关信息,来实现计划任务与异步任务的操作。

Celery的组成

使用Celery实现计划任务与异步任务_linux

Celery由以下几个核心组件组成:

  • Celery应用程序(Celery Application):Celery应用程序是整个Celery系统的核心。它负责任务的创建、调度和分发。应用程序通常在项目的入口处初始化,包括配置Celery的消息代理、结果存储等参数。
  • 任务(Tasks):任务是要执行的操作或函数,可以是任何Python可调用对象。任务函数使用装饰器@task进行修饰,将其注册到Celery应用程序中。任务可以同步执行或异步执行,可以根据需要设置任务的参数、执行时间和其他属性。
  • 消息代理(Message Broker):消息代理是任务队列的基础,负责接收和存储任务消息,并将它们传递给工作节点进行处理。Celery支持多种消息代理,如RabbitMQ、Redis、Amazon SQS等。消息代理负责确保任务的可靠传递和分发。
  • 工作节点(Worker Nodes):工作节点是实际执行任务的计算节点。它们连接到消息代理,接收任务消息并执行任务函数。一个Celery应用程序可以有多个工作节点,允许任务在分布式环境中并行执行。工作节点可以水平扩展,以适应任务负载的增加。
  • 结果存储(Result Backend):结果存储是用于存储任务执行结果的地方。当任务完成后,工作节点将结果存储在结果存储中,供应用程序查询和获取。常见的结果存储包括数据库、缓存、消息队列等。
  • 调度器(Scheduler):调度器负责定时触发任务的执行。Celery中的调度器组件称为Celery Beat(简称beat)。它允许您定义定时任务,指定任务执行的时间和频率。beat会根据您定义的调度配置将任务发送到消息代理中的任务队列。

通过这些组件的协作,Celery实现了异步任务处理、分布式任务调度和执行,并提供了灵活的定时任务功能。开发人员可以使用这些组件来构建可靠、高性能的分布式应用程序。


Celery即相关服务的安装

本文中的代码部署在home/hero/celery_test目录下,在虚拟环境下运行。

cd /home/hero/celery_test
virtualenv celery_test
source venv/bin/activate

使用pip可以直接安装Celery模块

pip install Celery

一般情况下,Celery需要使用一些消息代理服务如redis来实现队列机制。作为示例在本机启动一个redis-server即可。

使用Celery实现计划任务与异步任务_celery_02

Celery的使用

文件结构

使用Celery实现计划任务与异步任务_linux_03

Celery实例

首先定义一个Celery实例,通过此实例来实现Celery的功能。

文件名为celeryobj.py。这段代码主要是定义了一个celery对象,并向其注册了两个包含异步任务的模块以及两个定时任务。

from celery import Celery
from celery.schedules import crontab
from datetime import timedelta


def init_celery():
    broker_url = 'redis://127.0.0.1:6379/1'  # 指定 Broker消费者,我们使用redis 1号数据库
    result_backend = 'redis://127.0.0.1:6379/2'  # 指定 Backend,最终消费结果,我们使用redis 2号数据库
    timezone = 'Asia/Shanghai'  # 指定时区,默认是 UTC
    # 用于注册异步任务
    # 注册内容为包含celery装饰器异步任务函数的文件名径字符串列表,
    # 示例内容为 test目录下的work和
    include = ['test.work',
               'test.beat']
    # 用于注册定时任务
    # 字典格式,键为定时任务的名称,值为任务信息。
    # 任务信息依旧字典格式。 键task为包含Celery装饰器定时任务函数模块引入的文件路径函数字符串。键schedule为定时任务执行周期频率
    beat_schedule = {
        'cron_1min': {
          'task': 'test.beat.cron_1min',
          'schedule': crontab(minute='*/1')
        },
        'timedelta_5s': {
            'task': 'test.beat.timedelta_5s',
            'schedule': timedelta(seconds=5)
        }
    }
    # 初始化Celery对象
    cel = Celery('CeleryOBJ', backend=result_backend, broker=broker_url, include=include)  # 创建 Celery 实例
    cel.conf.timezone = timezone
    cel.conf.enable_utc = False
    cel.conf.beat_schedule = beat_schedule
    return cel


c = init_celery()

任务的编写

在test目录下work和beat文件中编写使用celery装饰器的函数代码。

test/work.py

from celeryobj import c
import time
from datetime import datetime


@c.task   # celery 装饰器
def work_test(user, second):
    """
    测试异步任务
    :param user:
    :param second:
    :return:
    """
    now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print('当前时间:{}'.format(now))
    print('{} 发起了任务,需要执行{}s'.format(user, second))
    time.sleep(second)  # 模拟串行任务长时间处理
    now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print('当前时间:{}'.format(now))
    print('{} 发起的任务完成'.format(user))

test/beat.py

from celeryobj import c
from datetime import datetime


@c.task   # celery 装饰器
def cron_1min():
    """
    1min 计划任务 crontab方式
    :return:
    """
    now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print('1min', now)

@c.task
def timedelta_5s():
    """
    5s 计划任务 timedelte方式
    :return:
    """
    now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print('5s', now)

运行Celery

若只是为了调试,可以使用命令行直接启动,但是后续进入正式环境,为方便启停推荐使用supervisord进行操作,以下是配置文件。

异步任务守护

[program:celery_worker]
user=hero
directory=/home/hero/celery_test
command=/home/hero/celery_test/venv/bin/celery -A celeryobj.c worker -l info  --concurrency=4  --logfile=/home/hero/log/celery_work.log

定时任务守护

[program:celery_beat]
user=hero
directory=/home/hero/celery_test
command=/home/hero/celery_test/venv/bin/celery -A celeryobj.c beat -l info --logfile=/home/hero/log/celery_beat.log

启动Celery

supervisorctl reload
supervisorctl restart all

在supervisord的守护下,前文定义的celery_work 和 celery_beat可以正常运行。

使用命令supervisorctl status可以查看运行状态,并可以通过配置文件中的日志文件查看具体运行状态。

使用Celery实现计划任务与异步任务_异步任务_04

测试Celery

定时任务

celery_beat根据celeryobj.py中beat_schedule定义的计划周期将任务发送给celery_work。从日志上看就是这样的:

celery_beat的日志

使用Celery实现计划任务与异步任务_定时任务_05

celery_work 的日志

使用Celery实现计划任务与异步任务_异步任务_06

异步任务

测试任务使用一下代码

celery_run.py

from test import work

if __name__ == "__main__":
    work.work_test.delay("GuoGuo", 10)
    work.work_test.delay("Guo", 3)
    work.work_test.delay("G", 5)

使用celery装饰器的函数可以通过delay方法来启用celery的功能,将其交给celery worker来执行。如果不加delay仍可以作为普通函数来使用。

如果不使用celery可以使用一下代码来进行对比

run.py

from test import work

if __name__ == "__main__":
    work.work_test("GuoGuo", 10)
    work.work_test("Guo", 3)
    work.work_test("G", 5)

先执行run.py

使用Celery实现计划任务与异步任务_定时任务_07

任务是逐个串行完成的。

运行celery_run.py,同时观察日志可以直观的感受到celery是如何异步执行的。

使用Celery实现计划任务与异步任务_linux_08

同样是三个任务,几乎同时开始。




举报

相关推荐

0 条评论