Skip to content

Security

OpenAPI provides support for basic HTTP authentication through security, but different web frameworks implement basic HTTP authentication in different ways. So Pait provides simple support for OpenAPI's security through Depends (api key, http, oauth2), which simplifies the use of security in different web frameworks.

Note

advanced authentication such as jwt will be supported through other package in the future.

1.APIKey

API Key is the simplest method in Security, and because of its simplicity, it has the most usage scenarios. Pait provides APIKey class to support the use of API Key, the usage is as follows:

docs_source_code/openapi/security/flask_with_apikey_demo.py
from flask import Flask

from pait.app.flask import pait
from pait.app.flask.security import api_key
from pait.field import Cookie, Depends, Header, Query
from pait.openapi.doc_route import AddDocRoute

token_cookie_api_key: api_key.APIKey = api_key.APIKey(
    name="token",
    field=Cookie(openapi_include=False),
    verify_api_key_callable=lambda x: "token" in x,
    security_name="token-cookie-api-key",
)
token_header_api_key: api_key.APIKey = api_key.APIKey(
    name="token",
    field=Header(openapi_include=False),
    verify_api_key_callable=lambda x: "token" in x,
    security_name="token-header-api-key",
)
token_query_api_key: api_key.APIKey = api_key.APIKey(
    name="token",
    field=Query(openapi_include=False),
    verify_api_key_callable=lambda x: "token" in x,
    security_name="token-query-api-key",
)


@pait()
def api_key_cookie_route(token: str = Depends.i(token_cookie_api_key)) -> dict:
    return {"code": 0, "msg": "", "data": token}


@pait()
def api_key_header_route(token: str = Depends.i(token_header_api_key)) -> dict:
    return {"code": 0, "msg": "", "data": token}


@pait()
def api_key_query_route(token: str = Depends.i(token_query_api_key)) -> dict:
    return {"code": 0, "msg": "", "data": token}


app = Flask("demo")
app.add_url_rule("/api/api-cookie-key", view_func=api_key_cookie_route, methods=["GET"])
app.add_url_rule("/api/api-header-key", view_func=api_key_header_route, methods=["GET"])
app.add_url_rule("/api/api-query-key", view_func=api_key_query_route, methods=["GET"])
AddDocRoute(app)


if __name__ == "__main__":
    app.run(port=8000)
docs_source_code/openapi/security/starlette_with_apikey_demo.py
from starlette.applications import Starlette
from starlette.responses import JSONResponse
from starlette.routing import Route

from pait.app.starlette import pait
from pait.app.starlette.security import api_key
from pait.field import Cookie, Depends, Header, Query
from pait.openapi.doc_route import AddDocRoute

token_cookie_api_key: api_key.APIKey = api_key.APIKey(
    name="token",
    field=Cookie(openapi_include=False),
    verify_api_key_callable=lambda x: "token" in x,
    security_name="token-cookie-api-key",
)
token_header_api_key: api_key.APIKey = api_key.APIKey(
    name="token",
    field=Header(openapi_include=False),
    verify_api_key_callable=lambda x: "token" in x,
    security_name="token-header-api-key",
)
token_query_api_key: api_key.APIKey = api_key.APIKey(
    name="token",
    field=Query(openapi_include=False),
    verify_api_key_callable=lambda x: "token" in x,
    security_name="token-query-api-key",
)


@pait()
async def api_key_cookie_route(token: str = Depends.i(token_cookie_api_key)) -> JSONResponse:
    return JSONResponse({"code": 0, "msg": "", "data": token})


@pait()
async def api_key_header_route(token: str = Depends.i(token_header_api_key)) -> JSONResponse:
    return JSONResponse({"code": 0, "msg": "", "data": token})


@pait()
async def api_key_query_route(token: str = Depends.i(token_query_api_key)) -> JSONResponse:
    return JSONResponse({"code": 0, "msg": "", "data": token})


app = Starlette(
    routes=[
        Route("/api/api-cookie-key", api_key_cookie_route, methods=["GET"]),
        Route("/api/api-header-key", api_key_header_route, methods=["GET"]),
        Route("/api/api-query-key", api_key_query_route, methods=["GET"]),
    ]
)
AddDocRoute(app)


if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app)
docs_source_code/openapi/security/sanic_with_apikey_demo.py
from sanic import HTTPResponse, Sanic, json

from pait.app.sanic import pait
from pait.app.sanic.security import api_key
from pait.field import Cookie, Depends, Header, Query
from pait.openapi.doc_route import AddDocRoute

token_cookie_api_key: api_key.APIKey = api_key.APIKey(
    name="token",
    field=Cookie(openapi_include=False),
    verify_api_key_callable=lambda x: "token" in x,
    security_name="token-cookie-api-key",
)
token_header_api_key: api_key.APIKey = api_key.APIKey(
    name="token",
    field=Header(openapi_include=False),
    verify_api_key_callable=lambda x: "token" in x,
    security_name="token-header-api-key",
)
token_query_api_key: api_key.APIKey = api_key.APIKey(
    name="token",
    field=Query(openapi_include=False),
    verify_api_key_callable=lambda x: "token" in x,
    security_name="token-query-api-key",
)


@pait()
async def api_key_cookie_route(token: str = Depends.i(token_cookie_api_key)) -> HTTPResponse:
    return json({"code": 0, "msg": "", "data": token})


@pait()
async def api_key_header_route(token: str = Depends.i(token_header_api_key)) -> HTTPResponse:
    return json({"code": 0, "msg": "", "data": token})


@pait()
async def api_key_query_route(token: str = Depends.i(token_query_api_key)) -> HTTPResponse:
    return json({"code": 0, "msg": "", "data": token})


app = Sanic(name="demo")
app.add_route(api_key_cookie_route, "/api/api-cookie-key", methods=["GET"])
app.add_route(api_key_header_route, "/api/api-header-key", methods=["GET"])
app.add_route(api_key_query_route, "/api/api-query-key", methods=["GET"])
AddDocRoute(app)


