题目
设计一个使用Celery处理用户上传图片的异步任务系统
信息
- 类型:问答
- 难度:⭐⭐
考点
Celery任务定义, 任务队列配置, 错误处理, 任务状态追踪
快速回答
实现一个图片处理异步任务的核心步骤:
- 定义Celery任务函数,包含图片处理逻辑
- 配置消息代理(如RabbitMQ/Redis)和结果后端
- 添加错误处理机制(重试、日志)
- 实现任务状态追踪和结果查询
- 在视图函数中异步调用任务
示例任务定义:
@app.task(bind=True, max_retries=3)
def process_image(self, image_path):
try:
# 图片处理逻辑
except Exception as e:
self.retry(exc=e)
## 解析
场景需求
在Web应用中,用户上传图片后需要执行耗时操作(如缩略图生成、EXIF信息提取、内容审核等)。使用Celery将这些操作异步化可避免阻塞HTTP请求,提升用户体验。
解决方案
1. 核心组件配置
# celery_config.py
broker_url = 'redis://localhost:6379/0' # 消息代理
result_backend = 'redis://localhost:6379/1' # 结果存储
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']2. 任务定义(关键代码)
# tasks.py
from celery import Celery
import PIL.Image # 实际项目需安装Pillow
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task(bind=True, max_retries=3, default_retry_delay=30)
def process_uploaded_image(self, image_path, user_id):
try:
# 1. 打开图片
img = PIL.Image.open(image_path)
# 2. 生成缩略图
img.thumbnail((300, 300))
thumb_path = f"thumbs/{user_id}_thumb.jpg"
img.save(thumb_path)
# 3. 返回处理结果(实际项目需存储到数据库)
return {"status": "success", "thumb_path": thumb_path}
except PIL.UnidentifiedImageError as e:
# 特殊处理无效图片
return {"status": "error", "message": "Invalid image format"}
except Exception as e:
# 通用错误重试
self.retry(exc=e)3. Django视图调用(示例)
# views.py
from .tasks import process_uploaded_image
def upload_view(request):
if request.method == 'POST':
image_file = request.FILES['image']
user_id = request.user.id
# 保存原始文件(实际项目需使用安全存储)
image_path = f"uploads/{user_id}_{image_file.name}"
with open(image_path, 'wb+') as f:
for chunk in image_file.chunks():
f.write(chunk)
# 异步调用Celery任务
task = process_uploaded_image.delay(image_path, user_id)
# 返回任务ID供前端查询状态
return JsonResponse({"task_id": task.id})最佳实践
- 错误处理:
- 使用
bind=True获取任务上下文(self) max_retries限制重试次数,避免无限循环- 针对不同异常类型差异化处理(如无效图片无需重试)
- 使用
- 任务设计:
- 任务函数保持幂等性(多次执行结果相同)
- 传递文件路径而非文件内容,减少消息队列负载
- 设置任务超时:
@app.task(timeout=120)
- 结果处理:
- 重要任务使用
ignore_result=False(默认)存储结果 - 定期清理结果后端,避免Redis内存溢出
- 重要任务使用
常见错误
- 序列化问题:传递不可序列化对象(如PIL.Image对象)导致异常
- 资源泄漏:未关闭文件句柄或数据库连接
- 配置错误:开发/生产环境共用Redis数据库导致数据污染
- 阻塞主进程:误用
.get()同步等待结果
扩展知识
- 任务状态追踪:
task = process_uploaded_image.delay(...)
# 前端通过task.id查询状态
from celery.result import AsyncResult
result = AsyncResult(task_id)
print(result.status) # PENDING/SUCCESS/FAILURE/RETRY - 工作流编排:
chain(
upload_image.s(user_id),
create_thumbnail.s() | extract_metadata.s()
).delay() - 性能监控:使用Flower(
celery -A proj flower)查看任务队列和Worker状态