Skip to content

课 2 · 限流与队列

本课目标

用 Redis 实现请求限流中间件,并了解 BullMQ 在任务队列场景下的 Redis 使用方式。

关键理解:限流保护的不只是 API 服务器,更重要的是保护背后的 LLM API 账单。

为什么要限流

AI 应用有一个独特的成本结构:每次 LLM 调用都产生费用。没有限流,一个恶意用户或程序 Bug 可能几分钟内跑掉几百块。

进阶项目需要两种限流:

类型场景实现
IP 限流防止单 IP 刷接口滑动窗口计数
用户限流限制单用户 LLM 调用频率令牌桶(简化版)

滑动窗口限流

滑动窗口比固定窗口更平滑,用 Redis Sorted Set 实现:

typescript
// apps/api/src/middleware/rate-limit.ts
import { getRedis } from '@shared/lib/redis'
import type { MiddlewareHandler } from 'hono'

interface RateLimitOptions {
  windowMs: number    // 时间窗口(毫秒)
  max: number         // 窗口内最大请求数
  keyPrefix: string   // Redis key 前缀
}

export function rateLimit(options: RateLimitOptions): MiddlewareHandler {
  return async (c, next) => {
    const redis = getRedis()
    const ip = c.req.header('x-forwarded-for') ?? 'unknown'
    const key = `${options.keyPrefix}:${ip}`
    const now = Date.now()
    const windowStart = now - options.windowMs

    // Sorted Set:score = 时间戳,member = 时间戳(唯一)
    const pipeline = redis.pipeline()
    pipeline.zremrangebyscore(key, 0, windowStart)           // 清除窗口外的记录
    pipeline.zadd(key, now, `${now}-${Math.random()}`)       // 记录本次请求
    pipeline.zcard(key)                                       // 当前窗口请求数
    pipeline.expire(key, Math.ceil(options.windowMs / 1000)) // 设置过期

    const results = await pipeline.exec()
    const count = results?.[2]?.[1] as number

    if (count > options.max) {
      return c.json(
        { error: '请求过于频繁,请稍后再试' },
        429,
        { 'Retry-After': String(Math.ceil(options.windowMs / 1000)) }
      )
    }

    await next()
  }
}

挂载到 chat 路由:

typescript
// apps/api/src/routes/chat.ts
chatRouter.use(
  rateLimit({
    windowMs: 60 * 1000,  // 1 分钟
    max: 10,              // 每分钟最多 10 次问答
    keyPrefix: 'rl:chat',
  })
)

BullMQ 的 Redis 使用方式

BullMQ 在 Redis 里用三种数据结构:

数据结构用途
Sorted Setqueue:wait — 等待任务(score = 优先级/延迟时间)
Hashqueue:job:id — 任务详情
Listqueue:active — 正在处理的任务 ID

理解这个结构对面试很有帮助,因为面试官常问"BullMQ 底层是怎么用 Redis 的"。

可以用 Redis CLI 直接观察:

bash
# 连接到 Redis
docker exec -it <container> redis-cli

# 查看等待队列
ZRANGE bull:ingest:wait 0 -1 WITHSCORES

# 查看任务详情
HGETALL bull:ingest:job:1

任务状态用 Redis 缓存(避免频繁查 SQLite)

任务进度轮询可能很频繁,用 Redis 缓存当前状态:

typescript
// apps/worker/src/processor.ts

async function updateProgress(jobId: string, progress: number, total: number) {
  await db.updateJob(jobId, { processedChunks: progress })

  // 同时写入 Redis,供前端高频轮询
  await getRedis().setex(
    `job:progress:${jobId}`,
    300, // 5 分钟 TTL
    JSON.stringify({ progress, total, percent: Math.round((progress / total) * 100) })
  )
}

前端进度接口优先读 Redis,降低 SQLite 压力:

typescript
// apps/api/src/routes/docs.ts
docsRouter.get('/jobs/:id/progress', async (c) => {
  const jobId = c.req.param('id')

  // 先查 Redis 缓存
  const cached = await getRedis().get(`job:progress:${jobId}`)
  if (cached) return c.json(JSON.parse(cached))

  // 缓存 miss,降级查 SQLite
  const job = await db.getJob(jobId)
  if (!job) return c.json({ error: '任务不存在' }, 404)

  return c.json({
    progress: job.processedChunks ?? 0,
    total: job.totalChunks ?? 0,
    percent: job.totalChunks
      ? Math.round(((job.processedChunks ?? 0) / job.totalChunks) * 100)
      : 0,
  })
})

本节产物

apps/api/src/middleware/
  rate-limit.ts         # 滑动窗口限流中间件
apps/api/src/routes/
  docs.ts               # 任务进度接口(Redis 缓存优先)

面试追问

Redis 在你的项目里用来做什么?

进阶项目里 Redis 承担四个角色:1)会话消息存储(List + TTL),用来替代 SQLite 存会话,支持 TTL 自动过期;2)问答结果缓存(String + TTL),相同问题 1 小时内直接命中缓存,不走 LLM;3)请求限流(Sorted Set 滑动窗口),保护 LLM API 不被滥用;4)BullMQ 任务队列的底层存储。

滑动窗口和令牌桶限流有什么区别?

滑动窗口:统计过去 N 秒内的请求总数,超过阈值就拒绝。简单直观,但不能"攒额度"。令牌桶:以固定速率补充令牌,请求消耗令牌,允许短时间突发。适合"平均限速但允许偶尔突发"的场景。进阶课用滑动窗口,更容易理解和调试。

面向前端工程师和独立开发者的 AI 应用工程课程