0
点赞
收藏
分享

微信扫一扫

celery精讲和实战

一,什么是celery

1.1celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。

Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

消息中间件

Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等,使用redis作为队列用的更多吧,因为使用相比mq更加简单,后面实战介绍也是用的redis

任务执行单元

Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

任务结果存储

Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

另外, Celery还支持不同的并发和序列化的手段

  • 并发:Prefork, Eventlet, gevent, threads/single threaded
  • 序列化:pickle, json, yaml, msgpack. zlib, bzip2 compression, Cryptographic message signing 等等


1.2使用场景

celery是一个强大的 分布式任务队列的异步处理框架,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(async task)和定时任务(crontab)。

异步任务:将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等

比如系统中创建用户成功,用户信息修改后发送邮件通知,用户注册短信验证,执行耗时较长的任务时

定时任务:定时执行某件事情,比如每天数据统计


1.3celery优点

imple(简单)
Celery 使用和维护都非常简单,并且不需要配置文件。

Highly Available(高可用)
woker和client会在网络连接丢失或者失败时,自动进行重试。并且有的brokers 也支持“双主”或者“主/从”的方式实现高可用。

Fast(快速)
单个的Celery进程每分钟可以处理百万级的任务,并且只需要毫秒级的往返延迟(使用 RabbitMQ, librabbitmq, 和优化设置时)

Flexible(灵活)
Celery几乎每个部分都可以扩展使用,自定义池实现、序列化、压缩方案、日志记录、调度器、消费者、生产者、broker传输等等。


1.4celery安装

可以通过python包管理平台pypi安装celery

pip install celery


二,celery执行异步任务

实验前提我在10.0.0.55这台内网机上安装了redis服务并且开通了6379端口

创建一个py文件,文件名为celery_task.py,内容如下:

import celery
import time

backend = 'redis://10.0.0.55:6379/1'
broker = 'redis://10.0.0.55:6379/2'
cel = celery.Celery('test', backend=backend, broker=broker)


@cel.task
def send_email(name):
    print("向%s发送邮件..." % name)
    time.sleep(5)
    print("向%s发送邮件完成" % name)
    return "ok"

编写完成后先别直接运行,先启动celery worker命令为:

celery worker -A celery_task -l info

celery命令,worker开的进程 -A 后面接的是要执行的文件名 -l 指定日志级别,执行后出现:

celery精讲和实战_定时任务

代表启动成功

接下来就是生产者创建任务推送到消息中间件,创建执行任务文件produce_task.py:

from celery_task import send_email
result2 = send_email.delay("alex")
print(result2.id)


生产者调用delay方法会向消息中间件插入队列信息,信息包括你要执行的send_email函数还有alex参数

点击执行produce_task.py,会发现虽然send_email函数里有5秒钟的挂起,但是执行很快就给返回了

celery精讲和实战_异步任务_02

但是这里碰到一个坑,worker进程接收到了执行任务,但是报错,说send_email函数期待接收3个参数,但是我一个都没给,报错如下:

celery精讲和实战_定时任务_03


解决办法:windows python虚拟环境下安装eventlet

celery精讲和实战_redis_04

重新起celery worker 进程监听

celery worker -A celery_task -l info -P eventlet

说是celery 4+开始已经对windows不再支持了,所以我们在windows上实验的话,得安装eventlet库,eventlet是一个python协程模板,重新执行produce_task.py

celery worker收到任务完成执行

celery精讲和实战_redis_05


我们执行完produce_task.py发现返回得是一个类似id的结果,这个就是celery id,那我如何取到send_email函数里return的结果呢?

创建result.py

from celery.result import AsyncResult # 引入AsyncResult
from celery_task import cel   #导入celery app实例

async_result=AsyncResult(id="910dc103-e4e2-48a8-8fc1-1f631d9323b1", app=cel)

# 910dc103-e4e2-48a8-8fc1-1f631d9323b1就是异步返回的执行任务id

if async_result.successful():
    result = async_result.get()
    print(result)
    # result.forget() # 将结果删除
elif async_result.failed():
    print('执行失败')
elif async_result.status == 'PENDING':
    print('任务等待中被执行')
elif async_result.status == 'RETRY':
    print('任务异常后正在重试')
elif async_result.status == 'STARTED':
    print('任务已经开始被执行')

result里对async_result任务状态进行判断,这里简单说下celery 任务的几种状态:

  • PENDING (waiting for execution or unknown task id)
  • STARTED (task has been started)
  • SUCCESS (task executed successfully)
  • FAILURE (task execution resulted in exception)
  • RETRY (task is being retried)
  • REVOKED (task has been revoked)


执行后返回:

得到return ok返回值

celery精讲和实战_redis_06


以下是项目中实际编写的获取celery result的一个示例:

