» Python:使用Flask构建REST API » 2. 开发 » 2.12 权限认证

权限认证

在 API 服务器中想对特定资源或功能的访问进行控制就需要实行身份权限验证

换句话说,如果你想要将某些 API 端点或数据限制给具有特殊角色的用户(例如管理员、版主或高级用户等)使用,你就需要进行身份权限验证。

传统用户流程

更新代码

添加 User 领域实体,domain/model/user.py:

from dataclasses import dataclass
from datetime import datetime


@dataclass
class User:
    id: int
    email: str
    password: str
    salt: str
    is_admin: bool
    created_at: datetime
    updated_at: datetime

声明其业务能力,domain/gateway/user_manager.py:

from abc import ABC, abstractmethod
from typing import Optional

from ..model import User


class UserManager(ABC):
    @abstractmethod
    def create_user(self, u: User) -> int:
        pass

    @abstractmethod
    def get_user_by_email(self, email: str) -> Optional[User]:
        pass

添加其实现,infrastructure/database/mysql.py:

@@ -3,11 +3,11 @@ from typing import Any, List, Optional
 
 from books.infrastructure.config import DBConfig
 
-from ...domain.gateway import BookManager
-from ...domain.model import Book
+from ...domain.gateway import BookManager, UserManager
+from ...domain.model import Book, User
 
 
-class MySQLPersistence(BookManager):
+class MySQLPersistence(BookManager, UserManager):
     def __init__(self, c: DBConfig, page_size: int):
         self.page_size = page_size
         self.conn = mysql.connector.connect(
@@ -73,3 +73,18 @@ class MySQLPersistence(BookManager):
         self.cursor.execute(query, tuple(params))
         results: List[Any] = self.cursor.fetchall()
         return [Book(**result) for result in results]
+
+    def create_user(self, u: User) -> int:
+        self.cursor.execute('''
+            INSERT INTO users (email, password, salt, is_admin, created_at, updated_at) VALUES (%s, %s, %s, %s, %s, %s)
+        ''', (u.email, u.password, u.salt, u.is_admin, u.created_at, u.updated_at))
+        return self.cursor.lastrowid or 0
+
+    def get_user_by_email(self, email: str) -> Optional[User]:
+        self.cursor.execute('''
+            SELECT * FROM users WHERE email=%s
+        ''', (email,))
+        result: Any = self.cursor.fetchone()
+        if result is None:
+            return None
+        return User(**result)

实现用户注册和登录功能,application/executor/user_operator.py:

from datetime import datetime
import hashlib
import random
import time
from typing import Optional

from books.application.dto import UserCredential, User
from books.domain.gateway import UserManager
from books.domain.model import User as DomainUser

SALT_LEN = 4
ERR_EMPTY_EMAIL = "empty email"
ERR_EMPTY_PASSWORD = "empty password"


class UserOperator:
    def __init__(self, user_manager: UserManager):
        self.user_manager = user_manager

    def create_user(self, uc: UserCredential) -> Optional[User]:
        if not uc.email:
            raise ValueError(ERR_EMPTY_EMAIL)
        if not uc.password:
            raise ValueError(ERR_EMPTY_PASSWORD)

        salt = random_string(SALT_LEN)
        password_hash = sha1_hash(uc.password + salt)

        now = datetime.now()
        user = DomainUser(id=0, email=uc.email,
                          password=password_hash, salt=salt, is_admin=False,
                          created_at=now, updated_at=now)
        uid = self.user_manager.create_user(user)
        return User(id=uid, email=uc.email)

    def sign_in(self, email: str, password: str) -> Optional[User]:
        if not email:
            raise ValueError(ERR_EMPTY_EMAIL)
        if not password:
            raise ValueError(ERR_EMPTY_PASSWORD)

        user = self.user_manager.get_user_by_email(email)
        if not user:
            return None
        password_hash = sha1_hash(password + user.salt)
        if user.password != password_hash:
            raise ValueError("wrong password")
        return User(id=user.id, email=user.email)


def random_string(length: int) -> str:
    source = random.Random(time.time())
    charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
    result = ''.join(source.choice(charset) for _ in range(length))
    return result


def sha1_hash(input_str: str) -> str:
    hash_object = hashlib.sha1(input_str.encode())
    hash_hex = hash_object.hexdigest()
    return hash_hex

函数 random_stringsha1_hash 是用于 salt 生成和密码哈希的辅助函数。

为什么需要盐(salt)?

没有salt,攻击者可以为常见密码预先计算哈希值并将其存储在表中(称为彩虹表)。如果数据库遭到入侵,攻击者可以将哈希密码与预先计算的哈希值进行比较,从而快速识别密码。为每个密码添加 salt 确保即使两个用户使用相同的密码,由于唯一的 salt,它们的哈希密码也会不同。

即使用户选择了弱密码,例如常见的词典单词,salt 也确保了生成的哈希值是唯一的。没有 salt,相同密码的所有实例都会哈希到相同的值,使其易受攻击。

添加 DTO,application/dto/user.py:

from dataclasses import dataclass


@dataclass
class UserCredential:
    email: str
    password: str


@dataclass
class User:
    id: int
    email: str

DTO 表示数据传输对象(Data Transfer Object)。

在应用程序中,数据的内部表示通常可能与你想通过 API 公开展示的表示方式不同。数据传输对象(DTO)允许你将内部数据结构转换为更适合 API 消费者的格式。这有助于将内部表示与外部表示解耦,从而实现更好的灵活性和抽象化。

引入新依赖,application/wire_helper.py:

@@ -1,4 +1,4 @@
-from books.domain.gateway import BookManager, ReviewManager
+from books.domain.gateway import BookManager, ReviewManager, UserManager
 from books.infrastructure.cache import RedisCache, CacheHelper
 from ..infrastructure.config import Config
 from ..infrastructure.database import MySQLPersistence, MongoPersistence
@@ -25,3 +25,6 @@ class WireHelper:
 
     def cache_helper(self) -> CacheHelper:
         return self.kvStore
+
+    def user_manager(self) -> UserManager:
+        return self.sqlPersistence

最后,添加路由,adapter/router.py:

@@ -1,17 +1,19 @@
 import logging
 from flask import Flask, request, jsonify
 
-from ..application.executor import BookOperator, ReviewOperator
+from books.application.dto import UserCredential
+from ..application.executor import BookOperator, ReviewOperator, UserOperator
 from ..application import WireHelper
 from ..domain.model import Book, Review
 from .util import dataclass_from_dict
 
 
 class RestHandler:
-    def __init__(self, logger: logging.Logger, book_operator: BookOperator, review_operator: ReviewOperator):
+    def __init__(self, logger: logging.Logger, book_operator: BookOperator, review_operator: ReviewOperator, user_operator: UserOperator):
         self._logger = logger
         self.book_operator = book_operator
         self.review_operator = review_operator
+        self.user_operator = user_operator
 
     def get_books(self):
         try:
@@ -104,6 +106,26 @@ class RestHandler:
             self._logger.error(f"Failed to delete: {e}")
             return jsonify({"error": "Failed to delete"}), 404
 
+    def user_sign_up(self):
+        try:
+            u = dataclass_from_dict(UserCredential, request.json)
+            user = self.user_operator.create_user(u)
+            return jsonify(user), 201
+        except Exception as e:
+            self._logger.error(f"Failed to create: {e}")
+            return jsonify({"error": "Failed to create"}), 400
+
+    def user_sign_in(self):
+        try:
+            u = dataclass_from_dict(UserCredential, request.json)
+            user = self.user_operator.sign_in(u.email, u.password)
+            if user is None:
+                return jsonify({"error": f"No user with email {u.email}"}), 404
+            return jsonify(user), 200
+        except Exception as e:
+            self._logger.error(f"Failed to sign in: {e}")
+            return jsonify({"error": f"Failed to sign in: {e}"}), 400
+
 
 def health():
     return jsonify({"status": "ok"})
@@ -115,7 +137,8 @@ def make_router(app: Flask, wire_helper: WireHelper):
         BookOperator(
             wire_helper.book_manager(),
             wire_helper.cache_helper()),
-        ReviewOperator(wire_helper.review_manager()))
+        ReviewOperator(wire_helper.review_manager()),
+        UserOperator(wire_helper.user_manager()))
     app.add_url_rule('/', view_func=health)
     app.add_url_rule('/books', view_func=rest_handler.get_books)
     app.add_url_rule('/books/<int:id>', view_func=rest_handler.get_book)
