» Python:使用Flask构建REST API » 2. 开发 » 2.5 4层架构

4层架构

目前,你的代码可以跑起来。但是,细观之下,不够“洁净”。

不够“洁净”

如下是代码不够洁净的几个点:

def get_db():
    db = getattr(g, '_database', None)
    if db is None:
        db = sqlite3.connect(DATABASE)
        g._database = db
  • 直接利用全局变量来存储数据库实例并在整个代码库中访问它并不可取。尽管 get_db 单例函数和 flask.g 上下文在一定程度上缓解了这个问题。
db = sqlite3.connect(DATABASE)
g._database = db
cursor = db.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS books (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    title TEXT NOT NULL,
    author TEXT NOT NULL,
    published_at TEXT NOT NULL,
    description TEXT NOT NULL,
    isbn TEXT NOT NULL,
    total_pages INTEGER NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    )''')
cursor.close()
  • 业务代码中去初始化数据库不是最佳实践。数据库操作属于“基础架构”,而不是“业务”。它们属于不同的关注点,你应该将其分开到不同地方。
@app.route('/books/<int:book_id>', methods=['GET'])
def get_book(book_id):
    query = "SELECT * FROM books WHERE id = ?"
    books = execute_query(query, (book_id,))
    if not books:
        return {"error": "Record not found"}, 404
    return jsonify(books[0])
  • 不推荐在路由处理程序中直接编写 SQL 查询。如果将来决定切换到新的 ORM 或数据库框架,这可能会产生较高的迁移成本。

关注点分离

Clean Arch 来自 https://blog.cleancoder.com/uncle-bob/2012/08/13/the-clean-architecture.html

整洁架构(The Clean Architecture) 是 Robert C. Martin 在他的书《整洁架构:软件结构与设计的工匠指南》中引入的软件架构模式。 它强调了软件系统内的关注点分离,其主要目标是使系统在其生命周期内更易于维护、可扩展和可测试。

4层架构在细节上有所不同,但与整洁架构类似。它们都有相同的目标,即关注点分离。它们都通过将软件划分为层来实现这种分离。

4 layers

  1. Adapter 适配器层:负责路由和适配框架、协议和前端显示(Web、移动等)。

  2. Application 应用层:主要负责获取输入、组装上下文、参数验证,调用领域层进行业务处理。该层是开放式的,应用层可以绕过领域层并直接访问基础设施层。

  3. Domain 领域层:封装核心业务逻辑,并通过领域服务和领域实体的方法向应用层提供业务实体和业务逻辑操作。领域是应用程序的核心,不依赖于任何其他层

  4. Infrastructure 基础设施层:主要处理诸如数据库上的CRUD操作、搜索引擎、文件系统、用于分布式服务的RPC等技术细节。外部依赖项需要通过此处的网关进行转换,然后才能被上面的应用层和领域层使用。

重构

让我们基于4层架构来重构。

新的目录结构如下所示:

projects/lr_rest_books_py
├── LICENSE
├── README.md
├── books
│   ├── __init__.py
│   ├── adapter
│   │   ├── __init__.py
│   │   ├── router.py
│   │   └── util.py
│   ├── application
│   │   ├── __init__.py
│   │   ├── executor
│   │   │   ├── __init__.py
│   │   │   └── book_operator.py
│   │   └── wire_helper.py
│   ├── domain
│   │   ├── __init__.py
│   │   ├── gateway
│   │   │   ├── __init__.py
│   │   │   └── book_manager.py
│   │   └── model
│   │       ├── __init__.py
│   │       └── book.py
│   └── infrastructure
│       ├── __init__.py
│       ├── config
│       │   ├── __init__.py
│       │   └── config.py
│       └── database
│           ├── __init__.py
│           └── sqlite.py
├── main.py
├── requirements.txt
└── test.db

移动 model/book.pydomain/model/book.py:

注意: 此教程内为了方便,一律省去 books/ 路径前缀。

from dataclasses import dataclass
from datetime import datetime

@dataclass
class Book:
    id: int
    title: str
    author: str
    published_at: str
    description: str
    isbn: str
    total_pages: int
    created_at: datetime
    updated_at: datetime

domain 目录存放核心业务规则。非业务代码远离该目录。

创建 domain/gateway/book_manager.py:

from abc import ABC, abstractmethod
from typing import List, Optional

from ..model import Book


class BookManager(ABC):
    @abstractmethod
    def create_book(self, b: Book) -> int:
        pass

    @abstractmethod
    def update_book(self, id: int, b: Book) -> None:
        pass

    @abstractmethod
    def delete_book(self, id: int) -> None:
        pass

    @abstractmethod
    def get_book(self, id: int) -> Optional[Book]:
        pass

    @abstractmethod
    def get_books(self) -> List[Book]:
        pass

Python 中没有像其他编程语言那样有内置"接口"。
好在,你可以使用 abc 模块中的抽象基类(ABC)来实现类似的功能。

BookManager 抽象类定义领域实体 Book 的业务能力。 此处只是抽象表示,具体实现交由基础架构层去完成。 另外,这些方法使得应用层可以调用它执行业务逻辑。

创建 infrastructrue/database/sqlite.py:

import sqlite3
from typing import List, Optional

from ...domain.gateway import BookManager
from ...domain.model import Book


class SQLitePersistence(BookManager):

    def __init__(self, file_name: str):
        self._file_name = file_name
        self._create_table()

    def _create_table(self):
        conn, cursor = self._connect()
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS books (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            title TEXT NOT NULL,
            author TEXT NOT NULL,
            published_at TEXT NOT NULL,
            description TEXT NOT NULL,
            isbn TEXT NOT NULL,
            total_pages INTEGER NOT NULL,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
          )
       ''')
        conn.commit()

    def _connect(self):
        # You cannot use a SQLite connection object across multiple threads in Flask.
        # Flask is designed to be multithreaded for handling multiple requests simultaneously.
        conn = sqlite3.connect(self._file_name)
        return conn, conn.cursor()

    def create_book(self, b: Book) -> int:
        conn, cursor = self._connect()
        cursor.execute('''
            INSERT INTO books (title, author, published_at, description, isbn, total_pages) VALUES (?, ?, ?, ?, ?, ?)
        ''', (b.title, b.author, b.published_at, b.description, b.isbn, b.total_pages))
        conn.commit()
        return cursor.lastrowid or 0

    def update_book(self, id: int, b: Book) -> None:
        conn, cursor = self._connect()
        cursor.execute('''
            UPDATE books SET title=?, author=?, published_at=?, description=?, isbn=?, total_pages=?,updated_at=DATETIME('now') WHERE id=?
        ''', (b.title, b.author, b.published_at, b.description, b.isbn, b.total_pages, id))
        conn.commit()

    def delete_book(self, id: int) -> None:
        conn, cursor = self._connect()
        cursor.execute('''
            DELETE FROM books WHERE id=?
        ''', (id,))
        conn.commit()

    def get_book(self, id: int) -> Optional[Book]:
        _, cursor = self._connect()
        cursor.execute('''
            SELECT * FROM books WHERE id=?
        ''', (id,))
        result = cursor.fetchone()
        if result:
            return Book(*result)
        return None

    def get_books(self) -> List[Book]:
        _, cursor = self._connect()
        cursor.execute('''
            SELECT * FROM books
        ''')
        results = cursor.fetchall()
        return [Book(*result) for result in results]