def cel_result(task_info, webhook, timeout=60):
    """
    :param timeout: 超时时间
    :param task_info: 列表,元素为字典,字典包含task_id  host_info update_method
    :param webhook:
    :return:
    """
    done = 0  # 操作完成计数
    count = 0  # 循环等待次数
    result = {}  # 最终结果
    success = True  # 是否全部成功
    print("任务执行检查,超时时间为:", timeout)
    while True:  #
        print("第{}次查询".format(str(count)))

        for task in task_info:
            if str(task['task_id']) in result:  # 已出结果的任务 pass
                continue

            update_method = task['update_method']
            host_name = task['host_info']['host_name']
            host_ip = task['host_info']['host_ip']

            async_result = AsyncResult(id=task['task_id'], app=c)
            print("task_id", task['task_id'], "状态", async_result.status)
            if async_result.successful():
                msg = async_result.get()
                print(msg)
                if not msg['success']:  # 执行成功 但是结果失败,认为失败
                    success = False
                result[str(task['task_id'])] = msg
                # async_result.forget()  # 将结果删除
                done += 1
                progress = '已完成 {}/{} {}'.format(str(done), str(len(task_info)), str(msg))
                webhook_msg(webhook, progress)
                print(progress)

            elif async_result.failed():
                msg = "{} {} {} 操作失败".format(host_name, host_ip, update_method)
                result[str(task['task_id'])] = msg
                done += 1
                progress = '已完成 {}/{} {}'.format(str(done), str(len(task_info)), msg)
                webhook_msg(webhook, progress)
                print(progress)
                success = False
            # elif async_result.status == 'PENDING':  # 有问题,执行完了也是pending状态
            #     result[str(task['task_id'])] = '任务等待中被执行'
            # elif async_result.status == 'RETRY':
            #     result[str(task['task_id'])] = '任务异常后正在重试'
            # elif async_result.status == 'STARTED':
            #     result[str(task['task_id'])] = '任务已经开始被执行'

        if done >= len(task_info):
            return result, success
        else:  # 等待1
            time.sleep(1)
        count += 1
        if count >= timeout:  # 超时处理
            print("timeout超时")
            return result, False


多任务结构:

celery精讲和实战_定时任务_07

celery_setting.py celery配置文件,最终返回celery实例,这里取名为celery_setting而不是celery的文件名是为了反正产生循环导入的问题

from celery import Celery

cel = Celery('celery_demo',
             broker='redis://10.0.0.55:6379/1',
             backend='redis://10.0.0.55:6379/2',
             # 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类
             include=['celery_tasks.task01',
                      'celery_tasks.task02'
                      ])

# 时区
cel.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
cel.conf.enable_utc = False

celery_demo,celery app 实例名字

borker,中间件代理地址

backend,异步任务结果存储代理地址

include,寻找异步任务模板路径,如果有多个异步任务,就写多个进去,列表包起来


task01.py,发送邮件任务

import time
from celery_tasks.celery_setting import cel


@cel.task
def send_email(res):
    time.sleep(5)
    return "完成向%s发送邮件任务" % res


task02.py,发送短信任务

import time
from celery_tasks.celery_setting import cel


@cel.task
def send_msg(name):
    time.sleep(5)
    return "完成向%s发送短信任务" % name


开启work进程

celery worker -A celery_tasks.celery_setting -l info -P eventlet

celery精讲和实战_定时任务_08

发现已经扫描到我这两个异步任务


编写生产者produce_task.py

from celery_tasks.task01 import send_email
from celery_tasks.task02 import send_msg

# 立即告知celery去执行test_celery任务,并传入一个参数
result = send_email.delay('peng')
print(result.id)
result = send_msg.delay('peng')
print(result.id)


Celery定时任务

produce_task.py

from celery_task import send_email
from datetime import datetime

# 方式一
v1 = datetime(2023, 5, 7, 21, 57, 00)
print(v1)
v2 = datetime.utcfromtimestamp(v1.timestamp())
print(v2)
result = send_email.apply_async(args=["egon",], eta=v1)
print(result.id)

# 方式二
# ctime = datetime.now()
# # 默认用utc时间
# utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
# from datetime import timedelta
# time_delay = timedelta(seconds=10)
# task_time = utc_ctime + time_delay
#
# # 使用apply_async并设定时间
# result = send_email.apply_async(args=["egon"], eta=task_time)
# print(result.id)

通过apply_async来执行定时任务,args里面是向send_email函数传递的参数,eta是传递的国标日期时间


多层次目录下我们修改celery_setting.py

from datetime import timedelta

from celery import Celery

cel = Celery('celery_demo',
             broker='redis://10.0.0.55:6379/1',
             backend='redis://10.0.0.55:6379/2',
             # 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类
             include=['celery_tasks.task01',
                      'celery_tasks.task02'
                      ])

# 时区
cel.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
cel.conf.enable_utc = False

cel.conf.beat_schedule = {
    # 名字随意命名
    'add-every-10-seconds': {
        # 执行tasks1下的test_celery函数
        'task': 'celery_tasks.task01.send_email',
        # 每隔2秒执行一次
        # 'schedule': 1.0,
        # 'schedule': crontab(minute="*/1"),
        'schedule': timedelta(seconds=6),
        # 传递参数
        'args': ('张三',)
    },
}

