数据库:mongoDB
如果你更青睐 NoSQL 数据库,mongoDB 绝对是你的最佳选择。
尝试 mongoDB
- 在你的机器上安装 mongoDB 并启动它。
注意:在生产项目中记得为你的集合中字段创建所需索引。
- 添加 mongo 依赖:
pip3 install pymongo
更新 requirements.txt:
pip3 freeze > requirements.txt
- 更新代码。
使用 mongoDB 执行 CRUD 操作
添加一个新的领域实体 Review
,在 domain/model/review.py:
from dataclasses import dataclass
from datetime import datetime
@dataclass
class Review:
id: str
book_id: int
author: str
title: str
content: str
created_at: datetime
updated_at: datetime
声明其业务能力,domain/gateway/review_manager.py:
from abc import ABC, abstractmethod
from typing import List, Optional
from ..model import Review
class ReviewManager(ABC):
@abstractmethod
def create_review(self, r: Review) -> str:
pass
@abstractmethod
def update_review(self, id: str, r: Review) -> None:
pass
@abstractmethod
def delete_review(self, id: str) -> None:
pass
@abstractmethod
def get_review(self, id: str) -> Optional[Review]:
pass
@abstractmethod
def get_reviews_of_book(self, book_id: int) -> List[Review]:
pass
实现这些方法,infrastructure/database/mongo.py:
from bson.objectid import ObjectId
import dataclasses
from pymongo import MongoClient
from typing import Any, Dict, List, Optional
from ...domain.gateway import ReviewManager
from ...domain.model import Review
COLL_REVIEW = "reviews"
class MongoPersistence(ReviewManager):
def __init__(self, uri: str, db_name: str):
self.client = MongoClient(uri)
self.db = self.client[db_name]
self.coll = self.db[COLL_REVIEW]
def create_review(self, r: Review) -> str:
result = self.coll.insert_one(dataclasses.asdict(r))
return str(result.inserted_id)
def update_review(self, id: str, r: Review) -> None:
new_data = {"title": r.title, "content": r.content,
"updated_at": r.updated_at}
self.coll.update_one({"_id": ObjectId(id)}, {"$set": new_data})
def delete_review(self, id: str) -> None:
self.coll.delete_one({"_id": ObjectId(id)})
def get_review(self, id: str) -> Optional[Review]:
review_data = self.coll.find_one({"_id": ObjectId(id)})
if review_data is None:
return None
return Review(**_polish(review_data))
def get_reviews_of_book(self, book_id: int) -> List[Review]:
reviews_data = self.coll.find({"book_id": book_id})
return [Review(**_polish(r)) for r in reviews_data]
def _polish(r: Dict[str, Any]):
r['id'] = str(r['_id'])
del r['_id']
return r
添加 mongodb 配置项,infrastructure/config/config.py:
@@ -10,6 +10,8 @@ class DBConfig:
user: str
password: str
database: str
+ mongo_uri: str
+ mongo_db_name: str
@dataclass
添加配置值,config.yml:
@@ -7,3 +7,5 @@ db:
user: "test_user"
password: "test_pass"
database: "lr_book"
+ mongo_uri: "mongodb://localhost:27017"
+ mongo_db_name: "lr_book"
在应用层添加 review_operator
,application/executor/review_operator.py:
from datetime import datetime
from typing import List, Optional
from ...domain.model import Review
from ...domain.gateway import ReviewManager
class ReviewOperator():
def __init__(self, review_manager: ReviewManager):
self.review_manager = review_manager
def create_review(self, r: Review) -> Review:
now = datetime.now()
r.created_at = now
r.updated_at = now
id = self.review_manager.create_review(r)
r.id = id
return r
def get_review(self, id: str) -> Optional[Review]:
return self.review_manager.get_review(id)
def get_reviews_of_book(self, review_id: int) -> List[Review]:
return self.review_manager.get_reviews_of_book(review_id)
def update_review(self, id: str, r: Review) -> Review:
r.updated_at = datetime.now()
self.review_manager.update_review(id, r)
return r
def delete_review(self, id: str) -> None:
return self.review_manager.delete_review(id)
调整 application/wire_helper.py 以引入 mongodb 连接:
@@ -1,16 +1,21 @@
-from ..domain.gateway import BookManager
+from books.domain.gateway import BookManager, ReviewManager
from ..infrastructure.config import Config
-from ..infrastructure.database import MySQLPersistence
+from ..infrastructure.database import MySQLPersistence, MongoPersistence
class WireHelper:
- def __init__(self, persistence: MySQLPersistence):
- self.persistence = persistence
+ def __init__(self, sqlPersistence: MySQLPersistence, noSQLPersistence: MongoPersistence):
+ self.sqlPersistence = sqlPersistence
+ self.noSQLPersistence = noSQLPersistence
@classmethod
def new(cls, c: Config):
db = MySQLPersistence(c.db)
- return cls(db)
+ mdb = MongoPersistence(c.db.mongo_uri, c.db.mongo_db_name)
+ return cls(db, mdb)
def book_manager(self) -> BookManager:
- return self.persistence
+ return self.sqlPersistence
+
+ def review_manager(self) -> ReviewManager:
+ return self.noSQLPersistence
添加 review 相关路由,adaptor/router.py:
@@ -1,16 +1,17 @@
import logging
from flask import Flask, request, jsonify
-from ..application.executor import BookOperator
+from ..application.executor import BookOperator, ReviewOperator
from ..application import WireHelper
-from ..domain.model import Book
+from ..domain.model import Book, Review
from .util import dataclass_from_dict
class RestHandler:
- def __init__(self, logger: logging.Logger, book_operator: BookOperator):
+ def __init__(self, logger: logging.Logger, book_operator: BookOperator, review_operator: ReviewOperator):
self._logger = logger
self.book_operator = book_operator
+ self.review_operator = review_operator
def get_books(self):
try:
@@ -56,6 +57,50 @@ class RestHandler:
self._logger.error(f"Failed to delete: {e}")
return jsonify({"error": "Failed to delete"}), 404
+ def get_reviews_of_book(self, book_id: int):
+ try:
+ reviews = self.review_operator.get_reviews_of_book(book_id)
+ return jsonify(reviews), 200
+ except Exception as e:
+ self._logger.error(f"Failed to get reviews of book: {e}")
+ return jsonify({"error": "Failed to get reviews of book"}), 404
+
+ def get_review(self, id: str):
+ try:
+ review = self.review_operator.get_review(id)
+ if not review:
+ return jsonify({"error": f"The review with id {id} does not exist"}), 404
+ return jsonify(review), 200
+ except Exception as e:
+ self._logger.error(f"Failed to get the review with {id}: {e}")
+ return jsonify({"error": "Failed to get the review"}), 404
+
+ def create_review(self):
+ try:
+ b = dataclass_from_dict(Review, request.json)
+ review = self.review_operator.create_review(b)
+ return jsonify(review), 201
+ except Exception as e:
+ self._logger.error(f"Failed to create: {e}")
+ return jsonify({"error": "Failed to create"}), 400
+
+ def update_review(self, id: str):
+ try:
+ b = dataclass_from_dict(Review, request.json)
+ review = self.review_operator.update_review(id, b)
+ return jsonify(review), 200
+ except Exception as e:
+ self._logger.error(f"Failed to update: {e}")
+ return jsonify({"error": "Failed to update"}), 404
+
+ def delete_review(self, id: str):
+ try:
+ self.review_operator.delete_review(id)
+ return "", 204
+ except Exception as e:
+ self._logger.error(f"Failed to delete: {e}")
+ return jsonify({"error": "Failed to delete"}), 404
+
def health():
return jsonify({"status": "ok"})
@@ -63,7 +108,7 @@ def health():
def make_router(app: Flask, wire_helper: WireHelper):
rest_handler = RestHandler(
- app.logger, BookOperator(wire_helper.book_manager()))
+ app.logger, BookOperator(wire_helper.book_manager()), ReviewOperator(wire_helper.review_manager()))
app.add_url_rule('/', view_func=health)
app.add_url_rule('/books', view_func=rest_handler.get_books)
app.add_url_rule('/books/<int:id>', view_func=rest_handler.get_book)
@@ -73,3 +118,12 @@ def make_router(app: Flask, wire_helper: WireHelper):
methods=['PUT'])
app.add_url_rule('/books/<int:id>', view_func=rest_handler.delete_book,
methods=['DELETE'])
+ app.add_url_rule('/books/<int:book_id>/reviews',
+ view_func=rest_handler.get_reviews_of_book)
+ app.add_url_rule('/reviews/<id>', view_func=rest_handler.get_review)
+ app.add_url_rule('/reviews', view_func=rest_handler.create_review,
+ methods=['POST'])
+ app.add_url_rule('/reviews/<id>', view_func=rest_handler.update_review,
+ methods=['PUT'])
+ app.add_url_rule('/reviews/<id>', view_func=rest_handler.delete_review,
+ methods=['DELETE'])
所有更改已合入。让我们用 curl 来试下效果。
curl 测试
创建一个新的书评:
curl -X POST \
-H "Content-Type: application/json" \
-d '{
"book_id": 1,
"author": "John Doe",
"title": "Great Book",
"content": "This is a great book!"
}' \
http://localhost:5000/reviews
响应如下:
{
"author": "John Doe",
"book_id": 1,
"content": "This is a great book!",
"created_at": "Thu, 07 Mar 2024 23:49:04 GMT",
"id": "65e9e1f020162bf22e98b4a1",
"title": "Great Book",
"updated_at": "Thu, 07 Mar 2024 23:49:04 GMT"
}
根据 ID 获取单个书评:
curl -X GET http://localhost:5000/reviews/65e9e1f020162bf22e98b4a1
结果:
{
"author": "John Doe",
"book_id": 1,
"content": "This is a great book!",
"created_at": "Thu, 07 Mar 2024 23:49:04 GMT",
"id": "65e9e1f020162bf22e98b4a1",
"title": "Great Book",
"updated_at": "Thu, 07 Mar 2024 23:49:04 GMT"
}
列出某本书的所有书评:
curl -X GET http://localhost:5000/books/1/reviews
结果列表:
[
{
"author": "Carl Smith",
"book_id": 1,
"content": "This is a great book!",
"created_at": "Fri, 01 Mar 2024 15:18:24 GMT",
"id": "65e1f1c0f1c5f50b36b2ce61",
"title": "Best best Book",
"updated_at": "Fri, 01 Mar 2024 15:18:24 GMT"
},
{
"author": "John Doe",
"book_id": 1,
"content": "This is a great book!",
"created_at": "Thu, 07 Mar 2024 23:49:04 GMT",
"id": "65e9e1f020162bf22e98b4a1",
"title": "Great Book",
"updated_at": "Thu, 07 Mar 2024 23:49:04 GMT"
}
]
更新已有书评:
curl -X PUT \
-H "Content-Type: application/json" \
-d '{
"content": "I prefer Robert Smith new book",
"title": "Not that good"
}' \
http://localhost:5000/reviews/65e9e1f020162bf22e98b4a1
结果:
{
"author": null,
"book_id": null,
"content": "I prefer Robert Smith new book",
"created_at": null,
"id": null,
"title": "Not that good",
"updated_at": "Fri, 08 Mar 2024 00:05:12 GMT"
}
删除已有书评:
curl -X DELETE http://localhost:5000/reviews/65e9e1f020162bf22e98b4a1
其返回 code 204 表示一次成功删除。
瞧!你的 API 服务器把 mongoDB 也用上啦。