0
点赞
收藏
分享

微信扫一扫

【实战项目】Django-Vue009---Celery、celery的基本使用、celery的包结构、celery执行延迟、异步任务、定时任务、双写一致性问题、定时更新处理首页轮播图


Celery(芹菜)

是一个分布式异步任务框架,是一个框架。跟其他web框架无关(跟其他web框架无关)

Celery 官网:http://www.celeryproject.org/ Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/ Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统

Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.
Celery是一个资金很少的项目,因此我们不支持Microsoft Windows。请不要打开与该平台相关的任何问题。

1)可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket)
2)celery服务为为其他项目服务提供异步解决任务需求的
注:会有两个服务同时运行,一个是项目服务(我们的项目是django),一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求

例子:

人是一个独立运行的服务(django) | 医院也是一个独立运行的服务(celery)
	正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时(处理异步任务),就会被医院接收,解决人生病问题
	人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求

架构:

broker:任务中间件
		用户提交的任务,存在这个里面(可以用redis、rabbitmq(专业的消息队列))
		就是生产者、消费者模型。比如厨师生产包子,放在盘子里面,等客户来吃,这个任务中间件就是这个盘子
	worker:任务执行者
		就是消费者,真正执行任务的进程(真正干活的人)
	backend:任务结果存储
		任务执行后的结果(可以用redis、rabbitmq(专业的消息队列))
	Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和 任务执行结果存储(task result store)组成。

【实战项目】Django-Vue009---Celery、celery的基本使用、celery的包结构、celery执行延迟、异步任务、定时任务、双写一致性问题、定时更新处理首页轮播图_python


【实战项目】Django-Vue009---Celery、celery的基本使用、celery的包结构、celery执行延迟、异步任务、定时任务、双写一致性问题、定时更新处理首页轮播图_django_02

celery服务为为其他项目服务提供异步解决任务需求的
celery可以做的事:
	异步任务
	延迟任务
	定时任务(其他方式也可以解决,网上有其他框架)

celery的基本使用

先安装一下

pip install celery==5.1.2

celery的快速使用/celery_task.py

# 先安装一下
# pip install celery==5.1.2

from celery import Celery

# Celery里面至少要传这几个参数
# main = None,    这个就是一个名字,可以写成__name__,也可以写其他的
# backend=None,  结果存储,是redis的地址:backend='redis://:123456@127.0.0.1:6379/1'
# 	redis: :redis协议
# 	123456: : 密码
# 	127.0.0.1:6379 :redis 地址
# 	1  :第一个库
# broker=None,  也是redis的地址:broker='redis://:123456@127.0.0.1:6379/2'
broker = "redis://127.0.0.1:6379/1"  # 消息中间件,任务提交到1里面
backend = "redis://127.0.0.1:6379/2"  # 结果存在2里面

# 1、实例化得到celery对象
app = Celery(__name__, backend=backend, broker=broker)  # 拿到对象


# 2、写一堆任务(计算a+b、挖井、砍树),就是函数
# 3、使用装饰器,包裹任务。用Celery管理任务

@app.task()
def add(a, b):
	import time
	time.sleep(2)
	return a + b

# 下面是替提交任务,因为上面的3个部分已经写好了,需要人或者程序去提交任务

celery的快速使用/提交任务.py

import celery_task

# 1、同步执行(既然同步执行的话,就按照之前的写法就可以了,不需要这样写)
# res = celery_task.add(2, 3)  # 这个一个普通的同步任务(会等着2+3执行完了,再返回结果)
# print(res)  # 同步执行的,等两秒钟之后,会出现结果5

# 异步任务:
# 第一步:提交(使用任务名.apply_async(args=[参数,参数])或者任务名.apply_async(kwargs={"a":2,"b":3})
# 结果是任务的ID号,唯一标识这个任务
res = celery_task.add.apply_async(kwargs={"a": 2, "b": 3})  # add.apply_async(2,3),括号里面是add函数的参数
print(res)  # 0e615f80-0aae-4ecd-8bc4-ee0c94545b8a
# 图片3
# 第二步:让worker执行---结果存到redis中
	# 通过命令启动celery(app)服务:
	# 非windows
		# 5.x之前的用的是下面的命令,现在celery用的是5.1.2
			# celery worker -A celery_task -l info
		# 5.x之之后用的是下面的命令(把-A参数放在前面)
			# celery -A celery_task worker -l info
				# -A celery_task:指的是对应的应用程序
				# -l info:对应的日志界别
	# windows:(官方不支持,需要装一个eventlet)
		# pip3 install eventlet(eventlet==0.32.0)(遇到一个坑,装eventlet的时候,会顺带安装dnspython,这个版本用2.0.0)
		# 5.x之前的用的是下面的命令,现在celery用的是5.1.2
			# celery worker -A celery_task -l info -P eventlet
		# 5.x之之后用的是下面的命令(把-A参数放在前面)
			# celery -A celery_task worker -l info -P eventlet

