缓存:Redis
在 MySQL 中的大查询或 MongoDB 中的大聚合可能需要几秒甚至几分钟才能完成。你绝对不希望频繁地触发这些操作。
将查询或聚合结果缓存到内存中是缓解这个问题的绝佳方法。如果你的 API 服务器在单个机器或节点上运行,只需将这些结果放入内存中的 HashMaps
或 Dictionaries
中即可解决问题。
但是,如果你有多台机器或节点运行 API 服务器并共享其公共内存的话,则 Redis 才是你的最佳选择。
尝试 Redis
-
在你的机器上安装 Redis 并启动它。
-
添加 redis 依赖。
pip3 install redis
更新 requirements.txt:
pip3 freeze > requirements.txt
- 更新代码。
添加 infrastructure/cache/helper.py:
from abc import ABC, abstractmethod
class CacheHelper(ABC):
@abstractmethod
def save(self, key: str, value: str) -> None:
pass
@abstractmethod
def load(self, key: str) -> str:
pass
使用 redis,infrastructure/cache/redis.py:
from typing import Any, Optional
from redis import Redis
from .helper import CacheHelper
from ..config import CacheConfig
DEFAULT_TTL = 3600
class RedisCache(CacheHelper):
def __init__(self, c: CacheConfig):
self.client = Redis(
host=c.host,
port=c.port,
password=c.password,
db=c.db,
)
def save(self, key: str, value: str) -> None:
self.client.set(key, value, ex=DEFAULT_TTL)
def load(self, key: str) -> Optional[str]:
value: Any = self.client.get(key)
if value is None:
return None
return value.decode("utf-8")
添加相关配置项,infrastructure/config/config.py:
@@ -14,6 +14,14 @@ class DBConfig:
mongo_db_name: str
+@dataclass
+class CacheConfig:
+ host: str
+ port: int
+ password: str
+ db: int
+
+
@dataclass
class ApplicationConfig:
port: int
@@ -22,6 +30,7 @@ class ApplicationConfig:
@dataclass
class Config:
app: ApplicationConfig
+ cache: CacheConfig
db: DBConfig
@@ -30,5 +39,6 @@ def parseConfig(filename: str) -> Config:
data = yaml.safe_load(f)
return Config(
ApplicationConfig(**data['app']),
+ CacheConfig(**data['cache']),
DBConfig(**data['db'])
)
置入配置值,config.yml:
@@ -9,3 +9,8 @@ db:
database: "lr_book"
mongo_uri: "mongodb://localhost:27017"
mongo_db_name: "lr_book"
+cache:
+ host: "localhost"
+ port: 6379
+ password: "test_pass"
+ db: 0
引入 redis 连接,application/wire_helper.py:
@@ -1,21 +1,27 @@
from books.domain.gateway import BookManager, ReviewManager
+from books.infrastructure.cache import RedisCache, CacheHelper
from ..infrastructure.config import Config
from ..infrastructure.database import MySQLPersistence, MongoPersistence
class WireHelper:
- def __init__(self, sqlPersistence: MySQLPersistence, noSQLPersistence: MongoPersistence):
+ def __init__(self, sqlPersistence: MySQLPersistence, noSQLPersistence: MongoPersistence, kvStore: RedisCache):
self.sqlPersistence = sqlPersistence
self.noSQLPersistence = noSQLPersistence
+ self.kvStore = kvStore
@classmethod
def new(cls, c: Config):
db = MySQLPersistence(c.db)
mdb = MongoPersistence(c.db.mongo_uri, c.db.mongo_db_name)
- return cls(db, mdb)
+ kv = RedisCache(c.cache)
+ return cls(db, mdb, kv)
def book_manager(self) -> BookManager:
return self.sqlPersistence
def review_manager(self) -> ReviewManager:
return self.noSQLPersistence
+
+ def cache_helper(self) -> CacheHelper:
+ return self.kvStore
假设列出所有图书操作需要在数据库中执行一个大查询,你需要将查询结果存入 Redis 以便下次可快速访问。
更改 application/executor/book_operator.py:
@@ -1,12 +1,19 @@
-from typing import List, Optional
+from dataclasses import asdict
+import json
+from typing import Any, Dict, List, Optional
+
+from books.infrastructure.cache.helper import CacheHelper
from ...domain.model import Book
from ...domain.gateway import BookManager
+BOOKS_KEY = "lr-books"
+
class BookOperator():
- def __init__(self, book_manager: BookManager):
+ def __init__(self, book_manager: BookManager, cache_helper: CacheHelper):
self.book_manager = book_manager
+ self.cache_helper = cache_helper
def create_book(self, b: Book) -> Book:
id = self.book_manager.create_book(b)
@@ -17,7 +24,13 @@ class BookOperator():
return self.book_manager.get_book(id)
def get_books(self) -> List[Book]:
- return self.book_manager.get_books()
+ v = self.cache_helper.load(BOOKS_KEY)
+ if v:
+ return json.loads(v)
+ books = self.book_manager.get_books()
+ self.cache_helper.save(
+ BOOKS_KEY, json.dumps([_convert(b) for b in books]))
+ return books
def update_book(self, id: int, b: Book) -> Book:
self.book_manager.update_book(id, b)
@@ -25,3 +38,10 @@ class BookOperator():
def delete_book(self, id: int) -> None:
return self.book_manager.delete_book(id)
+
+
+def _convert(b: Book) -> Dict[str, Any]:
+ new_b = asdict(b)
+ new_b['created_at'] = b.created_at.isoformat()
+ new_b['updated_at'] = b.updated_at.isoformat()
+ return new_b
微调 adapter/router.py:
@@ -108,7 +108,11 @@ def health():
def make_router(app: Flask, wire_helper: WireHelper):
rest_handler = RestHandler(
- app.logger, BookOperator(wire_helper.book_manager()), ReviewOperator(wire_helper.review_manager()))
+ app.logger,
+ BookOperator(
+ wire_helper.book_manager(),
+ wire_helper.cache_helper()),
+ ReviewOperator(wire_helper.review_manager()))
app.add_url_rule('/', view_func=health)
app.add_url_rule('/books', view_func=rest_handler.get_books)
app.add_url_rule('/books/<int:id>', view_func=rest_handler.get_book)
这些就是引入 redis 所需的调整。现在让我们试下缓存驱动的新端点。
用 curl 进行测试
列出所有图书:
curl -X GET -w "Total time: %{time_total}s\n" http://localhost:5000/books
结果与之前相似,但是性能显著提升。你可以通过 curl 的日志看出迹象。
Total time: 0.012821s
Total time: 0.008976s
Total time: 0.008859s
Total time: 0.008658s
使用 redis-cli
查看 Redis 中的值:
redis-cli
在 redis 客户端 shell 中调试这些键值:
127.0.0.1:6379> keys *
1) "lr-books"
127.0.0.1:6379> get lr-books
"[{\"id\":1,\"title\":\"Great Book II\",\"author\":\"Carl Smith\",\"published_at\":\"2022-01-01T08:00:00+08:00\",\"description\":\"Another sample book description\",\"isbn\":\"8334567890\",\"total_pages\":3880,\"created_at\":\"2024-02-25T16:29:31.353+08:00\",\"updated_at\":\"2024-02-25T16:29:31.353+08:00\"}]"
127.0.0.1:6379> del lr-books
(integer) 1
赞!Redis 已可以供君驱使了!💐
Loading...
> 此处输出代码运行结果