网易首页 > 网易号 > 正文 申请入驻

任务队列神器:Celery 入门到进阶指南

0
分享至

△点击上方“Python猫”关注 ,回复“1”领取电子书

作者:wedo实验君

来源:Python中文社区

1.什么是celery

celery是一个简单,灵活、可靠的分布式任务执行框架,可以支持大量任务的并发执行。celery采用典型生产者和消费者模型。生产者提交任务到任务队列,众多消费者从任务队列中取任务执行。

1.1 celery架构

Celery由以下三部分构成:消息中间件(Broker)、任务执行单元Worker、结果存储(Backend)

  • 任务调用提交任务执行请求给Broker队列

  • 如果是异步任务,worker会立即从队列中取出任务并执行,执行结果保存在Backend中

  • 如果是定时任务,任务由Celery Beat进程周期性地将任务发往Broker队列,Worker实时监视消息队列获取队列中的任务执行

1.2 应用场景
  • 大量的长时间任务的异步执行, 如上传大文件

  • 大规模实时任务执行,支持集群部署,如支持高并发的机器学习推理

  • 定时任务执行,如定时发送邮件,定时扫描机器运行情况

2.安装

celery安装非常简单, 除了安装celery,本文中使用redis作为消息队列即Broker

# celery 安装
pip install celery
# celery 监控 flower
pip install flower
pip install redis
# redis 安装
yum install redis
# redis启动
redis-server /etc/redis.conf
3. 完整例子

celery的应用开发涉及四个部分

  • celery 实例初始化

  • 任务的定义(定时和实时任务)

  • 任务worker的启动

  • 任务的调用

3.1 项目目录# 项目目录
wedo
.
├── config.py
├── __init__.py
├── period_task.py
└── tasks.py
3.2 celery 实例初始化

celery的实例化,主要包括执行Broker和backend的访问方式,任务模块的申明等

# celery 实例初始化
# __init__.py
from celery import Celery
app = Celery('wedo') # 创建 Celery 实例
app.config_from_object('wedo.config')

# 配置 wedo.config
# config.py
BROKER_URL = 'redis://10.8.238.2:6379/0' # Broker配置,使用Redis作为消息中间件
CELERY_RESULT_BACKEND = 'redis://10.8.238.2:6379/0' # BACKEND配置,这里使用redis
CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间
CELERY_TIMEZONE='Asia/Shanghai' # 时区配置
CELERY_IMPORTS = ( # 指定导入的任务模块,可以指定多个
'wedo.tasks',
'wedo.period_task'
)
3.3 任务的定义

celery中通过@task的装饰器来进行申明celery任务,其他操作无任何差别

# 任务的定义
# 简单任务 tasks.py
import celery
import time
from celery.utils.log import get_task_logger
from wedo import app

@app.task
def sum(x, y):
return x + y

@app.task
def mul(x, y):
time.sleep(5)
return x * y

定时任务和实时任务的区别主要是要申明何时执行任务,任务本身也是通过task装饰器来申明 何时执行任务有2种

  • 指定频率执行:sender.add_periodic_task(时间频率单位s, 任务函数, name='to_string')

  • crontab方式:分钟/小时/天/月/周粒度, 可以支持多种调度

# 任务的定义
# 定时任务 period_task.py
from wedo import app
from celery.schedules import crontab

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(5.0, to_string.s("celery peroid task"), name='to_string') # 每5秒执行add
sender.add_periodic_task(
crontab(minute='*/10'), #每10分钟执行一次
send_mail.s('hello, this is a celery'), name='send_mail'
)

@app.task
def send_mail(content):
print('send mail, content is %s' % content)

@app.task
def to_string(text):
return 'this is a %s' % text
3.4 任务worker的启动

任务启动分为worker启动和定时任务beat启动

# -A wedo为应用模块
# -l为日志level
# -c 为进程数
celery worker -A wedo -l debug -c 4

# 后台启动
nohup celery worker -A wedo -l debug -c 4 > ./log.log 2>&1

# 从下面的日志可以看出启动了4个任务
# . wedo.period_task.send_mail
# . wedo.period_task.to_string
# . wedo.tasks.mul
# . wedo.tasks.sum

