redis-py

本文总阅读量

前记

之前自己渣渣翻译的redi-py文档,redis-py可以使用python操作redis数据库

Redis

入门

1
2
3
4
5
6
>>> import redis
>>> r = redis.StrictRedis(host='localhost', port=6379, db=0)
>>> r.set('foo', 'bar')
True
>>> r.get('foo')
'bar'

API

Redis官方的命令文档英文,中文做了详细的讲解每个命令的的工作。redis-py公开了两个实现这些命令的类。StrictRedis类尝试遵守官方命令语法。不过有一些例外:

  • SELECT:未实现。请参阅下面的“线程安全”部分中的说明。
  • DEL:“del”是Python语法中的保留关键字。因此redis-py使用’delete’来代替。
  • CONFIG GET | SET:这些分别以config_get或config_set来实现。
  • MULTI / EXEC:这些被实现为Pipeline类的一部分。在执行时,默认情况下,管道是用MULTI和EXEC语句包装的,可以通过指定transaction = False来禁用。查看下面更多关于管道的说明。
  • SUBSCRIBE / LISTEN:与管道类似,PubSub作为独立的类来实现,因为它将底层连接置于无法执行非pubsub命令的状态。从Redis客户端调用pubsub方法将返回一个PubSub实例,您可以在其中订阅频道并侦听消息。您只能从Redis客户端调用PUBLISH( 有关详细信息,请参阅 第151期的此评论)。
  • SCAN / SSCAN / HSCAN / ZSCAN:* SCAN命令按Redis文档中存在的方式执行。另外,每个命令都有一个等价的迭代器方法。这些纯粹是为了方便,所以用户在迭代时不必跟踪光标。对此行为使用scan_iter / sscan_iter / hscan_iter / zscan_iter方法。

除了上面所做的更改外,Redis类(StrictRedis的一个子类)还覆盖了其他几个命令,以便与redis-py的旧版本向后兼容:

  • LREM:“num”和“value”参数的顺序相反,使得“num”可以提供默认值零。
  • ZADD:Redis在’value’之前指定’score’参数。人们使用它们之后才发现这些参数被意外交换。Redis类期望* args的形式为: name1,score1,name2,score2,…
  • SETEX:“时间”和“值”参数的顺序相反

连接池

redis-py使用连接池来管理与Redis服务器的连接。默认情况下,创建的每个Redis实例将依次创建自己的连接池。可以通过将已创建的连接池实例传递给Redis类的connection_pool参数来覆盖此行为并使用现有连接池。通过这样做,可以实现客户端分片或更好地控制连接的管理方式。

1
2
>>> pool = redis.ConnectionPool(host='localhost', port=6379, db=0)
>>> r = redis.Redis(connection_pool=pool)

连接

ConnectionPools管理一组Connection实例。redis-py提供两种类型的连接。默认的Connection是一个普通的基于TCP socke的连接。UnixDomainSocketConnection允许在与相同的服务器设备上运行的客户端通过unix域套接字进行连接。要使用UnixDomainSocketConnection连接,只需将unix_socket_path参数(它是一个字符串)传递给UnixDomainSocket文件。此外,请确保redis.conf文件中定义了unixsocket参数。因为它被默认注释掉了。

1
>>> r = redis.Redis(unix_socket_path='/tmp/redis.sock')

您也可以创建自己的Connection子类。如果想在异步框架中控制套接字行为,这可能很有用。要使用自己的连接实例化客户端类,需要创建一个连接池,将类传递给connection_class参数。传递给池的其他关键字参数将传递给在初始化过程中指定的类。

1
2
>>> pool = redis.ConnectionPool(connection_class=YourConnectionClass,
your_arg='...', ...)

解析器

解析器类提供了一种方法来控制如何解析来自Redis服务器的响应。redis-py提供了两个解析器类,PythonParser和HiredisParser。默认情况下,如果已安装hiredis模块,redis-py将尝试使用HiredisParser,否则将回退到PythonParser。

Hiredis是由Redis核心团队维护的C库。Pieter Noordhuis非常友好地创建了Python绑定。使用Hiredis可以使Redis服务器的响应速度提高10倍。当检索许多数据时(例如从LRANGE或SMEMBERS操作),性能提升最为明显
可以使用pip进行安装

