前记 最近在完善公司的监控系统, 发现在项目运行时经常会出现一些运行时的问题, 这些问题往往不是一个子服务引发的问题, 而可能是某个环节出现了问题, 这时候就需要引入APM系统。在收集APM数据时发现在Python
生态中针对web框架都有完善的APM中间件用于接口统计与监控, 但是第三方调用库相关的APM实现都比较少(几乎没有), 同时这些库大多数也都没提供一些钩子实现。这就需要自己去封装一些库, 为这些库实现一套调用过程的数据提供逻辑。
本文是以Python
的aiomysql
库为例,阐述如何基于Python
的探针完成调用库的调用过程统计与监控的封装。
注: 监控的形式的agent有很多种,如Prometheus
,Zabbix
, Graphite
和Opentracing
他们的数据源有很大的不同,但是他们都是基于元数据封装成自己的源数据,然后发送到对应的服务,所以本文只介绍如何提取元数据,剩下的如何发送需要自己按照特定的监控系统去实现。
注:这里以aiomysql库来做示例,提取数据的方法应该用统一的dbapi2, 本文只阐述如何简单的实现。
1.简单粗暴的方法–对mysql库进行封装 要统计一个执行过程, 有一个必要的条件就是需要知道这个执行过程的开始位置和结束位置, 所以最简单粗暴的方法就是要基于调用的方法进行封装,在框架调用MySQL
库和MySQL
库中间实现一个中间层, 并基于中间层完成耗时统计,如:
1 2 3 4 5 6 7 8 9 def my_execute (conn, sql, param ): with MyTracer(conn, sql, param): with conn.cursor as cursor: cursor.execute(sql, param) ...
这样的实现看起来非常不错, 而且更改非常方便, 但由于是在最顶层的API上进行修改, 其实是非常不灵活的, 同时cursor.execute
里面会执行一些预操作, 如把sql和param进行拼接, 以及调用nextset
清除当前游标的数据等等。 所以最后拿到的数据如时间耗时也是不准确的, 同时也没办法得到一些详细的元数据, 如错误码等等.
如果要拿到最直接有用的数据,就只能去改源代码, 然后再调用了, 但是如果每个库都需要改源代码才能统计, 那也太麻烦了, 好在Python
也提供了一些类似探针的接口, 可以通过探针把库的源码进行替换完成我们的代码.
2.Python的探针 在Python
中可以通过sys.meta_path
来实现import hook
的功能, 当执行 import 相关操作时, 会根据sys.meta_path
定义的对象对import相关库进行更改.sys.meta_path
中的对象需要实现一个find_module
方法, 这个find_module
方法返回None
或一个实现了load_module
方法的对象, 我们可以通过这个对象, 针对一些库在import时, 把相关的方法进行替换, 简单用法如下,通过hooktime.sleep
让他在sleep的时候能打印消耗的时间.github源码存储
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 import importlibimport sysfrom functools import wrapsdef func_wrapper (func ): """这里通过一个装饰器来达到狸猫换太子和获取数据的效果""" @wraps(func ) def wrapper (*args, **kwargs ): start = time.time() result = func(*args, **kwargs) end = time.time() print(f"speed time:{end - start} " ) return result return wrapperclass MetaPathFinder : def find_module (self, fullname, path=None ): print(f'find module:{path} :{fullname} ' ) return MetaPathLoader()class MetaPathLoader : def load_module (self, fullname ): if fullname in sys.modules: return sys.modules[fullname] finder = sys.meta_path.pop(0 ) module = importlib.import_module(fullname) if fullname == 'time' : module.sleep = func_wrapper(module.sleep) sys.meta_path.insert(0 , finder) return module sys.meta_path.insert(0 , MetaPathFinder())if __name__ == '__main__' : import time time.sleep(1 )
3.制作探针模块 了解完了主要流程后, 可以开始制作自己的探针模块了, 由于示例只涉及到aiomysql模块, 那么在MetaPathFinder.find_module
中需要只对aiomysql模块进行处理, 其他的先忽略. 然后我们需要确定要把aiomysql的哪个功能给替换, 从业务上来说, 一般情况下我们只要cursor.execute
, cursor.fetchone
, cursor.fetchall
, cursor.executemany
这几个主要的操作,所以需要深入cursor的源码 , 看看如何去更改代码, 后者重载哪个函数.
先从cursor.execute
的源码(cursor.executemanay
也类似)入手, 发现会先调用self.nextset
的方法, 把上个请求的数据先拿完, 再合并sql语句, 最后通过self._query
进行查询:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 async def execute (self, query, args=None ): """Executes the given operation Executes the given operation substituting any markers with the given parameters. For example, getting all rows where id is 5: cursor.execute("SELECT * FROM t1 WHERE id = %s", (5,)) :param query: ``str`` sql statement :param args: ``tuple`` or ``list`` of arguments for sql query :returns: ``int``, number of rows that has been produced of affected """ conn = self._get_db() while (await self.nextset()): pass if args is not None : query = query % self._escape_args(args, conn) await self._query(query) self._executed = query if self._echo: logger.info(query) logger.info("%r" , args) return self._rowcount
再看cursor.fetchone
的源码(cursor.fetchall
也类似), 发现其实是从缓存中获取数据, 这些数据在执行cursor.execute
中就已经获取了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 def fetchone (self ): """Fetch the next row """ self._check_executed() fut = self._loop.create_future() if self._rows is None or self._rownumber >= len (self._rows): fut.set_result(None ) return fut result = self._rows[self._rownumber] self._rownumber += 1 fut = self._loop.create_future() fut.set_result(result) return fut
综合上面的分析, 可以发现只要对核心的方法self._query
进行重载即可拿到我们想要的数据, 从源码中我们可以知道, 我们能获取到传入self._query
的self
和sql
参数, 根据self
又能获取到查询的结果, 同时我们通过装饰器能获取到运行的时间, 要的数据基本都到齐了, 按照思路修改后的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 import importlibimport timeimport sysfrom functools import wrapsfrom typing import cast, Any, Callable, Optional, Tuple, TYPE_CHECKINGfrom types import ModuleTypeif TYPE_CHECKING: import aiomysqldef func_wrapper (func: Callable ): @wraps(func ) async def wrapper (*args, **kwargs ) -> Any: start: float = time.time() func_result: Any = await func(*args, **kwargs) end: float = time.time() self: aiomysql.Cursor = args[0 ] sql: str = args[1 ] db: str = self._connection.db user: str = self._connection.user host: str = self._connection.host port: str = self._connection.port execute_result: Tuple[Tuple] = self._rows print({ "sql" : sql, "db" : db, "user" : user, "host" : host, "port" : port, "result" : execute_result, "speed time" : end - start }) return func_result return cast(Callable, wrapper)class MetaPathFinder : @staticmethod def find_module (fullname: str , path: Optional[str ] = None ) -> Optional["MetaPathLoader"]: if fullname == 'aiomysql' : return MetaPathLoader() else : return None class MetaPathLoader : @staticmethod def load_module (fullname: str ): if fullname in sys.modules: return sys.modules[fullname] finder: "MetaPathFinder" = sys.meta_path.pop(0 ) module: ModuleType = importlib.import_module(fullname) module.Cursor._query = func_wrapper(module.Cursor._query) sys.meta_path.insert(0 , finder) return moduleasync def test_mysql () -> None : import aiomysql pool: aiomysql.Pool = await aiomysql.create_pool( host='127.0.0.1' , port=3306 , user='root' , password='123123' , db='mysql' ) async with pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute("SELECT 42;" ) (r,) = await cur.fetchone() assert r == 42 pool.close() await pool.wait_closed()if __name__ == '__main__' : sys.meta_path.insert(0 , MetaPathFinder()) import asyncio asyncio.run(test_mysql())
这个例子看来很不错, 但是需要在调用的入口处显式调用该逻辑, 通常一个项目可能有几个入口, 每个入口都显示调用该逻辑会非常麻烦, 而且必须先调用我们的hook逻辑后才能import, 这样就得订好引入规范, 不然就可能出现部分地方hook不成功, 如果能把引入hook这个逻辑安排在解析器启动后马上执行, 就可以完美地解决这个问题了. 查阅了一翻资料后发现,python
解释器初始化的时候会自动import PYTHONPATH
下存在的sitecustomize
和usercustomize
模块, 我们只需要创建该模块, 并在模块里面写入我们的 替换函数即可。
具体结构如下,也可以参考github存储
1 2 3 4 5 . ├── __init__ .py ├── hook_aiomysql .py ├── sitecustomize .py └── test_auto_hook .py
hook_aiomysql.py
是我们制作探针的代码为例子, 而sitecustomize.py
存放的代码如下, 非常简单, 就是引入我们的探针代码, 并插入到sys.meta_path
:
1 2 3 4 import sysfrom hook_aiomysql import MetaPathFinder sys.meta_path.insert(0 , MetaPathFinder())
test_auto_hook.py
则是测试代码:
1 2 3 4 5 import asynciofrom hook_aiomysql import test_mysql asyncio.run(test_mysql())
接下来只要设置PYTHONPATH
并运行我们的代码即可(如果是项目的话一般交由superisor启动,则可以在配置文件中设置好PYTHONPATH
):
1 2 3 (.venv) ➜ python_hook git:(master) ✗ export PYTHONPATH=. (.venv) ➜ python_hook git:(master) ✗ python test_auto_hook.py {'sql' : 'SELECT 42;' , 'db' : 'mysql' , 'user' : 'root' , 'host' : '127.0.0.1' , 'port' : 3306, 'result' : ((42,),), 'speed time' : 0.000213623046875}
4.直接替换方法 可以看到上面的方法很好的运行了, 而且可以很方便的嵌入到我们的项目中, 但是该方法依赖了sitecustomize.py
文件, 我们很难让它抽离成一个第三方的库, 如果要抽离成第三方的库就得考虑看看有没有其他的方法。 在上面介绍MetaPathLoader
时说到了sys.module
, 在里面通过sys.modules
来减少重复引入:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 class MetaPathLoader : def load_module (self, fullname ): if fullname in sys.modules: return sys.modules[fullname] finder = sys.meta_path.pop(0 ) module = importlib.import_module(fullname) if fullname == 'time' : module.sleep = func_wrapper(module.sleep) sys.meta_path.insert(0 , finder) return module
这个减少重复引入的原理是, 每次引入一个模块后, 他就会存放在sys.modules, 如果是重复引入, 就会直接刷新成最新引入的模块。上面之所以会考虑到减少重复import是因为我们不会在程序运行时升级第三方库的依赖。
利用到我们可以不考虑重复引入同名不同实现的模块, 以及sys.modules会缓存引入模块的特点, 我们可以把上面的逻辑简化成引入模块->替换当前模块方法为我们修改的hook方法
。 第一次接触这个方法是从opentracing-python-instrumentation 学的, 不过他夹带着其他的封装, 所以我这里进行了简化处理github代码仓 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import timefrom functools import wrapsfrom typing import Any, Callable, Tuple, castimport aiomysqldef func_wrapper (func: Callable ): """和上面一样的封装函数, 这里简单略过""" _IS_HOOK: bool = False _query: Callable = aiomysql.Cursor._querydef install_hook () -> None : global _IS_HOOK _IS_HOOK = False if _IS_HOOK: return aiomysql.Cursor._query = func_wrapper(aiomysql.Cursor._query) _IS_HOOK = True def reset_hook () -> None : aiomysql.Cursor._query = _query _IS_HOOK = False
代码简单明了,接下来跑一跑刚才的测试:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import asyncioimport aiomysqlfrom demo import install_hook, reset_hookasync def test_mysql () -> None : pool: aiomysql.Pool = await aiomysql.create_pool( host='127.0.0.1' , port=3306 , user='root' , password='' , db='mysql' ) async with pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute("SELECT 42;" ) (r,) = await cur.fetchone() assert r == 42 pool.close() await pool.wait_closed() print("install hook" ) install_hook() asyncio.run(test_mysql()) print("reset hook" ) reset_hook() asyncio.run(test_mysql()) print("end" )
通过测试输出可以发现我们的逻辑的正确的, install hook后能出现我们提取的元信息, 而reset后则不会打印原信息
1 2 3 4 install hook {'sql' : 'SELECT 42;' , 'db' : 'mysql' , 'user' : 'root' , 'host' : '127.0.0.1' , 'port' : 3306, 'result' : ((42,),), 'speed time' : 0.000347137451171875} reset hook end
5.总结 得益于Python动态语言的特性, 我们可以很容易为第三方库实现钩子方法,上面说的两种方法中, 第二种方法非常简单, 但在自己项目中最好还是采用第一种方法, 因为Python
是通过一行一行代码进行扫描执行的, 第二种方法只能放在入口代码中, 并且要在被hook的对象实例化之前执行, 不然就会实现hook失败的现象, 而第一种方法除了麻烦外, 基本上能躲避所有坑。