跳转至

Security

OpenAPI通过security提供了对HTTP基本身份验证的描述,但是不同的Web框架对HTTP基本身份验证的实现方式不同, 因此Pait通过Depends对OpenAPI的security提供了简单支持(api key, http, oauth2),从而简化Security在不同的Web框架使用。

Note

JWT等高级身份验证方法将会在未来通过拓展包提供支持。

1.APIKey

API Key是Security中最简单的方法,也正因为简单,它的使用场景也是最多的。Pait提供APIKey类来支持API Key的使用,使用方法如下:

docs_source_code/openapi/security/flask_with_apikey_demo.py
from flask import Flask

from pait.app.flask import pait
from pait.app.flask.security import api_key
from pait.field import Cookie, Depends, Header, Query
from pait.openapi.doc_route import AddDocRoute

token_cookie_api_key: api_key.APIKey = api_key.APIKey(
    name="token",
    field=Cookie(openapi_include=False),
    verify_api_key_callable=lambda x: "token" in x,
    security_name="token-cookie-api-key",
)
token_header_api_key: api_key.APIKey = api_key.APIKey(
    name="token",
    field=Header(openapi_include=False),
    verify_api_key_callable=lambda x: "token" in x,
    security_name="token-header-api-key",
)
token_query_api_key: api_key.APIKey = api_key.APIKey(
    name="token",
    field=Query(openapi_include=False),
    verify_api_key_callable=lambda x: "token" in x,
    security_name="token-query-api-key",
)


@pait()
def api_key_cookie_route(token: str = Depends.i(token_cookie_api_key)) -> dict:
    return {"code": 0, "msg": "", "data": token}


@pait()
def api_key_header_route(token: str = Depends.i(token_header_api_key)) -> dict:
    return {"code": 0, "msg": "", "data": token}


@pait()
def api_key_query_route(token: str = Depends.i(token_query_api_key)) -> dict:
    return {"code": 0, "msg": "", "data": token}


app = Flask("demo")
app.add_url_rule("/api/api-cookie-key", view_func=api_key_cookie_route, methods=["GET"])
app.add_url_rule("/api/api-header-key", view_func=api_key_header_route, methods=["GET"])
app.add_url_rule("/api/api-query-key", view_func=api_key_query_route, methods=["GET"])
AddDocRoute(app)


if __name__ == "__main__":
    app.run(port=8000)
docs_source_code/openapi/security/starlette_with_apikey_demo.py
from starlette.applications import Starlette
from starlette.responses import JSONResponse
from starlette.routing import Route

from pait.app.starlette import pait
from pait.app.starlette.security import api_key
from pait.field import Cookie, Depends, Header, Query
from pait.openapi.doc_route import AddDocRoute

token_cookie_api_key: api_key.APIKey = api_key.APIKey(
    name="token",
    field=Cookie(openapi_include=False),
    verify_api_key_callable=lambda x: "token" in x,
    security_name="token-cookie-api-key",
)
token_header_api_key: api_key.APIKey = api_key.APIKey(
    name="token",
    field=Header(openapi_include=False),
    verify_api_key_callable=lambda x: "token" in x,
    security_name="token-header-api-key",
)
token_query_api_key: api_key.APIKey = api_key.APIKey(
    name="token",
    field=Query(openapi_include=False),
    verify_api_key_callable=lambda x: "token" in x,
    security_name="token-query-api-key",
)


@pait()
async def api_key_cookie_route(token: str = Depends.i(token_cookie_api_key)) -> JSONResponse:
    return JSONResponse({"code": 0, "msg": "", "data": token})


@pait()
async def api_key_header_route(token: str = Depends.i(token_header_api_key)) -> JSONResponse:
    return JSONResponse({"code": 0, "msg": "", "data": token})


@pait()
async def api_key_query_route(token: str = Depends.i(token_query_api_key)) -> JSONResponse:
    return JSONResponse({"code": 0, "msg": "", "data": token})


app = Starlette(
    routes=[
        Route("/api/api-cookie-key", api_key_cookie_route, methods=["GET"]),
        Route("/api/api-header-key", api_key_header_route, methods=["GET"]),
        Route("/api/api-query-key", api_key_query_route, methods=["GET"]),
    ]
)
AddDocRoute(app)


if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app)
docs_source_code/openapi/security/sanic_with_apikey_demo.py
from sanic import HTTPResponse, Sanic, json

from pait.app.sanic import pait
from pait.app.sanic.security import api_key
from pait.field import Cookie, Depends, Header, Query
from pait.openapi.doc_route import AddDocRoute

token_cookie_api_key: api_key.APIKey = api_key.APIKey(
    name="token",
    field=Cookie(openapi_include=False),
    verify_api_key_callable=lambda x: "token" in x,
    security_name="token-cookie-api-key",
)
token_header_api_key: api_key.APIKey = api_key.APIKey(
    name="token",
    field=Header(openapi_include=False),
    verify_api_key_callable=lambda x: "token" in x,
    security_name="token-header-api-key",
)
token_query_api_key: api_key.APIKey = api_key.APIKey(
    name="token",
    field=Query(openapi_include=False),
    verify_api_key_callable=lambda x: "token" in x,
    security_name="token-query-api-key",
)


@pait()
async def api_key_cookie_route(token: str = Depends.i(token_cookie_api_key)) -> HTTPResponse:
    return json({"code": 0, "msg": "", "data": token})


@pait()
async def api_key_header_route(token: str = Depends.i(token_header_api_key)) -> HTTPResponse:
    return json({"code": 0, "msg": "", "data": token})


@pait()
async def api_key_query_route(token: str = Depends.i(token_query_api_key)) -> HTTPResponse:
    return json({"code": 0, "msg": "", "data": token})


app = Sanic(name="demo")
app.add_route(api_key_cookie_route, "/api/api-cookie-key", methods=["GET"])
app.add_route(api_key_header_route, "/api/api-header-key", methods=["GET"])
app.add_route(api_key_query_route, "/api/api-query-key", methods=["GET"])
AddDocRoute(app)