响应回调

客户端类使用一组回调来将Redis响应转换为适当的Python类型。在名为RESPONSE_CALLBACKS的dict中,Redis客户端类中定义了许多这样的回调。
每个实例的基础上可以使用自定义回调set_response_callback方法添加。该方法接受两个参数:命令名称和回调。以这种方式添加的回调仅在添加了回调的实例上有效。如果要定义或覆盖全局回调,则应该创建Redis客户端的子类并将其回调添加到其RESPONSE_CALLBACKS类dict中。
响应回调至少需要一个参数:来自Redis服务器的响应。关键字参数也可以被接受,以便进一步控制如何解释响应。这些关键字参数在命令调用execute_command期间被指定。ZRANGE实现通过其“withscores”参数来演示响应回调关键字参数的使用。

线程安全

Redis客户端实例可以在线程之间安全地共享。在内部,连接实例只能在命令执行期间从连接池中获取,并在之后直接返回到池中。命令执行不会修改客户端实例的状态。
但是,有一个另外:Redis的SELECT命令。SELECT命令允许切换连接当前正在使用的数据库。该数据库保持选定状态,直到选择另一个数据库或连接关闭。这会产生一个问题,连接可能会返回到连接到不同数据库的池。
因此,redis-py不会在客户端实例上实现SELECT命令。如果在同一应用程序中使用多个Redis数据库,则应为每个数据库创建一个单独的客户端实例(可能还有一个单独的连接池)。
在线程之间传递PubSub或Pipeline对象是不安全的。

管道

管道是Redis类的一个子类,它提供了在一个请求中缓存多个到服务器的命令的支持。通过减少客户端和服务器之间来回TCP数据包的数量,可以大大提高命令组的性能。

1
2
3
4
5
6
7
8
9
10
11
 >>> r = redis.Redis(...)
>>> r.set('bing', 'baz')
>>> # Use the pipeline() method to create a pipeline instance
>>> pipe = r.pipeline()
>>> # The following SET commands are buffered
>>> pipe.set('foo', 'bar')
>>> pipe.get('bing')
>>> # the EXECUTE call sends all buffered commands to the server, returning
>>> # a list of responses, one for each command.
>>> pipe.execute()
[True, 'baz']

为了便于使用,缓冲到管道中的所有命令都返回管道对象本身。因此,调用可以像链接一样
1
2
 >>> pipe.set('foo', 'bar').sadd('faz', 'baz').incr('auto_number').execute()
[True, True, 6]

此外,默认状态下,管道还可以确保缓冲的命令作为一个组自动执行。如果你想禁用管道的原子特性,但仍想缓冲命令,可以关闭事务。

1
>>> pipe = r.pipeline(transaction=False)

发布/订阅

redis-py包含一个PubSub对象,用于订阅频道并侦听新消息。创建PubSub对象如下:

1
2
>>> r = redis.StrictRedis(...)
>>> p = r.pubsub()

一旦创建了PubSub实例,可以订阅给定模式的频道。

1
2
>>> p.subscribe('my-first-channel', 'my-second-channel', ...)
>>> p.psubscribe('my-*', ...)

在PubSub的实例订阅的频道/模式后,可以通过阅读PubSub 实例的消息来查看订阅

1
2
3
4
5
6
>>> p.get_message()
{'pattern': None, 'type': 'subscribe', 'channel': 'my-second-channel', 'data': 1L}
>>> p.get_message()
{'pattern': None, 'type': 'subscribe', 'channel': 'my-first-channel', 'data': 2L}
>>> p.get_message()
{'pattern': None, 'type': 'psubscribe', 'channel': 'my-*', 'data': 3L}

每个从PubSub实例读取的消息都是一个包含以下key的dict。

  • 类型: ‘subscribe’, ‘unsubscribe’, ‘psubscribe’, ‘punsubscribe’, ‘message’, ‘pmessage’。备注:发送者(pub)发送消息,订阅者(sub)接收消息
  • 频道:订阅或消息发布到的频道
  • 模式:匹配发布消息频道的模式。如果是None时,那模式就是除“pmessage”类型外的所有情况
  • 数据:消息数据。订阅者订阅消息时,此值将是连接当前订阅的通道和模式的数量。发布者发布消息时,这个值将是实际发布消息的数量

