前记 在项目的演变过程中,有时可能会诞生一些需要奇怪的临时需求,这些需求会涉及到所有的SQL,但开发时间上却不允许整个项目的所有SQL进行重写,比如控制不同的人访问表的权限,或者是我面对的SASS化需求,这时就需要在运行时根据对应的条件来修改SQL语句。
        
        1.缘起 最近项目在准备搞SASS化,SASS化有一个特点就是多租户,且每个租户之间的数据都要隔离,对于数据库的隔离方案常见的有数据库隔离,表隔离,字段隔离,目前我只用到表隔离和字段隔离(数据库隔离的原理也是差不多)。
1 SELECT  *  FROM  t_demo WHERE  tenant_id= 'xxx'  AND  is_del= 0 
但是为了严谨,需求上需要在执行SQL之前检查对应的表是否带上tenant_id的查询字段。
对于表隔离就麻烦了一些,他需要做到在运行的时候根据对应的租户ID来处理某个数据表,举个例子,假如有下面这样的一条SQL查询:
1 SELECT  *  FROM  t_demo WHERE  is_del= 0 
在遇到租户A时,SQL查询将变为:
1 SELECT  *  FROM  t_demo_a WHERE  is_del= 0 
在遇到租户B时,SQL查询将变为:
1 SELECT  *  FROM  t_demo_b WHERE  is_del= 0 
如果商户数量固定时,一般在代码里编写if-else来判断就可以了,但是常见的SASS化应用的商户是会一直新增的,那么对于这个SQL逻辑就会变成这样:
1 2 3 def  sql_handle (tenant_id: str  ):str  = f"t_demo_{tenant_id} " str  = f"SELECT * FROM {table_name}  WHERE is_del=0" 
但是这有几个问题,对于ORM来说,一开始只创建一个t_demo对应的表对象就可以了,现在却要根据多个商户创建多个表对象,这是不现实的,其次如果是裸写SQL,一般会使用IDE的检查,而对于这样的SQL:
1 sql: str  = f"SELECT * FROM {table_name}  WHERE is_del=0" 
IDE是没办法进行检查的,当然还有一个最为严重的问题,就是当前的项目已经非常庞大了,如果每个相关表的调用都进行适配更改的话,那工程量就非常庞大了,所以最好的方案就是在引擎库得到用户传过来的SQL语句后且还没发送到MySQL服务器之前自动的根据商户ID更改SQL, 而要达到这样的效果,就必须侵入到我们使用的MySQL的引擎库,修改里面的方法来兼容我们的需求。
不管是使用dbutils还是sqlalchemy,都可以指定一个引擎库,目前常用的引擎库是pymysql,所以下文都将以pymysql为例进行阐述。
 
