跳转到内容

Django Celery

来自代码酷

Django Celery[编辑 | 编辑源代码]

Django Celery 是一个用于处理异步任务和定时任务的分布式任务队列系统,它与 Django 框架深度集成。Celery 允许开发者将耗时的任务(如发送电子邮件、处理文件或调用外部API)从主请求-响应周期中分离出来,从而提高Web应用的性能和用户体验。本指南将详细介绍如何在Django项目中使用Celery,包括基本配置、任务定义、任务调度以及实际应用案例。

简介[编辑 | 编辑源代码]

Celery 是一个基于分布式消息传递的异步任务队列/作业队列,专注于实时处理和任务调度。它支持多种消息代理(如 RabbitMQRedisAmazon SQS),并可以与Django无缝集成。Celery 的核心组件包括:

  • 任务 (Task):需要异步执行的函数。
  • 消息代理 (Broker):存储任务队列的中间件(如Redis或RabbitMQ)。
  • 工作者 (Worker):执行任务的进程。
  • 结果后端 (Result Backend):存储任务执行结果的数据库(如Django的数据库或Redis)。

Celery 的主要优势包括:

  • 提高Web应用的响应速度。
  • 支持定时任务(周期性任务)。
  • 分布式任务处理,适用于高并发场景。

安装与配置[编辑 | 编辑源代码]

在开始使用Django Celery之前,需要安装必要的依赖包:

pip install celery django-celery-results

接下来,在Django项目的 `settings.py` 文件中配置Celery:

# settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'  # 使用Redis作为消息代理
CELERY_RESULT_BACKEND = 'django-db'  # 使用Django数据库存储任务结果
CELERY_CACHE_BACKEND = 'django-cache'

然后,在Django项目的根目录下创建 `celery.py` 文件:

# celery.py
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

最后,在 `__init__.py` 中导入Celery应用:

# __init__.py
from .celery import app as celery_app

__all__ = ['celery_app']

定义异步任务[编辑 | 编辑源代码]

Celery 任务是一个被 `@app.task` 装饰器标记的函数。以下是一个简单的示例:

# tasks.py
from celery import shared_task
from time import sleep

@shared_task
def send_email(to, subject, message):
    sleep(5)  # 模拟耗时操作
    print(f"Email sent to {to}: {subject} - {message}")
    return True

在视图中调用该任务:

# views.py
from django.http import HttpResponse
from .tasks import send_email

def trigger_email(request):
    send_email.delay("user@example.com", "Hello", "This is a test email.")
    return HttpResponse("Email task has been queued!")

执行任务时,需要启动Celery Worker:

celery -A myproject worker --loglevel=info

定时任务(周期性任务)[编辑 | 编辑源代码]

Celery 支持定时任务(也称为周期性任务),可以通过 `celery beat` 调度。在 `settings.py` 中配置:

# settings.py
from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
    'send-daily-report': {
        'task': 'myapp.tasks.send_daily_report',
        'schedule': crontab(hour=8, minute=0),  # 每天上午8点执行
    },
}

启动Celery Beat:

celery -A myproject beat --loglevel=info

实际应用案例[编辑 | 编辑源代码]

以下是一个真实场景的示例:一个电子商务网站使用Celery异步处理订单确认邮件和大规模数据分析。

1. 异步发送订单确认邮件

   @shared_task
   def send_order_confirmation(order_id):
       order = Order.objects.get(id=order_id)
       email_content = render_order_email(order)
       send_mail(
           "Order Confirmation",
           email_content,
           "noreply@example.com",
           [order.customer_email]
       )

2. 周期性生成销售报告

   @shared_task
   def generate_daily_sales_report():
       sales_data = Order.objects.filter(created_at__date=timezone.now().date())
       report = generate_report(sales_data)
       store_report_in_database(report)

高级特性[编辑 | 编辑源代码]

任务链(Chaining)[编辑 | 编辑源代码]

Celery 支持将多个任务串联执行:

from celery import chain

chain(
    task1.s(arg1),
    task2.s(arg2),
    task3.s(arg3)
).apply_async()

任务重试[编辑 | 编辑源代码]

如果任务失败,可以自动重试:

@shared_task(bind=True, max_retries=3)
def process_data(self, data):
    try:
        return heavy_computation(data)
    except Exception as e:
        self.retry(exc=e, countdown=60)  # 60秒后重试

任务结果追踪[编辑 | 编辑源代码]

使用 `django-celery-results` 存储任务结果:

from django_celery_results.models import TaskResult

def check_task_status(task_id):
    task = TaskResult.objects.get(task_id=task_id)
    return task.status

性能优化[编辑 | 编辑源代码]

  • 使用多个Worker进程提高并发能力。
  • 合理设置任务超时时间(`task_time_limit`)。
  • 避免在任务中执行阻塞操作(如长时间睡眠或同步网络请求)。

常见问题与解决方案[编辑 | 编辑源代码]

问题 解决方案
Worker 不执行任务 检查消息代理(Redis/RabbitMQ)是否运行正常
任务结果未保存 确保 `CELERY_RESULT_BACKEND` 配置正确
定时任务未触发 检查 `celery beat` 是否运行,并验证 `CELERY_BEAT_SCHEDULE`

总结[编辑 | 编辑源代码]

Django Celery 是一个强大的异步任务处理工具,适用于需要后台任务、定时任务或分布式任务处理的Web应用。通过合理配置和优化,可以显著提升Django应用的性能和可扩展性。