前记 在项目的演变过程中,有时可能会诞生一些需要奇怪的临时需求,这些需求会涉及到所有的SQL,但开发时间上却不允许整个项目的所有SQL进行重写,比如控制不同的人访问表的权限,或者是我面对的SASS化需求,这时就需要在运行时根据对应的条件来修改SQL语句。
1.缘起 最近项目在准备搞SASS化,SASS化有一个特点就是多租户,且每个租户之间的数据都要隔离,对于数据库的隔离方案常见的有数据库隔离,表隔离,字段隔离,目前我只用到表隔离和字段隔离(数据库隔离的原理也是差不多)。 对于字段隔离比较简单,就是查询条件不同而已,比如像下面的SQL查询:
1 SELECT * FROM t_demo WHERE tenant_id= 'xxx' AND is_del= 0
但是为了严谨,需求上需要在执行SQL之前检查对应的表是否带上tenant_id
的查询字段。
对于表隔离就麻烦了一些,他需要做到在运行的时候根据对应的租户ID来处理某个数据表,举个例子,假如有下面这样的一条SQL查询:
1 SELECT * FROM t_demo WHERE is_del= 0
在遇到租户A时,SQL查询将变为:
1 SELECT * FROM t_demo_a WHERE is_del= 0
在遇到租户B时,SQL查询将变为:
1 SELECT * FROM t_demo_b WHERE is_del= 0
如果商户数量固定时,一般在代码里编写if-else
来判断就可以了,但是常见的SASS化应用的商户是会一直新增的,那么对于这个SQL逻辑就会变成这样:
1 2 3 def sql_handle (tenant_id: str ): table_name: str = f"t_demo_{tenant_id} " sql: str = f"SELECT * FROM {table_name} WHERE is_del=0"
但是这有几个问题,对于ORM来说,一开始只创建一个t_demo
对应的表对象就可以了,现在却要根据多个商户创建多个表对象,这是不现实的,其次如果是裸写SQL,一般会使用IDE的检查,而对于这样的SQL:
1 sql: str = f"SELECT * FROM {table_name} WHERE is_del=0"
IDE是没办法进行检查的,当然还有一个最为严重的问题,就是当前的项目已经非常庞大了,如果每个相关表的调用都进行适配更改的话,那工程量就非常庞大了,所以最好的方案就是在引擎库得到用户传过来的SQL语句后且还没发送到MySQL
服务器之前自动的根据商户ID更改SQL, 而要达到这样的效果,就必须侵入到我们使用的MySQL
的引擎库,修改里面的方法来兼容我们的需求。
不管是使用dbutils
还是sqlalchemy
,都可以指定一个引擎库,目前常用的引擎库是pymysql
,所以下文都将以pymysql
为例进行阐述。
2.侵入库 由于必须侵入到我们使用的引擎库,所以我们应该先判断我们需要修改引擎库的哪个方法,在经过源码阅读后,我判定只要更改pymysql.cursors.Cursor
的mogrify
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 def mogrify (self, query, args=None ): """ Returns the exact string that is sent to the database by calling the execute() method. This method follows the extension to the DB API 2.0 followed by Psycopg. """ conn = self._get_db() if args is not None : query = query % self._escape_args(args, conn) return query
这个方法的作用就是把用户传过来的SQL和参数进行整合,生成一个最终的SQL,刚好符合我们的需求,于是可以通过继承的思路来创建一个新的属于我们自己的Cursor
类:
1 2 3 4 5 6 7 8 9 10 11 12 13 import pymysqlclass Cursor (pymysql.cursors.Cursor ): def mogrify (self, query: str , args: Union[None , list , dict , tuple ] = None ) -> str: mogrify_sql: str = super ().mogrify(query, args) return mogrify_sqlclass DictCursor (pymysql.cursors.DictCursorMixin, Cursor ): """A cursor which returns results as a dictionary"""
创建好了Cursor
类后,就需要考虑如何在pymysql
中应用我们自定义的Cursor
类了,一般的Mysql
连接库都支持我们传入自定义的Cursor
类,比如pymysql
:
1 2 3 4 5 6 7 8 9 10 11 import pymysql.cursors connection = pymysql.connect( host='localhost' , user='user' , password='passwd' , database='db' , charset='utf8mb4' , cursorclass=pymysql.cursors.DictCursor )
我们可以通过cursorclass
来指定我们的Cursor
类,如果使用的库不支持或者是其它原因则需要使用猴子补丁的方法,具体的使用方法见基于Python探针完成调用库的数据提取 。
3.获取商户ID 现在我们已经搞定了在何处修改SQL的问题了,接下来就要思考如何在mogrify
方法获取到商户ID以及那些表要进行替换,一般我们在进行一段代码调用时,有两种传参数的方法, 一种是传数组类型的参数:
1 2 with conn.cursor() as cursor: cursor.execute("SELECT * FROM t_demo WHERE is_del=%s" , (0 , ))
一种是传字典类型的参数:
1 2 with conn.cursor() as cursor: cursor.execute("SELECT * FROM t_demo WHERE is_del=%(is_del)s" , {"is_del" : 0 })
目前大多数的项目都存在这两种类型的编写习惯,而引擎库在执行execute
时会经过处理后才把参数sql
和args
传给了mogrify
,如果我们是使用字典类型的参数,那么可以在里面嵌入我们需要的参数,并在mogrify
里面提取出来,但是使用了数组类型的参数或者是ORM库的话就比较难传递参数给mogrify
方法了,这时可以通过context
隐式的把参数传给mogrify
方法,具体的分析和原理可见:如何使用contextvars模块和源码分析 。
context
的使用方法很简单, 首先是创建一个context
封装的类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 from contextvars import ContextVar, Tokenfrom typing import Any, Dict, Optional, Set context: ContextVar[Dict[str , Any]] = ContextVar("context" , default={})class Context (object ): """基础的context调用,支持Type Hints检查""" tenant_id: str replace_table_set: Set[str ] def __getattr__ (self, key: str ) -> Any: value: Any = context.get().get(key) return value def __setattr__ (self, key: str , value: Any ) -> None : context.get()[key] = valueclass WithContext (Context ): """简单的处理reset token逻辑,和context管理,只用在业务代码""" def __init__ (self ) -> None : self._token: Optional[Token] = None def __enter__ (self ) -> "WithContext": self._token = context.set ({}) return self def __exit__ (self, exc_type: Any, exc_val: Any, exc_tb: Any ) -> None : if self._token: context.reset(self._token) self._token = None
接下来在业务代码中,通过context传入当前业务对应的参数:
1 2 3 4 5 with WithContext as context: context.tenant_id = "xxx" context.replace_table_set = {"t_demo" } with conn.cursor() as cursor: cursor.execute("SELECT * FROM t_demo WHERE is_del=%s" , (0 , ))
然后在mogrify
中通过调用context
即可获得对应的参数了:
1 2 3 4 5 6 7 8 9 10 import pymysqlclass Cursor (pymysql.cursors.Cursor ): def mogrify (self, query: str , args: Union[None , list , dict , tuple ] = None ) -> str: tenant_id: str = context.tenant_id replace_table_set: Set[str ] = context.replace_table_set mogrify_sql: str = super ().mogrify(query, args) return mogrify_sql
4.修改SQL 现在,万事俱备,只剩下修改SQL的逻辑,之前在做别的项目的时候,建的表都是十分的规范,它们是以t_xxx
的格式给表命名,这样一来替换表名十分方便,只要进行两次替换就可以兼容大多数情况了,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import pymysqlclass Cursor (pymysql.cursors.Cursor ): def mogrify (self, query: str , args: Union[None , list , dict , tuple ] = None ) -> str: tenant_id: str = context.tenant_id replace_table_set: Set[str ] = context.replace_table_set for replace_table in replace_table_set: if replace_table in query: query = query.replace(f" {replace_table} " , f" {replace_table} _{tenant_id} " ) query = query.replace(f" {replace_table} ." , f" {replace_table} _{tenant_id} ." ) mogrify_sql: str = super ().mogrify(query, args) return mogrify_sql
但是现在项目的SQL规范并不是很好,有些表名还是MySQL
的关键字,所以靠简单的替换是行不通的,同时这个需求中,一些表只需要字段隔离,需要确保有带上对应的字段查询,这就意味着必须有一个库可以来解析SQL
,并返回一些数据使我们可以比较方便的知道SQL
中哪些是表名,哪些是查询字段了。
目前在Python中有一个比较知名的SQL
解析库–sqlparse ,它可以通过解析引擎把SQL解析成一个Python对象
,之后我们就可以通过一些语法来判断哪些是SQL
关键字, 哪些是表名,哪些是查询条件等等。但是这个库只实现一些底层的API,我们需要对他和SQL比较了解之后才能实现一些比较完备的功能,比如下面3种常见的SQL:
1 2 3 SELECT * FROM t_demoSELECT * FROM t_demo as demoSELECT * FROM t_other as other LEFT JOIN t_demo demo on demo.xxx= = other.xxx
如果我们要通过sqlparse
来提取表名的话就需要处理这3种情况,而我们如果要每一个情况都编写出来的话,那将会非常费心费力,同时也可能存在遗漏的情况,这时就需要用到另外一个库–sql_metadata ,这个库是基于sqlparse
和正则的解析库,同时提供了大量的常见使用方法的封装,我们通过直接调用对应的函数就能知道SQL
中有哪些表名,查询字段是什么了。
目前已知这个库有一个缺陷,就是会自动去掉字段的符号, 比如表名为关键字时,我们需要使用`符号把它包起来:
但在经过sql_metadata
解析后得到的表名是case
而不是`case`,需要人为的处理,但是我并不觉得这是一个BUG,自己不按规范创建表,能怪谁呢。
接下来就可以通过sql_metadata
的方法来实现我需要的功能了,在根据需求修改后,代码长这样(说明见注释):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 from typing import Dict, Set, Tuple, Unionimport pymysqlimport sql_metadataclass Cursor (pymysql.cursors.Cursor ): def mogrify (self, query: str , args: Union[None , list , dict , tuple ] = None ) -> str: tenant_id: str = context.tenant_id sql_parse: sql_metadata.Parser = sql_metadata.Parser(query) check_flag = False where_table_set: Set[str ] = context.where_table_set for table_name in sql_parse.tables: if table_name in where_table_set: if sql_parse.columns_dict: for where_column in sql_parse.columns_dict.get("where" , []): if "tenant_id" in where_column.lower().split("." ): check_flag = True break if not check_flag: raise RuntimeError() replace_table_set: Set[str ] = context.replace_table_set new_query: str = query for table_name in sql_parse.tables: if table_name in replace_table_set: new_query = "" for token in sql_parse.tokens: if token.is_potential_table_name: parse_table_name: str = token.stringified_token.strip() if parse_table_name in replace_table_set: new_table_name: str = f" {parse_table_name} _{tenant_id} " if token.next_token.normalized != "AS" : new_table_name += f" AS {parse_table_name} " query += new_table_name continue new_query += token.stringified_token mogrify_sql: str = super ().mogrify(new_query, args) return mogrify_sql
这份代码十分简单,它只做简单介绍,事实上这段逻辑会应用到所有的SQL
查询中,我们应该要保证这段代码是没问题的,同时不要有太多的性能浪费,所以在使用的时候要考虑到代码拆分和优化。 比如在使用的过程中可以发现,我们的SQL
转换和检查都是在父类的Cursor.mogrify
之前进行的,这就意味着不管我们代码逻辑里cursor.execute
传的参数是什么,对于同一个代码逻辑来说,传过来的query
值是保持不变的,比如下面的代码:
1 2 3 4 def get_user_info (uid: str ) -> Dict[str, Any]: with conn.cursor() as cursor: cursor.execute("SELECT * FROM t_user WHERE uid=%(uid)s" , {"uid" : uid}) return cursor.fetchone() or {}
这段代码中传到Cursor.mogrify
的query永远为SELECT * FROM t_user WHERE uid=%(uid)s
,有变化的只是args中uid的不同。 有了这样的一个前提条件,那么我们就可以把query
的校验结果和转换结果缓存下来,减少每次都需要解析SQL
再校验造成的性能浪费。至于如何实现缓存则需要根据自己的项目来决定,比如项目中只有几百个SQL
执行,那么直接用Python
的dict
来存放就可以了,如果项目中执行的SQL
很多,同时有些执行的频率非常的高,有些执行的频率非常的低,那么可以考虑使用LRU
来缓存。