使用Redis做消息队列

本文总阅读量

前记

Redis拥有很多数据结构, 是一个非常灵活的内存数据库, 基本上很多后端业务都会用到Redis, 在网络社区中, 经常听到可以使用Redis来当做XXX的问题, 在第一次接触消息队列时, 会有一个疑问, 可以使用Redis当做消息队列吗? Redis容易部署与维护, 而且一般都业务已经有一套Redis在跑了, 开发人员也拥有了一些使用经验, 为什么一定要重新去用Kafka或者RabbitMQ呢? 如果选用Redis做消息队列, 哪用哪种数据结构会比较好呢? 在Redis中, 可以使用Stream当做消息队列, 但是由于Redis是一个内存型数据结构, 所以他并不是一个完备的消息队列, 还是有丢日志的可能, 如果要确保一个日志都不丢, 那还是得上硬盘型的消息队列.

1.消息队列:

在内存中实现一个消息队列很简单, 消息队列的基础数据结构模型就是双端队列(双向链表), 然后只要确保这个双端队列的一端由生产者推数据进去, 消费者从另一端获取数据, 就是一个简单版的消息队列了.
但是这种实现是在程序内部, 需要考虑的事比较少, 如果转移到一个公共服务, 需要考虑的问题就很多了, 比如消息堆积, 还有网络的导致消息的收发问题等等, 而解决这些问题也可以认为是消息队列的特性:

  • 1.支持重复消费(可选), 这个往往不是最重要的, 但有这个特性会比较好.
  • 2.每个消息应该确保被消费, 也就是消息不丢, 即使堆积了很多消息, 前面的消息也不能删除, 除非人为确定不需要了.
  • 3.消息按照发送的顺序消费, 消费者的消费消息顺序和发送者发送的发送顺序一致.
  • 4.支持阻塞等待拉消息, 防止消费者CPU空转.
  • 5.消息不会因为自己消息队列本身服务挂了而丢数据.
  • 6.有完善的消息队列监控.
  • 7.可以对未完成消息的处理.

2.List

Stream未出现之前, 很多新手都会使用(或者想使用)List来当做消息队列, List他的底层实现是一个双向链表, 他能从头部和尾部操作元素, 能满足先进先出, 而且操作的时间复杂度都是O(1), 这都很符合消息队列的基础模型.
如果把List当做消息队列, 那么可以通过以下语法使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# Redis支持双端操作, 以下遵循左进右出

# 使用LPUSH语法, 往test_queue队列, 并放入1, 队列不存在自动创建
127.0.0.1:6379> LPUSH test_queue 1
(integer) 1
127.0.0.1:6379> LPUSH test_queue 2
(integer) 2
127.0.0.1:6379> LPUSH test_queue 3
(integer) 3
# 通过命令 查看队列可以发现 目前有三个值
127.0.0.1:6379> LRANGE test_queue 0 5
1) "3"
2) "2"
3) "1"
# 弹出最先进去的值 1
127.0.0.1:6379> RPOP test_queue
"1"
# 只剩下后面进来的2和3
127.0.0.1:6379> LRANGE test_queue 0 5
1) "3"
2) "2"
127.0.0.1:6379> RPOP test_queue
"2"
127.0.0.1:6379> RPOP test_queue
"3"
# 元素弹完了, 队列被删除
127.0.0.1:6379> LRANGE test_queue 0 5
(empty list or set)

看完上面的操作, 会发现非常容易使用也非常容易理解, 但是消费者与Redis是通过网络交互的, 且不是在同一个进程, 这时候需要考虑的问题就来了, 当队列中没有消息时
消费者在执行RPOP时, 会发现队列是空的, 为了能继续工作, 程序里需要写一个死循环, 一直等数据到来(一般来说消费者也都是在一个循环里面), 这个时候不止会让CPU空转, 还会一直发消息给Redis, 增加Redis的压力.一般情况下会考虑增加一个休眠都逻辑, 不过休眠过长, 会影响消费的及时性, 还好Redis是一个服务, 他能提供了一个机制: 当队列为空且消费者来拉数据时, 让消费者先等待, 直到有数据过来, 再返回数据给消费者, 这样就可以减少很多网络请求了. 而要使用这个机制也很简单, 只要把RPOP改为BRPOP即可, BRPOPB就是block的意思:

