Python的Dict实现

本文总阅读量

前记

Python中, Dict是一系列由键和值配对组成的元素的集合, 它是一个可变容器模型,可以存储任意类型对象. Dict的存取速度非常的快, 而这全靠他的哈希算法的功劳, 在Python3.6之前Dict是无序的, 在Python3.6中绝大部分是有序的, 在Python3.7以及之后则是绝对有序的, 而且占用内存空间更少, 对于万物基于Dict的Python,这算一个大优化, 也让我好奇Python是如何实现哈希有序的.

1.Dict的实现

Dict在查找key时非常的快, 这得益于它的使用空间换时间思路和哈希实现。的在读取和写入Key时, 都会对Key进行哈希计算(所以要求Key都是不可变类型,如果是可变类型,就无法计算出他的哈希值了), 然后根据计算的值, 与当前的数组空间长度进行取模计算, 得到的值就是当前Key在数组的下标, 最后通过下标就可以以O(1)的时间复杂度读取值. 这种实现非常棒, 也是分布式的常见做法, 但也有问题, 如果数组满了怎么办或者是不同的Key, 但是哈希结果是一样的怎么办?

针对第一个问题的解决办法是在合适的时候进行扩容, 在Python中, 当Dict中放置的数量占容量的2/3时, Dict就会开始扩容, 扩容后的总容量是扩容之前的一倍, 这是为了减少频繁扩容, 导致key的迁移次数变多;

而针对第二个问题则有两个解法:

  • 链接法: 原本数组里面存的是Key对应的值, 而链接法的数组存的是一个数组, 这个数组存了一个包含key和对应值的数组, 如下所示, 假设key1和key2的哈希结果都是0, 那就会插入到数组的0下标中, key1在0下标的数组的第一位, 而key2在插入时,发现已经存在key1了, 再用key2与key1进行对比, 发现它们的key其实是不一样的, 那就在0下标进行追加.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    array = [
    [
    # 分别为key, hash值, 数值
    ('key1', 123, 123),
    ('key2', 123, 123)
    ],
    [
    ('key3', 123, 123)
    ]
    ]
  • 开发寻址法: 开发寻址法走的是另外一个思路, 采取借用的思想, 在插入数据时, 如果遇到了冲突那就去使用当前下标的下一位, 如果下一位还是冲突, 就继续用下一位.在查找数据时则会对哈希值对应的key进行比较, 如果有值且对不上就找下一位, 直到或者空位找到为止。

上面两个的方案的实现都很简单, 对比下也很容易知道他们的优缺点:

  • 链表法的优点:
    • 删除记录方便, 直接处理数组对应下标的子数组即可.
    • 平均查找速度快, 如果冲突了, 只需要对子数组进行查询即可
  • 链表法的缺点:
    • 用到了指针, 导致了查询速度会偏慢一点, 内存占用可能会较高, 不适合序列化.
      而开放寻址法的优缺点是跟链表法反过来的, 由于Python万物基于Dict, 且都需要序列化, 所以选择了开放寻址法.

通过对比链表法和开放寻执法都可以发现, 他们都是针对哈希冲突的一个解决方案, 如果存数据的数组够大, 那么哈希冲突的可能性就会很小, 不用频繁扩容迁移数据, 但是占用的空间就会很大.所以一个好的哈希表实现初始值都不能太大, 在Python的Dict的初始值是8.
另外哈希表还需要让存数据的数组的未使用空位保持在一个范围值内波动, 这样空间的使用和哈希冲突的概率都会保持在一个最优的情况, 但由于每次扩容都会消耗很大的性能, 也不能每次更改都进行一次扩容, 所以需要确定一个值, 当未使用/使用的占比达到这个值时, 就自动扩容, 在Python的Dict中这个值是2/3. 也就是当Dict里面使用了2/3的空间后, 他就会自动扩容, 使他达到一个新的最优平衡. 同时, 为了减少每次扩容时key的迁移次数, 扩容后的总容量一定是扩容之前的总容量的一倍, 这样的话, key只需要迁移一半的数量即可.

