分布式锁(3)--基于Etcd的分布式锁实现

本文总阅读量

前记

基于Redis实现的分布式锁的性能强,功能多,但是由于Redis本身的特性决定了基于Redis实现的分布式锁可能会不唯一,所以无法应用在要求分布式锁必须绝对唯一的场景。
为此,只能基于其他服务来实现分布式锁,不过它们实现的总体思路是一致的,只是调用方式有一些区别,本文将介绍如何基于Etcd实现具有完备性的分布式锁。

1.为何基于Etcd实现分布式锁

目前常见的分布式锁实现除了Redis外还有EtcdZookeeper,由于Redis的一致性只满足了AP,所以基于Redis实现的分布式锁无法保证绝对唯一,而EtcdZookeeper的一致性都满足了CP,都是通过类似的一致性算法确保了数据的一致性,这样基于EtcdZookeeper实现的分布式锁能保证绝对唯一。
其中EtcdZookeeper的功能类似,但有一些方面中Etcd甚至超越了ZooKeeper,如Etcd采用的Raft协议就要比ZooKeeper采用的Zab协议简单、易理解,同时它的性能也比Zookeeper高,所以通常会选择Etcd作为分布式锁的实现。

官方提供的Etcd与其他同类型的组件对比如下图:
Etcd分布式锁之Etcd与其他组件的对比.png

通过图可以看到Etcd略胜于其他组件,此外它提供的WatchLeaseRevision以及Prefix机制使基于Etcd实现分布式锁会非常的方便,同时它是通过HTTP/gRPC提供对应的API,所以它的客户端实现也比较方便,当然如果当前系统依赖的是Zookepeer,那么肯定还是选择Zookerpeer,除非有性能要求。

PythonEtcd的客户端并没有官方的维护,目前维护的比较好的是aetcd,接下来将使用aetcd实现一个简单的分布式锁。

2.分布式锁的实现

前面说到,EtcdWatchLeaseRevision以及Prefix机制赋予Etcd分布式锁的能力,要实现分布式锁则需要了解它们的作用,它们的作用如下:

  • Lease机制:它类似于Redis中的过期,Etcd可以通过LeaseKey-Value设置租约时间(也就是过期时间),当租约到期时Key-Value会被删除。
  • Revision机制:Etcd会为每个Key-Value赋予一个版本,同时更新Key-Value的时候,它的版本也会发生变化。比如插入Key1时,它的版本为0,而在后续插入Key2时它的版本为1,通过这样的机制,就可以知道写操作的顺序。
  • Prefix机制:Etcd通过Prefix机制提供了对前缀相同的Key执行同一操作的能力,例如一个名为/lock的锁,当有两个客户端进行写操作时,实际上写入的Key分别为key1="/lock/UUID1"key2=/lock/UUID2"",其中UUID就是token的含义,用于确保每个锁的唯一性。
    不过与Redis中实现的分布式锁不一样的是在Etcd中这两个锁都会写入成功,但是返回的Revision是不一样的,需要应用程序通过/lock/前缀去查询数据,然后会获得到key1key2的结果和Revision,接着再根据Revision的结果进行判断哪个Key才是获取到锁。
  • Watch机制:EtcdWatch机制可以批量的监听一批Key,当被监听的Key发生变化时,客户端会收到通知。在实现分布式锁的时候,如果抢锁失败,可通过Prefix机制返回的Key列表获得Revision比自己小且相差最小的Key(称为Pre-Key),对Pre-Key进行监听,因为只有它释放锁,自己才能获得锁,如果监听到Pre-Key的删除事件,则说明Pre-Key已经释放,自己已经持有锁。

了解完这些机制,可以发现通过这些机制可以非常快的基于Etcd实现一个简单的分布式锁,如下图:
Etcd分布式锁的实现步骤.png
图中红色方块代表Etcd服务端,蓝色方块代表Etcd客户端,白色方块代表当前Etcd服务端存的数据,图中总共分为4个步骤,用虚线隔开。
首先是步骤1,client1client2会通过put命令把/demo/{uuid}推送到Etcd中。

然后是步骤2,这里通过Prefix获取/demo开头的键值对的信息。

接着就是步骤3,这个步骤会判断步骤2返回的键值对信息中Revision版本最小的是不是自己,如果是那就意味着自己获取到锁了,如果不是则会通过Watch机制监听Revision比自己小1的键值对的删除事件。

最后是步骤4,当获取到锁的client1执行完任务后删除了/demo/uuid1,这时Etcd会通知到client2客户端,它已经获取锁了,可以执行任务。

通过这些机制,可以发现基于Etcd实现的分布式锁会非常的方便,同时由于Revision的作用,实现的分布式锁默认带有了公平锁的功能,接下来开始手撸Etcd分布式锁的实现,首先是获取锁的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class Lock(object):
def __init__(self, name: str, client: aetcd.Client, ttl: int) -> None:
self._prefix = name.encode()
self._name = self._prefix + b'/' + uuid1().hex.encode()
self._client = client
self._ttl = ttl
self.lease = None
self._watch_dog: Optional[asyncio.Task]= None