if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app)
docs_source_code/openapi/security/tornado_with_apikey_demo.py
from tornado.ioloop import IOLoop
from tornado.web import Application, RequestHandler

from pait.app.tornado import pait
from pait.app.tornado.security import api_key
from pait.field import Cookie, Depends, Header, Query
from pait.openapi.doc_route import AddDocRoute

token_cookie_api_key: api_key.APIKey = api_key.APIKey(
    name="token",
    field=Cookie(openapi_include=False),
    verify_api_key_callable=lambda x: "token" in x,
    security_name="token-cookie-api-key",
)
token_header_api_key: api_key.APIKey = api_key.APIKey(
    name="token",
    field=Header(openapi_include=False),
    verify_api_key_callable=lambda x: "token" in x,
    security_name="token-header-api-key",
)
token_query_api_key: api_key.APIKey = api_key.APIKey(
    name="token",
    field=Query(openapi_include=False),
    verify_api_key_callable=lambda x: "token" in x,
    security_name="token-query-api-key",
)


class APIKeyCookieHandler(RequestHandler):
    @pait()
    async def get(self, token: str = Depends.i(token_cookie_api_key)) -> None:
        self.write({"code": 0, "msg": "", "data": token})


class APIKeyHeaderHandler(RequestHandler):
    @pait()
    async def get(self, token: str = Depends.i(token_header_api_key)) -> None:
        self.write({"code": 0, "msg": "", "data": token})


class APIKeyQueryHandler(RequestHandler):
    @pait()
    async def get(self, token: str = Depends.i(token_query_api_key)) -> None:
        self.write({"code": 0, "msg": "", "data": token})


app: Application = Application(
    [
        (r"/api/security/api-cookie-key", APIKeyCookieHandler),
        (r"/api/security/api-header-key", APIKeyHeaderHandler),
        (r"/api/security/api-query-key", APIKeyQueryHandler),
    ],
)
AddDocRoute(app)


if __name__ == "__main__":
    app.listen(8000)
    IOLoop.instance().start()

The first highlighting code is initialized for the different APIKey fields, which use slightly different parameters, see the table below for parameter meanings:

Parameters Description
name The name of the APIKey field
field The APIKey field corresponds to the Field class in Pait, API Key only supports Query, Header and Cookie parameters, so only field.Query, field.Header, field.Cookie are allowed
verify_api_key_callable A function that checks APIKey, Pait extracts the APIKey value from the request body and passes it to the function for processing, if the function returns True then the check passes, and vice versa then the check fails.
security_name Specify the name of the security, the name of APIKey must be different for different roles, the default value is the class name of APIKey.

Note

In order to use APIKey properly in the OpenAPI tool, the Field must be initialized with the openapi_include attribute False.

The second highlighted piece of code connects APIKey to the route function via Depend, where the argument to Depend is an instance of APIKey. When the route function receives a request Pait automatically extracts the value of APIKey from the request body and passes it to APIKey's verify_api_key_callable function for verification, if it passes the verification the value is injected into the route function for execution, and vice versa 401 is returned.

After running the code, execute the following command to see the execution effect of APIKey:

# Success Result
  curl -X 'GET' \
  'http://127.0.0.1:8000/api/api-cookie-key' \
  -H 'accept: */*' \
  -H 'Cookie: token=token'
{"code":0,"msg":"","data":"token"}

  curl -X 'GET' \
  'http://127.0.0.1:8000/api/api-header-key' \
  -H 'accept: */*' \
  -H 'token: token'
{"code":0,"msg":"","data":"token"}

  curl -X 'GET' \
  'http://127.0.0.1:8000/api/api-query-key?token=token' \
  -H 'accept: */*'
{"code":0,"msg":"","data":"token"}

# Fail Result
  curl -X 'GET' \
  'http://127.0.0.1:8000/api/api-cookie-key' \
  -H 'accept: */*' \
  -H 'Cookie: token='
Not authenticated

  curl -X 'GET' \
  'http://127.0.0.1:8000/api/api-header-key' \
  -H 'accept: */*' \
  -H 'token: '
Not authenticated

  curl -X 'GET' \
  'http://127.0.0.1:8000/api/api-query-key?token=' \
  -H 'accept: */*'
Not authenticated

Most of the parameters (e.g., Token) needed by route functions that use APIKey are obtained through other route functions. In this case, you can describe the relationship between this route function and other route functions by using Links in Field, such as the following scenario, which gets its Token from the login route function:

docs_source_code/openapi/security/flask_with_apikey_and_link_demo.py
import hashlib
from typing import Type

from flask import Flask
from pydantic import BaseModel, Field

from pait.app.flask import pait
from pait.app.flask.security import api_key
from pait.field import Depends, Json, Query
from pait.model.response import JsonResponseModel
from pait.openapi.doc_route import AddDocRoute
from pait.openapi.openapi import LinksModel


class LoginRespModel(JsonResponseModel):
    class ResponseModel(BaseModel):
        class DataModel(BaseModel):
            token: str

        code: int = Field(0, description="api code")
        msg: str = Field("success", description="api status msg")
        data: DataModel

    description: str = "login response"
    response_data: Type[BaseModel] = ResponseModel


@pait(response_model_list=[LoginRespModel])
def login_route(uid: str = Json.i(description="user id"), password: str = Json.i(description="password")) -> dict:
    return {"code": 0, "msg": "", "data": {"token": hashlib.sha256((uid + password).encode("utf-8")).hexdigest()}}


link_login_token_model: LinksModel = LinksModel(LoginRespModel, "$response.body#/data/token", desc="test links model")


token_query_api_key: api_key.APIKey = api_key.APIKey(
    name="token",
    field=Query(links=link_login_token_model),
    verify_api_key_callable=lambda x: "token" in x,
    security_name="token-query-api-key",
)