if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app)
docs_source_code/openapi/security/tornado_with_apikey_demo.py
from tornado.ioloop import IOLoop
from tornado.web import Application, RequestHandler

from pait.app.tornado import pait
from pait.app.tornado.security import api_key
from pait.field import Cookie, Depends, Header, Query
from pait.openapi.doc_route import AddDocRoute

token_cookie_api_key: api_key.APIKey = api_key.APIKey(
    name="token",
    field=Cookie(openapi_include=False),
    verify_api_key_callable=lambda x: "token" in x,
    security_name="token-cookie-api-key",
)
token_header_api_key: api_key.APIKey = api_key.APIKey(
    name="token",
    field=Header(openapi_include=False),
    verify_api_key_callable=lambda x: "token" in x,
    security_name="token-header-api-key",
)
token_query_api_key: api_key.APIKey = api_key.APIKey(
    name="token",
    field=Query(openapi_include=False),
    verify_api_key_callable=lambda x: "token" in x,
    security_name="token-query-api-key",
)


class APIKeyCookieHandler(RequestHandler):
    @pait()
    async def get(self, token: str = Depends.i(token_cookie_api_key)) -> None:
        self.write({"code": 0, "msg": "", "data": token})


class APIKeyHeaderHandler(RequestHandler):
    @pait()
    async def get(self, token: str = Depends.i(token_header_api_key)) -> None:
        self.write({"code": 0, "msg": "", "data": token})


class APIKeyQueryHandler(RequestHandler):
    @pait()
    async def get(self, token: str = Depends.i(token_query_api_key)) -> None:
        self.write({"code": 0, "msg": "", "data": token})


app: Application = Application(
    [
        (r"/api/security/api-cookie-key", APIKeyCookieHandler),
        (r"/api/security/api-header-key", APIKeyHeaderHandler),
        (r"/api/security/api-query-key", APIKeyQueryHandler),
    ],
)
AddDocRoute(app)


if __name__ == "__main__":
    app.listen(8000)
    IOLoop.instance().start()

代码中第一段高亮代码是针对不同的APIKey字段进行初始化,它们使用的参数略有不同,具体的参数说明如下:

参数 描述
name APIKey字段的名字
field APIKey字段对应Pait中的Field类,API Key只支持Query,Header和Cookie的参数,所以只允许使用field.Queryfield.Headerfield.Cookie
verify_api_key_callable 接受一个校验APIKey的函数,Pait从请求体中提取APIKey值后会传递给函数,交由函数处理,如果函数返回True则代表校验通过,反之则校验不通过。
security_name 指定Security的名称,不同作用的APIKey的名称必须是不同的,默认值为APIKey的类名。

Note

为了能在OpenAPI工具中正常使用APIKey,传递的Field在初始化时必须指定openapi_include为False。

第二段高亮的代码则是通过Depend连接了APIKey和路由函数,其中Depend的参数为APIKey的实例。

当路由函数收到请求时Pait会自动从请求体中提取APIKey的值,然后交由APIKeyverify_api_key_callable函数进行校验,如果校验通过则把值通过Depend传递给路由函数执行,反之则返回401

在运行代码后,运行如下命令,可以看到APIKey的执行效果:

# Success Result
  curl -X 'GET' \
  'http://127.0.0.1:8000/api/api-cookie-key' \
  -H 'accept: */*' \
  -H 'Cookie: token=token'
{"code":0,"msg":"","data":"token"}

  curl -X 'GET' \
  'http://127.0.0.1:8000/api/api-header-key' \
  -H 'accept: */*' \
  -H 'token: token'
{"code":0,"msg":"","data":"token"}

  curl -X 'GET' \
  'http://127.0.0.1:8000/api/api-query-key?token=token' \
  -H 'accept: */*'
{"code":0,"msg":"","data":"token"}

# Fail Result
  curl -X 'GET' \
  'http://127.0.0.1:8000/api/api-cookie-key' \
  -H 'accept: */*' \
  -H 'Cookie: token='
Not authenticated

  curl -X 'GET' \
  'http://127.0.0.1:8000/api/api-header-key' \
  -H 'accept: */*' \
  -H 'token: '
Not authenticated

  curl -X 'GET' \
  'http://127.0.0.1:8000/api/api-query-key?token=' \
  -H 'accept: */*'
Not authenticated

大部分使用APIKey的路由函数所需要的参数(如Token)都是通过其他路由函数获取的, 此时可以通过使用Field中的Links来描述该路由函数与其他路由函数之间的关系,比如下面的这个场景,它的Token是通过登陆接口中获取的:

docs_source_code/openapi/security/flask_with_apikey_and_link_demo.py
import hashlib
from typing import Type

from flask import Flask
from pydantic import BaseModel, Field

from pait.app.flask import pait
from pait.app.flask.security import api_key
from pait.field import Depends, Json, Query
from pait.model.response import JsonResponseModel
from pait.openapi.doc_route import AddDocRoute
from pait.openapi.openapi import LinksModel


class LoginRespModel(JsonResponseModel):
    class ResponseModel(BaseModel):
        class DataModel(BaseModel):
            token: str

        code: int = Field(0, description="api code")
        msg: str = Field("success", description="api status msg")
        data: DataModel

    description: str = "login response"
    response_data: Type[BaseModel] = ResponseModel


@pait(response_model_list=[LoginRespModel])
def login_route(uid: str = Json.i(description="user id"), password: str = Json.i(description="password")) -> dict:
    return {"code": 0, "msg": "", "data": {"token": hashlib.sha256((uid + password).encode("utf-8")).hexdigest()}}


link_login_token_model: LinksModel = LinksModel(LoginRespModel, "$response.body#/data/token", desc="test links model")


token_query_api_key: api_key.APIKey = api_key.APIKey(
    name="token",
    field=Query(links=link_login_token_model),
    verify_api_key_callable=lambda x: "token" in x,
    security_name="token-query-api-key",
)


