Redis缓存穿透,缓存击穿,缓存雪崩

本文总阅读量

前记

最近使用Redis越来越多了,使用Redis可以提升接口性能,同时也可以基于Redis实现分布式锁以及使用Redis限流等等.不过在使用Redis与Mysql结合时会遇到一些常见的单机缓存相关问题(基本是标配了)缓存穿透、击穿、雪崩.

缓存穿透,击穿,雪崩很像,如果不是为了面试准备,我也可能随时就忘了(逃)

  • 击穿 缓存和数据库都没有数据的时候
  • 穿透 同一个key,缓存没数据,数据库有数据的时候
  • 雪崩 很多个key在同一时刻,缓存没数据,数据库有数据的时候 这类问题十分常见,所以我又是多水了一文啦,不过要确保在引入解决方案时,能尽可能的适应自己的业务,而不是为了引进解决方案而引进解决方案.

注:示例代码做了更新,使用的redis为aioredis,使用的mysql为aiomysql,同时已经把代码更新到Python3.6+可用

1.业务接口开发中的缓存实现方式

为了简单说明,这里使用的db为mysql,缓存为Redis,为了保证缓存能及时跟db同步更新数据,而更新数据一般分为同步刷新或者定时刷新,以下是更新缓存数据的方式说明:

  • 1.1在应用层同时更新缓存和db
    写数据时先把数据写入到db,写成功后再把数据写入缓存.而读数据时先检查缓存中是否有数据,如果有则返回数据,如果没有则从db中获取数据,再把数据写入缓存,最后返回数据.
    这个方式的最简单,最容易理解的,适用范围也是最大的,同时也是坑最多的.
    • 1.先更新db,再更新缓存造成的脏数据
      假设有两个并发,当出现后抵达的操作先更新缓存数据,之后在把先操作的数据写入到缓存数据时就会造成缓存了脏数据,最好的解决办法就是当数据更新时,设置一个时间戳,之后再写入缓存时比较时间戳,如果写入的时间戳低于已缓存的时间戳,那就不进行缓存
    • 2.先删除缓存,在删除db的数据
      假设一个请求正在删除缓存数据后,准备删db数据时,另一个请求刚好在查缓存数据,发现查不到了,就从数据库拉数据,并把数据更新到缓存,操作完成后,第一个请求才把db的数据进行删除.造成了缓存有数据,而db没数据的数据不一致问题.比较好的解决方案是,先把数据库数据进行删除,再把缓存的key设置一个适当(结合业务)的过期时间,让其自动过期.(先删除数据库再删除缓存的数据理论上也是有坑的,但实际触发频率不高)
  • 1.2把db和缓存当成一个整体,由应用层调用
    从上面的方法可以看出虽然灵活,但每次都要在应用层维护一次同步更新缓存和db会觉得很繁琐,所以可以使用一些封装,把mysql和redis当成一个整体,内部实现上面的方法,然后把写数据和读数据封装成一个接口,让应用层调用.该方式对于外部调用的人会觉得十分简单,但内部实现会十分复杂.
  • 1.3先更新缓存,再定时把数据写入db
    这种方式会带来极大的性能提升,但由于数据一致性的问题局限性比较大,一般用于像微博的点赞/取消这类的业务.因为这类业务更新快,而且对于用户来说不太关心具体数值的大小,只要知道大概的范围即可.
    写数据都会先把数据写入到缓存,然后再使用定时脚本等把缓存的数据写入到db.读数据时都是从缓存中读数据,如果缓存没有数据,则从db中拉取.

2.缓存问题

介绍完上面几种业务接口开发中的缓存实现方式后,就可以说说缓存实现时容易遇到的常见的问题了

2.1缓存穿透

如果db中没有数据,那么缓存中也是没有数据的,会造成大量的请求实际上还是请求到了db里,这样子缓存就没有意义了.

通用的解决方法是在缓存中把db返回None的key在缓存中设置一个标识位,其他请求过来时看到这个标识位就知道没有数据了,不必再向db拿数据,直接返回没有数据的响应.然后当插入数据到db时,再通过数据写入到缓存.

还有一个更简单的方法是使用布隆过滤器(只要这个key不存在,他就会告诉你这个key不存在)来检查key是否存在于redis,如果不存在就返回自己设定好的响应.

2.2缓存击穿

在上面实现的方式中,都会出现缓存穿透的问题,假设有段数据,在查的时候需要花费1秒钟的时间,而此事的并发量很高,当这段数据不存在缓存中时,就会导致大量的并发请求在这1秒种内全部请求到db,使db的读写性能变差.

解决这个问题的方式可以先使用分布式锁,确保只有一个进程可以从db中获取到数据,其余进程可以等待第一个进程获取到数据后从缓存中拉取数据.而在进程内部,使用多线程或者协程都是共享进程内存的,所以可以使用一个类似于通知的机制,其他没从db拉取数据的线程/协程都可以等待数据拉完后,再从内存拿数据再返回响应.

2.2.1缓存击穿解决方案