@pait()
def api_key_query_route(token: str = Depends.i(token_query_api_key)) -> dict:
    return {"code": 0, "msg": "", "data": token}


app = Flask("demo")
app.add_url_rule("/api/login", "login", login_route, methods=["POST"])
app.add_url_rule("/api/api-query-key", view_func=api_key_query_route, methods=["GET"])
AddDocRoute(app)


if __name__ == "__main__":
    app.run(port=8000)
docs_source_code/openapi/security/starlette_with_apikey_and_link_demo.py
import hashlib
from typing import Type

from pydantic import BaseModel, Field
from starlette.applications import Starlette
from starlette.responses import JSONResponse
from starlette.routing import Route

from pait.app.starlette import pait
from pait.app.starlette.security import api_key
from pait.field import Depends, Json, Query
from pait.model.response import JsonResponseModel
from pait.openapi.doc_route import AddDocRoute
from pait.openapi.openapi import LinksModel


class LoginRespModel(JsonResponseModel):
    class ResponseModel(BaseModel):
        class DataModel(BaseModel):
            token: str

        code: int = Field(0, description="api code")
        msg: str = Field("success", description="api status msg")
        data: DataModel

    description: str = "login response"
    response_data: Type[BaseModel] = ResponseModel


@pait(response_model_list=[LoginRespModel])
async def login_route(
    uid: str = Json.i(description="user id"), password: str = Json.i(description="password")
) -> JSONResponse:
    return JSONResponse(
        {"code": 0, "msg": "", "data": {"token": hashlib.sha256((uid + password).encode("utf-8")).hexdigest()}}
    )


link_login_token_model: LinksModel = LinksModel(LoginRespModel, "$response.body#/data/token", desc="test links model")


token_query_api_key: api_key.APIKey = api_key.APIKey(
    name="token",
    field=Query(links=link_login_token_model),
    verify_api_key_callable=lambda x: "token" in x,
    security_name="token-query-api-key",
)


@pait()
async def api_key_query_route(token: str = Depends.i(token_query_api_key)) -> JSONResponse:
    return JSONResponse({"code": 0, "msg": "", "data": token})


app = Starlette(
    routes=[
        Route("/api/login", login_route, methods=["POST"]),
        Route("/api/api-query-key", api_key_query_route, methods=["GET"]),
    ]
)
AddDocRoute(app)


if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app)
docs_source_code/openapi/security/sanic_with_apikey_and_link_demo.py
import hashlib
from typing import Type

from pydantic import BaseModel, Field
from sanic import HTTPResponse, Sanic, json

from pait.app.sanic import pait
from pait.app.sanic.security import api_key
from pait.field import Depends, Json, Query
from pait.model.response import JsonResponseModel
from pait.openapi.doc_route import AddDocRoute
from pait.openapi.openapi import LinksModel


class LoginRespModel(JsonResponseModel):
    class ResponseModel(BaseModel):
        class DataModel(BaseModel):
            token: str

        code: int = Field(0, description="api code")
        msg: str = Field("success", description="api status msg")
        data: DataModel

    description: str = "login response"
    response_data: Type[BaseModel] = ResponseModel


@pait(response_model_list=[LoginRespModel])
async def login_route(
    uid: str = Json.i(description="user id"), password: str = Json.i(description="password")
) -> HTTPResponse:
    return json({"code": 0, "msg": "", "data": {"token": hashlib.sha256((uid + password).encode("utf-8")).hexdigest()}})


link_login_token_model: LinksModel = LinksModel(LoginRespModel, "$response.body#/data/token", desc="test links model")


token_query_api_key: api_key.APIKey = api_key.APIKey(
    name="token",
    field=Query(links=link_login_token_model),
    verify_api_key_callable=lambda x: "token" in x,
    security_name="token-query-api-key",
)


@pait()
async def api_key_query_route(token: str = Depends.i(token_query_api_key)) -> HTTPResponse:
    return json({"code": 0, "msg": "", "data": token})


app = Sanic(name="demo")
app.add_route(login_route, "/api/login", methods=["POST"])
app.add_route(api_key_query_route, "/api/api-query-key", methods=["GET"])
AddDocRoute(app)


if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app)
docs_source_code/openapi/security/tornado_with_apikey_and_link_demo.py
import hashlib
from typing import Type

from pydantic import BaseModel, Field
from tornado.ioloop import IOLoop
from tornado.web import Application, RequestHandler

from pait.app.tornado import pait
from pait.app.tornado.security import api_key
from pait.field import Depends, Json, Query
from pait.model.response import JsonResponseModel
from pait.openapi.doc_route import AddDocRoute
from pait.openapi.openapi import LinksModel


class LoginRespModel(JsonResponseModel):
    class ResponseModel(BaseModel):
        class DataModel(BaseModel):
            token: str

        code: int = Field(0, description="api code")
        msg: str = Field("success", description="api status msg")
        data: DataModel

    description: str = "login response"
    response_data: Type[BaseModel] = ResponseModel


class LoginHandler(RequestHandler):
    @pait(response_model_list=[LoginRespModel])
    async def post(
        self, uid: str = Json.i(description="user id"), password: str = Json.i(description="password")
    ) -> None:
        self.write(
            {"code": 0, "msg": "", "data": {"token": hashlib.sha256((uid + password).encode("utf-8")).hexdigest()}}
        )


link_login_token_model: LinksModel = LinksModel(LoginRespModel, "$response.body#/data/token", desc="test links model")


token_query_api_key: api_key.APIKey = api_key.APIKey(
    name="token",
    field=Query(links=link_login_token_model),
    verify_api_key_callable=lambda x: "token" in x,
    security_name="token-query-api-key",
)


class APIKeyQueryHandler(RequestHandler):
    @pait()
    async def get(self, token: str = Depends.i(token_query_api_key)) -> None:
        self.write({"code": 0, "msg": "", "data": token})