@pait()
def api_key_query_route(token: str = Depends.i(token_query_api_key)) -> dict:
    return {"code": 0, "msg": "", "data": token}


app = Flask("demo")
app.add_url_rule("/api/login", "login", login_route, methods=["POST"])
app.add_url_rule("/api/api-query-key", view_func=api_key_query_route, methods=["GET"])
AddDocRoute(app)


if __name__ == "__main__":
    app.run(port=8000)
docs_source_code/openapi/security/starlette_with_apikey_and_link_demo.py
import hashlib
from typing import Type

from pydantic import BaseModel, Field
from starlette.applications import Starlette
from starlette.responses import JSONResponse
from starlette.routing import Route

from pait.app.starlette import pait
from pait.app.starlette.security import api_key
from pait.field import Depends, Json, Query
from pait.model.response import JsonResponseModel
from pait.openapi.doc_route import AddDocRoute
from pait.openapi.openapi import LinksModel


class LoginRespModel(JsonResponseModel):
    class ResponseModel(BaseModel):
        class DataModel(BaseModel):
            token: str

        code: int = Field(0, description="api code")
        msg: str = Field("success", description="api status msg")
        data: DataModel

    description: str = "login response"
    response_data: Type[BaseModel] = ResponseModel


@pait(response_model_list=[LoginRespModel])
async def login_route(
    uid: str = Json.i(description="user id"), password: str = Json.i(description="password")
) -> JSONResponse:
    return JSONResponse(
        {"code": 0, "msg": "", "data": {"token": hashlib.sha256((uid + password).encode("utf-8")).hexdigest()}}
    )


link_login_token_model: LinksModel = LinksModel(LoginRespModel, "$response.body#/data/token", desc="test links model")


token_query_api_key: api_key.APIKey = api_key.APIKey(
    name="token",
    field=Query(links=link_login_token_model),
    verify_api_key_callable=lambda x: "token" in x,
    security_name="token-query-api-key",
)


@pait()
async def api_key_query_route(token: str = Depends.i(token_query_api_key)) -> JSONResponse:
    return JSONResponse({"code": 0, "msg": "", "data": token})


app = Starlette(
    routes=[
        Route("/api/login", login_route, methods=["POST"]),
        Route("/api/api-query-key", api_key_query_route, methods=["GET"]),
    ]
)
AddDocRoute(app)


if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app)
docs_source_code/openapi/security/sanic_with_apikey_and_link_demo.py
import hashlib
from typing import Type

from pydantic import BaseModel, Field
from sanic import HTTPResponse, Sanic, json

from pait.app.sanic import pait
from pait.app.sanic.security import api_key
from pait.field import Depends, Json, Query
from pait.model.response import JsonResponseModel
from pait.openapi.doc_route import AddDocRoute
from pait.openapi.openapi import LinksModel


class LoginRespModel(JsonResponseModel):
    class ResponseModel(BaseModel):
        class DataModel(BaseModel):
            token: str

        code: int = Field(0, description="api code")
        msg: str = Field("success", description="api status msg")
        data: DataModel

    description: str = "login response"
    response_data: Type[BaseModel] = ResponseModel


@pait(response_model_list=[LoginRespModel])
async def login_route(
    uid: str = Json.i(description="user id"), password: str = Json.i(description="password")
) -> HTTPResponse:
    return json({"code": 0, "msg": "", "data": {"token": hashlib.sha256((uid + password).encode("utf-8")).hexdigest()}})


link_login_token_model: LinksModel = LinksModel(LoginRespModel, "$response.body#/data/token", desc="test links model")


token_query_api_key: api_key.APIKey = api_key.APIKey(
    name="token",
    field=Query(links=link_login_token_model),
    verify_api_key_callable=lambda x: "token" in x,
    security_name="token-query-api-key",
)


@pait()
async def api_key_query_route(token: str = Depends.i(token_query_api_key)) -> HTTPResponse:
    return json({"code": 0, "msg": "", "data": token})


app = Sanic(name="demo")
app.add_route(login_route, "/api/login", methods=["POST"])
app.add_route(api_key_query_route, "/api/api-query-key", methods=["GET"])
AddDocRoute(app)


if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app)
docs_source_code/openapi/security/tornado_with_apikey_and_link_demo.py
import hashlib
from typing import Type

from pydantic import BaseModel, Field
from tornado.ioloop import IOLoop
from tornado.web import Application, RequestHandler

from pait.app.tornado import pait
from pait.app.tornado.security import api_key
from pait.field import Depends, Json, Query
from pait.model.response import JsonResponseModel
from pait.openapi.doc_route import AddDocRoute
from pait.openapi.openapi import LinksModel


class LoginRespModel(JsonResponseModel):
    class ResponseModel(BaseModel):
        class DataModel(BaseModel):
            token: str

        code: int = Field(0, description="api code")
        msg: str = Field("success", description="api status msg")
        data: DataModel

    description: str = "login response"
    response_data: Type[BaseModel] = ResponseModel


class LoginHandler(RequestHandler):
    @pait(response_model_list=[LoginRespModel])
    async def post(
        self, uid: str = Json.i(description="user id"), password: str = Json.i(description="password")
    ) -> None:
        self.write(
            {"code": 0, "msg": "", "data": {"token": hashlib.sha256((uid + password).encode("utf-8")).hexdigest()}}
        )


link_login_token_model: LinksModel = LinksModel(LoginRespModel, "$response.body#/data/token", desc="test links model")


token_query_api_key: api_key.APIKey = api_key.APIKey(
    name="token",
    field=Query(links=link_login_token_model),
    verify_api_key_callable=lambda x: "token" in x,
    security_name="token-query-api-key",
)


class APIKeyQueryHandler(RequestHandler):
    @pait()
    async def get(self, token: str = Depends.i(token_query_api_key)) -> None:
        self.write({"code": 0, "msg": "", "data": token})


