0
点赞
收藏
分享

微信扫一扫

14Django-windows环境下如何使用Celery

背景

因需要解决博客验证码异步发送的需要,需要用到大名鼎鼎的Celery异步框架,但在windows下测试失败,报错如下。

celery  In [8]: s.result Out[8]: ValueError('not enough values to unpack (expected 3, got 0)')

场景还原

本地环境如下:

  • Windows 10
  • Python 3.10
  • Celery 5.2.1

代码​​tasks.py​​:

from celery import Celery

app = Celery('tyj',broker='redis://:@127.0.0.1:6379/2',backend='redis://:@127.0.0.1:6379/3')

@app.task
def task_test(a,b):
print('task is running....')
return a + b

执行worker

D:\PycharmProjects>celery -A tasks worker -l info

输出:

D:\PycharmProjects>celery -A tasks worker -l info

-------------- celery@DESKTOP-041LA6S v5.2.1 (dawn-chorus)
--- ***** -----
-- ******* ---- Windows-10-10.0.19043-SP0 2021-12-14 17:19:59
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: tyj:0x1b6b08511b0
- ** ---------- .> transport: redis://127.0.0.1:6379/2
- ** ---------- .> results: redis://127.0.0.1:6379/3
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery


[tasks]
. tasks.task_test

[2021-12-14 17:19:59,524: INFO/MainProcess] Connected to redis://127.0.0.1:6379/2
[2021-12-14 17:19:59,537: INFO/MainProcess] mingle: searching for neighbors
[2021-12-14 17:20:00,066: INFO/SpawnPoolWorker-1] child process 9736 calling self.run()
[2021-12-14 17:20:00,100: INFO/SpawnPoolWorker-2] child process 18228 calling self.run()
[2021-12-14 17:20:00,199: INFO/SpawnPoolWorker-3] child process 18712 calling self.run()
[2021-12-14 17:20:00,206: INFO/SpawnPoolWorker-4] child process 17388 calling self.run()
[2021-12-14 17:20:00,210: INFO/SpawnPoolWorker-6] child process 12604 calling self.run()
[2021-12-14 17:20:00,215: INFO/SpawnPoolWorker-5] child process 8404 calling self.run()
[2021-12-14 17:20:00,228: INFO/SpawnPoolWorker-8] child process 12112 calling self.run()
[2021-12-14 17:20:00,239: INFO/SpawnPoolWorker-7] child process 5076 calling self.run()
[2021-12-14 17:20:00,584: INFO/MainProcess] mingle: all alone
[2021-12-14 17:20:00,616: INFO/MainProcess] celery@DESKTOP-041LA6S ready.

调用任务代码:

C:\Users\66907>d:

D:\>cd pycharmProjects

D:\PycharmProjects>ipython
Python 3.10.0 (tags/v3.10.0:b494f59, Oct 4 2021, 19:00:18) [MSC v.1929 64 bit (AMD64)]
Type 'copyright', 'credits' or 'license' for more information
IPython 7.30.1 -- An enhanced Interactive Python. Type '?' for help.

In [1]: from tasks import task_test

In [2]: s = task_test.delay(10,100)

In [3]: s.result

celery  In [4]: s.result Out[8]: ValueError('not enough values to unpack (expected 3, got 0)')

  1. Traceback (most recent call last):
  2. File "d:\programmingsoftware\python35\lib\site-packages\billiard\pool.py", line 358, in workloop
  3. result = (True, prepare_result(fun(*args, **kwargs)))
  4. File "d:\programmingsoftware\python35\lib\site-packages\celery\app\trace.py", line 525, in _fast_trace_task
  5. tasks, accept, hostname = _loc
  6. ValueError: not enough values to unpack (expected 3, got 0)

备注:报错代码不选

解决:

1安装一个库

D:\PycharmProjects>pip install eventlet

2然后启动worker的时候添加一个参数:

#D:\PycharmProjects>celery -A tasks worker -l info
D:\PycharmProjects>celery -A tasks worker -l info -P eventlet #window下必须要加-P eventlet

 

测试:

1新建一文件tasks.py,内容如下:

from celery import Celery

app = Celery('tyj',broker='redis://:@127.0.0.1:6379/2',backend='redis://:@127.0.0.1:6379/3')

@app.task
def task_test(a,b):
print('task is running....')
return a + b

2打开cmd并切换至该目录,然后执行worker:

D:\PycharmProjects>celery -A tasks worker -l info -P eventlet

-------------- celery@DESKTOP-041LA6S v5.2.1 (dawn-chorus)
--- ***** -----
-- ******* ---- Windows-10-10.0.19043-SP0 2021-12-14 17:34:14
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: tyj:0x1cfd58dffa0
- ** ---------- .> transport: redis://127.0.0.1:6379/2
- ** ---------- .> results: redis://127.0.0.1:6379/3
- *** --- * --- .> concurrency: 8 (eventlet)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery


[tasks]
. tasks.task_test

[2021-12-14 17:34:14,428: INFO/MainProcess] Connected to redis://127.0.0.1:6379/2
[2021-12-14 17:34:14,435: INFO/MainProcess] mingle: searching for neighbors
[2021-12-14 17:34:15,472: INFO/MainProcess] mingle: all alone
[2021-12-14 17:34:15,499: INFO/MainProcess] pidbox: Connected to redis://127.0.0.1:6379/2.
[2021-12-14 17:34:15,502: INFO/MainProcess] celery@DESKTOP-041LA6S ready.

3再打开一个cmd窗口并切换至文件目录然后在该目录下打开ipython(pip insyall ipython)

#模拟生产者
C:\Users\66907>d:

D:\>cd pycharmProjects

D:\PycharmProjects>ipython
Python 3.10.0 (tags/v3.10.0:b494f59, Oct 4 2021, 19:00:18) [MSC v.1929 64 bit (AMD64)]
Type 'copyright', 'credits' or 'license' for more information
IPython 7.30.1 -- An enhanced Interactive Python. Type '?' for help.

In [1]: from tasks import task_test

In [2]: s = task_test.delay(10,100)

In [3]: s.result
Out[3]: 110

 

名词解释:

broker:存储消息队列的容器,例如redis-list

backend:存储执行的结果

worker:执行broker任务的进程

django把任务推给broker后就干别的事情了,而worker发现broker里有队列就会主动去执行,并把结果存储到backenf里。

异步框架celery介绍:

celery分布式异步消息任务队列主要适用于两大类场景:

1异步:

针对并发量高的任务,可将任务放入celery任务队列中,并从Celery获取一个任务ID。后续通过询问Celery来得知任务执行状态和进度

定时:

需要定时执行同样的任务,Celery任务队列支持定时触发,可以按照时间间隔或者crontab表达式来触发任务。

缺点:

可能是对一些即时性任务比较强的业务场景不太适合吧!



举报

相关推荐

0 条评论