编写定时任务
from celery import Celery
from celery.schedules import crontab
crontab_app = Celery("celery_config_tasks", broker="redis://127.0.0.1:6379/1")
crontab_app.conf.update(
broker_heartbeat=None,
worker_max_tasks_per_child=20
)
@crontab_app.task
def cron_task_one(number:int) -> int:
sum_number = number + number
print(f"sum_number--------------{sum_number}")
return sum_number
@crontab_app.task
def cron_task_two(x:int, y:int) -> int:
sum_xy = x + y
print(f"sum_xy--------------{sum_xy}")
return sum_xy
crontab_app.conf.beat_schedule = {
"task_one": {
"task": "celery_conrtab_task.cron_task_one",
"schedule": 1.0,
"args": (100, )
},
"task_two": {
"task": "celery_conrtab_task.cron_task_two",
"schedule": 1.0,
"args": (1, 3)
}
}
@crontab_app.on_after_configure.connect
def crontab_task(sender, **kwargs):
sender.add_periodic_task(1.0, cron_task_one.s(100, ), name="celery_config_tasks")
sender.add_periodic_task(3.0, cron_task_two.s(10, 20), name="celery_config_tasks")