app: Application = Application(
    [
        (r"/api/login", LoginHandler),
        (r"/api/security/api-query-key", APIKeyQueryHandler),
    ],
)
AddDocRoute(app)


if __name__ == "__main__":
    app.listen(8000)
    IOLoop.instance().start()

第一段高亮代码来自于Field-Links,而第二段高亮代码中的Query设置了links属性为link_login_token_model,这样一来Pait生成OpenAPI时会把login_routeapi_key_query_route通过Link绑定在一起。

Note

  • Links的使用方法详见Field-Links
  • 使用openapi_include=False会导致Swggaer无法展示Link的数据

2.HTTP

HTTP基本身份校验分为两种,一种是HTTPBasic,另一种是HTTPBearerHTTPDIgest, 两者的区别在于HTTPBasic需要在请求头中传递usernamepassword进行校验,如果校验成功则代表验证成功, 如果校验错误会返回401响应,浏览器在收到401响应后会弹出一个窗口让用户输入usernamepassword, 而HTTPBearerHTTPDIgest则只需要在请求头中按要求传递token

Pait为HTTP基本校验的三个方式分别封装了HTTPBasicHTTPBearerHTTPDigest三个类, 同样的,它们也需要通过Depend与路由函数绑定,使用方法如下:

docs_source_code/docs_source_code/openapi/security/flask_with_http_demo.py
from typing import Optional

from flask import Flask

from pait.app.flask import pait
from pait.app.flask.security import http
from pait.field import Depends
from pait.openapi.doc_route import AddDocRoute

##############
# HTTP Basic #
##############
http_basic: http.HTTPBasic = http.HTTPBasic()


def get_user_name(credentials: Optional[http.HTTPBasicCredentials] = Depends.i(http_basic)) -> str:
    if not credentials or credentials.username != credentials.password:
        raise http_basic.not_authorization_exc
    return credentials.username


@pait()
def get_user_name_by_http_basic_credentials(user_name: str = Depends.t(get_user_name)) -> dict:
    return {"code": 0, "msg": "", "data": user_name}


#############
# HTTP Bear #
#############
http_bear: http.HTTPBearer = http.HTTPBearer(verify_callable=lambda x: "http" in x)


@pait()
def get_user_name_by_http_bearer(credentials: Optional[str] = Depends.i(http_bear)) -> dict:
    return {"code": 0, "msg": "", "data": credentials}


###############
# HTTP Digest #
###############
http_digest: http.HTTPDigest = http.HTTPDigest(verify_callable=lambda x: "http" in x)


@pait()
def get_user_name_by_http_digest(credentials: Optional[str] = Depends.i(http_digest)) -> dict:
    return {"code": 0, "msg": "", "data": credentials}


app = Flask("demo")
app.add_url_rule(
    "/api/user-name-by-http-basic-credentials",
    view_func=get_user_name_by_http_basic_credentials,
    methods=["GET"],
)
app.add_url_rule("/api/user-name-by-http-bearer", view_func=get_user_name_by_http_bearer, methods=["GET"])
app.add_url_rule("/api/user-name-by-http-digest", view_func=get_user_name_by_http_digest, methods=["GET"])
AddDocRoute(app)


if __name__ == "__main__":
    app.run(port=8000)
docs_source_code/docs_source_code/openapi/security/starlette_with_http_demo.py
from typing import Optional

from starlette.applications import Starlette
from starlette.responses import JSONResponse
from starlette.routing import Route

from pait.app.starlette import pait
from pait.app.starlette.security import http
from pait.field import Depends
from pait.openapi.doc_route import AddDocRoute

##############
# HTTP Basic #
##############
http_basic: http.HTTPBasic = http.HTTPBasic()


def get_user_name(credentials: Optional[http.HTTPBasicCredentials] = Depends.t(http_basic)) -> str:
    if not credentials or credentials.username != credentials.password:
        raise http_basic.not_authorization_exc
    return credentials.username


@pait()
async def get_user_name_by_http_basic_credentials(user_name: str = Depends.t(get_user_name)) -> JSONResponse:
    return JSONResponse({"code": 0, "msg": "", "data": user_name})


#############
# HTTP Bear #
#############
http_bear: http.HTTPBearer = http.HTTPBearer(verify_callable=lambda x: "http" in x)


@pait()
async def get_user_name_by_http_bearer(credentials: Optional[str] = Depends.t(http_bear)) -> JSONResponse:
    return JSONResponse({"code": 0, "msg": "", "data": credentials})


###############
# HTTP Digest #
###############
http_digest: http.HTTPDigest = http.HTTPDigest(verify_callable=lambda x: "http" in x)


@pait()
async def get_user_name_by_http_digest(credentials: Optional[str] = Depends.t(http_digest)) -> JSONResponse:
    return JSONResponse({"code": 0, "msg": "", "data": credentials})


app = Starlette(
    routes=[
        Route("/api/user-name-by-http-basic-credentials", get_user_name_by_http_basic_credentials, methods=["GET"]),
        Route("/api/user-name-by-http-bearer", get_user_name_by_http_bearer, methods=["GET"]),
        Route("/api/user-name-by-http-digest", get_user_name_by_http_digest, methods=["GET"]),
    ]
)
AddDocRoute(app)


if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app)
docs_source_code/docs_source_code/openapi/security/sanic_with_http_demo.py
from typing import Optional

from sanic import Sanic, response

from pait.app.sanic import pait
from pait.app.sanic.security import http
from pait.field import Depends
from pait.openapi.doc_route import AddDocRoute

##############
# HTTP Basic #
##############
http_basic: http.HTTPBasic = http.HTTPBasic()


def get_user_name(credentials: Optional[http.HTTPBasicCredentials] = Depends.t(http_basic)) -> str:
    if not credentials or credentials.username != credentials.password:
        raise http_basic.not_authorization_exc
    return credentials.username


@pait()
async def get_user_name_by_http_basic_credentials(user_name: str = Depends.t(get_user_name)) -> response.HTTPResponse:
    return response.json({"code": 0, "msg": "", "data": user_name})