2.侵入库 由于必须侵入到我们使用的引擎库,所以我们应该先判断我们需要修改引擎库的哪个方法,在经过源码阅读后,我判定只要更改pymysql.cursors.Cursor的mogrify方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 def  mogrify (self, query, args=None  ):"""     Returns the exact string that is sent to the database by calling the     execute() method.     This method follows the extension to the DB API 2.0 followed by Psycopg.     """ if  args is  not  None :return  query
这个方法的作用就是把用户传过来的SQL和参数进行整合,生成一个最终的SQL,刚好符合我们的需求,于是可以通过继承的思路来创建一个新的属于我们自己的Cursor类:
1 2 3 4 5 6 7 8 9 10 11 12 13 import  pymysqlclass  Cursor (pymysql.cursors.Cursor ):def  mogrify (self, query: str , args: Union[None , list , dict , tuple ] = None  ) -> str:str  = super ().mogrify(query, args)return  mogrify_sqlclass  DictCursor (pymysql.cursors.DictCursorMixin, Cursor ):"""A cursor which returns results as a dictionary""" 
创建好了Cursor类后,就需要考虑如何在pymysql中应用我们自定义的Cursor类了,一般的Mysql连接库都支持我们传入自定义的Cursor类,比如pymysql:
1 2 3 4 5 6 7 8 9 10 11 import  pymysql.cursors'localhost' ,'user' ,'passwd' ,'db' ,'utf8mb4' ,
我们可以通过cursorclass来指定我们的Cursor类,如果使用的库不支持或者是其它原因则需要使用猴子补丁的方法,具体的使用方法见基于Python探针完成调用库的数据提取 。
3.获取商户ID 现在我们已经搞定了在何处修改SQL的问题了,接下来就要思考如何在mogrify方法获取到商户ID以及那些表要进行替换,一般我们在进行一段代码调用时,有两种传参数的方法, 一种是传数组类型的参数:
1 2 with  conn.cursor() as  cursor:"SELECT * FROM t_demo WHERE is_del=%s" , (0 , ))
一种是传字典类型的参数:
1 2 with  conn.cursor() as  cursor:"SELECT * FROM t_demo WHERE is_del=%(is_del)s" , {"is_del" : 0 })
目前大多数的项目都存在这两种类型的编写习惯,而引擎库在执行execute时会经过处理后才把参数sql和args传给了mogrify,如果我们是使用字典类型的参数,那么可以在里面嵌入我们需要的参数,并在mogrify里面提取出来,但是使用了数组类型的参数或者是ORM库的话就比较难传递参数给mogrify方法了,这时可以通过context隐式的把参数传给mogrify方法,具体的分析和原理可见:如何使用contextvars模块和源码分析 。
context的使用方法很简单, 首先是创建一个context封装的类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 from  contextvars import  ContextVar, Tokenfrom  typing import  Any, Dict, Optional, Setstr , Any]] = ContextVar("context" , default={})class  Context (object """基础的context调用,支持Type Hints检查""" str str ]def  __getattr__ (self, key: str  ) -> Any:return  valuedef  __setattr__ (self, key: str , value: Any ) -> None :class  WithContext (Context ):"""简单的处理reset token逻辑,和context管理,只用在业务代码""" def  __init__ (self ) -> None :None def  __enter__ (self ) -> "WithContext":set ({})return  selfdef  __exit__ (self, exc_type: Any, exc_val: Any, exc_tb: Any ) -> None :if  self._token:None 
接下来在业务代码中,通过context传入当前业务对应的参数:
1 2 3 4 5 with  WithContext as  context:"xxx" "t_demo" }with  conn.cursor() as  cursor:"SELECT * FROM t_demo WHERE is_del=%s" , (0 , ))
然后在mogrify中通过调用context即可获得对应的参数了:
1 2 3 4 5 6 7 8 9 10 import  pymysqlclass  Cursor (pymysql.cursors.Cursor ):def  mogrify (self, query: str , args: Union[None , list , dict , tuple ] = None  ) -> str:str  = context.tenant_idstr ] = context.replace_table_setstr  = super ().mogrify(query, args)return  mogrify_sql
4.修改SQL 现在,万事俱备,只剩下修改SQL的逻辑,之前在做别的项目的时候,建的表都是十分的规范,它们是以t_xxx的格式给表命名,这样一来替换表名十分方便,只要进行两次替换就可以兼容大多数情况了,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import  pymysqlclass  Cursor (pymysql.cursors.Cursor ):def  mogrify (self, query: str , args: Union[None , list , dict , tuple ] = None  ) -> str:str  = context.tenant_idstr ] = context.replace_table_setfor  replace_table in  replace_table_set:if  replace_table in  query:f" {replace_table}  " , f" {replace_table} _{tenant_id}  " )f" {replace_table} ." , f" {replace_table} _{tenant_id} ." )str  = super ().mogrify(query, args)return  mogrify_sql
但是现在项目的SQL规范并不是很好,有些表名还是MySQL的关键字,所以靠简单的替换是行不通的,同时这个需求中,一些表只需要字段隔离,需要确保有带上对应的字段查询,这就意味着必须有一个库可以来解析SQL,并返回一些数据使我们可以比较方便的知道SQL中哪些是表名,哪些是查询字段了。
目前在Python中有一个比较知名的SQL解析库–sqlparse ,它可以通过解析引擎把SQL解析成一个Python对象,之后我们就可以通过一些语法来判断哪些是SQL关键字, 哪些是表名,哪些是查询条件等等。但是这个库只实现一些底层的API,我们需要对他和SQL比较了解之后才能实现一些比较完备的功能,比如下面3种常见的SQL:
1 2 3 SELECT  *  FROM  t_demoSELECT  *  FROM  t_demo as  demoSELECT  *  FROM  t_other as  other LEFT  JOIN  t_demo demo on  demo.xxx= = other.xxx
如果我们要通过sqlparse来提取表名的话就需要处理这3种情况,而我们如果要每一个情况都编写出来的话,那将会非常费心费力,同时也可能存在遗漏的情况,这时就需要用到另外一个库–sql_metadata ,这个库是基于sqlparse和正则的解析库,同时提供了大量的常见使用方法的封装,我们通过直接调用对应的函数就能知道SQL中有哪些表名,查询字段是什么了。
目前已知这个库有一个缺陷,就是会自动去掉字段的符号, 比如表名为关键字时,我们需要使用`符号把它包起来:
但在经过sql_metadata解析后得到的表名是case而不是`case`,需要人为的处理,但是我并不觉得这是一个BUG,自己不按规范创建表,能怪谁呢。
 
接下来就可以通过sql_metadata的方法来实现我需要的功能了,在根据需求修改后,代码长这样(说明见注释):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 from  typing import  Dict, Set, Tuple, Unionimport  pymysqlimport  sql_metadataclass  Cursor (pymysql.cursors.Cursor ):def  mogrify (self, query: str , args: Union[None , list , dict , tuple ] = None  ) -> str:str  = context.tenant_idFalse  str ] = context.where_table_setfor  table_name in  sql_parse.tables:if  table_name in  where_table_set:if  sql_parse.columns_dict:for  where_column in  sql_parse.columns_dict.get("where" , []):if  "tenant_id"  in  where_column.lower().split("." ):True break if  not  check_flag:raise  RuntimeError()str ] = context.replace_table_setstr  = queryfor  table_name in  sql_parse.tables:if  table_name in  replace_table_set:"" for  token in  sql_parse.tokens:if  token.is_potential_table_name:str  = token.stringified_token.strip()if  parse_table_name in  replace_table_set:str  = f" {parse_table_name} _{tenant_id} " if  token.next_token.normalized != "AS" :f" AS {parse_table_name} " continue str  = super ().mogrify(new_query, args)return  mogrify_sql
这份代码十分简单,它只做简单介绍,事实上这段逻辑会应用到所有的SQL查询中,我们应该要保证这段代码是没问题的,同时不要有太多的性能浪费,所以在使用的时候要考虑到代码拆分和优化。SQL转换和检查都是在父类的Cursor.mogrify之前进行的,这就意味着不管我们代码逻辑里cursor.execute传的参数是什么,对于同一个代码逻辑来说,传过来的query值是保持不变的,比如下面的代码:
1 2 3 4 def  get_user_info (uid: str  ) -> Dict[str, Any]:with  conn.cursor() as  cursor:"SELECT * FROM t_user WHERE uid=%(uid)s" , {"uid" : uid})return  cursor.fetchone() or  {}
这段代码中传到Cursor.mogrify的query永远为SELECT * FROM t_user WHERE uid=%(uid)s,有变化的只是args中uid的不同。query的校验结果和转换结果缓存下来,减少每次都需要解析SQL再校验造成的性能浪费。至于如何实现缓存则需要根据自己的项目来决定,比如项目中只有几百个SQL执行,那么直接用Python的dict来存放就可以了,如果项目中执行的SQL很多,同时有些执行的频率非常的高,有些执行的频率非常的低,那么可以考虑使用LRU来缓存。