前记 随着业务的增长,后端技术架构会慢慢的从单体服务转向多服务或者微服务的分布式架构,此时语言级别的锁无法管理所有资源的竞争,只能采用分布式锁。而分布式锁的主体思想虽然与语言级别的锁类似,但还需要考虑到一些网络因素,使其变得复杂。
1.为何需要分布式锁 举一个栗子,比如在做聊天服务时,需要统计一个聊天会话的在线人数,它的简单示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 class Counter (object ): def __init__ (self, namespace: str ) -> None : self.namespace: int = namespace self.count: int = 0 def login (self ) -> None : self.count += 1 def logout (self ) -> None : self.count -= 1
这份代码比较简单,它是每个namespace
的全局计数器实现,每有一个用户成功登陆就会调用login
方法使计数器+1,而每次退出就会调用logout
方法使计数器-1。这个计数器看起来实现了需求,但是它也符合了最简单的线程不安全模型,意味着在多线程/进程等环境下无法得出正确的结果。
这个操作之所以线程不安全,是因为self.count+=1
的这类操作不是原子性的,它在运行之前会被编译为self.count = self.count + 1
,这是一个先更改再赋值的操作,实际在执行的时候CPU会分为下面三个步骤去执行:
1.将count的值从内存读到CPU对应的寄存器上。
2.CPU操作寄存器上的count并进行+1操作。
3.把寄存器里的指写回内存中。
这样在多线程/进程的场景下可能出现了CPU核心1和CPU核心2同时从内存读到对应值0,并放到了自己的寄存器上面,然后再对它进行+1操作,最后又把值(此时已经为1)写回到内存中,导致self.count的结果变为1而不是真正想要的值2了。
1.self.count-=1
同理。
2.线程是操作系统的最小调度单位,在多核心系统时,会出现多核心同时调用线程去进行资源争夺。
3.Python 3.11做了优化,可能进行了几百次加减处理,结果也是对的。
这就是多线程环境对同一个资源竞争从而产生数据安全性的就问题,许多语言为了解决这个问题引入了锁机制,并使用锁机制保护了多线程环境下对同一个资源竞争的数据安全性。 开发者可以非常方便的通过锁机制给一些代码块加上锁从而使这些操作变成了原子性,比如对示例代码的Counter
进行了如下修改:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from threading import Lockclass Counter (object ): def __init__ (self, namespace: str ) -> None : self.namespace: int = namespace self.count: int = 0 self._lock: Lock = Lock() def login (self ) -> None : with lock(): self.count += 1 def logout (self ) -> None : with lock(): self.count -= 1
代码引入了Lock
对象,并把它套在了资源冲突的self.count+=1
和self.count-=1
上面,使得线程只有持有锁的时候才能对self.count
进行操作,而拿不到锁的线程则需要等待到获取锁才能继续操作,这样一来就不会产生多个线程同时操作一份数据而导致了结果不一致的问题。
不过现在的大多数服务不再是单体应用,更多的是以多服务,微服务的形式存在,这时上述的问题就会从不同的线程/进程争夺一个资源变为不同的机器上的服务对同一个资源进行竞争。 而语言级别的锁只存在于进程中,无法跨进程,只能管理自己进程里面的资源竞争,无法解决跨服务资源竞争的问题,只能使用一个带有锁机制的中间人来协调各个服务的资源竞争的问题,而这个中间人就是分布式锁。
2.分布式锁的实现 为了解决多服务,微服务的资源竞争这问题,分布式锁诞生了,分布式锁与语言级别的锁一样都是在某块空间打上标记,然后再通过打标记是否成功来判断是否获取锁,与语言级别锁唯一不同的是分布式锁需要通过网络进行通信,而网络是复杂的,这也就导致分布式锁的实现变得复杂。
为了降低分布式锁实现的复杂度,大多数分布式锁的方案都会基于拥有存储媒介和防止资源冲突的数据库进行开发,比如关系数据库MySQL
,KV数据库Redis
和Etcd
等,它们都有一套逻辑来确保数据的一致性和可用性,同时也有一套完善的传输协议,这样就可以不去考虑网络传输的问题和数据冲突与丢失的问题,只专注分布式锁功能的实现了。 不过实际业务中需要分布式锁有较高的性能,所以大多数会分布式锁都会基于KV数据库开发,目前常用的分布式锁使用的KV数据库是Redis
。
2.1.基于Redis的分布式锁 Redis
本质上就是使用一个单进程对一块内存进行读写(只考虑基本的读写),且每个操作都是以一个协程去操作内存,这保证了客户端提交的每个操作都是拥有原子性的。同时Redis
还支持使用Lua脚本去编写复杂的操作,这两个特性组合起来意味着可以通过Redis
实现出一个高性能且功能复杂的分布式锁。
在Python
的Redis
客户端库py-redis
中,提供了一个简单的Redis
锁封装,开发者通过这个封装可以很方便的使用基于Redis
的分布式锁。如以上面计数器示例代码进行修改后的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import asynciofrom redis.asyncio import Redisclass Counter (object ): def __init__ (self, namespace: str , redis: Redis ) -> None : self._redis = redis self.namespace: int = namespace self.count: int = 0 async def login (self ) -> None : async with self._redis.lock("demo" ): self.count += 1 async def logout (self ) -> None : async with self._redis.lock("demo" ): self.count -= 1 async def main (): counter = Counter("demo" , Redis()) await counter.login() await counter.logout() asyncio.run(main())
在这个示例代码中可以发现,锁的使用方法很简单,只要通过redis.lock
方法就可以获取到分布式锁的实例,而这个锁实例的使用方法与thread.Lock
类似,不用大改代码。
不过lock方法只是提供了一个简单的调用,实际上它返回的是符合如下函数签名的对象:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 class Lock : async def __aenter__ (self ): pass async def __aexit__ (self, exc_type, exc_value, traceback ): pass async def acquire ( self, blocking: Optional[bool ] = None , blocking_timeout: Optional[float ] = None , token: Optional[Union[str , bytes ]] = None , ): pass async def locked (self ) -> bool: pass async def owned (self ) -> bool: pass def release (self ) -> Awaitable[None ]: pass
这个对象拥有多个方法,首先是针对async with
语法提供了__aenter__
和__aexit__
方法,它们分别在进入和离开async with
语法块时被调用,它们的源码如下:
1 2 3 4 5 6 7 8 9 class Lock : ... async def __aenter__ (self ): if await self.acquire(): return self raise LockError("Unable to acquire lock within the time specified" ) async def __aexit__ (self, exc_type, exc_value, traceback ): await self.release()
通过源码可以看到它们的实现很简单,__aenter__
只调用acquire
方法,如果返回True
就允许进入代码块,如果返回False
则抛出获取锁错误,而__aexit__
则更简单,它只是调用release
方法执行分布式锁的释放。
接下来就是acquire
和release
这两个分别代表获取锁和释放锁的核心方法了,其中acquire
的源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 class Lock : ... async def acquire ( self, blocking: Optional[bool ] = None , blocking_timeout: Optional[float ] = None , token: Optional[Union[str , bytes ]] = None , ): sleep = self.sleep if token is None : token = uuid.uuid1().hex .encode() else : try : encoder = self.redis.connection_pool.get_encoder() except AttributeError: encoder = self.redis.get_encoder() token = encoder.encode(token) if blocking is None : blocking = self.blocking if blocking_timeout is None : blocking_timeout = self.blocking_timeout stop_trying_at = None if blocking_timeout is not None : stop_trying_at = asyncio.get_running_loop().time() + blocking_timeout while True : if await self.do_acquire(token): self.local.token = token return True if not blocking: return False next_try_at = asyncio.get_running_loop().time() + sleep if stop_trying_at is not None and next_try_at > stop_trying_at: return False await asyncio.sleep(sleep) async def do_acquire (self, token: Union[str , bytes ] ) -> bool: if self.timeout: timeout = int (self.timeout * 1000 ) else : timeout = None if await self.redis.set (self.name, token, nx=True , px=timeout): return True return False
通过源码可以发现acquire
方法主要是做三件事:
1.初始化各种参数,其中token是采用uuid1生成的,该方法虽然会泄露主机信息,但它是能确保每个客户端生成的ID唯一且速度很快的方法,同时Redis
一般都在内网运行的,只要能确保内网安全,一般也没啥事。
2.通过do_acquire
方法去获取锁,当获取成功就会返回True
,获取失败且设置不阻塞就返回False
,而获取失败且设置阻塞就会通过循环去竞争锁。
3.获取锁的重点(do_acquire方法),这里通过Redis
的set <key> <value> nx xxx ps xxx
的方法向Redis设置了一个K-V,并返回设置是否成功。这个命令是原子性的执行三个操作,从而保证获取锁的操作要么成功要么失败。 命令中的nx是确保Key不存在时且该命令能正常写入才返回True
,而ps是设置了Key的过期时间,防止客户端假死或宕机而导致整个锁无法被释放(避免死锁的一种技术手段)。
了解完获取锁acquire
方法的执行原理后再看释放锁release
方法的源码,由于release
操作要多个操作,所以采用了Lua脚本,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 class Lock : ... lua_release = None LUA_RELEASE_SCRIPT = """ local token = redis.call('get', KEYS[1]) if not token or token ~= ARGV[1] then return 0 end redis.call('del', KEYS[1]) return 1 """ ... def release (self ) -> Awaitable[None ]: expected_token = self.local.token if expected_token is None : raise LockError("Cannot release an unlocked lock" ) self.local.token = None return self.do_release(expected_token) async def do_release (self, expected_token: bytes ) -> None : if not bool ( await self.lua_release( keys=[self.name], args=[expected_token], client=self.redis ) ): raise LockNotOwnedError("Cannot release a lock that's no longer owned" )
release
也比较简单,它的整个逻辑是先通过本地线程存储中获取token,如果该值为空,则证明有可能没有执行acquire
获取锁,需要抛出锁已经被释放的异常,如果不为空则置空,再通过lua
脚本去释放锁,在释放锁时会校验token的值防止释放一个不是自己产生的锁。
此外,Lock
中的token存放在本地线程存储的原因是为了防止多线程调用同个Loc实例导致的问题,如下:
1.当A线程以timeout为30秒获取了锁。
2.B线程获取了锁,但是由于A已经获取了锁了,所以通过自旋进行等待。
3.A线程执行的逻辑超过30秒还未执行完成,而锁已经过期而被释放。
4.B线程发现锁已经被释放,开始获取锁并执行,最终在A线程执行完成之前运行完毕,并执行了释放锁的操作。
5.A线程执行了释放锁的操作,发现锁已经被释放了。
可以看到这个方法只是防止动作没执行完,但锁却过期的一种情况,它并不能真正的解决问题,如果要真正的解决这个问题,则需要引入WatchDog机制。
2.2.WatchDog实现 py-redis
的Lock
对象支持Timeout
参数,Timeout参数的作用是标记锁在被获取的n秒后被自动释放,这样加锁的程序即使崩溃了也能确保锁会在一定的时间后被释放,避免了死锁问题。 不过需要注意的是,Timeout参数就不能设置太长,如果设置太长,且程序在获取锁后崩溃而无法释放锁时,其他等待获取锁的程序会花时间进行无效的等待。 然而Timeout参数设置得太短也不行,如果程序的执行时间超过了Timeout设置的时间,那么就会出现程序还在运行着,但是锁却提前释放了,最终就会导致多个程序争夺同一个资源,也就是锁机制无效了。
由于Timeout参数设置太短太长都有问题,这意味着Timeout参数并不能完美的解决问题,这时就需要一个更好的机制–WatchDog来完善Timeout参数的不足。 WatchDog机制会在程序获取锁之后启动,在释放锁之前关闭,也就是跟随程序获取锁的行为一起运行,然后它会在程序执行期间按照一定的时间间隔帮锁自动续约(也就是增加锁的过期时间),从而防止业务代码没执行完,锁却过期的情况。
py-redis
库的Lock
追求的是简单的原则,它没有提供一套完整的WatchDog实现,但是提供了一个续约机制,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 class Lock : lua_extend = None LUA_EXTEND_SCRIPT = """ local token = redis.call('get', KEYS[1]) if not token or token ~= ARGV[1] then return 0 end local expiration = redis.call('pttl', KEYS[1]) if not expiration then expiration = 0 end if expiration < 0 then return 0 end local newttl = ARGV[2] if ARGV[3] == "0" then newttl = ARGV[2] + expiration end redis.call('pexpire', KEYS[1], newttl) return 1 """ def extend ( self, additional_time: float , replace_ttl: bool = False ) -> Awaitable[bool]: if self.local.token is None : raise LockError("Cannot extend an unlocked lock" ) if self.timeout is None : raise LockError("Cannot extend a lock with no timeout" ) return self.do_extend(additional_time, replace_ttl) async def do_extend (self, additional_time, replace_ttl ) -> bool: additional_time = int (additional_time * 1000 ) if not bool ( await self.lua_extend( keys=[self.name], args=[self.local.token, additional_time, replace_ttl and "1" or "0" ], client=self.redis, ) ): raise LockNotOwnedError("Cannot extend a lock that's no longer owned" ) return True
它的逻辑非常简单,就是先校验当前是不是自己持有锁以及锁是否还在,当所有条件都满足时才续约,不过续约有两种方案,一种是把key的过期时间设置为指定的时间,另一种是在剩余的过期时间基础上再添加指定的时间。了解了py-redis
提供的续约机制后,我们还需要考虑WatchDog剩余的逻辑,一个是什么时候开启/关闭WatchDog,另一个是如何制订WatchDog的执行周期。
前面说到WatchDog会伴随着加锁一直运行着,那么意味着WatchDog会在加锁成功后开始运行,并在释放锁之前停止,在根据之前针对Lock的代码分析可以判断,WatchDog需要在acquire
后开始运行,在do_release
之前停止运行。 至于WatchDog的间隔时间,大部分框架都是采用用户定义Timeout时间的1/3,这是考虑到网络通信的不可靠以及防止发送太多请求而权衡的结果。
现在WatchDog的原理分析完毕,可以着手实现WatchDog了,WatchDog最终的代码实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 import asynciofrom typing import Optional, Unionfrom redis.asyncio.lock import Lockclass MyLock (Lock ): _watch_dog: Optional[asyncio.Task] async def _watch (self ) -> None : """ 这是一个一直在循环的方法,它每次循环都会执行续约,然后休眠超时时间的1/3,然后再执行下一个循环。 """ while True : await self.extend(self.timeout) await asyncio.sleep(self.timeout / 3 ) def _cancel_watch_dog (self ) -> None : """ 取消正在运行WatchDog的协程 """ _old_watch_dog: Optional[asyncio.Future] = getattr (self, "_watch_dog" , None ) if _old_watch_dog and not _old_watch_dog.cancelled(): _old_watch_dog.cancel() async def acquire ( self, blocking: Optional[bool ] = None , blocking_timeout: Optional[float ] = None , token: Optional[Union[str , bytes ]] = None , ) -> bool: result = await super ().acquire(blocking, blocking_timeout, token) if result: self._cancel_watch_dog() self._watch_dog = asyncio.create_task(self._watch()) return result async def do_release (self, expected_token: bytes ) -> None : self._cancel_watch_dog() return await super ().do_release(expected_token) def __del__ (self ) -> None : try : self._cancel_watch_dog() except Exception: pass
接着编写一个Demo文件来验证编写的WatchDog是否有效,Demo文件代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import asyncioimport timefrom typing import Optional, Unionfrom redis.asyncio.lock import Lockfrom redis.asyncio import Redisasync def main (): _redis = Redis() s_t = time.time() async with _redis.lock("demo" , lock_class=MyLock, timeout=3 ): print("lock" ) await asyncio.sleep(5 ) print("ok" , time.time() - s_t)if __name__ == "__main__" : asyncio.run(main())
然后在终端直接运行,会看到终端有如下输出:
1 2 lock ok 5.005310297012329
通过输出可以发现,虽然锁设定的timeout参数为3秒,但是被锁住的代码能够正常的执行了5秒,也就代表WatchDog的实现是成功的。
需要注意的是,协程的创建销毁成本很低,所以使用一个协程执行一个WatchDog,如果是在线程模型下,则不能使用一个单独的线程来执行WatchDog,这样会导致频繁的开启和关闭线程,建议使用一个线程池来管理所有锁的WatchDog的运行。不过WatchDog是每隔一段时间运行的,所以也可以使用时间轮+单独的Worker来执行WatchDog。
3.总结 到目前为止,实现的分布式锁基本完备,也没有什么缺陷,同时它的性能也是非常高的。不过由于py-redis
的锁实现比较简单,导致拓展性比较低,无法兼容部分场景,同时py-redis
库并没有打算开发出包含更多功能的分布锁实现。这意味着开发者只能根据他提供的Lock对象进行重新开发,并通过redis.lock
中的lock_class
参数传递重新开发后的锁实现。