Celery(芹菜)
是一个分布式异步任务框架,是一个框架。跟其他web框架无关(跟其他web框架无关)
Celery 官网:http://www.celeryproject.org/ Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/ Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统
Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.
Celery是一个资金很少的项目,因此我们不支持Microsoft Windows。请不要打开与该平台相关的任何问题。
1)可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket)
2)celery服务为为其他项目服务提供异步解决任务需求的
注:会有两个服务同时运行,一个是项目服务(我们的项目是django),一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求
例子:
人是一个独立运行的服务(django) | 医院也是一个独立运行的服务(celery)
正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时(处理异步任务),就会被医院接收,解决人生病问题
人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求
架构:
broker:任务中间件
用户提交的任务,存在这个里面(可以用redis、rabbitmq(专业的消息队列))
就是生产者、消费者模型。比如厨师生产包子,放在盘子里面,等客户来吃,这个任务中间件就是这个盘子
worker:任务执行者
就是消费者,真正执行任务的进程(真正干活的人)
backend:任务结果存储
任务执行后的结果(可以用redis、rabbitmq(专业的消息队列))
Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和 任务执行结果存储(task result store)组成。
celery服务为为其他项目服务提供异步解决任务需求的
celery可以做的事:
异步任务
延迟任务
定时任务(其他方式也可以解决,网上有其他框架)
celery的基本使用
先安装一下
pip install celery==5.1.2
celery的快速使用/celery_task.py
# 先安装一下
# pip install celery==5.1.2
from celery import Celery
# Celery里面至少要传这几个参数
# main = None, 这个就是一个名字,可以写成__name__,也可以写其他的
# backend=None, 结果存储,是redis的地址:backend='redis://:123456@127.0.0.1:6379/1'
# redis: :redis协议
# 123456: : 密码
# 127.0.0.1:6379 :redis 地址
# 1 :第一个库
# broker=None, 也是redis的地址:broker='redis://:123456@127.0.0.1:6379/2'
broker = "redis://127.0.0.1:6379/1" # 消息中间件,任务提交到1里面
backend = "redis://127.0.0.1:6379/2" # 结果存在2里面
# 1、实例化得到celery对象
app = Celery(__name__, backend=backend, broker=broker) # 拿到对象
# 2、写一堆任务(计算a+b、挖井、砍树),就是函数
# 3、使用装饰器,包裹任务。用Celery管理任务
@app.task()
def add(a, b):
import time
time.sleep(2)
return a + b
# 下面是替提交任务,因为上面的3个部分已经写好了,需要人或者程序去提交任务
celery的快速使用/提交任务.py
import celery_task
# 1、同步执行(既然同步执行的话,就按照之前的写法就可以了,不需要这样写)
# res = celery_task.add(2, 3) # 这个一个普通的同步任务(会等着2+3执行完了,再返回结果)
# print(res) # 同步执行的,等两秒钟之后,会出现结果5
# 异步任务:
# 第一步:提交(使用任务名.apply_async(args=[参数,参数])或者任务名.apply_async(kwargs={"a":2,"b":3})
# 结果是任务的ID号,唯一标识这个任务
res = celery_task.add.apply_async(kwargs={"a": 2, "b": 3}) # add.apply_async(2,3),括号里面是add函数的参数
print(res) # 0e615f80-0aae-4ecd-8bc4-ee0c94545b8a
# 图片3
# 第二步:让worker执行---结果存到redis中
# 通过命令启动celery(app)服务:
# 非windows
# 5.x之前的用的是下面的命令,现在celery用的是5.1.2
# celery worker -A celery_task -l info
# 5.x之之后用的是下面的命令(把-A参数放在前面)
# celery -A celery_task worker -l info
# -A celery_task:指的是对应的应用程序
# -l info:对应的日志界别
# windows:(官方不支持,需要装一个eventlet)
# pip3 install eventlet(eventlet==0.32.0)(遇到一个坑,装eventlet的时候,会顺带安装dnspython,这个版本用2.0.0)
# 5.x之前的用的是下面的命令,现在celery用的是5.1.2
# celery worker -A celery_task -l info -P eventlet
# 5.x之之后用的是下面的命令(把-A参数放在前面)
# celery -A celery_task worker -l info -P eventlet
script/1、celery的快速使用/查看任务结果.py
# 第三步:查看任务执行的结果
# 查看执行结果,在哪里都可以
from celery_task import app
from celery.result import AsyncResult
id = '0e615f80-0aae-4ecd-8bc4-ee0c94545b8a' # 提交任务时,生成的id号
if __name__ == '__main__':
a = AsyncResult(id=id, app=app) # 实例化得到一个对象
if a.successful():
result = a.get() # 如果成功,就获取结果,异步任务得到的结果
print("任务执行成功了")
print(result)
elif a.failed():
print('任务失败')
elif a.status == 'PENDING':
print('任务等待中被执行')
elif a.status == 'RETRY':
print('任务异常后正在重试')
elif a.status == 'STARTED':
print('任务已经开始被执行')
celery的包结构
这种方式是比较常用的,做成一个包,后面可以在其他项目里面直接用
现在把拆开
提交任务,一般是在项目里面,不是在这个包下面;在任意位置都可以
上面就是这个包结构
下面可以把worker启动起来,没有任务的话,就等在那,一有任务的话,就立马执行了(也可以先提交任务)
只要没有任务就一直卡正这,只要服务不停
执行完了之后,结果就在结果存储里面了
我们直接可以通过任务id,查看结果
再提交一个生成订单的任务。效果也是一样的
celery执行延迟任务
延迟任务跟worker没有关系,worker有活就干,没有活就闲置在那边
延迟任务就是提交的方式跟之前不一样了,之前是使用res = order_task.make_order.apply_async();是使用apply_async()去提交的
现在提交的时候,只需要告知几分钟之后执行就行了。是固定用法
这个就是延迟任务的代码,就是加一个时间
代码:
from celery_task import user_task, order_task
###########celery 执行延迟任务#############
# 添加延迟任务
from datetime import datetime, timedelta
eta = datetime.utcnow() + timedelta(seconds=50)
# 50秒后发送短信
res = user_task.send_sms.apply_async(args=(200, 50), eta=eta)
print(res)
执行异步任务
方式一: apply_async后面不加时间,就是立即执行
user_task.send_sms.apply_async(args=(200, 50))
方式二:使用delay
# 使用第二种方式执行异步任务
res = user_task.send_sms.delay("1526589745", "8888") #后面不用传时间,有几个参数就传几个参数
print(res)
执行定时任务
celery能干3件事,执行异步任务、执行延迟任务、执行定时任务
定时任务,是程序启动之前就已经写好了。任务不是人为的去提交了
需要在celery.py里面注册定时任务,因为现在用的是UTC时间,需要修改celery的配置信息
app.conf 就是整个celery的配置信息
现在里面没有配置,我们可以配置一下,配置过之后,我们就不需要使用UTC时间了,直接使用本地的上海的时间就可以了
现在启动worker,任务还是不能执行,还需要在启动一个东西:beat
beat就相当于我们之前手动提交任务给worker执行,现在是定时任务是beat帮我们提交
celery -A celery_task beat -l info
celery_task是包名;启动的时候,需要新开一个窗口,不能在之前的worker停掉
汇总:
1、在包(celery_task)下的celery.py下写入
########## 注册定时任务 ##########
# 需要先修改celery的配置信息
# 时区
app.conf.timezone = "Asia/Shanghai"
# 是否使用UTC
app.conf.enable_utc = False
# 任务的定时配置,通过app.conf.beat_schedule
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
# send_sms_3_seconds是任务名,随便起
'send_sms_3_seconds': {
'task': 'celery_task.user_task.send_sms', # 指定执行的是哪个任务
'schedule': timedelta(seconds=3), # 每3秒钟发一次短信
# 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
'args': ("15052585635", "5202"), # 指定任务的参数
},
'make_order_5_seconds': {
'task': 'celery_task.order_task.make_order', # 指定执行的是哪个任务
'schedule': timedelta(seconds=3), # 每5秒钟生成一个订单
# 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
},
'add_every_1_seconds': {
'task': 'celery_task.course_task.add', # 指定执行的是哪个任务
'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
'args': (3, 5), # 指定任务的参数
},
}
2、启动worker
celery -A celery_task worker -l info -P eventlet
3、启动beat
如果不启动beat,worker是没有活干的
启动beat和启动worker无先后
celery -A celery_task beat -l info
script/2、celery的包结构/celery_task/celery.py
# 这个文件的名字,必须叫celery;包名可以随便叫
from celery import Celery
broker = "redis://127.0.0.1:6379/1"
backend = "redis://127.0.0.1:6379/2"
# 1、实例化得到celery对象
app = Celery(__name__, backend=backend, broker=broker, include=[
"celery_task.course_task", # 重包的位置开始都导
"celery_task.order_task",
"celery_task.user_task",
]) # 拿到对象;include是一个列表;放被管理的task
# 原来,任务写在这个py文件里面
# 后期任务非常多,可能有用户相关的、课程相关的、订单相关的
########## 注册定时任务 ##########
# 需要先修改celery的配置信息
# 时区
app.conf.timezone = "Asia/Shanghai"
# 是否使用UTC
app.conf.enable_utc = False
# 任务的定时配置,通过app.conf.beat_schedule
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
# send_sms_3_seconds是任务名,随便起
'send_sms_3_seconds': {
'task': 'celery_task.user_task.send_sms', # 指定执行的是哪个任务
'schedule': timedelta(seconds=3), # 每3秒钟发一次短信
# 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
'args': ("15052585635", "5202"), # 指定任务的参数
},
'make_order_5_seconds': {
'task': 'celery_task.order_task.make_order', # 指定执行的是哪个任务
'schedule': timedelta(seconds=3), # 每5秒钟生成一个订单
# 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
},
'add_every_1_seconds': {
'task': 'celery_task.course_task.add', # 指定执行的是哪个任务
'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
'args': (3, 5), # 指定任务的参数
},
}
django中集成celery实现异步任务
现在包已经写好了,只需要把包复制到项目的跟路径下面
了解
我们之前学redis的时候,有单独的redis,也有django_redis
我们现在学的celery,也有第三方的:django-celery,方便我们使用。但是这个第三方写的包的版本,跟celery和django版本完全对应。不然就报错
我们自己使用包结构,集成到django中
把写好的包,直接复制到根路径下
启动django项目,现在访问路由,调用视图函数
现在任务已经到redis里面了,但是没有执行,因为worker没有启动
正常情况下没Django的项目和celery的项目要一起启动起来
我们自己用包模式
第一步:把写好的包,直接复制到根路径下
第二步:在视图类中(视图函数中),正常写就OK
from django.core.cache import cache
# 用户提交一个请求,就发一个短信(异步发送)
from celery_task.user_task import send_sms
def test(request):
mobile = request.GET.get("mobile")
code = "9999"
res = send_sms.delay(mobile, code) # 将发送短信的方法,封装成异步的,res这里就是快速的生成一个id
print(res)
return HttpResponse(res)
django中集成celery实现定时任务
实现定时更新首页轮播图
先要将首页轮播图加入缓存?为啥要干这个事?
因为我们现在写了路飞的首页,现在来一个人,就会去查一次数据库,这样的话,人多了,压力就会大
如果同时来了1w人,就差了1w次的数据库,所以就需要把轮播图加入缓存中,因为轮播图基本上不变,第一个人查的时候,从数据库里面查出来,放在内存里面,以后再拿就从缓存里面拿
总结一下:
-因为如果不加缓存,I每次用户访问首页,都去查一次数据库,对数据库压力大
-第一次访问查数据库,拿到数据,放到缓存(redis)
说一下:双写一直性的问题,面试经常问到
比如,现在图片缓存在缓存里,现在图片变了,后面人来访问的时候,就不是新的了,拿的都是redis里面的老的
双写一致性的问题(redis缓存和mysql数据不同步)
有方案
缓存穿透、缓存击穿、缓存雪崩的问题
先改造一下轮播图这个接口:将首页轮播图加入缓存
luffy_api/apps/home/views.py
from django.shortcuts import render
from rest_framework.viewsets import GenericViewSet, ViewSetMixin
from rest_framework.mixins import ListModelMixin
from rest_framework.generics import ListAPIView
from . import models
from . import serializer
from django.conf import settings # 这个是django的配置文件,优先使用项目的配置信息,如果没有,使用内置的
# 而不是导入from settings import dev
from django.core.cache import cache
from rest_framework.response import Response
# class BannerView(GenericViewSet,ListModelMixin): 和下面的效果是一样的
class BannerView(ViewSetMixin, ListAPIView):
queryset = models.Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[
:settings.BANNER_COUNT] # 拿所有
serializer_class = serializer.BannerSerializer
# 上面这个是没有走缓存的,想要走缓存,重写一下list,因为查询所有就会触发list的执行
def list(self, request, *args, **kwargs): # ListModelMixin里面的get方法
# 先去缓存中获取,如果缓存有,直接返回;如果缓存没有,去数据库查询,放到缓存
# 先去缓存中获取
banner_list = cache.get("banner_list_cache")
if not banner_list: # 如果缓存中没有值
# 没有走缓存,查了数据库
print("走了数据库")
res = super().list(request, *args, **kwargs) # 这个就是去数据库中获取,得到的是Response对象
banner_list = res.data # 数据是是在data里面
# 拿到之后,放到缓存里面
cache.set("banner_list_cache", banner_list) # key:value的形式
return Response(data=banner_list) # 如果缓存中有值;直接返回回去;这里没有用封装好的APIResponse,用的是Response,因为前端已经写好了
补充:
Python中的定时任务,延时任务一般使用apscheduler去写,但是写异步任务的话,还是用celery。因为apscheduler比较轻量一些
Python 定时任务框架
pip install apscheduler
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
@scheduler.scheduled_job('cron', hour='8-23')
def request_update_status():
print('Doing job')
scheduler.start()
在项目中,可以自己封装一个视图类,让其实现走缓存
双写一致性问题
讲的是redis和mysql数据一致性的问题
缓存更新策略
策略1:先更新数据库,再更新缓存
极限情况下的问题是:数据库更新的,缓存还没有更新完,用户的请求过来了,用的还是缓存里面老的数据
策略2:先删缓存,再更新数据库
极限情况下的问题是:缓存删了,数据库还没有更新完,用户的请求过来了,会查库里面的数据,再缓存到缓存里面,后面一直用的是老数据
策略3:先更新数据库,再删缓存
极限情况下的问题是:类似于策略1,可靠性都高一点
策略4:定时更新
每隔一段时间更新一下缓存,对实时性要求不高的可以这个用
说到定时就用到了celery
通过定时更新处理首页轮播图缓存不一致问题
理论上现在就是要启动worker和beat就可以实现定时更新轮播图了
但是运行起来之后,就会报错
上面在home_task里面,用到了django的orm、序列化类用了cache,但是celery是一个独立的服务,跟django没有关系。
下面就是这么在脚本中调用django的环境
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffy_api.settings.dev")
import django #导入django
django.setup() # 启动django
现在启动worker和beat就正常了
但是有一个新的问题,访问接口的时候图片没有前面的地址
但是在视图函数那边是有这个地址的,因为视图函数那边有request对象。有这个请求上下文,代码内部会自动的把这个地址拼在前面
在第三方里面,就没有request对象,就缺了前面这个地址,需要手动拼上前面的地址