这里cel celery实例增加了beat_schedule选项

task:执行定时任务模块路径

schedule:日程,上面意思是每隔6秒钟

args:传递给定时任务函数的参数


启动定时器

celery beat -A celery_tasks.celery_setting

每6秒钟往队列里插入任务


worker消费端检测到任务执行

celery精讲和实战_redis_09



Django中使用celery示例:

项目中celery配置文件:

celery精讲和实战_redis_10


autodiscover_tasks自动发现任务,settings.INSTALL_APPS,表示会扫描所有settings配置的项目目录下的tasks文件,app.config_from_object表示会读取settings文件中关于celery的一些配置

celery精讲和实战_定时任务_11


celery精讲和实战_redis_12

from .celery import app as celery_app 导入了名为app的Celery实例对象,并将其命名为celery_app,这样其他模块可以通过导入这个模块来使用该Celery实例对象。

__all__ = ('celery_app',) 定义了__all__变量,它是一个字符串列表,表示模块中可以被导出的变量、类和函数。这里只导出了celery_app,也就是说只有这个变量可以被其他模块导入。


同样我们通过在服务器上启动celery woker进程作为消费端


celery的任务必须写在tasks.py的文件中,别的文件名称不识别!!!

同时引入celery的task作为装饰器去修饰我们要执行的celery任务函数

@task(name=xxx)  name表示设置任务的名称,如果不填写,则默认使用函数名做为任务名

celery精讲和实战_异步任务_13


生产端我们在view视图里去通过delay实现异步任务调用

celery精讲和实战_redis_14


flask使用celery

celery配置文件示例:


from celery import Celery
from celery.schedules import crontab
from datetime import timedelta
from config import configs
from getenv import getenv

env = getenv()


def init_celery():
    broker_url = configs[env].BROKER_URL
    result_backend = configs[env].CELERY_RESULT_BACKEND
    include = configs[env].CELERY_INCLUDE  # CELERY_INCLUDE = ['celery_tasks.update.tasks']
    timezone = configs[env].CELERY_TIMEZONE

    beat_schedule = {
        'purchse_ecs_check': {
          'task': 'celery_tasks.cron_task.purchse_ecs_check',
          'schedule': crontab(minute='*/20')
        },
        # 'check_dcdn_used': {
        #    'task': 'celery_tasks.check_dcnd_used_task.check_dcdn_used_task',
        #    'schedule': crontab(hour='*/1')
        # },
        'save_onlin_data': {
            'task': 'celery_tasks.cron_task.save_online',
            'schedule': crontab(minute='*/1')
        },
        'serial_task_check': {
          'task': 'celery_tasks.cron_task.serial_check',
          'schedule': crontab(minute='*/1')
        },
        # 'test_webhook': {
        #     'task': 'celery_tasks.webhooktest.webhooktest',
        #     'schedule': crontab(minute="*")
        # }
    }
    cel = Celery('crontab', backend=result_backend, broker=broker_url, include=include)  # 创建 Celery 实例
    cel.conf.timezone = timezone
    cel.conf.enable_utc = False
    cel.conf.beat_schedule = beat_schedule

    # class ContextTask(c.Task):
    #     def __call__(self, *args, **kwargs):
    #         with app.app_context():
    #             return self.run(*args, **kwargs)
    #
    # cel.Task = ContextTask
    return cel


# flask_app = init_app()
c = init_celery()


include异步任务模块路径发现,我们配置在config里

celery精讲和实战_定时任务_15


celery beat和worker进程我们通过supervisor进行管控,supervisor配置文件

beat的

[include]
files=/etc/supervisord.conf
[program:celery_beat]
user=hero 
directory=/home/hero/heroes_assemble
# command=/home/hero/heroes_assemble/venv/bin/celery -A celery_tasks.manager.c beat -l info
command=/home/hero/heroes_assemble/venv/bin/celery -A celery_tasks.celery.c beat -l info --logfile=/home/hero/log/celery_beat.log



worker的

[include]
files=/etc/supervisord.conf
[program:celery_worker]
environment=HOME="/home/hero",ANSIBLE_FORCE_COLOR=True
user=hero 
directory=/home/hero/heroes_assemble
# command=/home/hero/heroes_assemble/venv/bin/celery -A celery_tasks.manager.c worker -l info
command=/home/hero/heroes_assemble/venv/bin/celery -A celery_tasks.celery.c worker -l info  --concurrency=4  --logfile=/home/hero/log/celery_work.log

这个--concurrency等于就是一次并发执行4个celery任务


通过引入celery里的c 实例来修饰你需要作为celery任务的函数

celery精讲和实战_redis_16


生产端通过delay发起异步任务调用

celery精讲和实战_redis_17


定时任务也是通过引用celery里c 实例来修饰你需要作为celery任务的函数,只是最后配置 beat_schedule需要将这个所修饰的函数名模块路径带上

celery精讲和实战_redis_18

举报

相关推荐

0 条评论