@@ -134,3 +157,7 @@ def make_router(app: Flask, wire_helper: WireHelper):
                      methods=['PUT'])
     app.add_url_rule('/reviews/<id>', view_func=rest_handler.delete_review,
                      methods=['DELETE'])
+    app.add_url_rule('/users', view_func=rest_handler.user_sign_up,
+                     methods=['POST'])
+    app.add_url_rule('/users/sign-in', view_func=rest_handler.user_sign_in,
+                     methods=['POST'])

使用 curl 测试

用户注册:

curl -X POST -H "Content-Type: application/json" -d '{"email": "test-user@example.com", "password": "test-pass"}' http://localhost:5000/users

结果:

{
  "email": "test-user@example.com",
  "id": 2
}

如果你检查数据库中的记录,你将看到类似下面内容:

+----+-----------------------+------------------------------------------+------+----------+-------------------------+-------------------------+
| id | email                 | password                                 | salt | is_admin | created_at              | updated_at              |
+----+-----------------------+------------------------------------------+------+----------+-------------------------+-------------------------+
|  2 | test-user@example.com | acced10162fdabc812d830f2f1edf74a63b57a38 | 3qV4 |        0 | 2024-03-08 15:49:35.989 | 2024-03-08 15:49:35.989 |
+----+-----------------------+------------------------------------------+------+----------+-------------------------+-------------------------+