1
2
3
4
127.0.0.1:6379> LPUSH test_queue 1
(integer) 1
127.0.0.1:6379> BRPOP test_queue
"1"

不过需要注意的是, 客户端一定要启用保活机制或者重连机制, 防止太久没有发送数据被Redis服务判断链接失活断开.此外, Redis无法知道消费者获取消息后有没有正确处理完消息, 可能在获得消息后就丢了消息, 或者处理时突然发生异常, 就直接丢消息了.

可以发现List没办法确保消息一定能被消费是很恐怖的一件事, 除非这个系统能容忍丢一些消息, 但是使用者是无法知道List丢了那些消息, 除非发送者和接收者都有日志记录, 最后再通过日志记录去匹配.

2.Pub/Sub

List只能有一个消费者可以获得到消息, 那如果有多个消费者, 只要确保消费者收到消息后处理消息能逻辑能幂等, 那就能降低丢日志的可能性了, 虽然整个系统的性能会偏差一些.

Pub/Sub原本的意思是发布/订阅, 生成者发布一条消息, 多个订阅的消费者都能得到这条消息.

在使用Pub/Sub中, 需要启动订阅者, 之后订阅者会阻塞, 等待消息的到来,最后再启用发送:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 启动消费者1
127.0.0.1:6379> SUBSCRIBE test_queue
subscribe from test_queue: 1
# 启动消费者2
127.0.0.1:6379> SUBSCRIBE test_queue
subscribe from test_queue: 2
# 发送者发送消息
127.0.0.1:6379> PUBLISH test_queue a
(integer) 1

# 消费者1收到的消息
127.0.0.1:6379> SUBSCRIBE test_queue
subscribe from test_queue: 1
message from test_queue: a
# 消费者2收到的消息
127.0.0.1:6379> SUBSCRIBE test_queue
subscribe from test_queue: 2
message from test_queue: a

可以看到Pub/Sub的使用方式也很简单, 但是通过使用Pub/Sub以及消费者幂等处理消息, 虽然能解决部分丢数据的问题, 但是Pub/Sub的实现会带来一个更明显的缺点, 由于Pub/Sub没有基于任何数据类型, 只是一个通道, 把数据从一端转发到一端, 不会有任何的数据存储, 如果这时候消费者下线了, 那么这个消息就没办法再到消费者手中, 如果所有消费者都下线了, 那当前的消息就会被丢弃了, 这个直接违背了消息队列的原则.
此外, 在消息积压上面也有个致命的问题, 发布者发布消息后, Redis会发给消费者, 如果消费者处理不及时, 会把消息积压在发送给消费者的缓冲区之中, 如果这个缓冲区的数量达到配置的上限, 那么Redis就会认为这个消费者可能失去了消费能力, 与他断开连接, 所以Pub/Sub方案是不可取的.

Stream

综合上面两个类型后, 可以发现Redis并不是很胜任消息队列这个应用, 但很多人还会把RedisList当做消息队列, Redis的作者还开发了一个叫disque的项目, 他就是一个基于内存的分布式消息队列, 在Redis 5.0之后, 合并到Redis之中, 也就是Stream, Stream是一个完备的分布式内存消息队列, 如果用过其他类似的内存消息队列, 会发现别的内存消息队列有的, Stream基本都有.

首先看看Stream的一个简单应用–发布与消费
Stream之中, 通过XADD发布消息, 通过XREAD消费消息, 同时他的消息都带有自己的唯一ID, 也可以是我们自己设置的ID, 但要确保唯一, 使用如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# *代表redis自己生成的消息id, 格式是timestamp + 自增序号
127.0.0.1:6379> XADD test_queue * field1 value1
"1620618087884-0"
127.0.0.1:6379> XADD test_queue * field1 value1 field2 value2
"1620618092658-0"

