背景
因需要解决博客验证码异步发送的需要,需要用到大名鼎鼎的Celery异步框架,但在windows下测试失败,报错如下。
celery In [8]: s.result Out[8]: ValueError('not enough values to unpack (expected 3, got 0)')
场景还原
本地环境如下:
- Windows 10
- Python 3.10
- Celery 5.2.1
代码tasks.py
:
from celery import Celery
app = Celery('tyj',broker='redis://:@127.0.0.1:6379/2',backend='redis://:@127.0.0.1:6379/3')
@app.task
def task_test(a,b):
print('task is running....')
return a + b
执行worker
D:\PycharmProjects>celery -A tasks worker -l info
输出:
D:\PycharmProjects>celery -A tasks worker -l info
-------------- celery@DESKTOP-041LA6S v5.2.1 (dawn-chorus)
--- ***** -----
-- ******* ---- Windows-10-10.0.19043-SP0 2021-12-14 17:19:59
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: tyj:0x1b6b08511b0
- ** ---------- .> transport: redis://127.0.0.1:6379/2
- ** ---------- .> results: redis://127.0.0.1:6379/3
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. tasks.task_test
[2021-12-14 17:19:59,524: INFO/MainProcess] Connected to redis://127.0.0.1:6379/2
[2021-12-14 17:19:59,537: INFO/MainProcess] mingle: searching for neighbors
[2021-12-14 17:20:00,066: INFO/SpawnPoolWorker-1] child process 9736 calling self.run()
[2021-12-14 17:20:00,100: INFO/SpawnPoolWorker-2] child process 18228 calling self.run()
[2021-12-14 17:20:00,199: INFO/SpawnPoolWorker-3] child process 18712 calling self.run()
[2021-12-14 17:20:00,206: INFO/SpawnPoolWorker-4] child process 17388 calling self.run()
[2021-12-14 17:20:00,210: INFO/SpawnPoolWorker-6] child process 12604 calling self.run()
[2021-12-14 17:20:00,215: INFO/SpawnPoolWorker-5] child process 8404 calling self.run()
[2021-12-14 17:20:00,228: INFO/SpawnPoolWorker-8] child process 12112 calling self.run()
[2021-12-14 17:20:00,239: INFO/SpawnPoolWorker-7] child process 5076 calling self.run()
[2021-12-14 17:20:00,584: INFO/MainProcess] mingle: all alone
[2021-12-14 17:20:00,616: INFO/MainProcess] celery@DESKTOP-041LA6S ready.
调用任务代码:
C:\Users\66907>d:
D:\>cd pycharmProjects
D:\PycharmProjects>ipython
Python 3.10.0 (tags/v3.10.0:b494f59, Oct 4 2021, 19:00:18) [MSC v.1929 64 bit (AMD64)]
Type 'copyright', 'credits' or 'license' for more information
IPython 7.30.1 -- An enhanced Interactive Python. Type '?' for help.
In [1]: from tasks import task_test
In [2]: s = task_test.delay(10,100)
In [3]: s.result
celery In [4]: s.result Out[8]: ValueError('not enough values to unpack (expected 3, got 0)')
- Traceback (most recent call last):
- File "d:\programmingsoftware\python35\lib\site-packages\billiard\pool.py", line 358, in workloop
- result = (True, prepare_result(fun(*args, **kwargs)))
- File "d:\programmingsoftware\python35\lib\site-packages\celery\app\trace.py", line 525, in _fast_trace_task
- tasks, accept, hostname = _loc
- ValueError: not enough values to unpack (expected 3, got 0)
备注:报错代码不选
解决:
1安装一个库
D:\PycharmProjects>pip install eventlet
2然后启动worker的时候添加一个参数:
#D:\PycharmProjects>celery -A tasks worker -l info
D:\PycharmProjects>celery -A tasks worker -l info -P eventlet #window下必须要加-P eventlet
测试:
1新建一文件tasks.py,内容如下:
from celery import Celery
app = Celery('tyj',broker='redis://:@127.0.0.1:6379/2',backend='redis://:@127.0.0.1:6379/3')
@app.task
def task_test(a,b):
print('task is running....')
return a + b
2打开cmd并切换至该目录,然后执行worker:
D:\PycharmProjects>celery -A tasks worker -l info -P eventlet
-------------- celery@DESKTOP-041LA6S v5.2.1 (dawn-chorus)
--- ***** -----
-- ******* ---- Windows-10-10.0.19043-SP0 2021-12-14 17:34:14
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: tyj:0x1cfd58dffa0
- ** ---------- .> transport: redis://127.0.0.1:6379/2
- ** ---------- .> results: redis://127.0.0.1:6379/3
- *** --- * --- .> concurrency: 8 (eventlet)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. tasks.task_test
[2021-12-14 17:34:14,428: INFO/MainProcess] Connected to redis://127.0.0.1:6379/2
[2021-12-14 17:34:14,435: INFO/MainProcess] mingle: searching for neighbors
[2021-12-14 17:34:15,472: INFO/MainProcess] mingle: all alone
[2021-12-14 17:34:15,499: INFO/MainProcess] pidbox: Connected to redis://127.0.0.1:6379/2.
[2021-12-14 17:34:15,502: INFO/MainProcess] celery@DESKTOP-041LA6S ready.
3再打开一个cmd窗口并切换至文件目录然后在该目录下打开ipython(pip insyall ipython)
#模拟生产者
C:\Users\66907>d:
D:\>cd pycharmProjects
D:\PycharmProjects>ipython
Python 3.10.0 (tags/v3.10.0:b494f59, Oct 4 2021, 19:00:18) [MSC v.1929 64 bit (AMD64)]
Type 'copyright', 'credits' or 'license' for more information
IPython 7.30.1 -- An enhanced Interactive Python. Type '?' for help.
In [1]: from tasks import task_test
In [2]: s = task_test.delay(10,100)
In [3]: s.result
Out[3]: 110
名词解释:
broker:存储消息队列的容器,例如redis-list
backend:存储执行的结果
worker:执行broker任务的进程
django把任务推给broker后就干别的事情了,而worker发现broker里有队列就会主动去执行,并把结果存储到backenf里。
异步框架celery介绍:
celery分布式异步消息任务队列主要适用于两大类场景:
1异步:
针对并发量高的任务,可将任务放入celery任务队列中,并从Celery获取一个任务ID。后续通过询问Celery来得知任务执行状态和进度
定时:
需要定时执行同样的任务,Celery任务队列支持定时触发,可以按照时间间隔或者crontab表达式来触发任务。
缺点:
可能是对一些即时性任务比较强的业务场景不太适合吧!