基于Python探针完成mysql调用库的统计与监控

本文总阅读量

最近在完善系统监控, 针对web框架都有完善的APM middleware用于接口统计与监控, 但是针对Mysql相关的比较少(几乎没有), 每次Mysql业务出现问题时, 都无从下手, 如果是从云服务商买的服务, 还可以去查查是否有什么慢日志, 但也很难纳入自己的数据监控中心做集体监控, 所以就开始搞一套Python Mysql的统计与监控

注: 目前用的Python Mysql库是aiomysql

1.简单粗暴的方法–对mysql库进行封装

这是一个最简单的方法, 在框架调用Mysql库和Mysql库中间实现一个中间层, 在中间层完成耗时统计,如:

1
2
3
4
5
6
7
8
9
# 伪代码

def my_execute(conn, sql, param):
# 针对MySql库的统计封装组件
with MyTracer(conn, sql, param):
# 以下为正常使用MySql库的代码
with conn.cursor as cursor:
cursor.execute(sql, param)
...

看样子实现起来非常不错, 而且更改非常方便, 但由于是在最顶层的API上进行修改, 其实是非常不灵活的, 而且在cursor.execute里会进行一些预操作, 如把sql和param进行拼接, 调用nextset清除当前游标的数据等等, 拿到的数据如时间耗时也是不准确的, 同时也没办法得到一些详细的错误码.

看样子只能去改源代码, 然后再调用源代码了, 但是如果每个库都需要改源代码才能统计, 那也太麻烦了, 好在Python也提供了一些类似探针的接口, 可以通过探针把库的源码进行替换完成我们的代码.

2.Python的探针

在Python中可以通过sys.meta_path来实现import hook的功能, 当执行 import 相关操作时, 会根据sys.meta_path定义的对象对import相关库进行更改.
sys.meta_path中的对象需要实现一个find_module方法, 这个find_module方法返回None或一个实现了load_module方法的对象, 我们可以通过这个对象, 针对一些库在import时, 把相关的方法进行替换, 简单用法如下,通过hooktime.sleep让他在sleep的时候能打印消耗的时间.
github源码存储

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import importlib
import sys
from functools import wraps