如你所见,类 SQLitePersistence 实现了抽象类 BookManager 中所有方法。

创建 infrastructrue/config/config.py:

from dataclasses import dataclass


@dataclass
class DBConfig:
    file_name: str


@dataclass
class ApplicationConfig:
    port: int


@dataclass
class Config:
    app: ApplicationConfig
    db: DBConfig

数据类 Config 将硬编码的配置项拎出。一般而言,配置“拎出来”都是好的工程实践。

创建 application/executor/book_operator.py:

from typing import List, Optional
from ...domain.model import Book
from ...domain.gateway import BookManager


class BookOperator():

    def __init__(self, book_manager: BookManager):
        self.book_manager = book_manager

    def create_book(self, b: Book) -> Book:
        id = self.book_manager.create_book(b)
        b.id = id
        return b

    def get_book(self, id: int) -> Optional[Book]:
        return self.book_manager.get_book(id)

    def get_books(self) -> List[Book]:
        return self.book_manager.get_books()

    def update_book(self, id: int, b: Book) -> Book:
        self.book_manager.update_book(id, b)
        return b

    def delete_book(self, id: int) -> None:
        return self.book_manager.delete_book(id)

应用层封装所有底层的逻辑,并提供接口给顶层的适配层

创建 application/wire_helper.py:

from ..domain.gateway import BookManager
from ..infrastructure.config import Config
from ..infrastructure.database import SQLitePersistence


class WireHelper:
    def __init__(self, persistence: SQLitePersistence):
        self.persistence = persistence

    @classmethod
    def new(cls, c: Config):
        db = SQLitePersistence(c.db.file_name)
        return cls(db)

    def book_manager(self) -> BookManager:
        return self.persistence

WireHelper 帮助实现 DI(依赖注入)。 如果想要更全面的 DI 支持,考虑使用 injector

有了这些准备之后,router 终于可以“整洁地”完成工作了。 它不用感知底层数据库的存在。此刻,关注点被分离了。

创建 adapter/router.py:

import logging
from flask import Flask, request, jsonify

from ..application.executor import BookOperator
from ..application import WireHelper
from ..domain.model import Book
from .util import dataclass_from_dict


class RestHandler:
    def __init__(self, logger: logging.Logger, book_operator: BookOperator):
        self._logger = logger
        self.book_operator = book_operator

    def get_books(self):
        try:
            books = self.book_operator.get_books()
            return jsonify(books), 200
        except Exception as e:
            self._logger.error(f"Failed to get books: {e}")
            return jsonify({"error": "Failed to get books"}), 404

    def get_book(self, id):
        try:
            book = self.book_operator.get_book(id)
            if not book:
                return jsonify({"error": f"The book with id {id} does not exist"}), 404
            return jsonify(book), 200
        except Exception as e:
            self._logger.error(f"Failed to get the book with {id}: {e}")
            return jsonify({"error": "Failed to get the book"}), 404

    def create_book(self):
        try:
            b = dataclass_from_dict(Book, request.json)
            book = self.book_operator.create_book(b)
            return jsonify(book), 201
        except Exception as e:
            self._logger.error(f"Failed to create: {e}")
            return jsonify({"error": "Failed to create"}), 404

    def update_book(self, id):
        try:
            b = dataclass_from_dict(Book, request.json)
            book = self.book_operator.update_book(id, b)
            return jsonify(book), 200
        except Exception as e:
            self._logger.error(f"Failed to update: {e}")
            return jsonify({"error": "Failed to update"}), 404

    def delete_book(self, id):
        try:
            self.book_operator.delete_book(id)
            return "", 204
        except Exception as e:
            self._logger.error(f"Failed to delete: {e}")
            return jsonify({"error": "Failed to delete"}), 404


def health():
    return jsonify({"status": "ok"})


def make_router(app: Flask, wire_helper: WireHelper):
    rest_handler = RestHandler(
        app.logger, BookOperator(wire_helper.book_manager()))
    app.add_url_rule('/', view_func=health)
    app.add_url_rule('/books', view_func=rest_handler.get_books)
    app.add_url_rule('/books/<int:id>', view_func=rest_handler.get_book)
    app.add_url_rule('/books', view_func=rest_handler.create_book,
                     methods=['POST'])
    app.add_url_rule('/books/<int:id>', view_func=rest_handler.update_book,
                     methods=['PUT'])
    app.add_url_rule('/books/<int:id>', view_func=rest_handler.delete_book,
                     methods=['DELETE'])

route() 装饰器是携带 view_func 参数调用 app.add_url_rule 方法的语法捷径。

如下二者等价:

@app.route("/")
def index():
    ...
def index():
    ...

app.add_url_rule("/", view_func=index)

adapter/util.py 中创建辅助函数:

import dataclasses


def dataclass_from_dict(klass, d):
    class_d = {field.name: field.default if field.default !=
               field.default_factory else None for field in dataclasses.fields(klass)}
    return klass(**{**class_d, **d})

替换 main.py 中内容使其变得“洁净”:

from flask import Flask

from books.adapter.router import make_router
from books.application import WireHelper
from books.infrastructure.config import Config, ApplicationConfig, DBConfig

c = Config(
    ApplicationConfig(
        8080
    ),
    DBConfig(
        "test.db"
    )
)
wire_helper = WireHelper.new(c)
app = Flask(__name__)
make_router(app, wire_helper)

重新启动服务并测试各个端点:

flask --app main run --debug

一切正常!重构完成。🎉赞!