app: Application = Application(
    [
        (r"/api/login", LoginHandler),
        (r"/api/security/api-query-key", APIKeyQueryHandler),
    ],
)
AddDocRoute(app)


if __name__ == "__main__":
    app.listen(8000)
    IOLoop.instance().start()

The first highlighted code is from Field-Links, while the Query in the second highlighted code sets the links attribute to link_login_token_model. This way Pait will bind login_route to api_key_query_route via Link when generating OpenAPI.

Note

openapi_include=False causes Swggaer to be unable to display Link data.

2.HTTP

There are two types of HTTP basic authentication, one is HTTPBasic and the other is HTTPBearer or HTTPDIgest. The difference between the two is that HTTPBasic needs to verify the username and password parameters in the request header, and if the verification is successful, it means the authentication is successful. If the validation error will return 401 response, the browser will pop up a window to let the user enter username and password. While HTTPBearer or HTTPDIgest only need to pass token in the request header as required.

Pait encapsulates the HTTPBasic, HTTPBearer and HTTPDigest classes for each of the three methods of HTTP basic authentication. Like APIKey they need to be bound to a route function via Depend, which is used as follows:

docs_source_code/openapi/security/flask_with_http_demo.py
from typing import Optional

from flask import Flask

from pait.app.flask import pait
from pait.app.flask.security import http
from pait.field import Depends
from pait.openapi.doc_route import AddDocRoute

##############
# HTTP Basic #
##############
http_basic: http.HTTPBasic = http.HTTPBasic()


def get_user_name(credentials: Optional[http.HTTPBasicCredentials] = Depends.i(http_basic)) -> str:
    if not credentials or credentials.username != credentials.password:
        raise http_basic.not_authorization_exc
    return credentials.username


@pait()
def get_user_name_by_http_basic_credentials(user_name: str = Depends.t(get_user_name)) -> dict:
    return {"code": 0, "msg": "", "data": user_name}


#############
# HTTP Bear #
#############
http_bear: http.HTTPBearer = http.HTTPBearer(verify_callable=lambda x: "http" in x)


@pait()
def get_user_name_by_http_bearer(credentials: Optional[str] = Depends.i(http_bear)) -> dict:
    return {"code": 0, "msg": "", "data": credentials}


###############
# HTTP Digest #
###############
http_digest: http.HTTPDigest = http.HTTPDigest(verify_callable=lambda x: "http" in x)


@pait()
def get_user_name_by_http_digest(credentials: Optional[str] = Depends.i(http_digest)) -> dict:
    return {"code": 0, "msg": "", "data": credentials}


app = Flask("demo")
app.add_url_rule(
    "/api/user-name-by-http-basic-credentials",
    view_func=get_user_name_by_http_basic_credentials,
    methods=["GET"],
)
app.add_url_rule("/api/user-name-by-http-bearer", view_func=get_user_name_by_http_bearer, methods=["GET"])
app.add_url_rule("/api/user-name-by-http-digest", view_func=get_user_name_by_http_digest, methods=["GET"])
AddDocRoute(app)


if __name__ == "__main__":
    app.run(port=8000)
docs_source_code/openapi/security/starlette_with_http_demo.py
from typing import Optional

from starlette.applications import Starlette
from starlette.responses import JSONResponse
from starlette.routing import Route

from pait.app.starlette import pait
from pait.app.starlette.security import http
from pait.field import Depends
from pait.openapi.doc_route import AddDocRoute

##############
# HTTP Basic #
##############
http_basic: http.HTTPBasic = http.HTTPBasic()


def get_user_name(credentials: Optional[http.HTTPBasicCredentials] = Depends.t(http_basic)) -> str:
    if not credentials or credentials.username != credentials.password:
        raise http_basic.not_authorization_exc
    return credentials.username


@pait()
async def get_user_name_by_http_basic_credentials(user_name: str = Depends.t(get_user_name)) -> JSONResponse:
    return JSONResponse({"code": 0, "msg": "", "data": user_name})


#############
# HTTP Bear #
#############
http_bear: http.HTTPBearer = http.HTTPBearer(verify_callable=lambda x: "http" in x)


@pait()
async def get_user_name_by_http_bearer(credentials: Optional[str] = Depends.t(http_bear)) -> JSONResponse:
    return JSONResponse({"code": 0, "msg": "", "data": credentials})


###############
# HTTP Digest #
###############
http_digest: http.HTTPDigest = http.HTTPDigest(verify_callable=lambda x: "http" in x)


@pait()
async def get_user_name_by_http_digest(credentials: Optional[str] = Depends.t(http_digest)) -> JSONResponse:
    return JSONResponse({"code": 0, "msg": "", "data": credentials})


app = Starlette(
    routes=[
        Route("/api/user-name-by-http-basic-credentials", get_user_name_by_http_basic_credentials, methods=["GET"]),
        Route("/api/user-name-by-http-bearer", get_user_name_by_http_bearer, methods=["GET"]),
        Route("/api/user-name-by-http-digest", get_user_name_by_http_digest, methods=["GET"]),
    ]
)
AddDocRoute(app)


if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app)
docs_source_code/openapi/security/sanic_with_http_demo.py
from typing import Optional

from sanic import Sanic, response

from pait.app.sanic import pait
from pait.app.sanic.security import http
from pait.field import Depends
from pait.openapi.doc_route import AddDocRoute

##############
# HTTP Basic #
##############
http_basic: http.HTTPBasic = http.HTTPBasic()


def get_user_name(credentials: Optional[http.HTTPBasicCredentials] = Depends.t(http_basic)) -> str:
    if not credentials or credentials.username != credentials.password:
        raise http_basic.not_authorization_exc
    return credentials.username


@pait()
async def get_user_name_by_http_basic_credentials(user_name: str = Depends.t(get_user_name)) -> response.HTTPResponse:
    return response.json({"code": 0, "msg": "", "data": user_name})