【实战项目】Django-Vue009---Celery、celery的基本使用、celery的包结构、celery执行延迟、异步任务、定时任务、双写一致性问题、定时更新处理首页轮播图_缓存_03

script/1、celery的快速使用/查看任务结果.py

# 第三步:查看任务执行的结果
# 查看执行结果,在哪里都可以
from celery_task import app

from celery.result import AsyncResult

id = '0e615f80-0aae-4ecd-8bc4-ee0c94545b8a'  # 提交任务时,生成的id号
if __name__ == '__main__':
	a = AsyncResult(id=id, app=app)  # 实例化得到一个对象
	if a.successful():
		result = a.get()  # 如果成功,就获取结果,异步任务得到的结果
		print("任务执行成功了")
		print(result)
	elif a.failed():
		print('任务失败')
	elif a.status == 'PENDING':
		print('任务等待中被执行')
	elif a.status == 'RETRY':
		print('任务异常后正在重试')
	elif a.status == 'STARTED':
		print('任务已经开始被执行')

celery的包结构

这种方式是比较常用的,做成一个包,后面可以在其他项目里面直接用

【实战项目】Django-Vue009---Celery、celery的基本使用、celery的包结构、celery执行延迟、异步任务、定时任务、双写一致性问题、定时更新处理首页轮播图_python_04

现在把拆开

【实战项目】Django-Vue009---Celery、celery的基本使用、celery的包结构、celery执行延迟、异步任务、定时任务、双写一致性问题、定时更新处理首页轮播图_后端_05


【实战项目】Django-Vue009---Celery、celery的基本使用、celery的包结构、celery执行延迟、异步任务、定时任务、双写一致性问题、定时更新处理首页轮播图_redis_06

提交任务,一般是在项目里面,不是在这个包下面;在任意位置都可以

【实战项目】Django-Vue009---Celery、celery的基本使用、celery的包结构、celery执行延迟、异步任务、定时任务、双写一致性问题、定时更新处理首页轮播图_django_07


【实战项目】Django-Vue009---Celery、celery的基本使用、celery的包结构、celery执行延迟、异步任务、定时任务、双写一致性问题、定时更新处理首页轮播图_python_08

上面就是这个包结构
下面可以把worker启动起来,没有任务的话,就等在那,一有任务的话,就立马执行了(也可以先提交任务)

【实战项目】Django-Vue009---Celery、celery的基本使用、celery的包结构、celery执行延迟、异步任务、定时任务、双写一致性问题、定时更新处理首页轮播图_缓存_09


【实战项目】Django-Vue009---Celery、celery的基本使用、celery的包结构、celery执行延迟、异步任务、定时任务、双写一致性问题、定时更新处理首页轮播图_redis_10

只要没有任务就一直卡正这,只要服务不停

【实战项目】Django-Vue009---Celery、celery的基本使用、celery的包结构、celery执行延迟、异步任务、定时任务、双写一致性问题、定时更新处理首页轮播图_后端_11

执行完了之后,结果就在结果存储里面了
我们直接可以通过任务id,查看结果

【实战项目】Django-Vue009---Celery、celery的基本使用、celery的包结构、celery执行延迟、异步任务、定时任务、双写一致性问题、定时更新处理首页轮播图_redis_12

再提交一个生成订单的任务。效果也是一样的

celery执行延迟任务

延迟任务跟worker没有关系,worker有活就干,没有活就闲置在那边
延迟任务就是提交的方式跟之前不一样了,之前是使用res = order_task.make_order.apply_async();是使用apply_async()去提交的
现在提交的时候,只需要告知几分钟之后执行就行了。是固定用法

【实战项目】Django-Vue009---Celery、celery的基本使用、celery的包结构、celery执行延迟、异步任务、定时任务、双写一致性问题、定时更新处理首页轮播图_缓存_13


【实战项目】Django-Vue009---Celery、celery的基本使用、celery的包结构、celery执行延迟、异步任务、定时任务、双写一致性问题、定时更新处理首页轮播图_django_14


【实战项目】Django-Vue009---Celery、celery的基本使用、celery的包结构、celery执行延迟、异步任务、定时任务、双写一致性问题、定时更新处理首页轮播图_缓存_15


