项目结构
 

 
项目依赖
 
celery==5.1.2   
Django==3.2.12  
django-celery-beat==2.2.1  
django-celery-results==2.2.0  
django-redis==5.2.0  
eventlet==0.33.0  
greenlet==1.1.2   
redis==4.1.4  
 
安装package
 
 
pip install celery
 
- 安装django-celery-results
 用于保存celery任务结果
pip install celery
 
- 安装django-celery-beat
 用于执行周期任务
pip install celery
 
 
pip install eventlet
 
在django项目中使用
 
- 在项目设置文件settings.py中配置celery
 settings.py
"""  
Django settings for demo project.  
  
Generated by 'django-admin startproject' using Django 3.2.12.  
  
For more information on this file, see  
https://docs.djangoproject.com/en/3.2/topics/settings/  
  
For the full list of settings and their values, see  
https://docs.djangoproject.com/en/3.2/ref/settings/  
"""  
  
from pathlib import Path  
  
BASE_DIR = Path(__file__).resolve().parent.parent  
  
  
SECRET_KEY = 'django-insecure-%8q+wpy=)*@237)o#!pgkn&^$qr9-h!w12jociuv4i^f#&z70d'  
  
DEBUG = True  
  
ALLOWED_HOSTS = []  
  
  
INSTALLED_APPS = [  
    'django.contrib.admin',  
 'django.contrib.auth',  
 'django.contrib.contenttypes',  
 'django.contrib.sessions',  
 'django.contrib.messages',  
 'django.contrib.staticfiles',  
 'app1',  
 'django_celery_results',  
 'django_celery_beat',  
]  
  
MIDDLEWARE = [  
    'django.middleware.security.SecurityMiddleware',  
 'django.contrib.sessions.middleware.SessionMiddleware',  
 'django.middleware.common.CommonMiddleware',  
 'django.middleware.csrf.CsrfViewMiddleware',  
 'django.contrib.auth.middleware.AuthenticationMiddleware',  
 'django.contrib.messages.middleware.MessageMiddleware',  
 'django.middleware.clickjacking.XFrameOptionsMiddleware',  
]  
  
ROOT_URLCONF = 'demo.urls'  
  
TEMPLATES = [  
    {  
        'BACKEND': 'django.template.backends.django.DjangoTemplates',  
 'DIRS': [BASE_DIR / 'templates']  
        ,  
 'APP_DIRS': True,  
 'OPTIONS': {  
            'context_processors': [  
                'django.template.context_processors.debug',  
 'django.template.context_processors.request',  
 'django.contrib.auth.context_processors.auth',  
 'django.contrib.messages.context_processors.messages',  
 ],  
 },  
 },  
]  
  
WSGI_APPLICATION = 'demo.wsgi.application'  
  
  
DATABASES = {  
    'default': {  
        'ENGINE': 'django.db.backends.sqlite3',  
 'NAME': BASE_DIR / 'db.sqlite3',  
 }  
}  
  
  
AUTH_PASSWORD_VALIDATORS = [  
    {  
        'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',  
 },  
 {  
        'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',  
 },  
 {  
        'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',  
 },  
 {  
        'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',  
 },  
]  
  
  
LANGUAGE_CODE = 'zh-hans'  
  
TIME_ZONE = 'Asia/Shanghai'  
  
USE_I18N = True  
  
USE_L10N = True  
  
USE_TZ = False  
  
  
STATIC_URL = '/static/'  
  
  
DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField'  
  
CELERY_RESULT_BACKEND = 'django-db'  
CELERY_CACHE_BACKEND = 'django-cache'
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/2'  
  
CELERY_ACCEPT_CONTENT = ['json']  
CELERY_TASK_SERIALIZER = 'json'  
  
CACHES = {  
    'default': {  
        'BACKEND': 'django_redis.cache.RedisCache',  
 'LOCATION': 'redis://127.0.0.1:6379/0',  
 },  
 'celery': {  
        'BACKEND': 'django_redis.cache.RedisCache',  
 'LOCATION': 'redis://127.0.0.1:6379/1',  
 },  
}
 
- 在项目同名目录下创建celery.py文件
 celery.py
from __future__ import absolute_import, unicode_literals  
import os  
from celery import Celery  
from celery.schedules import timedelta  
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'demo.settings')  
  
app = Celery('demo')  
app.conf.timezone = 'Asia/Shanghai'  
app.conf.enable_utc = False  
  
app.config_from_object('django.conf:settings', namespace='CELERY')  
app.conf.beat_schedule = {  
    'add-every-10-seconds': {  
        'task': 'app1.tasks.func2',  
 'schedule': timedelta(seconds=10),  
 'args': ()  
    },  
}  
app.autodiscover_tasks()
 
- 在项目同名目录下的__init__.py中作如下配置,这样可以确保在Django启动时加载应用程序,以便@shared_task装饰器将使用该应用程序
 __init__py
from .celery import app as celery_app  
  
__all__ = ('celery_app',)
 
- 在app目录下新建tasks.py文件
 task.py
from celery import shared_task  
  
  
@shared_task  
def func1():  
    return "异步任务"  
  
  
@shared_task  
def func2():  
    return "周期任务"
 
- 任务调用:异步任务为主动触发,定时和周期任务需有beat分发任务,worker去执行
 views.py
from django.http import JsonResponse  
from .tasks import func1  
  
  
  
def async_task(request):  
    func1.delay()  
    return JsonResponse({"message": "这是celery异步任务!"},json_dumps_params={"ensure_ascii":False})
 
项目运行
 
 
python manage.py runserver 127.0.0.1:8000
 
- 启动celery beat(当需要执行定时任务时需要启动beat)
celery -A demo beat -l INFO
 

 
 
celery -A demo worker -l INFO --pool=solo
 

 
踩坑
 
- 不建议使用django-celery,容易出现很多包版本配合的问题
- 使用 celery -A demo worker -l INFO 启动worker会出现
ValueError: not enough values to unpack (expected 3, got 0)
 

 
- 使用 celery -A demo worker -l INFO -P eventlet 启动worker会出现
django.db.utils.DatabaseError: DatabaseWrapper objects created in a thread can only be used in that same thread
 