#############
# HTTP Bear #
#############
http_bear: http.HTTPBearer = http.HTTPBearer(verify_callable=lambda x: "http" in x)


@pait()
async def get_user_name_by_http_bearer(credentials: Optional[str] = Depends.t(http_bear)) -> response.HTTPResponse:
    return response.json({"code": 0, "msg": "", "data": credentials})


###############
# HTTP Digest #
###############
http_digest: http.HTTPDigest = http.HTTPDigest(verify_callable=lambda x: "http" in x)


@pait()
async def get_user_name_by_http_digest(credentials: Optional[str] = Depends.t(http_digest)) -> response.HTTPResponse:
    return response.json({"code": 0, "msg": "", "data": credentials})


app = Sanic(name="demo")
app.add_route(get_user_name_by_http_basic_credentials, "/api/user-name-by-http-basic-credentials", methods={"GET"})
app.add_route(get_user_name_by_http_bearer, "/api/user-name-by-http-bearer", methods={"GET"})
app.add_route(get_user_name_by_http_digest, "/api/user-name-by-http-digest", methods={"GET"})
AddDocRoute(app)


if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app)
docs_source_code/openapi/security/tornado_with_http_demo.py
from typing import Optional

from tornado.ioloop import IOLoop
from tornado.web import Application, RequestHandler

from pait.app.tornado import pait
from pait.app.tornado.security import http
from pait.field import Depends
from pait.openapi.doc_route import AddDocRoute

http_basic: http.HTTPBasic = http.HTTPBasic()


def get_user_name(credentials: Optional[http.HTTPBasicCredentials] = Depends.t(http_basic)) -> str:
    if not credentials or credentials.username != credentials.password:
        raise http_basic.not_authorization_exc
    return credentials.username


class UserNameByHttpBasicCredentialsHandler(RequestHandler):
    @pait()
    async def get(self, user_name: str = Depends.t(get_user_name)) -> None:
        self.write({"code": 0, "msg": "", "data": user_name})


#############
# HTTP Bear #
#############
http_bear: http.HTTPBearer = http.HTTPBearer(verify_callable=lambda x: "http" in x)


class UserNameByHttpBearerHandler(RequestHandler):
    @pait()
    async def get(self, credentials: Optional[str] = Depends.t(http_bear)) -> None:
        self.write({"code": 0, "msg": "", "data": credentials})


###############
# HTTP Digest #
###############
http_digest: http.HTTPDigest = http.HTTPDigest(verify_callable=lambda x: "http" in x)


class UserNameByHttpDigestHandler(RequestHandler):
    @pait()
    async def get(self, credentials: Optional[str] = Depends.t(http_digest)) -> None:
        self.write({"code": 0, "msg": "", "data": credentials})


app: Application = Application(
    [
        (r"/api/security/user-name-by-http-basic-credentials", UserNameByHttpBasicCredentialsHandler),
        (r"/api/security/user-name-by-http-bearer", UserNameByHttpBearerHandler),
        (r"/api/security/user-name-by-http-digest", UserNameByHttpDigestHandler),
    ],
)
AddDocRoute(app)


if __name__ == "__main__":
    app.listen(8000)
    IOLoop.instance().start()

You can see that the whole code consists of two parts, the first part initializes the corresponding basic authentication class, and the second part uses Depend in the route function to get an instance of the authentication class.

However, HTTPBasic is used a little differently than the other two, for example, HTTPBasic has different initialization parameters than the other two, which are described in the following table:

parameter description
security_model OpenAPI Description Model about HTTPBasic, a generic HTTPBasicModel has been provided by default, for customization needs visit OpenAPI's securitySchemeObject
security_name Specifies the name of the Security, the name of the basic authentication instance must be different for different roles, the default value is class name.
header_field Instance of Header Field for Pait
realm The realm parameter of HTTP basic authentication

While HTTPBearer and HTTPDigest are used in a similar way as APIKey, they need to be initialized as required and bound to the route function via Depend, their parameters are described as follows:

parameter description
security_model OpenAPI Description Model about HTTPBasic, a generic HTTPBasicModel has been provided by default, for customization needs visit OpenAPI's securitySchemeObject
security_name Specifies the name of the Security, the name of the basic authentication instance must be different for different roles, the default value is class name.
header_field Instance of Header Field for Pait
is_raise When set to True, Pait throws a standard error if parsing fails, and False returns None if parsing fails, default True.
verify_callable Accepts a checksum function, Pait extracts the value from the request body and passes it to the checksum function, if it returns True it means the checksum passes, otherwise the checksum fails.

In addition to the difference in initialization parameters, HTTPBasic is not used directly in the route function, but exists in the get_user_name function and the get_user_name function is responsible for authentication. returning the username to the route function if the authentication is successful, otherwise returning a 401 response.

After running the code, execute the curl command to see them executed as follows:

# Success Result
  curl -X 'GET' \
  'http://127.0.0.1:8000/api/user-name-by-http-basic-credentials' \
  -H 'accept: */*' \
  -H 'Authorization: Basic c28xbjpzbzFu'

{"code":0,"data":"so1n","msg":""}

   curl -X 'GET' \
  'http://127.0.0.1:8000/api/user-name-by-http-bearer' \
  -H 'accept: */*' \
  -H 'Authorization: Bearer http'

{"code":0,"data":"http","msg":""}

  curl -X 'GET' \
  'http://127.0.0.1:8000/api/user-name-by-http-digest' \
  -H 'accept: */*' \
  -H 'Authorization: Digest http'

{"code":0,"data":"http","msg":""}

# Fail Result
  curl -X 'GET' \
  'http://127.0.0.1:8000/api/user-name-by-http-digest' \
  -H 'accept: */*' \
  -H 'Authorization: Digest '