【实战项目】Django-Vue009---Celery、celery的基本使用、celery的包结构、celery执行延迟、异步任务、定时任务、双写一致性问题、定时更新处理首页轮播图_redis_16

这个就是延迟任务的代码,就是加一个时间
代码:

from celery_task import user_task, order_task

###########celery 执行延迟任务#############
# 添加延迟任务
from datetime import datetime, timedelta

eta = datetime.utcnow() + timedelta(seconds=50)
# 50秒后发送短信
res = user_task.send_sms.apply_async(args=(200, 50), eta=eta)
print(res)

执行异步任务

方式一: apply_async后面不加时间,就是立即执行

user_task.send_sms.apply_async(args=(200, 50))

方式二:使用delay

【实战项目】Django-Vue009---Celery、celery的基本使用、celery的包结构、celery执行延迟、异步任务、定时任务、双写一致性问题、定时更新处理首页轮播图_缓存_17

# 使用第二种方式执行异步任务
res = user_task.send_sms.delay("1526589745", "8888")    #后面不用传时间,有几个参数就传几个参数
print(res)

执行定时任务

celery能干3件事,执行异步任务、执行延迟任务、执行定时任务
定时任务,是程序启动之前就已经写好了。任务不是人为的去提交了
需要在celery.py里面注册定时任务,因为现在用的是UTC时间,需要修改celery的配置信息

【实战项目】Django-Vue009---Celery、celery的基本使用、celery的包结构、celery执行延迟、异步任务、定时任务、双写一致性问题、定时更新处理首页轮播图_后端_18

app.conf 就是整个celery的配置信息
现在里面没有配置,我们可以配置一下,配置过之后,我们就不需要使用UTC时间了,直接使用本地的上海的时间就可以了

【实战项目】Django-Vue009---Celery、celery的基本使用、celery的包结构、celery执行延迟、异步任务、定时任务、双写一致性问题、定时更新处理首页轮播图_python_19


【实战项目】Django-Vue009---Celery、celery的基本使用、celery的包结构、celery执行延迟、异步任务、定时任务、双写一致性问题、定时更新处理首页轮播图_python_20

现在启动worker,任务还是不能执行,还需要在启动一个东西:beat
beat就相当于我们之前手动提交任务给worker执行,现在是定时任务是beat帮我们提交
celery -A celery_task beat -l info
celery_task是包名;启动的时候,需要新开一个窗口,不能在之前的worker停掉

【实战项目】Django-Vue009---Celery、celery的基本使用、celery的包结构、celery执行延迟、异步任务、定时任务、双写一致性问题、定时更新处理首页轮播图_django_21

汇总:

1、在包(celery_task)下的celery.py下写入

########## 注册定时任务 ##########
# 需要先修改celery的配置信息
# 时区
app.conf.timezone = "Asia/Shanghai"
# 是否使用UTC
app.conf.enable_utc = False

# 任务的定时配置,通过app.conf.beat_schedule
from datetime import timedelta
from celery.schedules import crontab

app.conf.beat_schedule = {
	# send_sms_3_seconds是任务名,随便起
	'send_sms_3_seconds': {
		'task': 'celery_task.user_task.send_sms',  # 指定执行的是哪个任务
		'schedule': timedelta(seconds=3),  # 每3秒钟发一次短信
		# 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
		'args': ("15052585635", "5202"),  # 指定任务的参数
	},
	'make_order_5_seconds': {
		'task': 'celery_task.order_task.make_order',  # 指定执行的是哪个任务
		'schedule': timedelta(seconds=3),  # 每5秒钟生成一个订单
		# 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
	},
	'add_every_1_seconds': {
		'task': 'celery_task.course_task.add',  # 指定执行的是哪个任务
		'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
		'args': (3, 5),  # 指定任务的参数
	},
}

2、启动worker

celery -A celery_task worker -l info -P eventlet

3、启动beat

如果不启动beat,worker是没有活干的
		启动beat和启动worker无先后
		celery -A celery_task beat -l info

script/2、celery的包结构/celery_task/celery.py

# 这个文件的名字,必须叫celery;包名可以随便叫


from celery import Celery

broker = "redis://127.0.0.1:6379/1"
backend = "redis://127.0.0.1:6379/2"

# 1、实例化得到celery对象
app = Celery(__name__, backend=backend, broker=broker, include=[
	"celery_task.course_task",  # 重包的位置开始都导
	"celery_task.order_task",
	"celery_task.user_task",
])  # 拿到对象;include是一个列表;放被管理的task
# 原来,任务写在这个py文件里面
# 后期任务非常多,可能有用户相关的、课程相关的、订单相关的


