Appearance
课 2 · 限流与队列
本课目标
用 Redis 实现请求限流中间件,并了解 BullMQ 在任务队列场景下的 Redis 使用方式。
关键理解:限流保护的不只是 API 服务器,更重要的是保护背后的 LLM API 账单。
为什么要限流
AI 应用有一个独特的成本结构:每次 LLM 调用都产生费用。没有限流,一个恶意用户或程序 Bug 可能几分钟内跑掉几百块。
进阶项目需要两种限流:
| 类型 | 场景 | 实现 |
|---|---|---|
| IP 限流 | 防止单 IP 刷接口 | 滑动窗口计数 |
| 用户限流 | 限制单用户 LLM 调用频率 | 令牌桶(简化版) |
滑动窗口限流
滑动窗口比固定窗口更平滑,用 Redis Sorted Set 实现:
typescript
// apps/api/src/middleware/rate-limit.ts
import { getRedis } from '@shared/lib/redis'
import type { MiddlewareHandler } from 'hono'
interface RateLimitOptions {
windowMs: number // 时间窗口(毫秒)
max: number // 窗口内最大请求数
keyPrefix: string // Redis key 前缀
}
export function rateLimit(options: RateLimitOptions): MiddlewareHandler {
return async (c, next) => {
const redis = getRedis()
const ip = c.req.header('x-forwarded-for') ?? 'unknown'
const key = `${options.keyPrefix}:${ip}`
const now = Date.now()
const windowStart = now - options.windowMs
// Sorted Set:score = 时间戳,member = 时间戳(唯一)
const pipeline = redis.pipeline()
pipeline.zremrangebyscore(key, 0, windowStart) // 清除窗口外的记录
pipeline.zadd(key, now, `${now}-${Math.random()}`) // 记录本次请求
pipeline.zcard(key) // 当前窗口请求数
pipeline.expire(key, Math.ceil(options.windowMs / 1000)) // 设置过期
const results = await pipeline.exec()
const count = results?.[2]?.[1] as number
if (count > options.max) {
return c.json(
{ error: '请求过于频繁,请稍后再试' },
429,
{ 'Retry-After': String(Math.ceil(options.windowMs / 1000)) }
)
}
await next()
}
}挂载到 chat 路由:
typescript
// apps/api/src/routes/chat.ts
chatRouter.use(
rateLimit({
windowMs: 60 * 1000, // 1 分钟
max: 10, // 每分钟最多 10 次问答
keyPrefix: 'rl:chat',
})
)BullMQ 的 Redis 使用方式
BullMQ 在 Redis 里用三种数据结构:
| 数据结构 | 用途 |
|---|---|
| Sorted Set | queue:wait — 等待任务(score = 优先级/延迟时间) |
| Hash | queue:job:id — 任务详情 |
| List | queue:active — 正在处理的任务 ID |
理解这个结构对面试很有帮助,因为面试官常问"BullMQ 底层是怎么用 Redis 的"。
可以用 Redis CLI 直接观察:
bash
# 连接到 Redis
docker exec -it <container> redis-cli
# 查看等待队列
ZRANGE bull:ingest:wait 0 -1 WITHSCORES
# 查看任务详情
HGETALL bull:ingest:job:1任务状态用 Redis 缓存(避免频繁查 SQLite)
任务进度轮询可能很频繁,用 Redis 缓存当前状态:
typescript
// apps/worker/src/processor.ts
async function updateProgress(jobId: string, progress: number, total: number) {
await db.updateJob(jobId, { processedChunks: progress })
// 同时写入 Redis,供前端高频轮询
await getRedis().setex(
`job:progress:${jobId}`,
300, // 5 分钟 TTL
JSON.stringify({ progress, total, percent: Math.round((progress / total) * 100) })
)
}前端进度接口优先读 Redis,降低 SQLite 压力:
typescript
// apps/api/src/routes/docs.ts
docsRouter.get('/jobs/:id/progress', async (c) => {
const jobId = c.req.param('id')
// 先查 Redis 缓存
const cached = await getRedis().get(`job:progress:${jobId}`)
if (cached) return c.json(JSON.parse(cached))
// 缓存 miss,降级查 SQLite
const job = await db.getJob(jobId)
if (!job) return c.json({ error: '任务不存在' }, 404)
return c.json({
progress: job.processedChunks ?? 0,
total: job.totalChunks ?? 0,
percent: job.totalChunks
? Math.round(((job.processedChunks ?? 0) / job.totalChunks) * 100)
: 0,
})
})本节产物
apps/api/src/middleware/
rate-limit.ts # 滑动窗口限流中间件
apps/api/src/routes/
docs.ts # 任务进度接口(Redis 缓存优先)面试追问
Redis 在你的项目里用来做什么?
进阶项目里 Redis 承担四个角色:1)会话消息存储(List + TTL),用来替代 SQLite 存会话,支持 TTL 自动过期;2)问答结果缓存(String + TTL),相同问题 1 小时内直接命中缓存,不走 LLM;3)请求限流(Sorted Set 滑动窗口),保护 LLM API 不被滥用;4)BullMQ 任务队列的底层存储。
滑动窗口和令牌桶限流有什么区别?
滑动窗口:统计过去 N 秒内的请求总数,超过阈值就拒绝。简单直观,但不能"攒额度"。令牌桶:以固定速率补充令牌,请求消耗令牌,允许短时间突发。适合"平均限速但允许偶尔突发"的场景。进阶课用滑动窗口,更容易理解和调试。