发布消息的栗子:
publish方法返回匹配频道和订阅模式的号码。 ‘my-first-channel’同时匹配’my-first-channel’和’my-*‘订阅模式,所以这个消息将被传送到2个通道/模式

1
2
3
4
5
6
>>> r.publish('my-first-channel', 'some data')
2
>>> p.get_message()
{'channel': 'my-first-channel', 'data': 'some data', 'pattern': None, 'type': 'message'}
>>> p.get_message()
{'channel': 'my-first-channel', 'data': 'some data', 'pattern': 'my-*', 'type': 'pmessage'}

取消订阅就像订阅一样。如果没有参数传递给发送者取消订阅,则所有通道或模式都将取消订阅

1
2
3
4
5
6
7
8
>>> p.unsubscribe()
>>> p.punsubscribe('my-*')
>>> p.get_message()
{'channel': 'my-second-channel', 'data': 2L, 'pattern': None, 'type': 'unsubscribe'}
>>> p.get_message()
{'channel': 'my-first-channel', 'data': 1L, 'pattern': None, 'type': 'unsubscribe'}
>>> p.get_message()
{'channel': 'my-*', 'data': 0L, 'pattern': None, 'type': 'punsubscribe'}

redis-py也允许注册回调函数来处理发布的消息。消息处理(函数)只接受一个参数:message(像上面的例子那样是一个dict)。要使用消息处理(函数)订阅通道/模式,请将通道或模式名称作为关键字参数传递,其值为回调函数。

当消息在消息处理的通道或模式中读取时,将创建消息字典并将其传递给消息处理。在这种情况下,get_message()返回一个None值,因为消息已经被处理了。

1
2
3
4
5
6
7
8
9
10
11
12
13
>>> def my_handler(message):
... print ('MY HANDLER: '), message['data']
>>> p.subscribe(**{'my-channel': my_handler})
#阅读订阅确认消息
>>> p.get_message()
{'pattern': None, 'type': 'subscribe', 'channel': 'my-channel', 'data': 1L}
>>> r.publish('my-channel', 'awesome data')
1
#对于消息处理程序的工作,可以通过get_message()函数告诉实例读取数据
>>> message = p.get_message()
MY HANDLER: awesome data
>>> print(message)
None

如果对订阅/取消订阅的确认消息不感兴趣,则可以通过将ignore_subscribe_messages = True传递给r.pubsub()来忽略它们 。这将导致所有订阅/取消订阅消息被读取

1
2
3
4
5
6
7
>>> p = r.pubsub(ignore_subscribe_messages=True)
>>> p.subscribe('my-channel')
>>> p.get_message() # hides the subscribe message and returns None
>>> r.publish('my-channel')
1
>>> p.get_message()
{'channel': 'my-channel', 'data': 'my data', 'pattern': None, 'type': 'message'}

get_message()使用系统的“select”模块快速查询连接的 socket。如果可以读取数据,get_message()将读取它,格式化消息并返回,或者将消息传递给消息处理程序。如果没有要读取的数据,get_message()会立即返回None。这使得将它集成到应用程序中的现有事件循环中变得容易

1
2
3
4
5
>>> while True:
>>> message = p.get_message()
>>> if message:
>>> # 处理消息
>>> time.sleep(0.001) #增加点延迟

除了使用.get_message()外还有其他的方式可以来阅读信息
1.阻塞
旧版本的redis-py只能用pubsub.listen()读取消息。listen()是一个生成器(直到有可用的消息,才解除阻塞)。如果应用程序不需要执行其他任何操作,只需接收和处理从redis接收到的消息,那么listen()是最简单容易的方法

1
2
>>> for message in p.listen():
... # 处理消息