哈希表扩容一倍只会迁移一半的key的原因是获取key在数组的下标是通过对哈希值取模实现的, 比如一个哈希表容量为8,一个哈希值为20的key取模值为4,哈希表扩容后长度变为16, 此时取模结果还是4。而一个哈希值为11的key取模值为3, 扩容后取模值为11。可以很明显的看出,扩容后原来哈希值大于容量的key都不用做迁移, 而哈希值小于容量的都需要迁移。

但是如果按照上面是说法, 开放寻址法还是有一个问题的, 比如下面的数组:

1
arrray = [None, None, None, None, True, True, True, True]

以True代表当前数组的位置已经被占用, None代表未被占用, 可以看出当前并未满足使用了数组的2/3空间, 数组还未到扩容阶段。 此时假设要插入一个Key, 刚好落在数组下标4, 那么插入的时候就要一直查询下去, 最后发现只有数组下标0的位置的空的, 才可以真正的插入数据. 对于一个长度只有8的数组, 需要执行5次才能插入数据, 那也太浪费性能了, 所以Python要实现一个算法, 尽量让冲突的数据插在别的地方.
在源码中, Python用到了公式x = ((5*y) + 1) % 2**i来实现跳跃插入冲突数据. 式子中x为数组的下一个坐标, y为当前发生冲突的坐标,i为容量系数, 比如初始化时, i为3, 那么容量就是8了,
第一次插入数据到坐标0冲突时, y = 0, 带入公式后, 求得x 等于1, 第二次插入数据到坐标0时, y = 1, 求得x 等于 6, 通过这样算下去, 可以发现数字生成轨迹是0>1>6>7>4>5>2>3>0一直循环着, 这样跳着插数据就能完美解决上面场景的问题.

2.有序字典的原理

Python3.6之前说的差不多, 它的数组大概是长这样的, 数组中存了子数组, 第一项为hash值, 第二项为key, 第三项为值.

1
2
3
4
5
6
7
8
9
10
array = [
[],
[123456, "key1", 1],
[],
[],
[],
[234567, "key2", 2],
[],
[]
]

这种实现的空间大小在初始化时就固定了, 直到下次扩容再发送变化, 在遍历字典时, 实际上就是遍历数组, 只是把没有占用的空间进行跳过.可以看出这种遍历的生成的顺序只跟哈希结果相关, 无法跟插入顺序相关, 所以这种方法的实现是无序的(同时由于每次启动程序时, 他们的哈希计算是不一样的, 所以每次遍历的顺序也就各不相同了).

而在Python3.7之后, Python的Dict使用了新的数据结构, 使Python新Dict的内存占用也比老的Dict少了, 同时新的Dict在遍历时是跟插入顺序是一致的, 具体的实现是, 初始化时会生成两个数组, 插入值时, 在数组二追加当前的数据, 并获得当前追加数据所在的下标A, 然后对key进行哈希取模算出下标B, 最后对数组下标B的值更新为A, 简单的演示如下:

1
2
3
4
5
6
7
8
9
10
11
12
# 初始的结构
# -1代表还未插入数据
array_1 = [-1, -1, -1, -1, -1, -1, -1, -1]
array_2 = []


# 插入值后, 他就会变为:
array_1 = [-1, 0, -1, -1, -1, 1, -1, -1]
array_2 = [
[123456, "key1", 1],
[234567, "key2", 2],
]

可以看出, 数组2的容量跟当前放入的值相等的, 数组1虽然还会保持1/3的空闲标记位, 但他只保存数组二的下标, 占用空间也不多, 相比之前的方案会节省一些空间, 同时在遍历的时候可以直接遍历数组2, 这样Python的Dict就变为有序的了.
注: 为了保持操作高效, 在删除的时候, 只是把数组1的值设置为-2, 并把数组2对应的值设置为None, 而不去删除它, 在查找时会忽略掉数组1中值为-2的元素, 在遍历时会忽略掉值为None的元素.

3.有序字典的实现