########## 注册定时任务 ##########
# 需要先修改celery的配置信息
# 时区
app.conf.timezone = "Asia/Shanghai"
# 是否使用UTC
app.conf.enable_utc = False

# 任务的定时配置,通过app.conf.beat_schedule
from datetime import timedelta
from celery.schedules import crontab

app.conf.beat_schedule = {
	# send_sms_3_seconds是任务名,随便起
	'send_sms_3_seconds': {
		'task': 'celery_task.user_task.send_sms',  # 指定执行的是哪个任务
		'schedule': timedelta(seconds=3),  # 每3秒钟发一次短信
		# 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
		'args': ("15052585635", "5202"),  # 指定任务的参数
	},
	'make_order_5_seconds': {
		'task': 'celery_task.order_task.make_order',  # 指定执行的是哪个任务
		'schedule': timedelta(seconds=3),  # 每5秒钟生成一个订单
		# 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
	},
	'add_every_1_seconds': {
		'task': 'celery_task.course_task.add',  # 指定执行的是哪个任务
		'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
		'args': (3, 5),  # 指定任务的参数
	},
}

django中集成celery实现异步任务

现在包已经写好了,只需要把包复制到项目的跟路径下面

【实战项目】Django-Vue009---Celery、celery的基本使用、celery的包结构、celery执行延迟、异步任务、定时任务、双写一致性问题、定时更新处理首页轮播图_后端_22

了解

我们之前学redis的时候,有单独的redis,也有django_redis
	我们现在学的celery,也有第三方的:django-celery,方便我们使用。但是这个第三方写的包的版本,跟celery和django版本完全对应。不然就报错
	我们自己使用包结构,集成到django中

把写好的包,直接复制到根路径下

【实战项目】Django-Vue009---Celery、celery的基本使用、celery的包结构、celery执行延迟、异步任务、定时任务、双写一致性问题、定时更新处理首页轮播图_redis_23

启动django项目,现在访问路由,调用视图函数

【实战项目】Django-Vue009---Celery、celery的基本使用、celery的包结构、celery执行延迟、异步任务、定时任务、双写一致性问题、定时更新处理首页轮播图_python_24

现在任务已经到redis里面了,但是没有执行,因为worker没有启动
正常情况下没Django的项目和celery的项目要一起启动起来

【实战项目】Django-Vue009---Celery、celery的基本使用、celery的包结构、celery执行延迟、异步任务、定时任务、双写一致性问题、定时更新处理首页轮播图_缓存_25

我们自己用包模式

第一步:把写好的包,直接复制到根路径下
第二步:在视图类中(视图函数中),正常写就OK

from django.core.cache import cache

# 用户提交一个请求,就发一个短信(异步发送)
from celery_task.user_task import send_sms


def test(request):
	mobile = request.GET.get("mobile")
	code = "9999"
	res = send_sms.delay(mobile, code)  # 将发送短信的方法,封装成异步的,res这里就是快速的生成一个id
	print(res)
	return HttpResponse(res)

django中集成celery实现定时任务

实现定时更新首页轮播图
先要将首页轮播图加入缓存?为啥要干这个事?
因为我们现在写了路飞的首页,现在来一个人,就会去查一次数据库,这样的话,人多了,压力就会大
如果同时来了1w人,就差了1w次的数据库,所以就需要把轮播图加入缓存中,因为轮播图基本上不变,第一个人查的时候,从数据库里面查出来,放在内存里面,以后再拿就从缓存里面拿
总结一下:
	-因为如果不加缓存,I每次用户访问首页,都去查一次数据库,对数据库压力大
	-第一次访问查数据库,拿到数据,放到缓存(redis)

说一下:双写一直性的问题,面试经常问到

比如,现在图片缓存在缓存里,现在图片变了,后面人来访问的时候,就不是新的了,拿的都是redis里面的老的
		双写一致性的问题(redis缓存和mysql数据不同步)
			有方案
		缓存穿透、缓存击穿、缓存雪崩的问题

先改造一下轮播图这个接口:将首页轮播图加入缓存

【实战项目】Django-Vue009---Celery、celery的基本使用、celery的包结构、celery执行延迟、异步任务、定时任务、双写一致性问题、定时更新处理首页轮播图_redis_26


【实战项目】Django-Vue009---Celery、celery的基本使用、celery的包结构、celery执行延迟、异步任务、定时任务、双写一致性问题、定时更新处理首页轮播图_django_27

luffy_api/apps/home/views.py

from django.shortcuts import render
from rest_framework.viewsets import GenericViewSet, ViewSetMixin
from rest_framework.mixins import ListModelMixin
from rest_framework.generics import ListAPIView
from . import models
from . import serializer

