0
点赞
收藏
分享

微信扫一扫

【Flask+Celery】- 分布式任务工作使用流程


Celery介绍和使用

一.Celery介绍:

【Flask+Celery】- 分布式任务工作使用流程_Redis


一个简单、灵活且可靠、处理大量消息的分布式系统,可以在一台或者多台机器上运行。 Celery是一个功能完备即插即用的任务队列

单个 Celery 进程每分钟可处理数以百万计的任务。通过消息进行通信,使用消息队列(broker)在客户端和消费者之间进行协调。

安装Celery:

$ pip install -U Celery

Celery官方文档:https://docs.celeryq.dev/en/latest/index.html

1. 生产者消费者设计模式

最常用的解耦方式之一,寻找中间人(broker)搭桥,保证两个业务没有直接关联。

我们称这一解耦方式为:生产者消费者设计模式

【Flask+Celery】- 分布式任务工作使用流程_celery分布式任务工作使用流程_02


总结:

  • 生产者生成消息,缓存到消息队列中,消费者读取消息队列中的消息并执行。
  • 由生产者生成发送短信消息,缓存到消息队列中,消费者读取消息队列中的发送短信消息并执行。

2.中间人broker

示例:此处演示Redis数据库作为中间人broker
Celery需要一种解决消息的发送和接受的方式,我们把这种用来存储消息的的中间装置叫做message broker, 也可叫做消息中间人。
作为中间人,我们有几种方案可选择:

2.1.RabbitMQ

RabbitMQ是一个功能完备,稳定的并且易于安装的broker. 它是生产环境中最优的选择。

使用RabbitMQ的细节参照以下链接:http://docs.celeryproject.org/en/latest/getting-started/brokers/rabbitmq.html#broker-rabbitmq

如果使用的是Ubuntu或者Debian发行版的Linux,可以直接通过命令安装RabbitMQ:

sudo apt-get install rabbitmq-server

安装完毕之后,RabbitMQ-server服务器就已经在后台运行。

如果用的并不是Ubuntu或Debian, 可以在以下网址:
http://www.rabbitmq.com/download.html 去查找自己所需要的版本软件。

2.2.Redis

Redis也是一款功能完备的broker可选项,但是其更可能因意外中断或者电源故障导致数据丢失的情况。
关于是由那个Redis作为Broker,可访下面网址: http://docs.celeryproject.org/en/latest/getting-started/brokers/redis.html#broker-redis

2.3. celery框架伪代码

# 队列,中间人
class Broker(object):
    # 任务队列
    broker_list = []

# 消费者
class Worker(object):
    # 任务执行者

    def run(self, broker, func):
        if func in broker.broker_list:
            func()
        else:
            return 'error'

# Celery 将这3者 串联起来了
class Celery(object):
    def __init__(self):
        self.broker = Broker()
        self.worker = Worker()

    def add(self, func):
        self.broker.broker_list.append(func)

    def work(self, func):
        self.worker.run(self.broker,func)


# 任务(函数),生产者
def send_sms_code():
    print('send_sms_code')

# 1.创建celery实例
app=Celery()
# 2. 添加任务
app.add(send_sms_code)
# 3.执行任务
app.work(send_sms_code)

二. 配合Flask框架的基本使用

1.首先需要确定接口,即需要实现的功能接口并确定其接口名称

2.在main.py文件中定义接口,实现路由的使用

@app.route("/test", methods=['post'])
def test():
	func()
	return "hello world"

3.在task.py文件中定义需要调用的函数func

@celery.task
def func():
    	print("haha")
    	return "hahaha"

到此结束,celery接口实现完成
接下来启动celery任务
4.在虚拟机上进入虚拟环境,work on env_name
进入虚拟机上存放的main.py文件下启动gunicorn

guniron -w 6 --log-level=debug main:app

进入task.py文件目录下启动work

celery -A task.celery work -l info

至此,celery基本调用方法使用完成


举报

相关推荐

0 条评论