跳转到内容
主菜单
主菜单
移至侧栏
隐藏
导航
首页
最近更改
随机页面
MediaWiki帮助
代码酷
搜索
搜索
中文(中国大陆)
外观
创建账号
登录
个人工具
创建账号
登录
未登录编辑者的页面
了解详情
贡献
讨论
编辑“︁
Django Celery
”︁(章节)
页面
讨论
大陆简体
阅读
编辑
编辑源代码
查看历史
工具
工具
移至侧栏
隐藏
操作
阅读
编辑
编辑源代码
查看历史
常规
链入页面
相关更改
特殊页面
页面信息
外观
移至侧栏
隐藏
您的更改会在有权核准的用户核准后向读者展示。
警告:
您没有登录。如果您进行任何编辑,您的IP地址会公开展示。如果您
登录
或
创建账号
,您的编辑会以您的用户名署名,此外还有其他益处。
反垃圾检查。
不要
加入这个!
= Django Celery = '''Django Celery''' 是一个用于处理异步任务和定时任务的分布式任务队列系统,它与 [[Django]] 框架深度集成。Celery 允许开发者将耗时的任务(如发送电子邮件、处理文件或调用外部API)从主请求-响应周期中分离出来,从而提高Web应用的性能和用户体验。本指南将详细介绍如何在Django项目中使用Celery,包括基本配置、任务定义、任务调度以及实际应用案例。 == 简介 == Celery 是一个基于分布式消息传递的异步任务队列/作业队列,专注于实时处理和任务调度。它支持多种消息代理(如 [[RabbitMQ]]、[[Redis]] 和 [[Amazon SQS]]),并可以与Django无缝集成。Celery 的核心组件包括: * '''任务 (Task)''':需要异步执行的函数。 * '''消息代理 (Broker)''':存储任务队列的中间件(如Redis或RabbitMQ)。 * '''工作者 (Worker)''':执行任务的进程。 * '''结果后端 (Result Backend)''':存储任务执行结果的数据库(如Django的数据库或Redis)。 Celery 的主要优势包括: * 提高Web应用的响应速度。 * 支持定时任务(周期性任务)。 * 分布式任务处理,适用于高并发场景。 == 安装与配置 == 在开始使用Django Celery之前,需要安装必要的依赖包: <syntaxhighlight lang="bash"> pip install celery django-celery-results </syntaxhighlight> 接下来,在Django项目的 `settings.py` 文件中配置Celery: <syntaxhighlight lang="python"> # settings.py CELERY_BROKER_URL = 'redis://localhost:6379/0' # 使用Redis作为消息代理 CELERY_RESULT_BACKEND = 'django-db' # 使用Django数据库存储任务结果 CELERY_CACHE_BACKEND = 'django-cache' </syntaxhighlight> 然后,在Django项目的根目录下创建 `celery.py` 文件: <syntaxhighlight lang="python"> # celery.py import os from celery import Celery os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings') app = Celery('myproject') app.config_from_object('django.conf:settings', namespace='CELERY') app.autodiscover_tasks() </syntaxhighlight> 最后,在 `__init__.py` 中导入Celery应用: <syntaxhighlight lang="python"> # __init__.py from .celery import app as celery_app __all__ = ['celery_app'] </syntaxhighlight> == 定义异步任务 == Celery 任务是一个被 `@app.task` 装饰器标记的函数。以下是一个简单的示例: <syntaxhighlight lang="python"> # tasks.py from celery import shared_task from time import sleep @shared_task def send_email(to, subject, message): sleep(5) # 模拟耗时操作 print(f"Email sent to {to}: {subject} - {message}") return True </syntaxhighlight> 在视图中调用该任务: <syntaxhighlight lang="python"> # views.py from django.http import HttpResponse from .tasks import send_email def trigger_email(request): send_email.delay("user@example.com", "Hello", "This is a test email.") return HttpResponse("Email task has been queued!") </syntaxhighlight> 执行任务时,需要启动Celery Worker: <syntaxhighlight lang="bash"> celery -A myproject worker --loglevel=info </syntaxhighlight> == 定时任务(周期性任务) == Celery 支持定时任务(也称为周期性任务),可以通过 `celery beat` 调度。在 `settings.py` 中配置: <syntaxhighlight lang="python"> # settings.py from celery.schedules import crontab CELERY_BEAT_SCHEDULE = { 'send-daily-report': { 'task': 'myapp.tasks.send_daily_report', 'schedule': crontab(hour=8, minute=0), # 每天上午8点执行 }, } </syntaxhighlight> 启动Celery Beat: <syntaxhighlight lang="bash"> celery -A myproject beat --loglevel=info </syntaxhighlight> == 实际应用案例 == 以下是一个真实场景的示例:一个电子商务网站使用Celery异步处理订单确认邮件和大规模数据分析。 1. '''异步发送订单确认邮件''': <syntaxhighlight lang="python"> @shared_task def send_order_confirmation(order_id): order = Order.objects.get(id=order_id) email_content = render_order_email(order) send_mail( "Order Confirmation", email_content, "noreply@example.com", [order.customer_email] ) </syntaxhighlight> 2. '''周期性生成销售报告''': <syntaxhighlight lang="python"> @shared_task def generate_daily_sales_report(): sales_data = Order.objects.filter(created_at__date=timezone.now().date()) report = generate_report(sales_data) store_report_in_database(report) </syntaxhighlight> == 高级特性 == === 任务链(Chaining) === Celery 支持将多个任务串联执行: <syntaxhighlight lang="python"> from celery import chain chain( task1.s(arg1), task2.s(arg2), task3.s(arg3) ).apply_async() </syntaxhighlight> === 任务重试 === 如果任务失败,可以自动重试: <syntaxhighlight lang="python"> @shared_task(bind=True, max_retries=3) def process_data(self, data): try: return heavy_computation(data) except Exception as e: self.retry(exc=e, countdown=60) # 60秒后重试 </syntaxhighlight> === 任务结果追踪 === 使用 `django-celery-results` 存储任务结果: <syntaxhighlight lang="python"> from django_celery_results.models import TaskResult def check_task_status(task_id): task = TaskResult.objects.get(task_id=task_id) return task.status </syntaxhighlight> == 性能优化 == * 使用多个Worker进程提高并发能力。 * 合理设置任务超时时间(`task_time_limit`)。 * 避免在任务中执行阻塞操作(如长时间睡眠或同步网络请求)。 == 常见问题与解决方案 == {| class="wikitable" |- ! 问题 !! 解决方案 |- | Worker 不执行任务 || 检查消息代理(Redis/RabbitMQ)是否运行正常 |- | 任务结果未保存 || 确保 `CELERY_RESULT_BACKEND` 配置正确 |- | 定时任务未触发 || 检查 `celery beat` 是否运行,并验证 `CELERY_BEAT_SCHEDULE` |} == 总结 == Django Celery 是一个强大的异步任务处理工具,适用于需要后台任务、定时任务或分布式任务处理的Web应用。通过合理配置和优化,可以显著提升Django应用的性能和可扩展性。 [[Category:后端框架]] [[Category:Django]] [[Category:Django扩展应用]]
摘要:
请注意,所有对代码酷的贡献均被视为依照知识共享署名-非商业性使用-相同方式共享发表(详情请见
代码酷:著作权
)。如果您不希望您的文字作品被随意编辑和分发传播,请不要在此提交。
您同时也向我们承诺,您提交的内容为您自己所创作,或是复制自公共领域或类似自由来源。
未经许可,请勿提交受著作权保护的作品!
取消
编辑帮助
(在新窗口中打开)