» Python:使用Flask构建REST API » 2. 开发 » 2.8 数据库:MongoDB

数据库:mongoDB

如果你更青睐 NoSQL 数据库,mongoDB 绝对是你的最佳选择。

尝试 mongoDB

  1. 在你的机器上安装 mongoDB 并启动它。

注意:在生产项目中记得为你的集合中字段创建所需索引。

  1. 添加 mongo 依赖:
pip3 install pymongo

更新 requirements.txt:

pip3 freeze > requirements.txt
  1. 更新代码。

使用 mongoDB 执行 CRUD 操作

添加一个新的领域实体 Review,在 domain/model/review.py:

from dataclasses import dataclass
from datetime import datetime


@dataclass
class Review:
    id: str
    book_id: int
    author: str
    title: str
    content: str
    created_at: datetime
    updated_at: datetime

声明其业务能力,domain/gateway/review_manager.py:

from abc import ABC, abstractmethod
from typing import List, Optional

from ..model import Review


class ReviewManager(ABC):
    @abstractmethod
    def create_review(self, r: Review) -> str:
        pass

    @abstractmethod
    def update_review(self, id: str, r: Review) -> None:
        pass

    @abstractmethod
    def delete_review(self, id: str) -> None:
        pass

    @abstractmethod
    def get_review(self, id: str) -> Optional[Review]:
        pass

    @abstractmethod
    def get_reviews_of_book(self, book_id: int) -> List[Review]:
        pass

实现这些方法,infrastructure/database/mongo.py:

from bson.objectid import ObjectId
import dataclasses
from pymongo import MongoClient
from typing import Any, Dict, List, Optional

from ...domain.gateway import ReviewManager
from ...domain.model import Review

COLL_REVIEW = "reviews"


class MongoPersistence(ReviewManager):
    def __init__(self, uri: str, db_name: str):
        self.client = MongoClient(uri)
        self.db = self.client[db_name]
        self.coll = self.db[COLL_REVIEW]

    def create_review(self, r: Review) -> str:
        result = self.coll.insert_one(dataclasses.asdict(r))
        return str(result.inserted_id)

    def update_review(self, id: str, r: Review) -> None:
        new_data = {"title": r.title, "content": r.content,
                    "updated_at": r.updated_at}
        self.coll.update_one({"_id": ObjectId(id)}, {"$set": new_data})

    def delete_review(self, id: str) -> None:
        self.coll.delete_one({"_id": ObjectId(id)})

    def get_review(self, id: str) -> Optional[Review]:
        review_data = self.coll.find_one({"_id": ObjectId(id)})
        if review_data is None:
            return None
        return Review(**_polish(review_data))

    def get_reviews_of_book(self, book_id: int) -> List[Review]:
        reviews_data = self.coll.find({"book_id": book_id})
        return [Review(**_polish(r)) for r in reviews_data]


def _polish(r: Dict[str, Any]):
    r['id'] = str(r['_id'])
    del r['_id']
    return r

添加 mongodb 配置项,infrastructure/config/config.py:

@@ -10,6 +10,8 @@ class DBConfig:
     user: str
     password: str
     database: str
+    mongo_uri: str
+    mongo_db_name: str
 
 
 @dataclass

添加配置值,config.yml:

@@ -7,3 +7,5 @@ db:
   user: "test_user"
   password: "test_pass"
   database: "lr_book"
+  mongo_uri: "mongodb://localhost:27017"
+  mongo_db_name: "lr_book"

在应用层添加 review_operatorapplication/executor/review_operator.py:

from datetime import datetime
from typing import List, Optional

from ...domain.model import Review
from ...domain.gateway import ReviewManager


class ReviewOperator():

    def __init__(self, review_manager: ReviewManager):
        self.review_manager = review_manager

    def create_review(self, r: Review) -> Review:
        now = datetime.now()
        r.created_at = now
        r.updated_at = now
        id = self.review_manager.create_review(r)
        r.id = id
        return r

    def get_review(self, id: str) -> Optional[Review]:
        return self.review_manager.get_review(id)

    def get_reviews_of_book(self, review_id: int) -> List[Review]:
        return self.review_manager.get_reviews_of_book(review_id)

    def update_review(self, id: str, r: Review) -> Review:
        r.updated_at = datetime.now()
        self.review_manager.update_review(id, r)
        return r

    def delete_review(self, id: str) -> None:
        return self.review_manager.delete_review(id)