#############
# HTTP Bear #
#############
http_bear: http.HTTPBearer = http.HTTPBearer(verify_callable=lambda x: "http" in x)


@pait()
async def get_user_name_by_http_bearer(credentials: Optional[str] = Depends.t(http_bear)) -> response.HTTPResponse:
    return response.json({"code": 0, "msg": "", "data": credentials})


###############
# HTTP Digest #
###############
http_digest: http.HTTPDigest = http.HTTPDigest(verify_callable=lambda x: "http" in x)


@pait()
async def get_user_name_by_http_digest(credentials: Optional[str] = Depends.t(http_digest)) -> response.HTTPResponse:
    return response.json({"code": 0, "msg": "", "data": credentials})


app = Sanic(name="demo")
app.add_route(get_user_name_by_http_basic_credentials, "/api/user-name-by-http-basic-credentials", methods={"GET"})
app.add_route(get_user_name_by_http_bearer, "/api/user-name-by-http-bearer", methods={"GET"})
app.add_route(get_user_name_by_http_digest, "/api/user-name-by-http-digest", methods={"GET"})
AddDocRoute(app)


if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app)
docs_source_code/docs_source_code/openapi/security/tornado_with_http_demo.py
from typing import Optional

from tornado.ioloop import IOLoop
from tornado.web import Application, RequestHandler

from pait.app.tornado import pait
from pait.app.tornado.security import http
from pait.field import Depends
from pait.openapi.doc_route import AddDocRoute

http_basic: http.HTTPBasic = http.HTTPBasic()


def get_user_name(credentials: Optional[http.HTTPBasicCredentials] = Depends.t(http_basic)) -> str:
    if not credentials or credentials.username != credentials.password:
        raise http_basic.not_authorization_exc
    return credentials.username


class UserNameByHttpBasicCredentialsHandler(RequestHandler):
    @pait()
    async def get(self, user_name: str = Depends.t(get_user_name)) -> None:
        self.write({"code": 0, "msg": "", "data": user_name})


#############
# HTTP Bear #
#############
http_bear: http.HTTPBearer = http.HTTPBearer(verify_callable=lambda x: "http" in x)


class UserNameByHttpBearerHandler(RequestHandler):
    @pait()
    async def get(self, credentials: Optional[str] = Depends.t(http_bear)) -> None:
        self.write({"code": 0, "msg": "", "data": credentials})


###############
# HTTP Digest #
###############
http_digest: http.HTTPDigest = http.HTTPDigest(verify_callable=lambda x: "http" in x)


class UserNameByHttpDigestHandler(RequestHandler):
    @pait()
    async def get(self, credentials: Optional[str] = Depends.t(http_digest)) -> None:
        self.write({"code": 0, "msg": "", "data": credentials})


app: Application = Application(
    [
        (r"/api/security/user-name-by-http-basic-credentials", UserNameByHttpBasicCredentialsHandler),
        (r"/api/security/user-name-by-http-bearer", UserNameByHttpBearerHandler),
        (r"/api/security/user-name-by-http-digest", UserNameByHttpDigestHandler),
    ],
)
AddDocRoute(app)


if __name__ == "__main__":
    app.listen(8000)
    IOLoop.instance().start()

可以看到整个代码由两部分组成,第一部分初始化对应的基本身份验证类,第二部分是在路由函数中使用Depend获取身份验证类的实例。

不过HTTPBasic的使用方法与其他两个有点不同,比如HTTPBasic的初始化参数与其他两个不一样,它的初始化参数说明见下表:

参数 描述
security_model 关于HTTPBasic的OpenAPI描述Model,默认情况下已经提供了一个通用的HTTPBasicModel,如有定制需求请访问OpenAPI的securitySchemeObject了解
security_name 指定Security的名称,不同作用的基本身份校验实例的名称必须是不同的,默认值为类名。
header_field Pait的Header Field实例
realm HTTP基本身份校验的realm参数

HTTPBearerHTTPDigest使用方法与APIKey类似,需要按要求初始化,并通过Depend与路由函数绑定即可,它们的参数说明如下:

参数 描述
security_model 关于基本身份校验的OpenAPI描述Model,默认情况下已经提供了一个通用的HTTPBasicModel,如有定制需求请访问OpenAPI的securitySchemeObject了解
security_name 指定Security的名称,不同作用的基本身份校验实例的名称必须是不同的,默认值为类名。
header_field Pait的Header Field实例
is_raise 当设置为True时,Pait在解析失败后抛出标准的错误,为False时在解析失败后悔返回None, 默认值为True
verify_callable 接受一个校验函数,Pait从请求体中提取值后会交由校验函数处理,如果返回True则代表校验通过,反之则校验不通过。

除了初始化参数不同外,HTTPBasic不会直接用于路由函数中,而是存在于get_user_name函数中,get_user_name函数负责进行身份校验,如果身份校验成功则返回用户名到路由函数中,否则返回401响应。

在运行代码后,运行如下命令,可以看到它们的执行效果如下:

# Success Result
  curl -X 'GET' \
  'http://127.0.0.1:8000/api/user-name-by-http-basic-credentials' \
  -H 'accept: */*' \
  -H 'Authorization: Basic c28xbjpzbzFu'

{"code":0,"data":"so1n","msg":""}

   curl -X 'GET' \
  'http://127.0.0.1:8000/api/user-name-by-http-bearer' \
  -H 'accept: */*' \
  -H 'Authorization: Bearer http'

{"code":0,"data":"http","msg":""}

  curl -X 'GET' \
  'http://127.0.0.1:8000/api/user-name-by-http-digest' \
  -H 'accept: */*' \
  -H 'Authorization: Digest http'

{"code":0,"data":"http","msg":""}

# Fail Result
  curl -X 'GET' \
  'http://127.0.0.1:8000/api/user-name-by-http-digest' \
  -H 'accept: */*' \
  -H 'Authorization: Digest '