-------------- celery@localhost.localdomain v4.4.2 (cliffs)
--- ***** -----
-- ******* ---- Linux-3.10.0-327.28.3.el7.x86_64-x86_64-with-centos-7.2.1511-Core 2020-04-25 23:35:26
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: wedo:0x7f05af30d320
- ** ---------- .> transport: redis://10.8.238.2:6379/0
- ** ---------- .> results: redis://10.8.238.2:6379/0
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery

[tasks]
. celery.accumulate
. celery.backend_cleanup
...
. wedo.period_task.send_mail
. wedo.period_task.to_string
. wedo.tasks.mul
. wedo.tasks.sum
...
[2020-04-25 23:35:27,617: INFO/MainProcess] celery@localhost.localdomain ready.
[2020-04-25 23:35:27,617: DEBUG/MainProcess] basic.qos: prefetch_count->16
[2020-04-25 23:35:27,655: DEBUG/MainProcess] celery@12103675 joined the party

celery beat -A wedo.period_task

celery beat v4.4.2 (cliffs) is starting.
__ - ... __ - _
LocalTime -> 2020-04-25 23:37:08
Configuration ->
. broker -> redis://10.8.238.2:6379/0
. loader -> celery.loaders.app.AppLoader
. scheduler -> celery.beat.PersistentScheduler
. db -> celerybeat-schedule
. logfile -> [stderr]@%WARNING
. maxinterval -> 5.00 minutes (300s)
# worker启动是4个进程
\_ /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4
\_ /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4
\_ /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4
\_ /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4
\_ /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4

worker和beat的停止

ps auxww | awk '/celery worker/ {print $2}' | xargs kill -9
ps auxww | awk '/celery beat/ {print $2}' | xargs kill -9
3.5 任务的调用

任务worker已经启动好了,通过任务调用传递给broker(redis),并返回任务执行结果 任务调用主要有两种,本质是一致的,delay是apply_async的封装,apply_async可以支持更多的任务调用配置

  • task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'})

  • task.delay(arg1, arg2, kwarg1='x', kwarg2='y')

apply_async和delay会返回一个异步的任务结果 ,AsyncResult中存储了任务的执行状态和结果,常用的操作

value = result.get() # 任务返回值
print(result.__dict__) # 结果信息
print(result.successful()) # 是否成功
print(result.fail()) # 是否失败
print(result.ready()) # 是否执行完成
print(result.state) # 状态 PENDING -> STARTED -> SUCCESS/FAIL

常规任务:

from celery.utils.log import get_logger
from wedo.tasks import sum, mul, post_file
from celery import group, chain, chord
logger = get_logger(__name__)
try:
result = mul.apply_async(args=(2, 2))
value = result.get() # 等待任务执行完毕后,才会返回任务返回值
print(value)
except mul.OperationalError as exc: # 任务异常处理
logger.exception('Sending task raised: %r', exc)

组合任务:

  • 多个任务并行执行, group

  • 多个任务链式执行,chain:第一个任务的返回值作为第二个的输入参数,以此类推

result = group(sum.s(i, i) for i in range(5))()
result.get()
# [0, 2, 4, 6, 8]
result = chain(sum.s(1,2), sum.s(3), mul.s(3))()
result.get()
# ((1+2)+3)*3=18
4. 分布式集群部署

celery作为分布式的任务队列框架,worker是可以执行在不同的服务器上的。部署过程和单机上启动是一样。只要把项目代码copy到其他服务器,使用相同命令就可以了。可以思考下,这个是怎么实现的?对了,就是通过共享Broker队列。使用合适的队列,如redis,单进程单线程的方式可以有效的避免同个任务被不同worker同时执行的情况。

celery worker -A wedo -l debug -c 4

  • 分布式集群如下:

5. 进阶使用

在前面已经了解了celery的主要的功能了。celery还为一些特别的场景提供了需要扩展的功能

5.1 任务状态跟踪和日志

有时候我们需要对任务的执行情况做一些监控,比如失败后报警通知。

  • celery在装饰器@app.task中提供了base参数,传入重写的Task模块,重新on_*函数就可以控制不同的任务结果

  • 在@app.task提供bind=True,可以通过self获取Task中各种参数

    • self.request:任务的各种参数

    • self.update_state: 自定义任务状态, 原有的任务状态:PENDING -> STARTED -> SUCCESS, 如果你想了解STARTED -> SUCCESS之间的一个状态,比如执行的百分比之类,可以通过自定义状态来实现

    • self.retry: 重试