<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
<title>403 Forbidden</title>
<h1>Forbidden</h1>
<p>Not authenticated</p>

  curl -X 'GET' \
  'http://127.0.0.1:8000/api/user-name-by-http-bearer' \
  -H 'accept: */*' \
  -H 'Authorization: Bearer '

<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
<title>403 Forbidden</title>
<h1>Forbidden</h1>
<p>Not authenticated</p>

Note

The HTTPDigest class only provides simple HTTPDigest authentication support, which needs to be modified to suit your business logic when using it.

3.Oauth2

OAuth 2.0 is an authorization protocol that provides API clients with limited access to user data on the Web server. In addition to providing identity verification, it also supports privilege verification, which is used as follows:

docs_source_code/openapi/security/flask_with_oauth2_demo.py
import random
import string
from typing import TYPE_CHECKING, Callable, Dict, List, Optional

from flask import Flask
from pydantic import BaseModel, Field
from werkzeug.exceptions import BadRequest

from pait.app.flask import pait
from pait.app.flask.security import oauth2
from pait.field import Depends
from pait.model.response import Http400RespModel
from pait.openapi.doc_route import AddDocRoute

if TYPE_CHECKING:
    from pait.app.base.security.oauth2 import BaseOAuth2PasswordBearerProxy


class User(BaseModel):
    uid: str = Field(..., description="user id")
    name: str = Field(..., description="user name")
    age: int = Field(..., description="user age")
    sex: str = Field(..., description="user sex")
    scopes: List[str] = Field(..., description="user scopes")


temp_token_dict: Dict[str, User] = {}


@pait(
    response_model_list=[Http400RespModel, oauth2.OAuth2PasswordBearerJsonRespModel],
)
def oauth2_login(form_data: oauth2.OAuth2PasswordRequestFrom) -> dict:
    if form_data.username != form_data.password:
        raise BadRequest()
    token: str = "".join(random.choice(string.ascii_letters + string.digits) for _ in range(10))
    temp_token_dict[token] = User(uid="123", name=form_data.username, age=23, sex="M", scopes=form_data.scope)
    return oauth2.OAuth2PasswordBearerJsonRespModel.response_data(access_token=token).dict()


oauth2_pb: oauth2.OAuth2PasswordBearer = oauth2.OAuth2PasswordBearer(
    route=oauth2_login,
    scopes={
        "user-info": "get all user info",
        "user-name": "only get user name",
    },
)


def get_current_user(_oauth2_pb: "BaseOAuth2PasswordBearerProxy") -> Callable[[str], User]:
    def _check_scope(token: str = Depends.i(_oauth2_pb)) -> User:
        user_model: Optional[User] = temp_token_dict.get(token, None)
        if not user_model:
            raise _oauth2_pb.security.not_authenticated_exc
        if not _oauth2_pb.is_allow(user_model.scopes):
            raise _oauth2_pb.security.not_authenticated_exc
        return user_model

    return _check_scope


@pait()
def oauth2_user_name(user_model: User = Depends.t(get_current_user(oauth2_pb.get_depend(["user-name"])))) -> dict:
    return {"code": 0, "msg": "", "data": user_model.name}


@pait()
def oauth2_user_info(user_model: User = Depends.t(get_current_user(oauth2_pb.get_depend(["user-info"])))) -> dict:
    return {"code": 0, "msg": "", "data": user_model.dict()}


app = Flask("demo")
app.add_url_rule("/api/oauth2-login", view_func=oauth2_login, methods=["POST"])
app.add_url_rule("/api/oauth2-user-name", view_func=oauth2_user_name, methods=["GET"])
app.add_url_rule("/api/oauth2-user-info", view_func=oauth2_user_info, methods=["GET"])
AddDocRoute(app)


if __name__ == "__main__":
    app.run(port=8000)
docs_source_code/openapi/security/starlette_with_oauth2_demo.py
import random
import string
from typing import TYPE_CHECKING, Callable, Dict, List, Optional

from pydantic import BaseModel, Field
from starlette.applications import Starlette
from starlette.exceptions import HTTPException
from starlette.responses import JSONResponse

from pait.app.starlette import pait
from pait.app.starlette.security import oauth2
from pait.field import Depends
from pait.model.response import Http400RespModel, TextResponseModel

if TYPE_CHECKING:
    from pait.app.base.security.oauth2 import BaseOAuth2PasswordBearerProxy


class User(BaseModel):
    uid: str = Field(..., description="user id")
    name: str = Field(..., description="user name")
    age: int = Field(..., description="user age")
    sex: str = Field(..., description="user sex")
    scopes: List[str] = Field(..., description="user scopes")


temp_token_dict: Dict[str, User] = {}


@pait(
    response_model_list=[
        oauth2.OAuth2PasswordBearerJsonRespModel,
        Http400RespModel.clone(resp_model=TextResponseModel),
    ],
)
async def oauth2_login(form_data: oauth2.OAuth2PasswordRequestFrom) -> JSONResponse:
    if form_data.username != form_data.password:
        raise HTTPException(400)
    token: str = "".join(random.choice(string.ascii_letters + string.digits) for _ in range(10))
    temp_token_dict[token] = User(uid="123", name=form_data.username, age=23, sex="M", scopes=form_data.scope)
    return JSONResponse(oauth2.OAuth2PasswordBearerJsonRespModel.response_data(access_token=token).dict())


oauth2_pb: oauth2.OAuth2PasswordBearer = oauth2.OAuth2PasswordBearer(
    route=oauth2_login,
    scopes={
        "user-info": "get all user info",
        "user-name": "only get user name",
    },
)


def get_current_user(_oauth2_pb: "BaseOAuth2PasswordBearerProxy") -> Callable[[str], User]:
    def _check_scope(token: str = Depends.i(_oauth2_pb)) -> User:
        user_model: Optional[User] = temp_token_dict.get(token, None)
        if not user_model:
            raise _oauth2_pb.security.not_authenticated_exc
        if not _oauth2_pb.is_allow(user_model.scopes):
            raise _oauth2_pb.security.not_authenticated_exc
        return user_model

    return _check_scope


