侧边栏壁纸
博主头像
colo

欲买桂花同载酒

  • 累计撰写 1823 篇文章
  • 累计收到 0 条评论

如何在FastAPI中实现带限流和认证的异步端点

2025-12-14 / 0 评论 / 4 阅读

题目

如何在FastAPI中实现带限流和认证的异步端点

信息

  • 类型:问答
  • 难度:⭐⭐

考点

异步编程,依赖注入,中间件,安全认证,性能优化

快速回答

实现带限流和认证的异步端点需要:

  • 使用async def定义异步端点
  • 通过依赖注入系统实现JWT认证
  • 使用中间件实现请求限流
  • 结合Redis存储访问计数
  • 正确处理异步数据库操作
## 解析

问题场景

在真实应用中,我们需要保护敏感端点防止滥用。假设有一个获取用户数据的端点,要求:

  1. 用户必须通过JWT认证
  2. 每个用户每分钟最多10次请求
  3. 异步查询数据库

解决方案

1. 核心依赖项实现

from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from redis import asyncio as aioredis

app = FastAPI()

# JWT认证依赖
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

async def get_current_user(token: str = Depends(oauth2_scheme)):
    try:
        payload = jwt.decode(token, "SECRET_KEY", algorithms=["HS256"])
        return payload["sub"]  # 返回用户名
    except JWTError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid credentials"
        )

# Redis限流依赖
async def rate_limiter(user: str = Depends(get_current_user)):
    redis = await aioredis.from_url("redis://localhost")
    key = f"rate_limit:{user}"

    # 原子操作:增加计数并设置过期时间
    current = await redis.incr(key)
    if current == 1:
        await redis.expire(key, 60)  # 60秒过期

    if current > 10:
        raise HTTPException(
            status_code=status.HTTP_429_TOO_MANY_REQUESTS,
            detail="Rate limit exceeded"
        )
    return user

2. 实现异步端点

from fastapi import APIRouter
from databases import Database

router = APIRouter()
database = Database("postgresql://user:password@localhost/dbname")

@router.get("/user-data")
async def get_user_data(
    user: str = Depends(rate_limiter)  # 链式依赖
):
    query = "SELECT * FROM users WHERE username = :username"
    return await database.fetch_one(query, {"username": user})

app.include_router(router)

原理说明

  • 依赖注入系统:FastAPI的依赖树自动处理认证和限流逻辑
  • 异步Redis操作:使用aioredis进行原子计数操作
  • JWT认证:基于OAuth2标准验证Bearer Token
  • 链式依赖rate_limiter依赖get_current_user确保执行顺序

最佳实践

  1. 将密钥和限流配置放在环境变量中
  2. 使用单独的Redis数据库进行限流计数
  3. 为不同端点设置差异化的限流策略
  4. 异步数据库连接使用连接池

常见错误

错误后果解决方案
同步阻塞操作阻塞事件循环确保所有I/O操作使用异步库
未处理JWT异常认证绕过风险捕获所有JWTError异常
限流未原子操作计数不准确使用Redis的INCR+EXPIRE组合

扩展知识

  • 分布式限流:在微服务架构中使用Redis集群共享计数
  • 滑动窗口算法:更精确的限流控制(使用Redis的ZSET实现)
  • 安全增强:在JWT中加入jti(JWT ID)实现令牌吊销
  • 性能优化:使用@cached_property缓存重复计算

滑动窗口限流示例

async def sliding_window_limiter(user: str):
    redis = await aioredis.from_url("redis://localhost")
    now = int(time.time())
    window = 60  # 秒
    key = f"sliding:{user}"

    # 移除60秒前的记录
    await redis.zremrangebyscore(key, 0, now - window)

    # 添加当前时间戳
    await redis.zadd(key, {now: now})

    # 获取当前计数
    count = await redis.zcard(key)

    if count > 10:
        raise HTTPException(status_code=429, detail="Rate limit exceeded")