调整 application/wire_helper.py 以引入 mongodb 连接:

@@ -1,16 +1,21 @@
-from ..domain.gateway import BookManager
+from books.domain.gateway import BookManager, ReviewManager
 from ..infrastructure.config import Config
-from ..infrastructure.database import MySQLPersistence
+from ..infrastructure.database import MySQLPersistence, MongoPersistence
 
 
 class WireHelper:
-    def __init__(self, persistence: MySQLPersistence):
-        self.persistence = persistence
+    def __init__(self, sqlPersistence: MySQLPersistence, noSQLPersistence: MongoPersistence):
+        self.sqlPersistence = sqlPersistence
+        self.noSQLPersistence = noSQLPersistence
 
     @classmethod
     def new(cls, c: Config):
         db = MySQLPersistence(c.db)
-        return cls(db)
+        mdb = MongoPersistence(c.db.mongo_uri, c.db.mongo_db_name)
+        return cls(db, mdb)
 
     def book_manager(self) -> BookManager:
-        return self.persistence
+        return self.sqlPersistence
+
+    def review_manager(self) -> ReviewManager:
+        return self.noSQLPersistence

添加 review 相关路由,adaptor/router.py:

@@ -1,16 +1,17 @@
 import logging
 from flask import Flask, request, jsonify
 
-from ..application.executor import BookOperator
+from ..application.executor import BookOperator, ReviewOperator
 from ..application import WireHelper
-from ..domain.model import Book
+from ..domain.model import Book, Review
 from .util import dataclass_from_dict
 
 
 class RestHandler:
-    def __init__(self, logger: logging.Logger, book_operator: BookOperator):
+    def __init__(self, logger: logging.Logger, book_operator: BookOperator, review_operator: ReviewOperator):
         self._logger = logger
         self.book_operator = book_operator
+        self.review_operator = review_operator
 
     def get_books(self):
         try:
@@ -56,6 +57,50 @@ class RestHandler:
             self._logger.error(f"Failed to delete: {e}")
             return jsonify({"error": "Failed to delete"}), 404
 
+    def get_reviews_of_book(self, book_id: int):
+        try:
+            reviews = self.review_operator.get_reviews_of_book(book_id)
+            return jsonify(reviews), 200
+        except Exception as e:
+            self._logger.error(f"Failed to get reviews of book: {e}")
+            return jsonify({"error": "Failed to get reviews of book"}), 404
+
+    def get_review(self, id: str):
+        try:
+            review = self.review_operator.get_review(id)
+            if not review:
+                return jsonify({"error": f"The review with id {id} does not exist"}), 404
+            return jsonify(review), 200
+        except Exception as e:
+            self._logger.error(f"Failed to get the review with {id}: {e}")
+            return jsonify({"error": "Failed to get the review"}), 404
+
+    def create_review(self):
+        try:
+            b = dataclass_from_dict(Review, request.json)
+            review = self.review_operator.create_review(b)
+            return jsonify(review), 201
+        except Exception as e:
+            self._logger.error(f"Failed to create: {e}")
+            return jsonify({"error": "Failed to create"}), 400
+
+    def update_review(self, id: str):
+        try:
+            b = dataclass_from_dict(Review, request.json)
+            review = self.review_operator.update_review(id, b)
+            return jsonify(review), 200
+        except Exception as e:
+            self._logger.error(f"Failed to update: {e}")
+            return jsonify({"error": "Failed to update"}), 404
+
+    def delete_review(self, id: str):
+        try:
+            self.review_operator.delete_review(id)
+            return "", 204
+        except Exception as e:
+            self._logger.error(f"Failed to delete: {e}")
+            return jsonify({"error": "Failed to delete"}), 404
+
 
 def health():
     return jsonify({"status": "ok"})