@pait()
def oauth2_user_name(
    user_model: User = Depends.t(get_current_user(oauth2_pb.get_depend(["user-name"]))),
) -> JSONResponse:
    return JSONResponse({"code": 0, "msg": "", "data": user_model.name})


@pait()
def oauth2_user_info(
    user_model: User = Depends.t(get_current_user(oauth2_pb.get_depend(["user-info"]))),
) -> JSONResponse:
    return JSONResponse({"code": 0, "msg": "", "data": user_model.dict()})


app = Starlette()
app.add_route("/api/oauth2-login", oauth2_login, methods=["POST"])
app.add_route("/api/oauth2-user-name", oauth2_user_name, methods=["GET"])
app.add_route("/api/oauth2-user-info", oauth2_user_info, methods=["GET"])


if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app)
docs_source_code/openapi/security/sanic_with_oauth2_demo.py
import random
import string
from typing import TYPE_CHECKING, Callable, Dict, List, Optional

from pydantic import BaseModel, Field
from sanic import HTTPResponse, Sanic, json
from sanic.exceptions import InvalidUsage

from pait.app.sanic import pait
from pait.app.sanic.security import oauth2
from pait.field import Depends
from pait.model.response import Http400RespModel
from pait.openapi.doc_route import AddDocRoute

if TYPE_CHECKING:
    from pait.app.base.security.oauth2 import BaseOAuth2PasswordBearerProxy


class User(BaseModel):
    uid: str = Field(..., description="user id")
    name: str = Field(..., description="user name")
    age: int = Field(..., description="user age")
    sex: str = Field(..., description="user sex")
    scopes: List[str] = Field(..., description="user scopes")


temp_token_dict: Dict[str, User] = {}


@pait(
    response_model_list=[Http400RespModel, oauth2.OAuth2PasswordBearerJsonRespModel],
)
async def oauth2_login(form_data: oauth2.OAuth2PasswordRequestFrom) -> HTTPResponse:
    if form_data.username != form_data.password:
        raise InvalidUsage("Bad Request")
    token: str = "".join(random.choice(string.ascii_letters + string.digits) for _ in range(10))
    temp_token_dict[token] = User(uid="123", name=form_data.username, age=23, sex="M", scopes=form_data.scope)
    return json(oauth2.OAuth2PasswordBearerJsonRespModel.response_data(access_token=token).dict())


oauth2_pb: oauth2.OAuth2PasswordBearer = oauth2.OAuth2PasswordBearer(
    route=oauth2_login,
    scopes={
        "user-info": "get all user info",
        "user-name": "only get user name",
    },
)


def get_current_user(_oauth2_pb: "BaseOAuth2PasswordBearerProxy") -> Callable[[str], User]:
    def _check_scope(token: str = Depends.i(_oauth2_pb)) -> User:
        user_model: Optional[User] = temp_token_dict.get(token, None)
        if not user_model:
            raise _oauth2_pb.security.not_authenticated_exc
        if not _oauth2_pb.is_allow(user_model.scopes):
            raise _oauth2_pb.security.not_authenticated_exc
        return user_model

    return _check_scope


@pait()
def oauth2_user_name(
    user_model: User = Depends.t(get_current_user(oauth2_pb.get_depend(["user-name"]))),
) -> HTTPResponse:
    return json({"code": 0, "msg": "", "data": user_model.name})


@pait()
async def oauth2_user_info(
    user_model: User = Depends.t(get_current_user(oauth2_pb.get_depend(["user-info"]))),
) -> HTTPResponse:
    return json({"code": 0, "msg": "", "data": user_model.dict()})


app = Sanic(name="demo")
app.add_route(oauth2_login, "/api/oauth2-login", methods={"POST"})
app.add_route(oauth2_user_name, "/api/oauth2-user-name", methods={"GET"})
app.add_route(oauth2_user_info, "/api/oauth2-user-info", methods={"GET"})
AddDocRoute(app)


if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app)
docs_source_code/openapi/security/tornado_with_oauth2_demo.py
import random
import string
from typing import TYPE_CHECKING, Callable, Dict, List, Optional

from pydantic import BaseModel, Field
from tornado.ioloop import IOLoop
from tornado.web import Application, HTTPError, RequestHandler

from pait.app.tornado import pait
from pait.app.tornado.security import oauth2
from pait.field import Depends
from pait.model.response import Http400RespModel
from pait.openapi.doc_route import AddDocRoute

if TYPE_CHECKING:
    from pait.app.base.security.oauth2 import BaseOAuth2PasswordBearerProxy


class User(BaseModel):
    uid: str = Field(..., description="user id")
    name: str = Field(..., description="user name")
    age: int = Field(..., description="user age")
    sex: str = Field(..., description="user sex")
    scopes: List[str] = Field(..., description="user scopes")


temp_token_dict: Dict[str, User] = {}


class OAuth2LoginHandler(RequestHandler):
    @pait(
        response_model_list=[oauth2.OAuth2PasswordBearerJsonRespModel, Http400RespModel],
    )
    async def post(self, form_data: oauth2.OAuth2PasswordRequestFrom) -> None:
        if form_data.username != form_data.password:
            raise HTTPError(400)
        token: str = "".join(random.choice(string.ascii_letters + string.digits) for _ in range(10))
        temp_token_dict[token] = User(uid="123", name=form_data.username, age=23, sex="M", scopes=form_data.scope)
        self.write(oauth2.OAuth2PasswordBearerJsonRespModel.response_data(access_token=token).dict())


oauth2_pb: oauth2.OAuth2PasswordBearer = oauth2.OAuth2PasswordBearer(
    route=OAuth2LoginHandler.post,
    scopes={
        "user-info": "get all user info",
        "user-name": "only get user name",
    },
)