import celery
import time
from celery.utils.log import get_task_logger
from wedo import app

logger = logger = get_task_logger(__name__)
class TaskMonitor(celery.Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""failed callback"""
logger.info('task id: {0!r} failed: {1!r}'.format(task_id, exc))

def on_success(self, retval, task_id, args, kwargs):
"""success callback"""
logger.info('task id:{} , arg:{} , successful !'.format(task_id,args))

def on_retry(self, exc, task_id, args, kwargs, einfo):
"""retry callback"""
logger.info('task id:{} , arg:{} , retry ! einfo: {}'.format(task_id, args, exc))

@app.task(base=TaskMonitor, bind=True, name='post_file')
def post_file(self, file_names):
logger.info(self.request.__dict__)
try:
for i, file in enumerate(file_names):
print('the file %s is posted' % file)
if not self.request.called_directly:
self.update_state(state='PROGRESS',
meta={'current': i, 'total': len(file_names)})
time.sleep(2)
except Exception as exec:
raise self.retry(exc=exec, countdown=3, max_retries=5)
5.2 任务指定特定的worker执行

celery做为支持分布式,理论上可以无限扩展worker。默认情况下celery提交任务后,任务会放入名为celery的队列,所有在线的worker都会从任务队列中获取任务,任一个worker都有可能执行这个任务。有时候,有时候任务的特殊性或者机器本身的限制,某些任务只能跑在某些worker上。celery提供了queue在区别不同的worker,很好的支持这种情况。

  • 启动worker时,-Q 指定worker支持的任务列队名, 可以支持多个队列名哦

celery worker -A wedo -l debug -c 4 -Q celery,hipri
  • 任务调用时,queue=*来指定需要执行worker

result = mul.apply_async(args=(2, 2), queue='hipri')
6. 任务队列监控

如果你想通过可视化的方式,查看celery的一切。flower提供可行的解决方案,十分的方便

flower -A wedo --port=6006
# web访问 http://10.8.238.2:6006/

7. 总结

本文和大家了介绍了分布式的队列celery, 妥妥的很全吧, 欢迎交流。总结下内容:

  • celery为分布式队列, 通过消息队列连接任务提交和执行者worker, 松耦合模式,可扩展

  • celery消息队列建议为redis

  • celery通过@app.task装饰把普通任务变成celery Task

  • celery worker 通过不同queue支持特定的worker消费特定的任务

  • @app.task中可以同步base和bind参数获取更过的控制任务生命周期

  • flower监控celery全过程

  • celery doc:https://docs.celeryproject.org/en/master/getting-started/index.html

Python猫技术交流群开放啦!群里既有国内一二线大厂在职员工,也有国内外高校在读学生,既有十多年码龄的编程老鸟,也有中小学刚刚入门的新人,学习氛围良好!想入群的同学,请在公号内回复『交流群』,获取猫哥的微信 (谢绝广告党,非诚勿扰!)~

特别声明:以上内容(如有图片或视频亦包括在内)为自媒体平台“网易号”用户上传并发布,本平台仅提供信息存储服务。

Notice: The content above (including the pictures and videos if any) is uploaded and posted by a user of NetEase Hao, which is a social media platform and only provides information storage services.

相关推荐
热点推荐
小沈阳买新车了,身价过亿的人买个这么便宜的车。太低调了

小沈阳买新车了,身价过亿的人买个这么便宜的车。太低调了

老吴教育课堂
2026-05-25 01:48:31
输给广厦出局后!周鹏去向曝光,深圳寻求交易广东,租借黄明依?

输给广厦出局后!周鹏去向曝光,深圳寻求交易广东,租借黄明依?

绯雨儿
2026-05-24 12:14:05
武契奇刚抵京,塞尔维亚万人涌上街头喊下台,有人不愿看到他访华

武契奇刚抵京,塞尔维亚万人涌上街头喊下台,有人不愿看到他访华

叮当当科技
2026-05-24 22:54:29
活久见!在校女大学生追求年轻未婚男教师引争议,网友:一票否决

活久见!在校女大学生追求年轻未婚男教师引争议,网友:一票否决

火山詩话
2026-05-22 09:44:44
泪目了!她要上太空那天,丈夫先“下岗”了

泪目了!她要上太空那天,丈夫先“下岗”了

Thurman在昆明
2026-05-25 01:32:49
许家印的靠山,被起诉了

许家印的靠山,被起诉了

哲空空
2026-05-22 13:40:53
太疯狂,7家半导体公司集中减持,澜起科技股东套现33亿

太疯狂,7家半导体公司集中减持,澜起科技股东套现33亿

金石随笔
2026-05-25 00:06:18
砸12亿!中国第一座“星穹大球”,上海人沸腾!

砸12亿!中国第一座“星穹大球”,上海人沸腾!

GA环球建筑
2026-05-24 23:31:02
女生主动起来有多黏人?网友:这些女的太开放了

女生主动起来有多黏人?网友:这些女的太开放了

带你感受人间冷暖
2026-01-27 00:20:06
谈判失败,轮到中方掀桌子,拒绝美军高层访华,中方已定调统一

谈判失败,轮到中方掀桌子,拒绝美军高层访华,中方已定调统一

掉了颗大白兔糖
2026-05-22 11:52:45
中国人吃了几千年的碳水,怎么就丢脸了?

中国人吃了几千年的碳水,怎么就丢脸了?

人间像素
2026-05-22 16:06:05
上海交大的“天才少女”,让当代女大学生成为“最差的一届”

上海交大的“天才少女”,让当代女大学生成为“最差的一届”

将军箭
2026-05-21 14:07:13
《剑星》体模申才恩再穿"战袍":这身材比例是扫描出来的?

《剑星》体模申才恩再穿"战袍":这身材比例是扫描出来的?

队友祭天法力无边
2026-05-22 17:11:46
50岁李小冉机场吃面,褪去滤镜才懂,普通人的衰老藏不住

50岁李小冉机场吃面,褪去滤镜才懂,普通人的衰老藏不住

庭小娱
2026-05-13 12:06:40
56岁的王菲怎么也想不到,29岁的窦靖童如今又开始为她争光了

56岁的王菲怎么也想不到,29岁的窦靖童如今又开始为她争光了

雅儿姐游世界
2026-05-24 11:41:58
Netflix这部科幻神剧,让《环太平洋》都显得小了

Netflix这部科幻神剧,让《环太平洋》都显得小了

时光慢旅人
2026-05-25 01:31:25
汪小菲陪玥儿上课,晚上一家三口吃大餐,家中疑放着他与大S的画

汪小菲陪玥儿上课,晚上一家三口吃大餐,家中疑放着他与大S的画

手工制作阿歼
2026-05-25 01:30:09
俄国佬滚回去! 1976年4月1日《人民日报》

俄国佬滚回去! 1976年4月1日《人民日报》

那些看得见的老照片
2026-05-21 11:48:55
汪峰携全家观看女儿醒醒演出,21岁小苹果舞台风采惊艳

汪峰携全家观看女儿醒醒演出,21岁小苹果舞台风采惊艳

调侃国际观点
2026-05-25 07:39:49
高市亲信突然飞到苏州,当面要求中方,把一件“不正当”的事改改

高市亲信突然飞到苏州,当面要求中方,把一件“不正当”的事改改

云舟史策
2026-05-24 07:08:49
2026-05-25 08:28:49
Python猫 incentive-icons
Python猫
人生苦短,我用Python。博客:https://pythoncat.top
729文章数 8120关注度
往期回顾 全部

科技要闻

神舟二十三号航天员乘组顺利进驻“天宫”

头条要闻

暴雨突袭17岁少年赤膊上阵 让村民踩着他大腿肩膀转移

头条要闻

暴雨突袭17岁少年赤膊上阵 让村民踩着他大腿肩膀转移

体育要闻

唐斯发牌,大头逆袭:骑士跌向残忍夏季

娱乐要闻

王鹤棣掉粉超20万!代言和作品遭抵制

财经要闻

退市!33年“A股不死鸟”落幕

汽车要闻

国民家轿再上新 帝豪向上系列限时5.59万起

态度原创

手机
本地
时尚
教育
公开课

手机要闻

OPPO开始发力:Reno新机再次被确认,超清云台主摄来袭!

本地新闻

用云锦的方式,打开江苏南京

《低智商犯罪》一半惊喜,一半可惜

教育要闻

在这个拼人脉的世界,会说话真的很重要!

公开课

李玫瑾:为什么性格比能力更重要?

无障碍浏览 进入关怀版