用户成功登录:

curl -X POST -H "Content-Type: application/json" -d '{"email": "test-user@example.com", "password": "test-pass"}' http://localhost:5000/users/sign-in

结果:

{
  "email": "test-user@example.com",
  "id": 2
}

用户使用错误密码登录:

curl -X POST -H "Content-Type: application/json" -d '{"email": "test-user@example.com", "password": "wrong-pass"}' http://localhost:5000/users/sign-in

结果:

{
  "error": "Failed to sign in: wrong password"
}

注意
为成功和错误响应使用统一的 json 格式是一种最佳实践。 比如:

{"code":0, "message":"ok", "data": {"id":2,"email":"test-user@example.com"}}
{"code":1001, "message":"wrong password", "data": null}

使用 JWT(JSON Web Token)

JWT(JSON Web Token)是一种紧凑、URL 安全的用于表示传输双方之间声明 claims 的方式。它经常用于 Web 应用程序和 API 中的身份验证和授权。

安装 JWT 依赖

pip3 install PyJWT

更新 requirements.txt:

pip3 freeze > requirements.txt 

此刻文件内容大致如下:

async-timeout==4.0.3
blinker==1.7.0
click==8.1.7
dnspython==2.6.1
Flask==3.0.2
importlib-metadata==7.0.1
itsdangerous==2.1.2
Jinja2==3.1.3
MarkupSafe==2.1.5
mysql-connector-python==8.3.0
PyJWT==2.8.0
pymongo==4.6.2
PyYAML==6.0.1
redis==5.0.2
Werkzeug==3.0.1
zipp==3.17.0

更新代码

添加 UserPermission 枚举,domain/model/user.py:

@@ -1,5 +1,6 @@
 from dataclasses import dataclass
 from datetime import datetime
+from enum import IntEnum
 
 
 @dataclass
@@ -11,3 +12,10 @@ class User:
     is_admin: bool
     created_at: datetime
     updated_at: datetime
+
+
+class UserPermission(IntEnum):
+    PermNone = 0
+    PermUser = 1
+    PermAuthor = 2
+    PermAdmin = 3

声明它的业务能力,domain/gateway/user_manager.py:

@@ -1,7 +1,7 @@
 from abc import ABC, abstractmethod
 from typing import Optional
 
-from ..model import User
+from ..model import User, UserPermission
 
 
 class UserManager(ABC):
@@ -12,3 +12,13 @@ class UserManager(ABC):
     @abstractmethod
     def get_user_by_email(self, email: str) -> Optional[User]:
         pass
