» Python:使用Flask构建REST API » 2. 开发 » 2.6 数据库:MySQL

数据库:MySQL

SQLite 只是为了效果演示。如果你需要一个生产环境可用的数据库,可尝试 MySQLmongoDB

切换到 MySQL

  1. 在你机器上安装 MySQL 并启动。

注意:在实际生产项目中,记得给表中字段创建所需的索引。

  1. 添加 mysql 依赖:
pip3 install mysql-connector-python

该依赖是官方的 MySQL 驱动 Python 版本。

更新 requirements.txt:

pip3 freeze > requirements.txt
  1. 更新代码。

添加 infrastructure/database/mysql.py:

import mysql.connector
from typing import Any, List, Optional

from books.infrastructure.config import DBConfig

from ...domain.gateway import BookManager
from ...domain.model import Book


class MySQLPersistence(BookManager):
    def __init__(self, c: DBConfig):
        self.conn = mysql.connector.connect(
            host=c.host,
            port=c.port,
            user=c.user,
            password=c.password,
            database=c.database,
            autocommit=True
        )
        self.cursor = self.conn.cursor(dictionary=True)
        self._create_table()

    def _create_table(self):
        self.cursor.execute('''
            CREATE TABLE IF NOT EXISTS books (
            id INT AUTO_INCREMENT PRIMARY KEY,
            title VARCHAR(255) NOT NULL,
            author VARCHAR(255) NOT NULL,
            published_at DATE NOT NULL,
            description TEXT NOT NULL,
            isbn VARCHAR(15) NOT NULL,
            total_pages INT NOT NULL,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
        );
        ''')

    def create_book(self, b: Book) -> int:
        self.cursor.execute('''
            INSERT INTO books (title, author, published_at, description, isbn, total_pages) VALUES (%s, %s, %s, %s, %s, %s)
        ''', (b.title, b.author, b.published_at, b.description, b.isbn, b.total_pages))
        return self.cursor.lastrowid or 0

    def update_book(self, id: int, b: Book) -> None:
        self.cursor.execute('''
            UPDATE books SET title=%s, author=%s, published_at=%s, description=%s, isbn=%s, total_pages=%s WHERE id=%s
        ''', (b.title, b.author, b.published_at, b.description, b.isbn, b.total_pages, id))

    def delete_book(self, id: int) -> None:
        self.cursor.execute('''
            DELETE FROM books WHERE id=%s
        ''', (id,))

    def get_book(self, id: int) -> Optional[Book]:
        self.cursor.execute('''
            SELECT * FROM books WHERE id=%s
        ''', (id,))
        result: Any = self.cursor.fetchone()
        if result is None:
            return None
        return Book(**result)

    def get_books(self) -> List[Book]:
        self.cursor.execute('''
            SELECT * FROM books
        ''')
        results: List[Any] = self.cursor.fetchall()
        return [Book(**result) for result in results]

注意与之前的区别,此处使用 mysql.connector 代替 sqlite3

调整 infrastructure/database/init.py:

@@ -1 +1,2 @@
 from .sqlite import SQLitePersistence
+from .mysql import MySQLPersistence

DBConfig 类添加 mysql 连接配置项,在 infrastructure/config/config.py 中:

@@ -4,6 +4,11 @@ from dataclasses import dataclass
 @dataclass
 class DBConfig:
     file_name: str
+    host: str
+    port: int
+    user: str
+    password: str
+    database: str

更新 WireHelper 以切换依赖,application/wire_helper.py:

@@ -1,15 +1,15 @@
 from ..domain.gateway import BookManager
 from ..infrastructure.config import Config
-from ..infrastructure.database import SQLitePersistence
+from ..infrastructure.database import MySQLPersistence
 
 
 class WireHelper:
-    def __init__(self, persistence: SQLitePersistence):
+    def __init__(self, persistence: MySQLPersistence):
         self.persistence = persistence
 
     @classmethod
     def new(cls, c: Config):
-        db = SQLitePersistence(c.db.file_name)
+        db = MySQLPersistence(c.db)
         return cls(db)
 
     def book_manager(self) -> BookManager:

放入连接参数,main.py:

@@ -9,7 +9,12 @@ c = Config(
         8080
     ),
     DBConfig(
-        "test.db"
+        "test.db",
+        "127.0.0.1",
+        3306,
+        "test_user",
+        "test_pass",
+        "lr_book"
     )
 )
 wire_helper = WireHelper.new(c)

瞧!你的 API 服务器现在由 MySQL 驱动啦!

上页下页