Redis 消息队列:LIST vs STREAM 差异与适用场景

Redis 提供了两种根本不同的消息队列方案:经典的 LIST 队列和现代的 STREAM 数据类型(Redis 5.0+)。本指南用真实命令进行正面对比。

问题场景

你的应用需要异步消息处理。你可以用 RabbitMQ 或 Kafka,但你已经有 Redis 了。Redis 能搞定吗?可以——但你需要选对数据结构。

基于 LIST 的队列

Redis 中最简单的消息队列模式。生产者推送,消费者弹出。

基本生产者/消费者

RPUSH queue:tasks '{"type":"send_email","to":"user@example.com"}'
RPUSH queue:tasks '{"type":"process_payment","order_id":5001}'
RPUSH queue:tasks '{"type":"generate_report","report_id":301}'

LPOP queue:tasks
LPOP queue:tasks
LLEN queue:tasks

RPUSH 添加到尾部,LPOP 从头部移除——FIFO 顺序。

阻塞弹出(BLPOP)

LPOP 在队列为空时返回 nil。BLPOP 会阻塞直到消息到达:

RPUSH queue:tasks "job:1"
BLPOP queue:tasks 5

5 是超时秒数。消费者最多阻塞 5 秒等待消息。这比在循环中用 LPOP 轮询高效得多。

可靠队列(RPOPLPUSH)

如果消费者在弹出消息后、处理完成前崩溃了怎么办?消息就丢了。

解决方案:RPOPLPUSH(或 Redis 6.2+ 的 LMOVE)原子性地将消息移到处理列表:

RPUSH queue:tasks "job:1" "job:2" "job:3"
RPOPLPUSH queue:tasks queue:processing
LRANGE queue:processing 0 -1
LRANGE queue:tasks 0 -1

处理成功后,从处理列表中移除:

LREM queue:processing 1 "job:1"

如果消费者崩溃,queue:processing 中的消息可以重新入队。

LIST 队列的局限

  • 没有消费者组——只有一个消费者能弹出消息。
  • 没有内置的消息确认机制。
  • 不能重放消息——弹出后就没了。
  • 没有消息 ID——无法追踪单个消息。

基于 STREAM 的队列

Redis Stream(5.0+)是专为消息传递设计的类日志数据结构。

添加消息

XADD stream:events * type "user_signup" user_id "1001"
XADD stream:events * type "order_placed" order_id "5001"
XADD stream:events * type "payment_received" order_id "5001"

* 自动生成唯一 ID(基于时间戳)。

读取消息

XRANGE stream:events - +
XLEN stream:events

XRANGE - + 从头到尾读取所有消息。

读取新消息(尾部跟踪)

XREAD COUNT 2 STREAMS stream:events 0-0

0-0 从头开始读。使用 $ 只读取新消息(类似 tail -f)。

消费者组

这是 Stream 的亮点。多个消费者可以分担工作负载:

XGROUP CREATE stream:events group1 0

XREADGROUP GROUP group1 consumer-a COUNT 1 STREAMS stream:events >
XREADGROUP GROUP group1 consumer-b COUNT 1 STREAMS stream:events >

> 表示"给我这个组中还没有分发给任何消费者的消息"。

消息确认

处理完成后,确认消息:

XACK stream:events group1 1739600001-0

未确认的消息可以被其他消费者认领(用于崩溃恢复):

XPENDING stream:events group1 - + 10

认领滞留消息

如果消费者挂了,另一个可以认领它的待处理消息:

XAUTOCLAIM stream:events group1 consumer-b 60000 0-0

认领待处理超过 60 秒的消息。

正面对比

特性LISTSTREAM
数据模型简单队列追加日志
消费者组不支持支持
消息确认手动(RPOPLPUSH)内置(XACK)
消息重放不支持(破坏性读取)支持(非破坏性)
消息 ID自动生成
阻塞读取BLPOPXREAD BLOCK
内存效率更高较低(存储 ID + 字段)
最大吞吐量更高略低
复杂度简单中等

何时使用 LIST

  • 单消费者的简单任务队列。
  • 不需要消息重放或确认。
  • 最大吞吐量是关键。
  • 你想要最简单的实现。
RPUSH jobs "task:1" "task:2" "task:3"
LPOP jobs
LPOP jobs
LLEN jobs

何时使用 STREAM

  • 多个消费者需要分担工作负载(消费者组)。
  • 需要消息确认和重试。
  • 想要重放消息(如调试或新消费者加入)。
  • 需要带 ID 的消息排序保证。
XADD events * action "click" page "/home"
XADD events * action "purchase" item "widget"
XRANGE events - +
XLEN events

混合模式:LIST 求速度,STREAM 求可靠

有些架构两者都用:

  1. 热路径:LIST 队列追求最大吞吐。
  2. 审计追踪:STREAM 用于日志和重放。
RPUSH queue:fast "job:1"
XADD stream:audit * job_id "1" status "queued"

LPOP queue:fast
XADD stream:audit * job_id "1" status "processing"

常见坑点

  1. LIST 内存:无界列表会无限增长。监控 LLEN 并设置告警。
  2. STREAM 内存:Stream 也会无限增长。使用 MAXLEN 限制:
XADD stream:events MAXLEN ~ 10000 * type "event" data "payload"
XLEN stream:events

~ 允许近似裁剪以获得更好的性能。

  1. BLPOP 超时:超时设为 0 会永远阻塞。始终设置合理的超时。
  2. 消费者组延迟:用 XPENDING 监控待处理消息。待处理列表不断增长意味着消费者跟不上了。
  3. Stream ID 排序:除非你理解其含义,否则不要使用自定义 ID。自动生成的 ID 保证排序。

在线编辑器试一试

前往 Redis 在线编辑器 对比两种方案:

RPUSH queue:list "msg:1" "msg:2" "msg:3"
LPOP queue:list
LLEN queue:list

XADD stream:demo * greeting "hello" from "alice"
XADD stream:demo * greeting "world" from "bob"
XRANGE stream:demo - +
XLEN stream:demo

注意 LPOP 是怎么移除消息的(破坏性),而 XRANGE 读取但不移除(非破坏性)。这就是根本区别。