权限认证
在 API 服务器中想对特定资源或功能的访问进行控制就需要实行身份权限验证。
换句话说,如果你想要将某些 API 端点或数据限制给具有特殊角色的用户(例如管理员、版主或高级用户等)使用,你就需要进行身份权限验证。
传统用户流程
更新代码
添加 User
领域实体,domain/model/user.py:
from dataclasses import dataclass
from datetime import datetime
@dataclass
class User:
id: int
email: str
password: str
salt: str
is_admin: bool
created_at: datetime
updated_at: datetime
声明其业务能力,domain/gateway/user_manager.py:
from abc import ABC, abstractmethod
from typing import Optional
from ..model import User
class UserManager(ABC):
@abstractmethod
def create_user(self, u: User) -> int:
pass
@abstractmethod
def get_user_by_email(self, email: str) -> Optional[User]:
pass
添加其实现,infrastructure/database/mysql.py:
@@ -3,11 +3,11 @@ from typing import Any, List, Optional
from books.infrastructure.config import DBConfig
-from ...domain.gateway import BookManager
-from ...domain.model import Book
+from ...domain.gateway import BookManager, UserManager
+from ...domain.model import Book, User
-class MySQLPersistence(BookManager):
+class MySQLPersistence(BookManager, UserManager):
def __init__(self, c: DBConfig, page_size: int):
self.page_size = page_size
self.conn = mysql.connector.connect(
@@ -73,3 +73,18 @@ class MySQLPersistence(BookManager):
self.cursor.execute(query, tuple(params))
results: List[Any] = self.cursor.fetchall()
return [Book(**result) for result in results]
+
+ def create_user(self, u: User) -> int:
+ self.cursor.execute('''
+ INSERT INTO users (email, password, salt, is_admin, created_at, updated_at) VALUES (%s, %s, %s, %s, %s, %s)
+ ''', (u.email, u.password, u.salt, u.is_admin, u.created_at, u.updated_at))
+ return self.cursor.lastrowid or 0
+
+ def get_user_by_email(self, email: str) -> Optional[User]:
+ self.cursor.execute('''
+ SELECT * FROM users WHERE email=%s
+ ''', (email,))
+ result: Any = self.cursor.fetchone()
+ if result is None:
+ return None
+ return User(**result)
实现用户注册和登录功能,application/executor/user_operator.py:
from datetime import datetime
import hashlib
import random
import time
from typing import Optional
from books.application.dto import UserCredential, User
from books.domain.gateway import UserManager
from books.domain.model import User as DomainUser
SALT_LEN = 4
ERR_EMPTY_EMAIL = "empty email"
ERR_EMPTY_PASSWORD = "empty password"
class UserOperator:
def __init__(self, user_manager: UserManager):
self.user_manager = user_manager
def create_user(self, uc: UserCredential) -> Optional[User]:
if not uc.email:
raise ValueError(ERR_EMPTY_EMAIL)
if not uc.password:
raise ValueError(ERR_EMPTY_PASSWORD)
salt = random_string(SALT_LEN)
password_hash = sha1_hash(uc.password + salt)
now = datetime.now()
user = DomainUser(id=0, email=uc.email,
password=password_hash, salt=salt, is_admin=False,
created_at=now, updated_at=now)
uid = self.user_manager.create_user(user)
return User(id=uid, email=uc.email)
def sign_in(self, email: str, password: str) -> Optional[User]:
if not email:
raise ValueError(ERR_EMPTY_EMAIL)
if not password:
raise ValueError(ERR_EMPTY_PASSWORD)
user = self.user_manager.get_user_by_email(email)
if not user:
return None
password_hash = sha1_hash(password + user.salt)
if user.password != password_hash:
raise ValueError("wrong password")
return User(id=user.id, email=user.email)
def random_string(length: int) -> str:
source = random.Random(time.time())
charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
result = ''.join(source.choice(charset) for _ in range(length))
return result
def sha1_hash(input_str: str) -> str:
hash_object = hashlib.sha1(input_str.encode())
hash_hex = hash_object.hexdigest()
return hash_hex
函数 random_string
和 sha1_hash
是用于 salt 生成和密码哈希的辅助函数。
为什么需要盐(salt)?
没有salt,攻击者可以为常见密码预先计算哈希值并将其存储在表中(称为彩虹表)。如果数据库遭到入侵,攻击者可以将哈希密码与预先计算的哈希值进行比较,从而快速识别密码。为每个密码添加 salt 确保即使两个用户使用相同的密码,由于唯一的 salt,它们的哈希密码也会不同。
即使用户选择了弱密码,例如常见的词典单词,salt 也确保了生成的哈希值是唯一的。没有 salt,相同密码的所有实例都会哈希到相同的值,使其易受攻击。
添加 DTO,application/dto/user.py:
from dataclasses import dataclass
@dataclass
class UserCredential:
email: str
password: str
@dataclass
class User:
id: int
email: str
DTO 表示数据传输对象(Data Transfer Object)。
在应用程序中,数据的内部表示通常可能与你想通过 API 公开展示的表示方式不同。数据传输对象(DTO)允许你将内部数据结构转换为更适合 API 消费者的格式。这有助于将内部表示与外部表示解耦,从而实现更好的灵活性和抽象化。
引入新依赖,application/wire_helper.py:
@@ -1,4 +1,4 @@
-from books.domain.gateway import BookManager, ReviewManager
+from books.domain.gateway import BookManager, ReviewManager, UserManager
from books.infrastructure.cache import RedisCache, CacheHelper
from ..infrastructure.config import Config
from ..infrastructure.database import MySQLPersistence, MongoPersistence
@@ -25,3 +25,6 @@ class WireHelper:
def cache_helper(self) -> CacheHelper:
return self.kvStore
+
+ def user_manager(self) -> UserManager:
+ return self.sqlPersistence
最后,添加路由,adapter/router.py:
@@ -1,17 +1,19 @@
import logging
from flask import Flask, request, jsonify
-from ..application.executor import BookOperator, ReviewOperator
+from books.application.dto import UserCredential
+from ..application.executor import BookOperator, ReviewOperator, UserOperator
from ..application import WireHelper
from ..domain.model import Book, Review
from .util import dataclass_from_dict
class RestHandler:
- def __init__(self, logger: logging.Logger, book_operator: BookOperator, review_operator: ReviewOperator):
+ def __init__(self, logger: logging.Logger, book_operator: BookOperator, review_operator: ReviewOperator, user_operator: UserOperator):
self._logger = logger
self.book_operator = book_operator
self.review_operator = review_operator
+ self.user_operator = user_operator
def get_books(self):
try:
@@ -104,6 +106,26 @@ class RestHandler:
self._logger.error(f"Failed to delete: {e}")
return jsonify({"error": "Failed to delete"}), 404
+ def user_sign_up(self):
+ try:
+ u = dataclass_from_dict(UserCredential, request.json)
+ user = self.user_operator.create_user(u)
+ return jsonify(user), 201
+ except Exception as e:
+ self._logger.error(f"Failed to create: {e}")
+ return jsonify({"error": "Failed to create"}), 400
+
+ def user_sign_in(self):
+ try:
+ u = dataclass_from_dict(UserCredential, request.json)
+ user = self.user_operator.sign_in(u.email, u.password)
+ if user is None:
+ return jsonify({"error": f"No user with email {u.email}"}), 404
+ return jsonify(user), 200
+ except Exception as e:
+ self._logger.error(f"Failed to sign in: {e}")
+ return jsonify({"error": f"Failed to sign in: {e}"}), 400
+
def health():
return jsonify({"status": "ok"})
@@ -115,7 +137,8 @@ def make_router(app: Flask, wire_helper: WireHelper):
BookOperator(
wire_helper.book_manager(),
wire_helper.cache_helper()),
- ReviewOperator(wire_helper.review_manager()))
+ ReviewOperator(wire_helper.review_manager()),
+ UserOperator(wire_helper.user_manager()))
app.add_url_rule('/', view_func=health)
app.add_url_rule('/books', view_func=rest_handler.get_books)
app.add_url_rule('/books/<int:id>', view_func=rest_handler.get_book)
@@ -134,3 +157,7 @@ def make_router(app: Flask, wire_helper: WireHelper):
methods=['PUT'])
app.add_url_rule('/reviews/<id>', view_func=rest_handler.delete_review,
methods=['DELETE'])
+ app.add_url_rule('/users', view_func=rest_handler.user_sign_up,
+ methods=['POST'])
+ app.add_url_rule('/users/sign-in', view_func=rest_handler.user_sign_in,
+ methods=['POST'])
使用 curl 测试
用户注册:
curl -X POST -H "Content-Type: application/json" -d '{"email": "test-user@example.com", "password": "test-pass"}' http://localhost:5000/users
结果:
{
"email": "test-user@example.com",
"id": 2
}
如果你检查数据库中的记录,你将看到类似下面内容:
+----+-----------------------+------------------------------------------+------+----------+-------------------------+-------------------------+
| id | email | password | salt | is_admin | created_at | updated_at |
+----+-----------------------+------------------------------------------+------+----------+-------------------------+-------------------------+
| 2 | test-user@example.com | acced10162fdabc812d830f2f1edf74a63b57a38 | 3qV4 | 0 | 2024-03-08 15:49:35.989 | 2024-03-08 15:49:35.989 |
+----+-----------------------+------------------------------------------+------+----------+-------------------------+-------------------------+
用户成功登录:
curl -X POST -H "Content-Type: application/json" -d '{"email": "test-user@example.com", "password": "test-pass"}' http://localhost:5000/users/sign-in
结果:
{
"email": "test-user@example.com",
"id": 2
}
用户使用错误密码登录:
curl -X POST -H "Content-Type: application/json" -d '{"email": "test-user@example.com", "password": "wrong-pass"}' http://localhost:5000/users/sign-in
结果:
{
"error": "Failed to sign in: wrong password"
}
注意:
为成功和错误响应使用统一的 json 格式是一种最佳实践。 比如:{"code":0, "message":"ok", "data": {"id":2,"email":"test-user@example.com"}} {"code":1001, "message":"wrong password", "data": null}
使用 JWT(JSON Web Token)
JWT(JSON Web Token)是一种紧凑、URL 安全的用于表示传输双方之间声明 claims 的方式。它经常用于 Web 应用程序和 API 中的身份验证和授权。
安装 JWT 依赖
pip3 install PyJWT
更新 requirements.txt:
pip3 freeze > requirements.txt
此刻文件内容大致如下:
async-timeout==4.0.3
blinker==1.7.0
click==8.1.7
dnspython==2.6.1
Flask==3.0.2
importlib-metadata==7.0.1
itsdangerous==2.1.2
Jinja2==3.1.3
MarkupSafe==2.1.5
mysql-connector-python==8.3.0
PyJWT==2.8.0
pymongo==4.6.2
PyYAML==6.0.1
redis==5.0.2
Werkzeug==3.0.1
zipp==3.17.0
更新代码
添加 UserPermission
枚举,domain/model/user.py:
@@ -1,5 +1,6 @@
from dataclasses import dataclass
from datetime import datetime
+from enum import IntEnum
@dataclass
@@ -11,3 +12,10 @@ class User:
is_admin: bool
created_at: datetime
updated_at: datetime
+
+
+class UserPermission(IntEnum):
+ PermNone = 0
+ PermUser = 1
+ PermAuthor = 2
+ PermAdmin = 3
声明它的业务能力,domain/gateway/user_manager.py:
@@ -1,7 +1,7 @@
from abc import ABC, abstractmethod
from typing import Optional
-from ..model import User
+from ..model import User, UserPermission
class UserManager(ABC):
@@ -12,3 +12,13 @@ class UserManager(ABC):
@abstractmethod
def get_user_by_email(self, email: str) -> Optional[User]:
pass
+
+
+class PermissionManager(ABC):
+ @abstractmethod
+ def generate_token(self, user_id: int, email: str, perm: UserPermission) -> str:
+ pass
+
+ @abstractmethod
+ def has_permission(self, token: str, perm: UserPermission) -> bool:
+ pass
实现 2 个方法,infrastructure/token/jwt.py:
import jwt
import time
from typing import Optional
from books.domain.gateway import PermissionManager
from ...domain.model import UserPermission
ERR_INVALID_TOKEN = "invalid token"
ERR_FAIL_TO_DECODE = "failed to decode token"
class TokenKeeper(PermissionManager):
def __init__(self, secret_key: str, expire_in_hours: int):
self.secret_key = secret_key.encode()
self.expire_hours = expire_in_hours
def generate_token(self, user_id: int, email: str, perm: UserPermission) -> str:
expiration_time = int(time.time()) + self.expire_hours * 3600
claims = {
"user_id": user_id,
"user_name": email,
"permission": perm,
"exp": expiration_time
}
token_result = jwt.encode(claims, self.secret_key, algorithm='HS256')
return token_result
def extract_token(self, token_result: str) -> Optional[dict]:
try:
claims = jwt.decode(
token_result, self.secret_key, algorithms=['HS256'])
return claims
except jwt.ExpiredSignatureError:
raise ValueError(ERR_INVALID_TOKEN)
except jwt.DecodeError:
raise ValueError(ERR_FAIL_TO_DECODE)
def has_permission(self, token_result: str, perm: UserPermission) -> bool:
claims = self.extract_token(token_result)
if not claims:
return False
return claims.get("permission", UserPermission.PermNone) >= perm
添加 Tokens 相关的配置项,infrastructure/config/config.py:
@@ -26,6 +26,8 @@ class CacheConfig:
class ApplicationConfig:
port: int
page_size: int
+ token_secret: str
+ token_hours: int
@dataclass
置入其值,config.yml:
@@ -1,6 +1,8 @@
app:
port: 5000
page_size: 5
+ token_secret: "I_Love_LiteRank"
+ token_hours: 24
db:
file_name: "test.db"
host: "127.0.0.1"
生成并返回 tokens,application/executor/user_operator.py:
@@ -4,9 +4,9 @@ import random
import time
from typing import Optional
-from books.application.dto import UserCredential, User
-from books.domain.gateway import UserManager
-from books.domain.model import User as DomainUser
+from books.application.dto import UserCredential, User, UserToken
+from books.domain.gateway import UserManager, PermissionManager
+from books.domain.model import User as DomainUser, UserPermission
SALT_LEN = 4
ERR_EMPTY_EMAIL = "empty email"
@@ -14,8 +14,9 @@ ERR_EMPTY_PASSWORD = "empty password"
class UserOperator:
- def __init__(self, user_manager: UserManager):
+ def __init__(self, user_manager: UserManager, perm_manager: PermissionManager):
self.user_manager = user_manager
+ self.perm_manager = perm_manager
def create_user(self, uc: UserCredential) -> Optional[User]:
if not uc.email:
@@ -33,7 +34,7 @@ class UserOperator:
uid = self.user_manager.create_user(user)
return User(id=uid, email=uc.email)
- def sign_in(self, email: str, password: str) -> Optional[User]:
+ def sign_in(self, email: str, password: str) -> Optional[UserToken]:
if not email:
raise ValueError(ERR_EMPTY_EMAIL)
if not password:
@@ -45,7 +46,12 @@ class UserOperator:
password_hash = sha1_hash(password + user.salt)
if user.password != password_hash:
raise ValueError("wrong password")
- return User(id=user.id, email=user.email)
+ token = self.perm_manager.generate_token(
+ user.id, user.email,
+ UserPermission.PermAdmin if user.is_admin else UserPermission.PermUser)
+ return UserToken(
+ User(id=user.id, email=user.email),
+ token)
def random_string(length: int) -> str:
添加 DTO,application/dto/user.py:
@@ -11,3 +11,9 @@ class UserCredential:
class User:
id: int
email: str
+
+
+@dataclass
+class UserToken:
+ user: User
+ token: str
微调 application/wire_helper.py:
@@ -1,25 +1,35 @@
-from books.domain.gateway import BookManager, ReviewManager, UserManager
+from books.domain.gateway import BookManager, ReviewManager, UserManager, PermissionManager
from books.infrastructure.cache import RedisCache, CacheHelper
+from books.infrastructure.token import TokenKeeper
from ..infrastructure.config import Config
from ..infrastructure.database import MySQLPersistence, MongoPersistence
class WireHelper:
- def __init__(self, sqlPersistence: MySQLPersistence, noSQLPersistence: MongoPersistence, kvStore: RedisCache):
+ def __init__(self,
+ sqlPersistence: MySQLPersistence,
+ noSQLPersistence: MongoPersistence,
+ kvStore: RedisCache,
+ tokenKeeper: TokenKeeper):
self.sqlPersistence = sqlPersistence
self.noSQLPersistence = noSQLPersistence
self.kvStore = kvStore
+ self.tokenKeeper = tokenKeeper
@classmethod
def new(cls, c: Config):
db = MySQLPersistence(c.db, c.app.page_size)
mdb = MongoPersistence(c.db.mongo_uri, c.db.mongo_db_name)
kv = RedisCache(c.cache)
- return cls(db, mdb, kv)
+ tk = TokenKeeper(c.app.token_secret, c.app.token_hours)
+ return cls(db, mdb, kv, tk)
def book_manager(self) -> BookManager:
return self.sqlPersistence
+ def perm_manager(self) -> PermissionManager:
+ return self.tokenKeeper
+
def review_manager(self) -> ReviewManager:
return self.noSQLPersistence
添加 perm_check
中间件到创建、更新、删除路由中 adapter/router.py:
@@ -1,7 +1,9 @@
import logging
from flask import Flask, request, jsonify
+from books.adapter.middleware import perm_check
from books.application.dto import UserCredential
+from books.domain.model.user import UserPermission
from ..application.executor import BookOperator, ReviewOperator, UserOperator
from ..application import WireHelper
from ..domain.model import Book, Review
@@ -138,15 +140,15 @@ def make_router(app: Flask, wire_helper: WireHelper):
wire_helper.book_manager(),
wire_helper.cache_helper()),
ReviewOperator(wire_helper.review_manager()),
- UserOperator(wire_helper.user_manager()))
+ UserOperator(wire_helper.user_manager(), wire_helper.perm_manager()))
app.add_url_rule('/', view_func=health)
app.add_url_rule('/books', view_func=rest_handler.get_books)
app.add_url_rule('/books/<int:id>', view_func=rest_handler.get_book)
- app.add_url_rule('/books', view_func=rest_handler.create_book,
+ app.add_url_rule('/books', view_func=perm_check(wire_helper.perm_manager(), UserPermission.PermAuthor)(rest_handler.create_book),
methods=['POST'])
- app.add_url_rule('/books/<int:id>', view_func=rest_handler.update_book,
+ app.add_url_rule('/books/<int:id>', view_func=perm_check(wire_helper.perm_manager(), UserPermission.PermAuthor)(rest_handler.update_book),
methods=['PUT'])
- app.add_url_rule('/books/<int:id>', view_func=rest_handler.delete_book,
+ app.add_url_rule('/books/<int:id>', view_func=perm_check(wire_helper.perm_manager(), UserPermission.PermAuthor)(rest_handler.delete_book),
methods=['DELETE'])
app.add_url_rule('/books/<int:book_id>/reviews',
view_func=rest_handler.get_reviews_of_book)
perm_check
的实现,adapter/middleware.py:
from functools import wraps
from flask import request, jsonify
from books.domain.gateway import PermissionManager
from books.domain.model.user import UserPermission
TOKEN_PREFIX = "Bearer "
def perm_check(perm_manager: PermissionManager, allow_perm: UserPermission):
def middleware(view_func):
@wraps(view_func)
def wrapped_view(*args, **kwargs):
auth_header = request.headers.get("Authorization")
if not auth_header:
return jsonify({"error": "Token is required"}), 401
token = auth_header.replace(TOKEN_PREFIX, "", 1)
try:
has_perm = perm_manager.has_permission(token, allow_perm)
except Exception as e:
return jsonify({"error": f"{e}"}), 401
if not has_perm:
return jsonify({"error": "Unauthorized"}), 401
return view_func(*args, **kwargs)
return wrapped_view
return middleware
以上就是 API 中使用 JWT 的所需更改。让我们再次测试一下服务器功能!
使用 curl 测试
用户登录获得 Token
curl -X POST -H "Content-Type: application/json" -d '{"email": "test-user@example.com", "password": "test-pass"}' http://localhost:5000/users/sign-in
结果:
{
"token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoyLCJ1c2VyX25hbWUiOiJ0ZXN0LXVzZXJAZXhhbXBsZS5jb20iLCJwZXJtaXNzaW9uIjozLCJleHAiOjE3MDk5ODk1NTB9.aUTu4YI2_9aUWEvffe7pRQbAuivIq73NsPy6IXYypCk",
"user": {
"email": "test-user@example.com",
"id": 2
}
}
将 token 放到 https://jwt.io/ 调试器中,可查看 token 的负载和签名是否正常。
创建新的图书,携带合法正常的 Token
curl -X POST \
http://localhost:5000/books \
-H 'Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoyLCJ1c2VyX25hbWUiOiJ0ZXN0LXVzZXJAZXhhbXBsZS5jb20iLCJwZXJtaXNzaW9uIjozLCJleHAiOjE3MDk5ODk1NTB9.aUTu4YI2_9aUWEvffe7pRQbAuivIq73NsPy6IXYypCk' \
-H 'Content-Type: application/json' \
-d '{
"title": "Test Book",
"author": "John Doe",
"published_at": "2003-01-01",
"description": "A sample book description",
"isbn": "1234567890",
"total_pages": 100
}'
成功结果:
{
"author": "John Doe",
"created_at": null,
"description": "A sample book description",
"id": 18,
"isbn": "1234567890",
"published_at": "2003-01-01",
"title": "Test Book",
"total_pages": 100,
"updated_at": null
}
创建新的图书,携带合法 Token,但是权限不够
curl -X POST \
http://localhost:5000/books \
-H 'Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjo2LCJ1c2VyX25hbWUiOiJ0ZXN0LW5vZGVAZXhhbXBsZS5jb20iLCJwZXJtaXNzaW9uIjoxLCJleHAiOjE3MDk5ODk5MjN9.KhNCNRFYVAe67pyrFPH_bCFMbn83y55AsCtlDa3v0Sk' \
-H 'Content-Type: application/json' \
-d '{
"title": "Test Book",
"author": "John Doe",
"published_at": "2003-01-01",
"description": "A sample book description",
"isbn": "1234567890",
"total_pages": 100
}'
结果:
{
"error": "Unauthorized"
}
创建新的图书,不带 Token
curl -X POST \
http://localhost:5000/books \
-H 'Content-Type: application/json' \
-d '{
"title": "Test Book",
"author": "John Doe",
"published_at": "2003-01-01",
"description": "A sample book description",
"isbn": "1234567890",
"total_pages": 100
}'
结果:
{
"error": "Token is required"
}
创建新的图书,带一个假 Token
curl -X POST \
http://localhost:5000/books \
-H 'Authorization: Bearer FAKE_TOKEN' \
-H 'Content-Type: application/json' \
-d '{
"title": "Test Book",
"author": "John Doe",
"published_at": "2003-01-01",
"description": "A sample book description",
"isbn": "1234567890",
"total_pages": 100
}'
结果:
{
"error": "failed to decode token"
}
完美! 🎉
现在,你的部分端点已经被权限认证机制保护起来啦。