def get_current_user(_oauth2_pb: "BaseOAuth2PasswordBearerProxy") -> Callable[[str], User]:
    def _check_scope(token: str = Depends.i(_oauth2_pb)) -> User:
        user_model: Optional[User] = temp_token_dict.get(token, None)
        if not user_model:
            raise _oauth2_pb.security.not_authenticated_exc
        if not _oauth2_pb.is_allow(user_model.scopes):
            raise _oauth2_pb.security.not_authenticated_exc
        return user_model

    return _check_scope


class OAuth2UserNameHandler(RequestHandler):
    @pait()
    def get(self, user_model: User = Depends.t(get_current_user(oauth2_pb.get_depend(["user-name"])))) -> None:
        self.write({"code": 0, "msg": "", "data": user_model.name})


class OAuth2UserInfoHandler(RequestHandler):
    @pait()
    def get(self, user_model: User = Depends.t(get_current_user(oauth2_pb.get_depend(["user-info"])))) -> None:
        self.write({"code": 0, "msg": "", "data": user_model.dict()})


app: Application = Application(
    [
        (r"/api/security/oauth2-login", OAuth2LoginHandler),
        (r"/api/security/oauth2-user-name", OAuth2UserNameHandler),
        (r"/api/security/oauth2-user-info", OAuth2UserInfoHandler),
    ],
)
AddDocRoute(app)


if __name__ == "__main__":
    app.listen(8000)
    IOLoop.instance().start()

The first part is to create a Model -- User about the user's data and a temp_token_dict with key as token value as User to be used to mocker database storage.

The second part is to create a standard login route function that takes a parameter of type OAuth2PasswordRequestFrom, which is Pait's encapsulation of Oauth2's login parameters, and which has the following source code:

from pydantic import BaseModel
from pait.field import Form

class BaseOAuth2PasswordRequestFrom(BaseModel):
    username: str = Form()
    password: str = Form()
    scope: ScopeType = Form("")
    client_id: Optional[str] = Form(None)
    client_secret: Optional[str] = Form(None)


class OAuth2PasswordRequestFrom(BaseOAuth2PasswordRequestFrom):
    grant_type: Optional[str] = Form(None, regex="password")
can see that OAuth2PasswordRequestFrom inherits BaseModel and uses Form for the Field of all its parameters, which means that its parameters get data from the form in the request body. While the login route function simply checks the data after receiving it, and returns a 400 response if the check is wrong. If it passes, it generates a token and stores the token and User in temp_token_dict and returns the Oauth2 standard response via oauth2.OAuth2PasswordBearerJsonRespModel.

The third part is the creation of an instance of oauth2_pb via oauth2.OAuth2PasswordBearer as well as the creation of a function -- get-current_userto get the user. The scopes parameter to create oauth2_pb is the permission description of oauth2_pb and the route parameter is the login route function, when the route function registers with the web framework, oauth2_pb will find the URL of the route function and writes it to the tokenUrl attribute. The get_current_user function will get the current user through the Token, and then through the is_allow method to determine whether the current user has permission to access the route function, if not, then return a 403 response, if so, then return User Model. Note that the get_current_user function receives the value of the oauth2.OAuth2PasswordBearer proxy class, which already specifies only which permissions are allowed(by oauth2_pb.get_depend method). In addition, the class has two functions, one that passes the requested parameters to the function via Depend, and the other that provides the is_allow method for determining whether the user has permission to access the interface.

The fourth part is the route functions, which use the get_current_user function created in the third part. where oauth2_pb.get_depend(["user-name"]) creates an instance of a proxy that only allows access with user-name privileges, via oauth2.OAuth2PasswordBearer. oauth2_pb.get_depend(["user-info"]) will create a proxy instance that only allows access with user-info permissions via oauth2.OAuth2PasswordBearer. The only difference between them is that the scopes are different.

After running the code, run the curl command to see them executed as follows:

  curl 'http://127.0.0.1:8000/api/oauth2-login' \
  -H 'Content-Type: application/x-www-form-urlencoded' \
  --data-raw 'grant_type=password&scope=user-info&username=so1n&password=so1n' \

{"access_token":"pomeG4jCDh","token_type":"bearer"}

  curl 'http://127.0.0.1:8000/api/oauth2-login' \
  -H 'Content-Type: application/x-www-form-urlencoded' \
  --data-raw 'grant_type=password&scope=user-name&username=so1n1&password=so1n1' \

{"access_token":"G8ckqKGkDO","token_type":"bearer"}

  curl -X 'GET' \
  'http://127.0.0.1:8000/api/oauth2-user-info' \
  -H 'accept: */*' \
  -H 'Authorization: Bearer pomeG4jCDh'

{"code":0,"data":{"age":23,"name":"so1n","scopes":["user-info"],"sex":"M","uid":"123"},"msg":""}

  curl -X 'GET' \
  'http://127.0.0.1:8000/api/oauth2-user-info' \
  -H 'accept: */*' \
  -H 'Authorization: Bearer G8ckqKGkDO'

<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
<title>401 Unauthorized</title>
<h1>Unauthorized</h1>
<p>Not authenticated</p>

  curl -X 'GET' \
  'http://127.0.0.1:8000/api/oauth2-user-name' \
  -H 'accept: */*' \
  -H 'Authorization: Bearer pomeG4jCDh'

<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
<title>401 Unauthorized</title>
<h1>Unauthorized</h1>
<p>Not authenticated</p>

  curl -X 'GET' \
  'http://127.0.0.1:8000/api/oauth2-user-name' \
  -H 'accept: */*' \
  -H 'Authorization: Bearer G8ckqKGkDO'
{"code":0,"data":"so1n1","msg":""}
The response result shows that a user with permission user-info can only access /api/oauth2-user-info, while a user with permission user-name can only access /api/oauth2-user-name.

Note

The current version does not support refresh Url yet