2.非阻塞
pubsub.run_in_thread()创建一个新线程并启动事件循环,线程对象返回给run_in_thread()的调用者。调用者可以使用thread.stop()方法关闭事件循环和线程。
run_in_thread(),其实只是get_message()的一个封装,使它在一个单独的线程中运行时,本质上创建了一个非常小的非阻塞事件循环。 run_in_thread()有一个可选的sleep_time参数。如果指定参数,则事件循环将调用每次迭代时time.sleep()的值。

注意:由于在一个单独的线程中运行,因此无法处理那些不能自动处理和注册小心的函数。因此,如果订阅了没有附加消息处理函数的模式/通道,redis-py将阻止调用run_in_thread()。

1
2
3
4
5
>>> p.subscribe(**{'my-channel': my_handler})
>>> thread = p.run_in_thread(sleep_time=0.001)
# 事件循环现在正在后台处理消息
# 中,直到它被关闭的时...
>>> thread.stop()

一个PubSub对象的编码与创建它的客户端实例相同。在发送到Redis之前,任何使用unicode的通道/模式都将使用客户端上指定的字符串编码。如果客户端的decode_responses设置为False(缺省值),则消息字典中的’channel’,’pattern’和’data’值将是(Python 2中的str,Python 3中的bit)。如果客户端的 decode_responses为True,那么’channel’,’pattern’和’data’值将被自动解码为使用客户端字的 unicode字符串编码。

PubSub对象记住它订阅的频道和模式。如果发生网络错误或超时等断开事件,之后又重新连接时,PubSub对象将重新订阅之前的所有通道和模式。不过客户端断开连接时发布的消息则无法传送。所以,当使用完PubSub对象时,要调用 .close()方法关闭连接。

1
2
3
>>> p = r.pubsub()
>>> ...
>>> p.close()

PubSub对象还支持PUBSUB子命令CHANNELS,NUMSUB和NUMPAT的设置

1
2
3
4
5
6
7
8
>>> r.pubsub_channels()
['foo', 'bar']
>>> r.pubsub_numsub('foo', 'bar')
[('foo', 9001), ('bar', 42)]
>>> r.pubsub_numsub('baz')
[('baz', 0)]
>>> r.pubsub_numpat()
1204

提供Sentinel支持

关于Sentinel(哨兵)
redis-py可以与Redis Sentinel一起用于发现Redis节点。不过需要运行至少一个Sentinel守护进程才能使用redis-py的Sentinel支持。

将redis-py连接到Sentinel实例很容易。可以使用Sentinel连接来发现主从之间的网络地址:
ps:主设备(用于写操作),从设备(用于只读操作)

1
2
3
4
5
6
>>> from redis.sentinel import Sentinel
>>> sentinel = Sentinel([('localhost', 26379)], socket_timeout=0.1)
>>> sentinel.discover_master('mymaster')
('127.0.0.1', 6379)
>>> sentinel.discover_slaves('mymaster')
[('127.0.0.1', 6380)]

也可以从Sentinel实例创建Redis客户端连接主从设备。

1
2
3
4
5
>>> master = sentinel.master_for('mymaster', socket_timeout=0.1)
>>> slave = sentinel.slave_for('mymaster', socket_timeout=0.1)
>>> master.set('foo', 'bar')
>>> slave.get('foo')
'bar'

主对象和从属对象是正常的StrictRedis实例,其连接池绑定到Sentinel实例。当支持Sentinel的客户端尝试建立连接时,它首先查询Sentinel服务器以确定要连接的适当主机。如果找不到服务器,则会引发MasterNotFoundError或SlaveNotFoundError,这两个类都是ConnectionError的子类。
当尝试连接到从属客户端时,Sentinel连接池将遍历从属列表,直到找到可以连接的从属客户端。如果没有从属客户端可以连接,则与主站建立连接。

扫描迭代器

redis-py可以通过自带的can_iter,hscan_iter, sscan_iter和zscan_iter方法方便的使用Python迭代器

1
2
3
4
5
6
7
>>> for key, value in (('A', '1'), ('B', '2'), ('C', '3')):
... r.set(key, value)
>>> for key in r.scan_iter():
... print(key, r.get(key))
A 1
B 2
C 3

操作文档

文档

查看评论