async def acquire(self) -> bool:
# 1.创建续约
self.lease = await self._client.lease(self._ttl)
# 2.写入数据
create_result = await self._client.put(self._name, b"", lease=self.lease)
self_revision = create_result.header.revision
# 3.查询数据
range_result = await self._client.get_prefix(self._prefix, sort_target="create")
create_revision = range_result.kvs[0].create_revision
if create_revision == self_revision:
return True

# 4.查询需要监听的key
watch_key = b""
for index, item in enumerate(range_result.kvs):
if item.create_revision == self_revision:
watch_key = range_result.kvs[index - 1].key

# 5.监听Key
watch = await self._client.watch(
watch_key,
kind=aetcd.rtypes.EventKind.DELETE,
)
async for event in watch:
return True

示例代码中会先进行初始化,其中传递进来的name参数会取名为prefix,而锁内的nameprefixtoken的拼接,这样会方便通过Prefix机制去查找相同锁的内容,也可以让Watch机制单独的去监听对应锁的状态,以及防止发生别的锁释放到自己的锁的情况。

在初始化之后就是获取锁的实现了,获取锁的第一步是创建续约的相关方法,Etcd的续约机制与Redis的过期时间不同的是,Etcd是需要先创建一个租约,然后再用租约去绑定对应的Key,如果这个租约过期了,那么租约对应的Key都会同时过期,所有要先创建一个租约,然后在put方法中与Key绑定。
接着是与图中的步骤一样,会先推送数据到Etcd中,然后根据get_prefix获取所有锁的数据,需要注意的是这里需要定义sort_targetcreate,这样返回的结果集会按照Key创建的版本号排序。使后续能快速的判断自己获取锁是否成功,也容易推断出上一个Key的值是什么。
最后就是通过watch方法,监听上一个Key的删除事件,如果收到删除事件那么意味获取到锁。

可以看到由于EtcdRevisionWatch机制以及它的一致性,基于Etcd实现的获取锁不需要通过While循环去循环获取锁,也不需要考虑到网络原因导致客户端和服务端的数据不同步的因素,因此实现的代码 非常的简单。同时这个简单还不止提现在获取锁的方法中,释放锁和WatchDog的实现也非常简单,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class Lock(object):
...
async def __aenter__(self) -> "Lock":
self._watch_dog = asyncio.create_task(self.watch_dog())
await self.acquire()
return self

async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
if self._watch_dog and not self._watch_dog.done():
self._watch_dog.cancel()
await self.release()

async def watch_dog(self):
while True:
if self.lease is not None:
try:
await self.refresh()
except ValueError:
pass
await asyncio.sleep(self._ttl / 3)
async def refresh(self):
"""Refresh the time to live on this lock."""
if self.lease is not None:
return await self.lease.refresh()

raise ValueError(f'no lease associated with this lock: {self._name!r}')

async def release(self) -> None:
await self._client.delete(self._name)

通过代码可以看到,释放锁的操作非常简单只需要调用delete方法即可,而WatchDog与之前Redis分布式锁的机制类似,只是核心的锁续约交给了Lease机制去实现,Lease机制的refresh方法会重置当前续约的时间为一开始定义的ttl。

接下来运行测试代码,这个代码会按顺序执行相同的任务,它们的锁超时时间为1秒,但是执行的时间为2秒,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def my_print(msg: str):
print(f"Timestamp:{time.time()} Task:{id(asyncio.current_task())}, {msg}")


async def sub(client: aetcd.Client, cnt: int) -> None:
my_print(f"cnt:{cnt} wait")
async with Lock("demo", client, 1):
my_print(f"cnt:{cnt} run")
await asyncio.sleep(2)
my_print(f"cnt:{cnt} done")


async def main() -> None:
client = aetcd.Client()
await client.delete_prefix(b"demo")
tasks = []
tasks.append(asyncio.create_task(sub(client, 1)))
await asyncio.sleep(0.1)
tasks.append(asyncio.create_task(sub(client, 2)))
await asyncio.sleep(0.1)
tasks.append(asyncio.create_task(sub(client, 3)))
await asyncio.gather(*tasks)


if __name__ == "__main__":
asyncio.run(main())

在运行测试代码后可以看到如下输出:

1
2
3
4
5
6
7
8
9
Timestamp:1694361604.7456405 Task:140007816152864, cnt:1 wait
Timestamp:1694361604.7522662 Task:140007816152864, cnt:1 run
Timestamp:1694361604.846592 Task:140007816153184, cnt:2 wait
Timestamp:1694361604.947299 Task:140007816154624, cnt:3 wait
Timestamp:1694361606.7529566 Task:140007816152864, cnt:1 done
Timestamp:1694361606.7571979 Task:140007816153184, cnt:2 run
Timestamp:1694361608.7577982 Task:140007816153184, cnt:2 done
Timestamp:1694361608.7609887 Task:140007816154624, cnt:3 run
Timestamp:1694361610.7621803 Task:140007816154624, cnt:3 done

通过输出可以发现,每个任务的执行时间都在2秒左右,而且任务2和任务3虽然在同一时间内等待执行,但是任务1执行完毕后,只有任务2会执行,任务3需要等待任务2执行完毕后才能执行。

3.总结

通过实现过程了解到由于Etcd本身拥有的各种机制,基于Etcd实现的分布式锁非常简单,同时由于EtcdRedis一样都是Key-Value数据库,它也能通过数据结构拓展锁的功能。

查看评论