首先,就是设置一个锁,通过锁解决多个进程同时执行一个相同的操作,由于是单机方案,所以使用set…ex…nx即可,通过ex设置超时时间,防止程序挂了等造成死锁,再通过nx会让除了set成功返回1的原理来实现获取锁,如果是集群最好使用RedLock.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class RDS():
def __init__():
# 注:__init__逻辑里面并不能使用await,该行代码无法正常运行,这里只是简要说明初始化连接池
self.conn_pool = await aioredis.create_pool()

async def execute(
self, command: str, *args: Any, **kwargs: Any
) -> Optional[Any]:
try:
async with self.conn_pool.get() as conn:
return await conn.execute(command, *args, **kwargs)
except Exception as e:
pass # 抛错自定义,与演示无关,先pass
return None

@asynccontextmanager
async def lock(self, name: str, timeout: int = 1 * 60) -> bool:
# 这里锁粒度是天,可以根据自己业务定义
today_string: str = datetime.datetime.now().strftime("%Y-%m-%d")
key: str = f"project:lock:{name}:{today_string}"
try:
lock = await self.execute('set', key, '1', 'ex', timeout, 'nx')
if lock == 1:
lock = True
else:
lock = False
yield lock
finally:
await self.execute('del', key)

之后每个程序就可以像如下逻辑去调用:

1
2
3
4
5
6
7
8
rds = RDS()

async def api_demo():
while:
async with rds.lock as lock:
if lock:
pass # 获得锁的逻辑
await async.sleep(0.01)

这样可以解决了多个进程同时执行一个相同的操作,但是还没解决同个进程内重复获取的问题.
在Python的协程中,可以使用协程的Future对象来实现通知的机制,下面通过一个叫Share的类来实现,如果跟随web框架启动,需要把它绑定在全局变量里面,这里做了省略,代码和对应解释如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# 清理现场逻辑最好还是用计数器来保证所有调用调用结束后才进行清理现场,不过代码量会变多,所以这里简单的使用延迟清理逻辑来代替.
# 同时为了逻辑清晰,并没有全部使用asyncio.Future的功能
import asyncio
from functools import partial


class Share():
def __init__(self, delay_clean_time=1):
self._future_dict = dict()
self._result_dict = dict()
self._error_dict = dict()
self._loop = asyncio.get_event_loop()
self._delay_clean_time = delay_clean_time # 代表执行完毕后多少秒清理现场

async def clean(self, key):
await asyncio.sleep(self._delay_clean_time)
# 清理现场
if key in self._future_dict:
del self._future_dict[key]
if key in self._result_dict:
del self._result_dict[key]
if key in self._error_dict:
del self._error_dict[key]

async def do(self, key, fn):
if key not in self._future_dict:
# 第一个执行的协程,进入执行逻辑
self._future_dict[key] = asyncio.Future() # 设置一个Future,其他协程可以等待这个Future直到他被执行set_result(True)
try:
self._result_dict[key] = await fn() # 执行函数(如果有同步和异步函数,需要做判断,这里只示范异步)
except Exception as e:
self._error_dict[key] = e # 记录异常
finally:
self._future_dict[key].set_result(True) # 代表执行完毕

# 执行延迟清理现场逻辑
asyncio.ensure_future(self.clean(key))
else:
# 非第一个执行的协程进入等待逻辑
await self._future_dict[key]

# 有异常则抛异常
if key in self._error_dict:
raise self._error_dict[key]
# 返回数据
return self._result_dict[key]

通过Share类后可以把demo代码改为如下就可以解决缓存击穿(如果用集群的话,记得把lock更换为RedLock):

1
2
3
4
5
6
7
8
9
10
11
12
13
rds = RDS()
share = Share()
share_key = 'so1n'

async def get_data_demo():
while:
async with rds.lock as lock:
if lock:
pass # 获得锁的逻辑
await async.sleep(0.01)

async def api_demo():
return share.do(share_key, get_data_demo)

2.3缓存雪崩

缓存雪崩是指缓存不可用或者大量缓存由于超时时间相同在同一时间段失效,大量请求直接访问数据库,数据库压力过大导致系统雪崩。
为了解决这个问题:

  • 需要有良好的过期时间设计,可以在设置过期时间时添加jitter值,比如设置jitter为10,那每次给key设置过期时间时,他的过期时间会在 +-10内波动.
  • 如果是时点性数据,比如昨天活动有10%的奖励,今天是20%的奖励,那么jitter可能不好用,这时候需要在业务层让程序短暂的睡几毫秒或者秒,给更新热点key分散压力.
  • 要做好监控,基于redis的info数据做监控,看那些时间段过期的key比较多,是否跟自己的设定值有关,如果有就要赶紧修改设定值.
  • 还有一个办法就是使用多级缓存,一级缓存更新快,过期时间也快,二级缓存更新频率不高,能在一级缓存过期时,作为一个兜底的作用.但在使用这个方法时,要确保二级缓存的更新频率是否符合自己的要求.
查看评论