def func_wrapper(func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
end = time.time()
print(f"speed time:{end - start}")
return result
return wrapper


class MetaPathFinder:

def find_module(self, fullname, path=None):
print(f'find module:{path}:{fullname}')
return MetaPathLoader()


class MetaPathLoader:

def load_module(self, fullname):
if fullname in sys.modules:
return sys.modules[fullname]
# 防止递归调用
finder = sys.meta_path.pop(0)
# 导入 module
module = importlib.import_module(fullname)
if fullname == 'time':
module.sleep = func_wrapper(module.sleep)
sys.meta_path.insert(0, finder)
return module


sys.meta_path.insert(0, MetaPathFinder())


if __name__ == '__main__':
import time
time.sleep(1)


# 输出示例:
# find module:datetime
# find module:time
# load module:time
# find module:math
# find module:_datetime
# speed time:1.00073385238647468

上面的例子看来很不错, 但是需要在调用的入口处显式调用该逻辑, 通常一个项目可能有几个入口, 每个入口都显示调用该逻辑会非常麻烦, 如果能根据解析器启动而自动hook会更好.
查阅了一翻资料后发现, python 解释器初始化的时候会自动import PYTHONPATH下存在的sitecustomizeusercustomize模块, 我们只要创建该模块, 并在模块里面写入我们的sys.meta_path逻辑即可.

3.制作探针模块

一切了解完毕, 开始制作探针模块, 由于只涉及到aiomysql模块, 那么在MetaPathFinder.find_module中需要只对aiomysql模块进行处理, 其他的先忽略.
然后我们要确定我们要把aiomysql的哪个功能给替换, 从业务上来说, 我们只要cursor.execute, cursor.fetchone, cursor.fetchall, cursor.executemany这几个主要的操作,所以需要深入cursor的源码, 看看如何去更改代码, 后者重载哪个函数.

先查看cursor.execute的源码(cursor.executemanay也类似), 会先调用self.nextset的方法, 把上个请求的数据先拿完, 再合并sql语句, 最后通过self._query进行查询:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
async def execute(self, query, args=None):
"""Executes the given operation
Executes the given operation substituting any markers with
the given parameters.
For example, getting all rows where id is 5:
cursor.execute("SELECT * FROM t1 WHERE id = %s", (5,))
:param query: ``str`` sql statement
:param args: ``tuple`` or ``list`` of arguments for sql query
:returns: ``int``, number of rows that has been produced of affected
"""
conn = self._get_db()

while (await self.nextset()):
pass

if args is not None:
query = query % self._escape_args(args, conn)

await self._query(query)
self._executed = query
if self._echo:
logger.info(query)
logger.info("%r", args)
return self._rowcount

再看cursor.fetchone的源码(cursor.fetchall也类似), 发现其实是从缓存中获取数据, 这些数据在执行cursor.execute中就已经获取了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def fetchone(self):
"""Fetch the next row """
self._check_executed()
fut = self._loop.create_future()

if self._rows is None or self._rownumber >= len(self._rows):
fut.set_result(None)
return fut
result = self._rows[self._rownumber]
self._rownumber += 1

fut = self._loop.create_future()
fut.set_result(result)
return fut

综合上面的分析, 我们只要对核心的方法self._query进行重载即可拿到我们要的数据, 从源码中我们可以知道, 我们能获取到传入self._queryselfsql参数, 根据self又能获取到查询的结果, 同时我们通过装饰器能获取到运行的时间, 要的数据基本都到齐了, 按照思路修改后的代码如下(github存储):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import importlib
import time
import sys
from functools import wraps

from typing import cast, Any, Callable, Optional, Tuple, TYPE_CHECKING
from types import ModuleType
if TYPE_CHECKING:
import aiomysql


def func_wrapper(func: Callable):
@wraps(func)
async def wrapper(*args, **kwargs) -> Any:
start: float = time.time()
func_result: Any = await func(*args, **kwargs)
end: float = time.time()

# 根据_query可以知道, 第一格参数是self, 第二个参数是sql
self: aiomysql.Cursor = args[0]
sql: str = args[1]
# 通过self,我们可以拿到其他的数据
db: str = self._connection.db
user: str = self._connection.user
host: str = self._connection.host
port: str = self._connection.port
execute_result: Tuple[Tuple] = self._rows
# 可以根据自己定义的agent把数据发送到指定的平台, 然后我们就可以在平台上看到对应的数据或进行监控了,
# 这里只是打印一部分数据出来
print({
"sql": sql,
"db": db,
"user": user,
"host": host,
"port": port,
"result": execute_result,
"speed time": end - start
})
return func_result
return cast(Callable, wrapper)


class MetaPathFinder:

@staticmethod
def find_module(fullname: str, path: Optional[str] = None) -> Optional["MetaPathLoader"]:
if fullname == 'aiomysql':
# 只有aiomysql才进行hook
return MetaPathLoader()
else:
return None


class MetaPathLoader:

@staticmethod
def load_module(fullname: str):
if fullname in sys.modules:
return sys.modules[fullname]
# 防止递归调用
finder: "MetaPathFinder" = sys.meta_path.pop(0)
# 导入 module
module: ModuleType = importlib.import_module(fullname)
# 针对_query进行hook
module.Cursor._query = func_wrapper(module.Cursor._query)
sys.meta_path.insert(0, finder)
return module


sys.meta_path.insert(0, MetaPathFinder())


if __name__ == '__main__':
import aiomysql
import asyncio


async def main():
"""aiomysql的官方例子"""
pool: aiomysql.Pool = await aiomysql.create_pool(
host='127.0.0.1', port=3306, user='root', password='', db='mysql'
)
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT 42;")
(r,) = await cur.fetchone()
assert r == 42
pool.close()
await pool.wait_closed()
asyncio.run(main())

# 输出示例:
# 可以看出sql语句与我们输入的一样, db, user, host, port等参数也是, 还能知道执行的结果和运行时间
# {'sql': 'SELECT 42;', 'db': 'mysql', 'user': 'root', 'host': '127.0.0.1', 'port': 3306, 'result': ((42,),), 'speed time': 0.00045609474182128906}

4.总结

基于Python探针的方法为我们引入的库制作钩子非常简单,并不是很复杂, 得益于Python的动态语言特性, 也能获取到很多数据, 对库的源代码侵入性为0, 而且这个方法理论上适用于任何库, 可以很方便的为httpx,aioredis等库制作钩子.

虽然本文带有监控几个字…但是每个监控平台都有不一样的风格, 所以有了原数据, 就可以根据不同的监控平台制订不同的方法(学术点的说法就是制作apm-agent), 例如我的是把得到的数据写到日志, 被日志收集器收集和解析后发送到Es(除了用于监控, 平时也能上去看看数据是怎么样的, 有没有优化空间等等), 然后由Prometheus对数据拉取和监控.

5.附录

Python探针灵感来自于:https://mozillazg.com/2016/04/apm-python-agent-principle.html

查看评论