<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
<title>403 Forbidden</title>
<h1>Forbidden</h1>
<p>Not authenticated</p>

  curl -X 'GET' \
  'http://127.0.0.1:8000/api/user-name-by-http-bearer' \
  -H 'accept: */*' \
  -H 'Authorization: Bearer '

<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
<title>403 Forbidden</title>
<h1>Forbidden</h1>
<p>Not authenticated</p>

Note

HTTPDigest类只提供简单的HTTPDigest身份校验支持,在使用时需要根据自己的业务逻辑进行修改。

3.Oauth2

OAuth 2.0是一种授权协议,为 API 客户端提供对 Web 服务器上的用户数据的有限访问权限,它除了提供身份校验的功能外,还支持权限校验。 PaitOauth2使用方法如下:

docs_source_code/docs_source_code/openapi/security/flask_with_oauth2_demo.py
import random
import string
from typing import TYPE_CHECKING, Callable, Dict, List, Optional

from flask import Flask
from pydantic import BaseModel, Field
from werkzeug.exceptions import BadRequest

from pait.app.flask import pait
from pait.app.flask.security import oauth2
from pait.field import Depends
from pait.model.response import Http400RespModel
from pait.openapi.doc_route import AddDocRoute

if TYPE_CHECKING:
    from pait.app.base.security.oauth2 import BaseOAuth2PasswordBearerProxy


class User(BaseModel):
    uid: str = Field(..., description="user id")
    name: str = Field(..., description="user name")
    age: int = Field(..., description="user age")
    sex: str = Field(..., description="user sex")
    scopes: List[str] = Field(..., description="user scopes")


temp_token_dict: Dict[str, User] = {}


@pait(
    response_model_list=[Http400RespModel, oauth2.OAuth2PasswordBearerJsonRespModel],
)
def oauth2_login(form_data: oauth2.OAuth2PasswordRequestFrom) -> dict:
    if form_data.username != form_data.password:
        raise BadRequest()
    token: str = "".join(random.choice(string.ascii_letters + string.digits) for _ in range(10))
    temp_token_dict[token] = User(uid="123", name=form_data.username, age=23, sex="M", scopes=form_data.scope)
    return oauth2.OAuth2PasswordBearerJsonRespModel.response_data(access_token=token).dict()


oauth2_pb: oauth2.OAuth2PasswordBearer = oauth2.OAuth2PasswordBearer(
    route=oauth2_login,
    scopes={
        "user-info": "get all user info",
        "user-name": "only get user name",
    },
)


def get_current_user(_oauth2_pb: "BaseOAuth2PasswordBearerProxy") -> Callable[[str], User]:
    def _check_scope(token: str = Depends.i(_oauth2_pb)) -> User:
        user_model: Optional[User] = temp_token_dict.get(token, None)
        if not user_model:
            raise _oauth2_pb.security.not_authenticated_exc
        if not _oauth2_pb.is_allow(user_model.scopes):
            raise _oauth2_pb.security.not_authenticated_exc
        return user_model

    return _check_scope


@pait()
def oauth2_user_name(user_model: User = Depends.t(get_current_user(oauth2_pb.get_depend(["user-name"])))) -> dict:
    return {"code": 0, "msg": "", "data": user_model.name}


@pait()
def oauth2_user_info(user_model: User = Depends.t(get_current_user(oauth2_pb.get_depend(["user-info"])))) -> dict:
    return {"code": 0, "msg": "", "data": user_model.dict()}


app = Flask("demo")
app.add_url_rule("/api/oauth2-login", view_func=oauth2_login, methods=["POST"])
app.add_url_rule("/api/oauth2-user-name", view_func=oauth2_user_name, methods=["GET"])
app.add_url_rule("/api/oauth2-user-info", view_func=oauth2_user_info, methods=["GET"])
AddDocRoute(app)


if __name__ == "__main__":
    app.run(port=8000)
docs_source_code/docs_source_code/openapi/security/starlette_with_oauth2_demo.py
import random
import string
from typing import TYPE_CHECKING, Callable, Dict, List, Optional

from pydantic import BaseModel, Field
from starlette.applications import Starlette
from starlette.exceptions import HTTPException
from starlette.responses import JSONResponse

from pait.app.starlette import pait
from pait.app.starlette.security import oauth2
from pait.field import Depends
from pait.model.response import Http400RespModel, TextResponseModel

if TYPE_CHECKING:
    from pait.app.base.security.oauth2 import BaseOAuth2PasswordBearerProxy


class User(BaseModel):
    uid: str = Field(..., description="user id")
    name: str = Field(..., description="user name")
    age: int = Field(..., description="user age")
    sex: str = Field(..., description="user sex")
    scopes: List[str] = Field(..., description="user scopes")


temp_token_dict: Dict[str, User] = {}


@pait(
    response_model_list=[
        oauth2.OAuth2PasswordBearerJsonRespModel,
        Http400RespModel.clone(resp_model=TextResponseModel),
    ],
)
async def oauth2_login(form_data: oauth2.OAuth2PasswordRequestFrom) -> JSONResponse:
    if form_data.username != form_data.password:
        raise HTTPException(400)
    token: str = "".join(random.choice(string.ascii_letters + string.digits) for _ in range(10))
    temp_token_dict[token] = User(uid="123", name=form_data.username, age=23, sex="M", scopes=form_data.scope)
    return JSONResponse(oauth2.OAuth2PasswordBearerJsonRespModel.response_data(access_token=token).dict())


oauth2_pb: oauth2.OAuth2PasswordBearer = oauth2.OAuth2PasswordBearer(
    route=oauth2_login,
    scopes={
        "user-info": "get all user info",
        "user-name": "only get user name",
    },
)


def get_current_user(_oauth2_pb: "BaseOAuth2PasswordBearerProxy") -> Callable[[str], User]:
    def _check_scope(token: str = Depends.i(_oauth2_pb)) -> User:
        user_model: Optional[User] = temp_token_dict.get(token, None)
        if not user_model:
            raise _oauth2_pb.security.not_authenticated_exc
        if not _oauth2_pb.is_allow(user_model.scopes):
            raise _oauth2_pb.security.not_authenticated_exc
        return user_model

    return _check_scope