+
+
+class PermissionManager(ABC):
+    @abstractmethod
+    def generate_token(self, user_id: int, email: str, perm: UserPermission) -> str:
+        pass
+
+    @abstractmethod
+    def has_permission(self, token: str, perm: UserPermission) -> bool:
+        pass

实现 2 个方法,infrastructure/token/jwt.py:

import jwt
import time
from typing import Optional

from books.domain.gateway import PermissionManager
from ...domain.model import UserPermission

ERR_INVALID_TOKEN = "invalid token"
ERR_FAIL_TO_DECODE = "failed to decode token"


class TokenKeeper(PermissionManager):
    def __init__(self, secret_key: str, expire_in_hours: int):
        self.secret_key = secret_key.encode()
        self.expire_hours = expire_in_hours

    def generate_token(self, user_id: int, email: str, perm: UserPermission) -> str:
        expiration_time = int(time.time()) + self.expire_hours * 3600
        claims = {
            "user_id": user_id,
            "user_name": email,
            "permission": perm,
            "exp": expiration_time
        }
        token_result = jwt.encode(claims, self.secret_key, algorithm='HS256')
        return token_result

    def extract_token(self, token_result: str) -> Optional[dict]:
        try:
            claims = jwt.decode(
                token_result, self.secret_key, algorithms=['HS256'])
            return claims
        except jwt.ExpiredSignatureError:
            raise ValueError(ERR_INVALID_TOKEN)
        except jwt.DecodeError:
            raise ValueError(ERR_FAIL_TO_DECODE)

    def has_permission(self, token_result: str, perm: UserPermission) -> bool:
        claims = self.extract_token(token_result)
        if not claims:
            return False
        return claims.get("permission", UserPermission.PermNone) >= perm

添加 Tokens 相关的配置项,infrastructure/config/config.py:

@@ -26,6 +26,8 @@ class CacheConfig:
 class ApplicationConfig:
     port: int
     page_size: int
+    token_secret: str
+    token_hours: int
 
 
 @dataclass

置入其值,config.yml:

@@ -1,6 +1,8 @@
 app:
   port: 5000
   page_size: 5
+  token_secret: "I_Love_LiteRank"
+  token_hours: 24
 db:
   file_name: "test.db"
   host: "127.0.0.1"

生成并返回 tokens,application/executor/user_operator.py:

@@ -4,9 +4,9 @@ import random
 import time
 from typing import Optional
 
-from books.application.dto import UserCredential, User
-from books.domain.gateway import UserManager
-from books.domain.model import User as DomainUser
+from books.application.dto import UserCredential, User, UserToken
+from books.domain.gateway import UserManager, PermissionManager
+from books.domain.model import User as DomainUser, UserPermission
 
 SALT_LEN = 4
 ERR_EMPTY_EMAIL = "empty email"
@@ -14,8 +14,9 @@ ERR_EMPTY_PASSWORD = "empty password"
 
 
 class UserOperator:
-    def __init__(self, user_manager: UserManager):
+    def __init__(self, user_manager: UserManager, perm_manager: PermissionManager):
         self.user_manager = user_manager
+        self.perm_manager = perm_manager
 
     def create_user(self, uc: UserCredential) -> Optional[User]:
         if not uc.email:
@@ -33,7 +34,7 @@ class UserOperator:
         uid = self.user_manager.create_user(user)
         return User(id=uid, email=uc.email)
 
-    def sign_in(self, email: str, password: str) -> Optional[User]:
+    def sign_in(self, email: str, password: str) -> Optional[UserToken]:
         if not email:
             raise ValueError(ERR_EMPTY_EMAIL)
         if not password:
@@ -45,7 +46,12 @@ class UserOperator:
         password_hash = sha1_hash(password + user.salt)
         if user.password != password_hash:
             raise ValueError("wrong password")
-        return User(id=user.id, email=user.email)
+        token = self.perm_manager.generate_token(
+            user.id, user.email,
+            UserPermission.PermAdmin if user.is_admin else UserPermission.PermUser)
+        return UserToken(
+            User(id=user.id, email=user.email),
+            token)
 
 
 def random_string(length: int) -> str:

添加 DTO,application/dto/user.py:

@@ -11,3 +11,9 @@ class UserCredential:
 class User:
     id: int
     email: str
+
+
+@dataclass
+class UserToken:
+    user: User
+    token: str

微调 application/wire_helper.py:

@@ -1,25 +1,35 @@
-from books.domain.gateway import BookManager, ReviewManager, UserManager
+from books.domain.gateway import BookManager, ReviewManager, UserManager, PermissionManager
 from books.infrastructure.cache import RedisCache, CacheHelper
+from books.infrastructure.token import TokenKeeper
 from ..infrastructure.config import Config
 from ..infrastructure.database import MySQLPersistence, MongoPersistence
 
 
 class WireHelper:
-    def __init__(self, sqlPersistence: MySQLPersistence, noSQLPersistence: MongoPersistence, kvStore: RedisCache):
+    def __init__(self,
+                 sqlPersistence: MySQLPersistence,
+                 noSQLPersistence: MongoPersistence,
+                 kvStore: RedisCache,
+                 tokenKeeper: TokenKeeper):
         self.sqlPersistence = sqlPersistence
         self.noSQLPersistence = noSQLPersistence
         self.kvStore = kvStore
+        self.tokenKeeper = tokenKeeper
 
     @classmethod
     def new(cls, c: Config):
         db = MySQLPersistence(c.db, c.app.page_size)
         mdb = MongoPersistence(c.db.mongo_uri, c.db.mongo_db_name)
         kv = RedisCache(c.cache)
-        return cls(db, mdb, kv)
+        tk = TokenKeeper(c.app.token_secret, c.app.token_hours)
+        return cls(db, mdb, kv, tk)
 
     def book_manager(self) -> BookManager:
         return self.sqlPersistence
 
+    def perm_manager(self) -> PermissionManager:
+        return self.tokenKeeper
+
     def review_manager(self) -> ReviewManager:
         return self.noSQLPersistence

添加 perm_check 中间件到创建、更新、删除路由中 adapter/router.py:

@@ -1,7 +1,9 @@
 import logging
 from flask import Flask, request, jsonify
+from books.adapter.middleware import perm_check
 
 from books.application.dto import UserCredential
+from books.domain.model.user import UserPermission
 from ..application.executor import BookOperator, ReviewOperator, UserOperator
 from ..application import WireHelper
 from ..domain.model import Book, Review
@@ -138,15 +140,15 @@ def make_router(app: Flask, wire_helper: WireHelper):
             wire_helper.book_manager(),
             wire_helper.cache_helper()),
         ReviewOperator(wire_helper.review_manager()),
-        UserOperator(wire_helper.user_manager()))
+        UserOperator(wire_helper.user_manager(), wire_helper.perm_manager()))
     app.add_url_rule('/', view_func=health)
     app.add_url_rule('/books', view_func=rest_handler.get_books)
     app.add_url_rule('/books/<int:id>', view_func=rest_handler.get_book)
