使用Redis做消息队列
前记
Redis
拥有很多数据结构, 是一个非常灵活的内存数据库, 基本上很多后端业务都会用到Redis
, 在网络社区中, 经常听到可以使用Redis
来当做XXX的问题, 在第一次接触消息队列时, 会有一个疑问, 可以使用Redis
当做消息队列吗? Redis
容易部署与维护, 而且一般都业务已经有一套Redis
在跑了, 开发人员也拥有了一些使用经验, 为什么一定要重新去用Kafka
或者RabbitMQ
呢? 如果选用Redis
做消息队列, 哪用哪种数据结构会比较好呢? 在Redis
中, 可以使用Stream
当做消息队列, 但是由于Redis
是一个内存型数据结构, 所以他并不是一个完备的消息队列, 还是有丢日志的可能, 如果要确保一个日志都不丢, 那还是得上硬盘型的消息队列.
1.消息队列:
在内存中实现一个消息队列很简单, 消息队列的基础数据结构模型就是双端队列(双向链表), 然后只要确保这个双端队列的一端由生产者推数据进去, 消费者从另一端获取数据, 就是一个简单版的消息队列了.
但是这种实现是在程序内部, 需要考虑的事比较少, 如果转移到一个公共服务, 需要考虑的问题就很多了, 比如消息堆积, 还有网络的导致消息的收发问题等等, 而解决这些问题也可以认为是消息队列的特性:
- 1.支持重复消费(可选), 这个往往不是最重要的, 但有这个特性会比较好.
- 2.每个消息应该确保被消费, 也就是消息不丢, 即使堆积了很多消息, 前面的消息也不能删除, 除非人为确定不需要了.
- 3.消息按照发送的顺序消费, 消费者的消费消息顺序和发送者发送的发送顺序一致.
- 4.支持阻塞等待拉消息, 防止消费者CPU空转.
- 5.消息不会因为自己消息队列本身服务挂了而丢数据.
- 6.有完善的消息队列监控.
- 7.可以对未完成消息的处理.
2.List
在Stream
未出现之前, 很多新手都会使用(或者想使用)List
来当做消息队列, List
他的底层实现是一个双向链表, 他能从头部和尾部操作元素, 能满足先进先出, 而且操作的时间复杂度都是O(1), 这都很符合消息队列的基础模型.
如果把List
当做消息队列, 那么可以通过以下语法使用:
1 |
|
看完上面的操作, 会发现非常容易使用也非常容易理解, 但是消费者与Redis
是通过网络交互的, 且不是在同一个进程, 这时候需要考虑的问题就来了, 当队列中没有消息时
消费者在执行RPOP
时, 会发现队列是空的, 为了能继续工作, 程序里需要写一个死循环, 一直等数据到来(一般来说消费者也都是在一个循环里面), 这个时候不止会让CPU空转, 还会一直发消息给Redis
, 增加Redis
的压力.一般情况下会考虑增加一个休眠都逻辑, 不过休眠过长, 会影响消费的及时性, 还好Redis
是一个服务, 他能提供了一个机制: 当队列为空且消费者来拉数据时, 让消费者先等待, 直到有数据过来, 再返回数据给消费者, 这样就可以减少很多网络请求了. 而要使用这个机制也很简单, 只要把RPOP
改为BRPOP
即可, BRPOP
的B
就是block的意思:
1 |
|
不过需要注意的是, 客户端一定要启用保活机制或者重连机制, 防止太久没有发送数据被Redis
服务判断链接失活断开.此外, Redis
无法知道消费者获取消息后有没有正确处理完消息, 可能在获得消息后就丢了消息, 或者处理时突然发生异常, 就直接丢消息了.
可以发现List
没办法确保消息一定能被消费是很恐怖的一件事, 除非这个系统能容忍丢一些消息, 但是使用者是无法知道List
丢了那些消息, 除非发送者和接收者都有日志记录, 最后再通过日志记录去匹配.
2.Pub/Sub
List
只能有一个消费者可以获得到消息, 那如果有多个消费者, 只要确保消费者收到消息后处理消息能逻辑能幂等, 那就能降低丢日志的可能性了, 虽然整个系统的性能会偏差一些.
Pub/Sub
原本的意思是发布/订阅, 生成者发布一条消息, 多个订阅的消费者都能得到这条消息.
在使用Pub/Sub
中, 需要启动订阅者, 之后订阅者会阻塞, 等待消息的到来,最后再启用发送:
1 |
|
可以看到Pub/Sub
的使用方式也很简单, 但是通过使用Pub/Sub
以及消费者幂等处理消息, 虽然能解决部分丢数据的问题, 但是Pub/Sub
的实现会带来一个更明显的缺点, 由于Pub/Sub
没有基于任何数据类型, 只是一个通道, 把数据从一端转发到一端, 不会有任何的数据存储, 如果这时候消费者下线了, 那么这个消息就没办法再到消费者手中, 如果所有消费者都下线了, 那当前的消息就会被丢弃了, 这个直接违背了消息队列的原则.
此外, 在消息积压上面也有个致命的问题, 发布者发布消息后, Redis
会发给消费者, 如果消费者处理不及时, 会把消息积压在发送给消费者的缓冲区之中, 如果这个缓冲区的数量达到配置的上限, 那么Redis
就会认为这个消费者可能失去了消费能力, 与他断开连接, 所以Pub/Sub
方案是不可取的.
Stream
综合上面两个类型后, 可以发现Redis
并不是很胜任消息队列这个应用, 但很多人还会把Redis
的List
当做消息队列, Redis
的作者还开发了一个叫disque
的项目, 他就是一个基于内存的分布式消息队列, 在Redis
5.0之后, 合并到Redis
之中, 也就是Stream
, Stream
是一个完备的分布式内存消息队列, 如果用过其他类似的内存消息队列, 会发现别的内存消息队列有的, Stream
基本都有.
首先看看Stream
的一个简单应用–发布与消费
在Stream
之中, 通过XADD
发布消息, 通过XREAD
消费消息, 同时他的消息都带有自己的唯一ID, 也可以是我们自己设置的ID, 但要确保唯一, 使用如下:
1 |
|
看起来Stream
的发送和消费逻辑都挺正常的, 消费者只要通过上个消息的ID 来请求, 就能一直获取到新的消息, 而且还能读历史数据(可以试着断开客户端重新连上, 还是一样能获得到消息的).不过上面的XREAD
是独立消费的命令, 一般情况下还是用消费者组来消费会比较好. Redis
的Stream
使用了一种类似Kafka
的消费组概念, 如下图, 生产者会把数据投到消息队列中, 每个消息都带有自己的消息id, 且这些id是趋势递增的, 而与消息队列接触的只有消费组, 随着组内成员的消费, 消费者组会往右移动, 消费者组中有个last_delivered_is, 用来记录当前移动到哪里了. 而消费者只能订阅到消费组, 消费组中所有的消费者共享着同一个last_deliverd_id标记, 多个消费者会共同消费到一个消费者组的消息, 他们不会读到下标一样的消息.
此外, 每个消费者还有一个叫pending_ids的变量, 用来表示已经被客户端读取到的但是还未ack的消息id, 客户端获取消息时, 就会把消息id存到这个变量, 如果收到客户端发过来该消息的id,则从变量中把该消息id移除.
Stream
功能比较多, 以下是几个使用Stream
的例子:
- 发布/订阅模式:
了解了Pub/Sub
后, 可以知道在消息队列中使用发布/订阅模式可以做一些兜底的操作, 比如消息需要比较及时响应时, 有多个消费者在消费同一个消息, 即使有一个挂了, 其他的消费者也能正常处理消息.
在Stream
中, 可以通过消费者组实现发布/订阅模式:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29# 创建消费者组1
127.0.0.1:6379> XGROUP CREATE test_queue test_group1 0-0
"OK"
# 创建消费者组2
127.0.0.1:6379> XGROUP CREATE test_queue test_group2 0-0
"OK"
# 删除消费者组2
127.0.0.1:6379> XGROUP DESTROY test_queue test_group2
(integer) 1
# 创建消费者组2, 并设置以第二条消息的id为起点
127.0.0.1:6379> XGROUP CREATE test_queue test_group2 1619024299524-0
"OK"
# 第一组消费者开始消费, 只消费一条, > 代表从最新数据开始拉取, 由于没有携带NOCAK, 命令是自动ACK掉消息的
127.0.0.1:6379> XREADGROUP GROUP test_group1 consumer COUNT 1 STREAMS test_queue >
1) 1) "test_queue"
2) 1) 1) "1619024293085-0"
2) 1) "field1"
2) "value1"
127.0.0.1:6379> XREADGROUP GROUP test_group1 consumer COUNT 1 STREAMS test_queue >
1) 1) "test_queue"
2) 1) 1) "1619024299524-0"
2) 1) "field1"
2) "value1"
3) "field2"
4) "value2"
# 第二组由于设置了是从第二条消息之后为起点, 所以是拿不到数据的
127.0.0.1:6379> XREADGROUP GROUP test_group2 consumer COUNT 1 STREAMS test_queue >
(error) ERROR 'NoneType' object is not iterable - 消费监控
在Stream
中带有一个简单的消息监控, 可以看到每个消费组有多少个消费者, 当前消费到那条信息:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17127.0.0.1:6379> xinfo GROUPS test_queue
1) 1) "name"
2) "test_group1"
3) "consumers"
4) "1"
5) "pending"
6) "2"
7) "last-delivered-id"
8) "1620618092658-0"
2) 1) "name"
2) "test_group2"
3) "consumers"
4) "0"
5) "pending"
6) "0"
7) "last-delivered-id"
8) "1620618087884-0" - ACK
从消费者监控中可以看到, test_group1中还有两条pending的消息, 这些都是消费者还未ack的消息, 当消费者启动时, 需要通过命令xpending
主动查看尚在pending的消息, 根据消息id, 再次消费并提交ACK.这样就可以确保所有消息都能被消费者正常消费, 不会因为消费者本身的问题造成漏消费.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17# 查看还未消费的消息和信息
127.0.0.1:6379> xpending test_queue test_group1
1) "2"
2) "1620618087884-0"
3) "1620618092658-0"
4) 1) 1) "consumer"
2) "2"
# 直接使用xac膜模拟消费者已经消费消息
127.0.0.1:6379> xack test_queue test_group1 1620618087884-0
(integer) 1
# 可以发现已经正常消费
127.0.0.1:6379> xpending test_queue test_group1
1) "1"
2) "1620618092658-0"
3) "1620618092658-0"
4) 1) 1) "consumer"
2) "1" - 消息堆积
当消息队列发生消息堆积时, 一般会通过生产者限流或者丢弃旧消息两种方式, 由于Redis
是一个内存型的应用, 所以空间是有限的, 不能无限增长, 同时删除消息的xdel
命令也不是真正的删除消息, 而是打上一个标记位, 所以Stream
很容易遇到队列满的情况, 在这种情况下, 生产者能通过调用xlen命令来查看队列的情况, 决定是否限流, 也可以使用maxlen选项, 在创建Stream
的队列时, 规定队列的最长长度, 当队列里面积压的消息超过限度时, 旧消息就会被扔掉了.但是Stream
是数据结构比较特殊(基于基数树), 如果经常做调整, 把消息队列限制在固定的数字, 那么会对性能造成一定的影响, 因此maxlen支持带~
的命令,带上~
后,Stream
会基于当前的结构进行合理的调整, 比如设置为1000时,Stream
会保证消息队列的长度大于1000后会进行调整, 但可能是1030, 1050之类的.1
2
3
4# 限制长度为2
127.0.0.1:6379> XADD test_queue maxlen 2 * field1 value1
# 带上~选项
127.0.0.1:6379> XADD test_queue maxlen ~ 2 * field1 value1
总结
通过分析, Stream
拥有了很多消息队列的特性, 但由于是内存型的消息队列, 并不是非常完备的, 就像他实现了ack逻辑, 解决了消费者处理消息过程中挂了造成消息丢失的问题, 但是由于Redis的AOF/RDB机制, 如果Redis挂了, 那很有可能造成Stream
本身丢失几条消息, 而且Stream
的消息堆积是有限的. 在选择是否要用Stream
做消息队列时, 要先清楚我们的业务可以容忍丢一两条数据吗(或者有其他兜底), 以及从0上线一个Kafka
或者RabbitMQ
的成本是否极高, 不如直接使用现成的Redis
.
- 本文作者:So1n
- 本文链接:http://so1n.me/2021/04/23/%E4%BD%BF%E7%94%A8Redis%E5%BD%93%E5%81%9A%E9%98%9F%E5%88%97/index.html
- 版权声明:本博客所有文章均采用 BY-NC-SA 许可协议,转载请注明出处!