Redis 消息队列:LIST vs STREAM 差异与适用场景
Redis 提供了两种根本不同的消息队列方案:经典的 LIST 队列和现代的 STREAM 数据类型(Redis 5.0+)。本指南用真实命令进行正面对比。
问题场景
你的应用需要异步消息处理。你可以用 RabbitMQ 或 Kafka,但你已经有 Redis 了。Redis 能搞定吗?可以——但你需要选对数据结构。
基于 LIST 的队列
Redis 中最简单的消息队列模式。生产者推送,消费者弹出。
基本生产者/消费者
RPUSH queue:tasks '{"type":"send_email","to":"user@example.com"}'
RPUSH queue:tasks '{"type":"process_payment","order_id":5001}'
RPUSH queue:tasks '{"type":"generate_report","report_id":301}'
LPOP queue:tasks
LPOP queue:tasks
LLEN queue:tasks
RPUSH 添加到尾部,LPOP 从头部移除——FIFO 顺序。
阻塞弹出(BLPOP)
LPOP 在队列为空时返回 nil。BLPOP 会阻塞直到消息到达:
RPUSH queue:tasks "job:1"
BLPOP queue:tasks 5
5 是超时秒数。消费者最多阻塞 5 秒等待消息。这比在循环中用 LPOP 轮询高效得多。
可靠队列(RPOPLPUSH)
如果消费者在弹出消息后、处理完成前崩溃了怎么办?消息就丢了。
解决方案:RPOPLPUSH(或 Redis 6.2+ 的 LMOVE)原子性地将消息移到处理列表:
RPUSH queue:tasks "job:1" "job:2" "job:3"
RPOPLPUSH queue:tasks queue:processing
LRANGE queue:processing 0 -1
LRANGE queue:tasks 0 -1
处理成功后,从处理列表中移除:
LREM queue:processing 1 "job:1"
如果消费者崩溃,queue:processing 中的消息可以重新入队。
LIST 队列的局限
- 没有消费者组——只有一个消费者能弹出消息。
- 没有内置的消息确认机制。
- 不能重放消息——弹出后就没了。
- 没有消息 ID——无法追踪单个消息。
基于 STREAM 的队列
Redis Stream(5.0+)是专为消息传递设计的类日志数据结构。
添加消息
XADD stream:events * type "user_signup" user_id "1001"
XADD stream:events * type "order_placed" order_id "5001"
XADD stream:events * type "payment_received" order_id "5001"
* 自动生成唯一 ID(基于时间戳)。
读取消息
XRANGE stream:events - +
XLEN stream:events
XRANGE - + 从头到尾读取所有消息。
读取新消息(尾部跟踪)
XREAD COUNT 2 STREAMS stream:events 0-0
0-0 从头开始读。使用 $ 只读取新消息(类似 tail -f)。
消费者组
这是 Stream 的亮点。多个消费者可以分担工作负载:
XGROUP CREATE stream:events group1 0
XREADGROUP GROUP group1 consumer-a COUNT 1 STREAMS stream:events >
XREADGROUP GROUP group1 consumer-b COUNT 1 STREAMS stream:events >
> 表示"给我这个组中还没有分发给任何消费者的消息"。
消息确认
处理完成后,确认消息:
XACK stream:events group1 1739600001-0
未确认的消息可以被其他消费者认领(用于崩溃恢复):
XPENDING stream:events group1 - + 10
认领滞留消息
如果消费者挂了,另一个可以认领它的待处理消息:
XAUTOCLAIM stream:events group1 consumer-b 60000 0-0
认领待处理超过 60 秒的消息。
正面对比
| 特性 | LIST | STREAM |
|---|---|---|
| 数据模型 | 简单队列 | 追加日志 |
| 消费者组 | 不支持 | 支持 |
| 消息确认 | 手动(RPOPLPUSH) | 内置(XACK) |
| 消息重放 | 不支持(破坏性读取) | 支持(非破坏性) |
| 消息 ID | 无 | 自动生成 |
| 阻塞读取 | BLPOP | XREAD BLOCK |
| 内存效率 | 更高 | 较低(存储 ID + 字段) |
| 最大吞吐量 | 更高 | 略低 |
| 复杂度 | 简单 | 中等 |
何时使用 LIST
- 单消费者的简单任务队列。
- 不需要消息重放或确认。
- 最大吞吐量是关键。
- 你想要最简单的实现。
RPUSH jobs "task:1" "task:2" "task:3"
LPOP jobs
LPOP jobs
LLEN jobs
何时使用 STREAM
- 多个消费者需要分担工作负载(消费者组)。
- 需要消息确认和重试。
- 想要重放消息(如调试或新消费者加入)。
- 需要带 ID 的消息排序保证。
XADD events * action "click" page "/home"
XADD events * action "purchase" item "widget"
XRANGE events - +
XLEN events
混合模式:LIST 求速度,STREAM 求可靠
有些架构两者都用:
- 热路径:LIST 队列追求最大吞吐。
- 审计追踪:STREAM 用于日志和重放。
RPUSH queue:fast "job:1"
XADD stream:audit * job_id "1" status "queued"
LPOP queue:fast
XADD stream:audit * job_id "1" status "processing"
常见坑点
- LIST 内存:无界列表会无限增长。监控
LLEN并设置告警。 - STREAM 内存:Stream 也会无限增长。使用
MAXLEN限制:
XADD stream:events MAXLEN ~ 10000 * type "event" data "payload"
XLEN stream:events
~ 允许近似裁剪以获得更好的性能。
- BLPOP 超时:超时设为 0 会永远阻塞。始终设置合理的超时。
- 消费者组延迟:用
XPENDING监控待处理消息。待处理列表不断增长意味着消费者跟不上了。 - Stream ID 排序:除非你理解其含义,否则不要使用自定义 ID。自动生成的 ID 保证排序。
在线编辑器试一试
前往 Redis 在线编辑器 对比两种方案:
RPUSH queue:list "msg:1" "msg:2" "msg:3"
LPOP queue:list
LLEN queue:list
XADD stream:demo * greeting "hello" from "alice"
XADD stream:demo * greeting "world" from "bob"
XRANGE stream:demo - +
XLEN stream:demo
注意 LPOP 是怎么移除消息的(破坏性),而 XRANGE 读取但不移除(非破坏性)。这就是根本区别。