from django.conf import settings  # 这个是django的配置文件,优先使用项目的配置信息,如果没有,使用内置的

# 而不是导入from settings import dev
from django.core.cache import cache
from rest_framework.response import Response


# class BannerView(GenericViewSet,ListModelMixin): 和下面的效果是一样的
class BannerView(ViewSetMixin, ListAPIView):
	queryset = models.Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[
	           :settings.BANNER_COUNT]  # 拿所有
	serializer_class = serializer.BannerSerializer

	# 上面这个是没有走缓存的,想要走缓存,重写一下list,因为查询所有就会触发list的执行
	def list(self, request, *args, **kwargs):  # ListModelMixin里面的get方法
		# 先去缓存中获取,如果缓存有,直接返回;如果缓存没有,去数据库查询,放到缓存

		# 先去缓存中获取
		banner_list = cache.get("banner_list_cache")
		if not banner_list:  # 如果缓存中没有值
			# 没有走缓存,查了数据库
			print("走了数据库")
			res = super().list(request, *args, **kwargs)  # 这个就是去数据库中获取,得到的是Response对象
			banner_list = res.data  # 数据是是在data里面
			# 拿到之后,放到缓存里面
			cache.set("banner_list_cache", banner_list)  # key:value的形式
		return Response(data=banner_list)  # 如果缓存中有值;直接返回回去;这里没有用封装好的APIResponse,用的是Response,因为前端已经写好了

补充:

Python中的定时任务,延时任务一般使用apscheduler去写,但是写异步任务的话,还是用celery。因为apscheduler比较轻量一些
Python 定时任务框架

【实战项目】Django-Vue009---Celery、celery的基本使用、celery的包结构、celery执行延迟、异步任务、定时任务、双写一致性问题、定时更新处理首页轮播图_redis_28

pip install apscheduler

from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
@scheduler.scheduled_job('cron', hour='8-23')
def request_update_status():
    print('Doing job')

scheduler.start()

在项目中,可以自己封装一个视图类,让其实现走缓存

双写一致性问题

讲的是redis和mysql数据一致性的问题
缓存更新策略

策略1:先更新数据库,再更新缓存
		极限情况下的问题是:数据库更新的,缓存还没有更新完,用户的请求过来了,用的还是缓存里面老的数据
	策略2:先删缓存,再更新数据库
		极限情况下的问题是:缓存删了,数据库还没有更新完,用户的请求过来了,会查库里面的数据,再缓存到缓存里面,后面一直用的是老数据
	策略3:先更新数据库,再删缓存
		极限情况下的问题是:类似于策略1,可靠性都高一点
	策略4:定时更新
		每隔一段时间更新一下缓存,对实时性要求不高的可以这个用
		说到定时就用到了celery

通过定时更新处理首页轮播图缓存不一致问题

【实战项目】Django-Vue009---Celery、celery的基本使用、celery的包结构、celery执行延迟、异步任务、定时任务、双写一致性问题、定时更新处理首页轮播图_django_29

理论上现在就是要启动worker和beat就可以实现定时更新轮播图了
但是运行起来之后,就会报错

【实战项目】Django-Vue009---Celery、celery的基本使用、celery的包结构、celery执行延迟、异步任务、定时任务、双写一致性问题、定时更新处理首页轮播图_缓存_30

上面在home_task里面,用到了django的orm、序列化类用了cache,但是celery是一个独立的服务,跟django没有关系。
下面就是这么在脚本中调用django的环境

import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffy_api.settings.dev")
import django   #导入django
django.setup()  # 启动django

【实战项目】Django-Vue009---Celery、celery的基本使用、celery的包结构、celery执行延迟、异步任务、定时任务、双写一致性问题、定时更新处理首页轮播图_缓存_31

现在启动worker和beat就正常了
但是有一个新的问题,访问接口的时候图片没有前面的地址

【实战项目】Django-Vue009---Celery、celery的基本使用、celery的包结构、celery执行延迟、异步任务、定时任务、双写一致性问题、定时更新处理首页轮播图_python_32

但是在视图函数那边是有这个地址的,因为视图函数那边有request对象。有这个请求上下文,代码内部会自动的把这个地址拼在前面
在第三方里面,就没有request对象,就缺了前面这个地址,需要手动拼上前面的地址

【实战项目】Django-Vue009---Celery、celery的基本使用、celery的包结构、celery执行延迟、异步任务、定时任务、双写一致性问题、定时更新处理首页轮播图_python_33


举报

相关推荐

0 条评论