@pait()
def oauth2_user_name(
    user_model: User = Depends.t(get_current_user(oauth2_pb.get_depend(["user-name"]))),
) -> JSONResponse:
    return JSONResponse({"code": 0, "msg": "", "data": user_model.name})


@pait()
def oauth2_user_info(
    user_model: User = Depends.t(get_current_user(oauth2_pb.get_depend(["user-info"]))),
) -> JSONResponse:
    return JSONResponse({"code": 0, "msg": "", "data": user_model.dict()})


app = Starlette()
app.add_route("/api/oauth2-login", oauth2_login, methods=["POST"])
app.add_route("/api/oauth2-user-name", oauth2_user_name, methods=["GET"])
app.add_route("/api/oauth2-user-info", oauth2_user_info, methods=["GET"])


if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app)
docs_source_code/docs_source_code/openapi/security/sanic_with_oauth2_demo.py
import random
import string
from typing import TYPE_CHECKING, Callable, Dict, List, Optional

from pydantic import BaseModel, Field
from sanic import HTTPResponse, Sanic, json
from sanic.exceptions import InvalidUsage

from pait.app.sanic import pait
from pait.app.sanic.security import oauth2
from pait.field import Depends
from pait.model.response import Http400RespModel
from pait.openapi.doc_route import AddDocRoute

if TYPE_CHECKING:
    from pait.app.base.security.oauth2 import BaseOAuth2PasswordBearerProxy


class User(BaseModel):
    uid: str = Field(..., description="user id")
    name: str = Field(..., description="user name")
    age: int = Field(..., description="user age")
    sex: str = Field(..., description="user sex")
    scopes: List[str] = Field(..., description="user scopes")


temp_token_dict: Dict[str, User] = {}


@pait(
    response_model_list=[Http400RespModel, oauth2.OAuth2PasswordBearerJsonRespModel],
)
async def oauth2_login(form_data: oauth2.OAuth2PasswordRequestFrom) -> HTTPResponse:
    if form_data.username != form_data.password:
        raise InvalidUsage("Bad Request")
    token: str = "".join(random.choice(string.ascii_letters + string.digits) for _ in range(10))
    temp_token_dict[token] = User(uid="123", name=form_data.username, age=23, sex="M", scopes=form_data.scope)
    return json(oauth2.OAuth2PasswordBearerJsonRespModel.response_data(access_token=token).dict())


oauth2_pb: oauth2.OAuth2PasswordBearer = oauth2.OAuth2PasswordBearer(
    route=oauth2_login,
    scopes={
        "user-info": "get all user info",
        "user-name": "only get user name",
    },
)


def get_current_user(_oauth2_pb: "BaseOAuth2PasswordBearerProxy") -> Callable[[str], User]:
    def _check_scope(token: str = Depends.i(_oauth2_pb)) -> User:
        user_model: Optional[User] = temp_token_dict.get(token, None)
        if not user_model:
            raise _oauth2_pb.security.not_authenticated_exc
        if not _oauth2_pb.is_allow(user_model.scopes):
            raise _oauth2_pb.security.not_authenticated_exc
        return user_model

    return _check_scope


@pait()
def oauth2_user_name(
    user_model: User = Depends.t(get_current_user(oauth2_pb.get_depend(["user-name"]))),
) -> HTTPResponse:
    return json({"code": 0, "msg": "", "data": user_model.name})


@pait()
async def oauth2_user_info(
    user_model: User = Depends.t(get_current_user(oauth2_pb.get_depend(["user-info"]))),
) -> HTTPResponse:
    return json({"code": 0, "msg": "", "data": user_model.dict()})


app = Sanic(name="demo")
app.add_route(oauth2_login, "/api/oauth2-login", methods={"POST"})
app.add_route(oauth2_user_name, "/api/oauth2-user-name", methods={"GET"})
app.add_route(oauth2_user_info, "/api/oauth2-user-info", methods={"GET"})
AddDocRoute(app)


if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app)
docs_source_code/docs_source_code/openapi/security/tornado_with_oauth2_demo.py
import random
import string
from typing import TYPE_CHECKING, Callable, Dict, List, Optional

from pydantic import BaseModel, Field
from tornado.ioloop import IOLoop
from tornado.web import Application, HTTPError, RequestHandler

from pait.app.tornado import pait
from pait.app.tornado.security import oauth2
from pait.field import Depends
from pait.model.response import Http400RespModel
from pait.openapi.doc_route import AddDocRoute

if TYPE_CHECKING:
    from pait.app.base.security.oauth2 import BaseOAuth2PasswordBearerProxy


class User(BaseModel):
    uid: str = Field(..., description="user id")
    name: str = Field(..., description="user name")
    age: int = Field(..., description="user age")
    sex: str = Field(..., description="user sex")
    scopes: List[str] = Field(..., description="user scopes")


temp_token_dict: Dict[str, User] = {}


class OAuth2LoginHandler(RequestHandler):
    @pait(
        response_model_list=[oauth2.OAuth2PasswordBearerJsonRespModel, Http400RespModel],
    )
    async def post(self, form_data: oauth2.OAuth2PasswordRequestFrom) -> None:
        if form_data.username != form_data.password:
            raise HTTPError(400)
        token: str = "".join(random.choice(string.ascii_letters + string.digits) for _ in range(10))
        temp_token_dict[token] = User(uid="123", name=form_data.username, age=23, sex="M", scopes=form_data.scope)
        self.write(oauth2.OAuth2PasswordBearerJsonRespModel.response_data(access_token=token).dict())


oauth2_pb: oauth2.OAuth2PasswordBearer = oauth2.OAuth2PasswordBearer(
    route=OAuth2LoginHandler.post,
    scopes={
        "user-info": "get all user info",
        "user-name": "only get user name",
    },
)