-    app.add_url_rule('/books', view_func=rest_handler.create_book,
+    app.add_url_rule('/books', view_func=perm_check(wire_helper.perm_manager(), UserPermission.PermAuthor)(rest_handler.create_book),
                      methods=['POST'])
-    app.add_url_rule('/books/<int:id>', view_func=rest_handler.update_book,
+    app.add_url_rule('/books/<int:id>', view_func=perm_check(wire_helper.perm_manager(), UserPermission.PermAuthor)(rest_handler.update_book),
                      methods=['PUT'])
-    app.add_url_rule('/books/<int:id>', view_func=rest_handler.delete_book,
+    app.add_url_rule('/books/<int:id>', view_func=perm_check(wire_helper.perm_manager(), UserPermission.PermAuthor)(rest_handler.delete_book),
                      methods=['DELETE'])
     app.add_url_rule('/books/<int:book_id>/reviews',
                      view_func=rest_handler.get_reviews_of_book)

perm_check 的实现,adapter/middleware.py:

from functools import wraps
from flask import request, jsonify
from books.domain.gateway import PermissionManager
from books.domain.model.user import UserPermission

TOKEN_PREFIX = "Bearer "


def perm_check(perm_manager: PermissionManager, allow_perm: UserPermission):
    def middleware(view_func):
        @wraps(view_func)
        def wrapped_view(*args, **kwargs):
            auth_header = request.headers.get("Authorization")
            if not auth_header:
                return jsonify({"error": "Token is required"}), 401

            token = auth_header.replace(TOKEN_PREFIX, "", 1)
            try:
                has_perm = perm_manager.has_permission(token, allow_perm)
            except Exception as e:
                return jsonify({"error": f"{e}"}), 401
            if not has_perm:
                return jsonify({"error": "Unauthorized"}), 401
            return view_func(*args, **kwargs)
        return wrapped_view
    return middleware

以上就是 API 中使用 JWT 的所需更改。让我们再次测试一下服务器功能!

使用 curl 测试

用户登录获得 Token

curl -X POST -H "Content-Type: application/json" -d '{"email": "test-user@example.com", "password": "test-pass"}' http://localhost:5000/users/sign-in

结果:

{
  "token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoyLCJ1c2VyX25hbWUiOiJ0ZXN0LXVzZXJAZXhhbXBsZS5jb20iLCJwZXJtaXNzaW9uIjozLCJleHAiOjE3MDk5ODk1NTB9.aUTu4YI2_9aUWEvffe7pRQbAuivIq73NsPy6IXYypCk",
  "user": {
    "email": "test-user@example.com",
    "id": 2
  }
}

将 token 放到 https://jwt.io/ 调试器中,可查看 token 的负载和签名是否正常。

JWT Debugger

创建新的图书,携带合法正常的 Token

curl -X POST \
  http://localhost:5000/books \
  -H 'Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoyLCJ1c2VyX25hbWUiOiJ0ZXN0LXVzZXJAZXhhbXBsZS5jb20iLCJwZXJtaXNzaW9uIjozLCJleHAiOjE3MDk5ODk1NTB9.aUTu4YI2_9aUWEvffe7pRQbAuivIq73NsPy6IXYypCk' \
  -H 'Content-Type: application/json' \
  -d '{
    "title": "Test Book",
    "author": "John Doe",
    "published_at": "2003-01-01",
    "description": "A sample book description",
    "isbn": "1234567890",
    "total_pages": 100
}'

成功结果:

{
  "author": "John Doe",
  "created_at": null,
  "description": "A sample book description",
  "id": 18,
  "isbn": "1234567890",
  "published_at": "2003-01-01",
  "title": "Test Book",
  "total_pages": 100,
  "updated_at": null
}

创建新的图书,携带合法 Token,但是权限不够

curl -X POST \
  http://localhost:5000/books \
  -H 'Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjo2LCJ1c2VyX25hbWUiOiJ0ZXN0LW5vZGVAZXhhbXBsZS5jb20iLCJwZXJtaXNzaW9uIjoxLCJleHAiOjE3MDk5ODk5MjN9.KhNCNRFYVAe67pyrFPH_bCFMbn83y55AsCtlDa3v0Sk' \
  -H 'Content-Type: application/json' \
  -d '{
    "title": "Test Book",
    "author": "John Doe",
    "published_at": "2003-01-01",
    "description": "A sample book description",
    "isbn": "1234567890",
    "total_pages": 100
}'

结果:

{
  "error": "Unauthorized"
}

创建新的图书,不带 Token

curl -X POST \
  http://localhost:5000/books \
  -H 'Content-Type: application/json' \
  -d '{
    "title": "Test Book",
    "author": "John Doe",
    "published_at": "2003-01-01",
    "description": "A sample book description",
    "isbn": "1234567890",
    "total_pages": 100
}'

结果:

{
  "error": "Token is required"
}

创建新的图书,带一个假 Token

curl -X POST \
  http://localhost:5000/books \
  -H 'Authorization: Bearer FAKE_TOKEN' \
  -H 'Content-Type: application/json' \
  -d '{
    "title": "Test Book",
    "author": "John Doe",
    "published_at": "2003-01-01",
    "description": "A sample book description",
    "isbn": "1234567890",
    "total_pages": 100
}'

结果:

{
  "error": "failed to decode token"
}

完美! 🎉

现在,你的部分端点已经被权限认证机制保护起来啦。

上页下页