# 代表从头获取3个消息, 由于目前只有2个, 只返回两个
127.0.0.1:6379> XREAD COUNT 3 STREAMS test_queue 0-0
1) 1) "test_queue"
2) 1) 1) "1620618087884-0"
2) 1) "field1"
2) "value1"
2) 1) "1620618092658-0"
2) 1) "field1"
2) "value1"
3) "field2"
4) "value2"
# 获取上个消息ID之后的消息

# 如果没有消息, 会返回空(这里是iredis客户端解析的问题)
127.0.0.1:6379> XREAD COUNT 1 STREAMS test_queue 1619024293085-0
(error) ERROR 'NoneType' object is not iterable
# 可以改用阻塞式拉取, 设置为0代表不超时, 设置其他参数代表超时时间, 单位为秒, 一般建议设置一个自己的超时时间数, 如果超时了继续发起一个等待, 防止没有开保活之类的机制导致客户端与服务端的链接断开.
127.0.0.1:6379> XREAD COUNT 1 BLOCK 0 STREAMS test_queue 1619024293085-0

看起来Stream的发送和消费逻辑都挺正常的, 消费者只要通过上个消息的ID 来请求, 就能一直获取到新的消息, 而且还能读历史数据(可以试着断开客户端重新连上, 还是一样能获得到消息的).不过上面的XREAD是独立消费的命令, 一般情况下还是用消费者组来消费会比较好. RedisStream使用了一种类似Kafka的消费组概念, 如下图, 生产者会把数据投到消息队列中, 每个消息都带有自己的消息id, 且这些id是趋势递增的, 而与消息队列接触的只有消费组, 随着组内成员的消费, 消费者组会往右移动, 消费者组中有个last_delivered_is, 用来记录当前移动到哪里了. 而消费者只能订阅到消费组, 消费组中所有的消费者共享着同一个last_deliverd_id标记, 多个消费者会共同消费到一个消费者组的消息, 他们不会读到下标一样的消息.
此外, 每个消费者还有一个叫pending_ids的变量, 用来表示已经被客户端读取到的但是还未ack的消息id, 客户端获取消息时, 就会把消息id存到这个变量, 如果收到客户端发过来该消息的id,则从变量中把该消息id移除.

redis-stream

