Celery 的详细使用指南,包含安装、配置、基本用法及示例代码:
1. 安装 Celery
bash
pip install celery2. 配置 Celery 应用
创建一个 celery_app.py 文件:
python
from celery import Celery
# 创建 Celery 实例,设置 Broker 和 Backend
app = Celery(
'tasks',
broker='redis://localhost:6379/0', # Broker (消息队列)
backend='redis://localhost:6379/1', # 结果存储
include=['tasks'] # 包含任务模块
)
# 可选:配置选项
app.conf.update(
task_serializer='json',
accept_content=['json'],
result_expires=3600,
)3. 定义任务
创建 tasks.py:
python
from celery_app import app
@app.task
def add(x, y):
return x + y
@app.task
def send_email(to, message):
# 模拟发送邮件
return f"Email to {to}: {message}"4. 启动 Worker
在终端运行 Worker:
bash
celery -A tasks worker --loglevel=info5. 调用任务
python
from tasks import add, send_email
# 异步调用任务
result = add.delay(4, 6)
email_task = send_email.delay('user@example.com', 'Hello!')
# 获取结果(阻塞,直到任务完成)
print(result.get(timeout=10)) # 输出 106. 配置 Flower(监控)
安装并启动 Flower:
bash
pip install flower
celery -A tasks flower --port=5555访问 http://localhost:5555 查看监控面板。
7. 配置定时任务(周期性任务)
在 celery_app.py 中添加:
python
app.conf.beat_schedule = {
'every-10-seconds': {
'task': 'tasks.add',
'schedule': 10.0, # 每10秒执行一次
'args': (3, 7),
},
}启动 Beat 服务:
bash
celery -A tasks beat8. 处理重试
在任务中自动重试:
python
@app.task(bind=True, max_retries=3)
def send_email(self, to, message):
try:
# 发送邮件逻辑
except Exception as e:
self.retry(exc=e, countdown=60) # 60秒后重试9. 配置结果存储
在 celery_app.py 中配置结果后端:
python
app.conf.result_backend = 'redis://localhost:6379/1'10. 使用场景示例
场景:异步发送邮件
python
send_email.delay('user@example.com', 'Welcome!')常见问题解决
- Broker 连接失败:确保 Redis/RabbitMQ 服务运行。
- 任务未执行:检查 Worker 是否在线,任务是否注册。
- 结果丢失:确保配置了
result_backend。
通过以上步骤,可以快速将 Celery 集成到项目中,处理耗时任务(如邮件发送、图片处理、数据分析等),提升应用性能。