通过上面的了解后, 可以使用Python来写一个新版Dict的实现, 具体说明见注释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
from typing import Any, Iterable, List, Optional, Tuple


class CustomerDict(object):

def __init__(self):
self._init_seed: int = 3 # 容量因子
self._init_length: int = 2 ** self._init_seed # 初始化数组大小
self._load_factor: float = 2 / 3 # 扩容因子
self._index_array: List[int] = [-1 for _ in range(self._init_length)] # 存放下标的数组
self._data_array: List[Optional[Tuple[int, Any, Any]]] = [] # 存放数据的数组
self._used_count: int = 0 # 目前用的量
self._delete_count: int = 0 # 被标记删除的量

def _create_new(self):
"""扩容函数"""
self._init_seed += 1 # 增加容量因子
self._init_length = 2 ** self._init_seed
old_data_array: List[Tuple[int, Any, Any]] = self._data_array
self._index_array: List[int] = [-1 for _ in range(self._init_length)]
self._data_array: List[Tuple[int, Any, Any]] = []
self._used_count = 0
self._delete_count = 0

# 这里只是简单实现, 实际上只需要搬运一半的数据
for item in old_data_array:
if item is not None:
self.__setitem__(item[1], item[2])

def _get_next(self, index: int):
"""计算冲突的下一跳,如果下标对应的值冲突了, 需要计算下一跳的下标"""
return ((5*index) + 1) % self._init_length

def _core(self, key: Any, default_value: Optional[Any] = None) -> Tuple[int, Any, int]:
"""获取数据或者得到可以放新数据的方法, 返回值是index_array的索引, 数据, data_array的索引"""
index: int = hash(key) % (self._init_length - 1)
while True:
data_index: int = self._index_array[index]
# 如果是-1则代表没有数据
if data_index == -1:
break
# 如果是-2则代表之前有数据则不过被删除了
elif data_index == -2:
index = self._get_next(index)
continue

_, new_key, default_value = self._data_array[data_index]
# 判断是不是对应的key
if key != new_key:
index = self._get_next(index)
else:
break
return index, default_value, data_index

def __getitem__(self, key: Any) -> Any:
_, value, data_index = self._core(key)
if data_index == -1:
raise KeyError(key)
return value

def __setitem__(self, key: Any, value: Any) -> None:
if (self._used_count / self._init_length) > self._load_factor:
self._create_new()
index, _, _ = self._core(key)

self._index_array[index] = self._used_count
self._data_array.append((hash(key), key, value))
self._used_count += 1

def __delitem__(self, key: Any) -> None:
index, _, data_index = self._core(key)
if data_index == -1:
raise KeyError(key)
self._index_array[index] = -2
self._data_array[data_index] = None
self._delete_count += 1

def __len__(self) -> int:
return self._used_count - self._delete_count

def __iter__(self) -> Iterable:
return iter(self._data_array)

def __str__(self) -> str:
return str({item[1]: item[2] for item in self._data_array if item is not None})

def keys(self) -> List[Any]:
"""模拟实现keys方法"""
return [item[1] for item in self._data_array if item is not None]

def values(self) -> List[Any]:
"""模拟实现values方法"""
return [item[2] for item in self._data_array if item is not None]

def items(self) -> List[Tuple[Any, Any]]:
"""模拟实现items方法"""
return [(item[1], item[2]) for item in self._data_array if item is not None]


if __name__ == '__main__':
customer_dict: CustomerDict = CustomerDict()
customer_dict["demo_1"] = "a"
customer_dict["demo_2"] = "b"
assert len(customer_dict) == 2

del customer_dict["demo_1"]
del customer_dict["demo_2"]
assert len(customer_dict) == 0

for i in range(30):
customer_dict[i] = i
assert len(customer_dict) == 30

customer_dict_value_list: List[Any] = customer_dict.values()
for i in range(30):
assert i == customer_dict[i]

for i in range(30):
assert customer_dict[i] == i
del customer_dict[i]
assert len(customer_dict) == 0
查看评论