Skip to content

Celery 的详细使用指南,包含安装、配置、基本用法及示例代码:


1. 安装 Celery

bash
pip install celery

2. 配置 Celery 应用

创建一个 celery_app.py 文件:

python
from celery import Celery

# 创建 Celery 实例,设置 Broker 和 Backend
app = Celery(
    'tasks',
    broker='redis://localhost:6379/0',  # Broker (消息队列)
    backend='redis://localhost:6379/1',  # 结果存储
    include=['tasks']  # 包含任务模块
)

# 可选:配置选项
app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_expires=3600,
)

3. 定义任务

创建 tasks.py

python
from celery_app import app


@app.task
def add(x, y):
    return x + y


@app.task
def send_email(to, message):
    # 模拟发送邮件
    return f"Email to {to}: {message}"

4. 启动 Worker

在终端运行 Worker:

bash
celery -A tasks worker --loglevel=info

5. 调用任务

python
from tasks import add, send_email

# 异步调用任务
result = add.delay(4, 6)
email_task = send_email.delay('user@example.com', 'Hello!')

# 获取结果(阻塞,直到任务完成)
print(result.get(timeout=10))  # 输出 10

6. 配置 Flower(监控)

安装并启动 Flower:

bash
pip install flower
celery -A tasks flower --port=5555

访问 http://localhost:5555 查看监控面板。


7. 配置定时任务(周期性任务)

celery_app.py 中添加:

python
app.conf.beat_schedule = {
    'every-10-seconds': {
        'task': 'tasks.add',
        'schedule': 10.0,  # 每10秒执行一次
        'args': (3, 7),
    },
}

启动 Beat 服务:

bash
celery -A tasks beat

8. 处理重试

在任务中自动重试:

python
@app.task(bind=True, max_retries=3)
def send_email(self, to, message):
    try:
    # 发送邮件逻辑
    except Exception as e:
        self.retry(exc=e, countdown=60)  # 60秒后重试

9. 配置结果存储

celery_app.py 中配置结果后端:

python
app.conf.result_backend = 'redis://localhost:6379/1'

10. 使用场景示例

场景:异步发送邮件

python
send_email.delay('user@example.com', 'Welcome!')

常见问题解决

  1. Broker 连接失败:确保 Redis/RabbitMQ 服务运行。
  2. 任务未执行:检查 Worker 是否在线,任务是否注册。
  3. 结果丢失:确保配置了 result_backend

通过以上步骤,可以快速将 Celery 集成到项目中,处理耗时任务(如邮件发送、图片处理、数据分析等),提升应用性能。

✨ 网站运行时间: 3年11月15天 ❤️ 道阻且长,行则将至 - 微信号: heikedreamer