def get_current_user(_oauth2_pb: "BaseOAuth2PasswordBearerProxy") -> Callable[[str], User]:
    def _check_scope(token: str = Depends.i(_oauth2_pb)) -> User:
        user_model: Optional[User] = temp_token_dict.get(token, None)
        if not user_model:
            raise _oauth2_pb.security.not_authenticated_exc
        if not _oauth2_pb.is_allow(user_model.scopes):
            raise _oauth2_pb.security.not_authenticated_exc
        return user_model

    return _check_scope


class OAuth2UserNameHandler(RequestHandler):
    @pait()
    def get(self, user_model: User = Depends.t(get_current_user(oauth2_pb.get_depend(["user-name"])))) -> None:
        self.write({"code": 0, "msg": "", "data": user_model.name})


class OAuth2UserInfoHandler(RequestHandler):
    @pait()
    def get(self, user_model: User = Depends.t(get_current_user(oauth2_pb.get_depend(["user-info"])))) -> None:
        self.write({"code": 0, "msg": "", "data": user_model.dict()})


app: Application = Application(
    [
        (r"/api/security/oauth2-login", OAuth2LoginHandler),
        (r"/api/security/oauth2-user-name", OAuth2UserNameHandler),
        (r"/api/security/oauth2-user-info", OAuth2UserInfoHandler),
    ],
)
AddDocRoute(app)


if __name__ == "__main__":
    app.listen(8000)
    IOLoop.instance().start()

代码中第一部分是创建一个关于用户数据的Model--User,和一个key为token,valueUsertemp_token_dict用于模拟数据库存储。

第二部分是创建一个标准的登陆路由函数,它接收的参数类型是OAuth2PasswordRequestFrom,这是Pait针对Oauth2的登陆参数封装的,它的源码如下:

from pydantic import BaseModel
from pait.field import Form

class BaseOAuth2PasswordRequestFrom(BaseModel):
    username: str = Form()
    password: str = Form()
    scope: ScopeType = Form("")
    client_id: Optional[str] = Form(None)
    client_secret: Optional[str] = Form(None)


class OAuth2PasswordRequestFrom(BaseOAuth2PasswordRequestFrom):
    grant_type: Optional[str] = Form(None, regex="password")
可以看到OAuth2PasswordRequestFrom继承了BaseModel,并且所有参数的Field都使用Form,这意味着它的参数是从请求体中的表单获取数据。 而登陆路由函数在接受到数据后会对数据进行简单的校验,如果校验错误则返回400响应, 如果校验通过则生成一个token并将tokenUser存储到temp_token_dict中并通过oauth2.OAuth2PasswordBearerJsonRespModel返回Oauth2标准的响应。

第三部分是通过oauth2.OAuth2PasswordBearer创建oauth2_pb实例以及创建一个获取用户的函数--get-current_user。 创建oauth2_pbscopes参数为oauth2_pb的权限描述,route参数为登陆路由函数,当路由函数注册到Web框架时oauth2_pb会发现路由函数的URL并写入到tokenUrl属性中。 而get_current_user函数会通过Token获取到当前的用户,然后再通过is_allow方法判断当前用户是否有权限访问该接口,如果没有则返回403响应,如果有则返回User Model。 需要注意点睡,get_current_user函数接收的值为oauth2.OAuth2PasswordBearer的代理类,这个代理类已经明确了只允许了哪些权限。同时 该类有两个功能,一个是通过Depend把请求的参数传递给函数,另外一个是提供is_allow方法用于判断用户是否有权限访问该接口。

第四部分则是路由函数,它们用到了第三部分创建的get_current_user函数, 其中oauth2_pb.get_depend(["user-name"])会通过oauth2.OAuth2PasswordBearer创建一个仅允许user-name权限访问的代理实例, oauth2_pb.get_depend(["user-info"])会通过oauth2.OAuth2PasswordBearer创建一个仅允许user-info权限访问的代理实例, 它们的区别只有scopes是不同的。

在运行代码后,运行如下命令,可以看到它们的执行效果如下:

  curl 'http://127.0.0.1:8000/api/oauth2-login' \
  -H 'Content-Type: application/x-www-form-urlencoded' \
  --data-raw 'grant_type=password&scope=user-info&username=so1n&password=so1n' \

{"access_token":"pomeG4jCDh","token_type":"bearer"}

  curl 'http://127.0.0.1:8000/api/oauth2-login' \
  -H 'Content-Type: application/x-www-form-urlencoded' \
  --data-raw 'grant_type=password&scope=user-name&username=so1n1&password=so1n1' \

{"access_token":"G8ckqKGkDO","token_type":"bearer"}

  curl -X 'GET' \
  'http://127.0.0.1:8000/api/oauth2-user-info' \
  -H 'accept: */*' \
  -H 'Authorization: Bearer pomeG4jCDh'

{"code":0,"data":{"age":23,"name":"so1n","scopes":["user-info"],"sex":"M","uid":"123"},"msg":""}

  curl -X 'GET' \
  'http://127.0.0.1:8000/api/oauth2-user-info' \
  -H 'accept: */*' \
  -H 'Authorization: Bearer G8ckqKGkDO'

<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
<title>401 Unauthorized</title>
<h1>Unauthorized</h1>
<p>Not authenticated</p>

  curl -X 'GET' \
  'http://127.0.0.1:8000/api/oauth2-user-name' \
  -H 'accept: */*' \
  -H 'Authorization: Bearer pomeG4jCDh'

<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
<title>401 Unauthorized</title>
<h1>Unauthorized</h1>
<p>Not authenticated</p>

  curl -X 'GET' \
  'http://127.0.0.1:8000/api/oauth2-user-name' \
  -H 'accept: */*' \
  -H 'Authorization: Bearer G8ckqKGkDO'
{"code":0,"data":"so1n1","msg":""}
通过响应结果可以看到权限为user-info的用户只能访问/api/oauth2-user-info接口,而权限为user-name的用户只能访问/api/oauth2-user-name接口。

Note

当前版本尚未支持refreshUrl