@@ -63,7 +108,7 @@ def health():
 
 def make_router(app: Flask, wire_helper: WireHelper):
     rest_handler = RestHandler(
-        app.logger, BookOperator(wire_helper.book_manager()))
+        app.logger, BookOperator(wire_helper.book_manager()), ReviewOperator(wire_helper.review_manager()))
     app.add_url_rule('/', view_func=health)
     app.add_url_rule('/books', view_func=rest_handler.get_books)
     app.add_url_rule('/books/<int:id>', view_func=rest_handler.get_book)
@@ -73,3 +118,12 @@ def make_router(app: Flask, wire_helper: WireHelper):
                      methods=['PUT'])
     app.add_url_rule('/books/<int:id>', view_func=rest_handler.delete_book,
                      methods=['DELETE'])
+    app.add_url_rule('/books/<int:book_id>/reviews',
+                     view_func=rest_handler.get_reviews_of_book)
+    app.add_url_rule('/reviews/<id>', view_func=rest_handler.get_review)
+    app.add_url_rule('/reviews', view_func=rest_handler.create_review,
+                     methods=['POST'])
+    app.add_url_rule('/reviews/<id>', view_func=rest_handler.update_review,
+                     methods=['PUT'])
+    app.add_url_rule('/reviews/<id>', view_func=rest_handler.delete_review,
+                     methods=['DELETE'])

所有更改已合入。让我们用 curl 来试下效果。

curl 测试

创建一个新的书评:

curl -X POST \
  -H "Content-Type: application/json" \
  -d '{
        "book_id": 1,
        "author": "John Doe",
        "title": "Great Book",
        "content": "This is a great book!"
      }' \
  http://localhost:5000/reviews

响应如下:

{
  "author": "John Doe",
  "book_id": 1,
  "content": "This is a great book!",
  "created_at": "Thu, 07 Mar 2024 23:49:04 GMT",
  "id": "65e9e1f020162bf22e98b4a1",
  "title": "Great Book",
  "updated_at": "Thu, 07 Mar 2024 23:49:04 GMT"
}

根据 ID 获取单个书评:

curl -X GET http://localhost:5000/reviews/65e9e1f020162bf22e98b4a1

结果:

{
  "author": "John Doe",
  "book_id": 1,
  "content": "This is a great book!",
  "created_at": "Thu, 07 Mar 2024 23:49:04 GMT",
  "id": "65e9e1f020162bf22e98b4a1",
  "title": "Great Book",
  "updated_at": "Thu, 07 Mar 2024 23:49:04 GMT"
}

列出某本书的所有书评:

curl -X GET http://localhost:5000/books/1/reviews

结果列表:

[
  {
    "author": "Carl Smith",
    "book_id": 1,
    "content": "This is a great book!",
    "created_at": "Fri, 01 Mar 2024 15:18:24 GMT",
    "id": "65e1f1c0f1c5f50b36b2ce61",
    "title": "Best best Book",
    "updated_at": "Fri, 01 Mar 2024 15:18:24 GMT"
  },
  {
    "author": "John Doe",
    "book_id": 1,
    "content": "This is a great book!",
    "created_at": "Thu, 07 Mar 2024 23:49:04 GMT",
    "id": "65e9e1f020162bf22e98b4a1",
    "title": "Great Book",
    "updated_at": "Thu, 07 Mar 2024 23:49:04 GMT"
  }
]

更新已有书评:

curl -X PUT \
  -H "Content-Type: application/json" \
  -d '{
        "content": "I prefer Robert Smith new book",
        "title": "Not that good"
      }' \
  http://localhost:5000/reviews/65e9e1f020162bf22e98b4a1

结果:

{
  "author": null,
  "book_id": null,
  "content": "I prefer Robert Smith new book",
  "created_at": null,
  "id": null,
  "title": "Not that good",
  "updated_at": "Fri, 08 Mar 2024 00:05:12 GMT"
}

删除已有书评:

curl -X DELETE http://localhost:5000/reviews/65e9e1f020162bf22e98b4a1

其返回 code 204 表示一次成功删除。

瞧!你的 API 服务器把 mongoDB 也用上啦。