Stream功能比较多, 以下是几个使用Stream的例子:

  • 发布/订阅模式:
    了解了Pub/Sub后, 可以知道在消息队列中使用发布/订阅模式可以做一些兜底的操作, 比如消息需要比较及时响应时, 有多个消费者在消费同一个消息, 即使有一个挂了, 其他的消费者也能正常处理消息.
    Stream中, 可以通过消费者组实现发布/订阅模式:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    # 创建消费者组1
    127.0.0.1:6379> XGROUP CREATE test_queue test_group1 0-0
    "OK"
    # 创建消费者组2
    127.0.0.1:6379> XGROUP CREATE test_queue test_group2 0-0
    "OK"
    # 删除消费者组2
    127.0.0.1:6379> XGROUP DESTROY test_queue test_group2
    (integer) 1
    # 创建消费者组2, 并设置以第二条消息的id为起点
    127.0.0.1:6379> XGROUP CREATE test_queue test_group2 1619024299524-0
    "OK"
    # 第一组消费者开始消费, 只消费一条, > 代表从最新数据开始拉取, 由于没有携带NOCAK, 命令是自动ACK掉消息的
    127.0.0.1:6379> XREADGROUP GROUP test_group1 consumer COUNT 1 STREAMS test_queue >
    1) 1) "test_queue"
    2) 1) 1) "1619024293085-0"
    2) 1) "field1"
    2) "value1"
    127.0.0.1:6379> XREADGROUP GROUP test_group1 consumer COUNT 1 STREAMS test_queue >
    1) 1) "test_queue"
    2) 1) 1) "1619024299524-0"
    2) 1) "field1"
    2) "value1"
    3) "field2"
    4) "value2"
    # 第二组由于设置了是从第二条消息之后为起点, 所以是拿不到数据的
    127.0.0.1:6379> XREADGROUP GROUP test_group2 consumer COUNT 1 STREAMS test_queue >
    (error) ERROR 'NoneType' object is not iterable

  • 消费监控
    Stream中带有一个简单的消息监控, 可以看到每个消费组有多少个消费者, 当前消费到那条信息:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    127.0.0.1:6379> xinfo GROUPS test_queue
    1) 1) "name"
    2) "test_group1"
    3) "consumers"
    4) "1"
    5) "pending"
    6) "2"
    7) "last-delivered-id"
    8) "1620618092658-0"
    2) 1) "name"
    2) "test_group2"
    3) "consumers"
    4) "0"
    5) "pending"
    6) "0"
    7) "last-delivered-id"
    8) "1620618087884-0"
  • ACK
    从消费者监控中可以看到, test_group1中还有两条pending的消息, 这些都是消费者还未ack的消息, 当消费者启动时, 需要通过命令xpending主动查看尚在pending的消息, 根据消息id, 再次消费并提交ACK.这样就可以确保所有消息都能被消费者正常消费, 不会因为消费者本身的问题造成漏消费.
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    # 查看还未消费的消息和信息
    127.0.0.1:6379> xpending test_queue test_group1
    1) "2"
    2) "1620618087884-0"
    3) "1620618092658-0"
    4) 1) 1) "consumer"
    2) "2"
    # 直接使用xac膜模拟消费者已经消费消息
    127.0.0.1:6379> xack test_queue test_group1 1620618087884-0
    (integer) 1
    # 可以发现已经正常消费
    127.0.0.1:6379> xpending test_queue test_group1
    1) "1"
    2) "1620618092658-0"
    3) "1620618092658-0"
    4) 1) 1) "consumer"
    2) "1"
  • 消息堆积
    当消息队列发生消息堆积时, 一般会通过生产者限流或者丢弃旧消息两种方式, 由于Redis是一个内存型的应用, 所以空间是有限的, 不能无限增长, 同时删除消息的xdel命令也不是真正的删除消息, 而是打上一个标记位, 所以Stream很容易遇到队列满的情况, 在这种情况下, 生产者能通过调用xlen命令来查看队列的情况, 决定是否限流, 也可以使用maxlen选项, 在创建Stream的队列时, 规定队列的最长长度, 当队列里面积压的消息超过限度时, 旧消息就会被扔掉了.但是Stream是数据结构比较特殊(基于基数树), 如果经常做调整, 把消息队列限制在固定的数字, 那么会对性能造成一定的影响, 因此maxlen支持带~的命令,带上~后,Stream会基于当前的结构进行合理的调整, 比如设置为1000时, Stream会保证消息队列的长度大于1000后会进行调整, 但可能是1030, 1050之类的.
    1
    2
    3
    4
    # 限制长度为2
    127.0.0.1:6379> XADD test_queue maxlen 2 * field1 value1
    # 带上~选项
    127.0.0.1:6379> XADD test_queue maxlen ~ 2 * field1 value1

总结

通过分析, Stream拥有了很多消息队列的特性, 但由于是内存型的消息队列, 并不是非常完备的, 就像他实现了ack逻辑, 解决了消费者处理消息过程中挂了造成消息丢失的问题, 但是由于Redis的AOF/RDB机制, 如果Redis挂了, 那很有可能造成Stream本身丢失几条消息, 而且Stream的消息堆积是有限的. 在选择是否要用Stream做消息队列时, 要先清楚我们的业务可以容忍丢一两条数据吗(或者有其他兜底), 以及从0上线一个Kafka或者RabbitMQ的成本是否极高, 不如直接使用现成的Redis.

查看评论