FastAPI API 服务器
创建 service/domain/model/trend.py:
from datetime import datetime
from dataclasses import dataclass
from typing import List
from .book import Book
@dataclass
class Trend:
query: str
books: List[Book]
created_at: datetime | None
创建 service/trend/domain/gateway/trend_manager.py:
from abc import ABC, abstractmethod
from typing import List
from ....domain.model import Trend
class TrendManager(ABC):
@abstractmethod
def create_trend(self, t: Trend) -> int:
pass
@abstractmethod
def top_trends(self, offset: int) -> List[Trend]:
pass
在 Redis 中,ZSET(有序集合)是一种结合了集合和有序列表特性的数据结构。 它类似于普通集合,但它还可根据每个成员关联的分数维护元素排序顺序。这个分数使得成员可以按升序或降序排序。
因此,我们可以使用 ZSET 来存储热搜数据,用其分数来表示”热度“。
安装 redis 依赖:
pip3 install redis
记得更新 requirements.txt:
annotated-types==0.6.0
anyio==4.3.0
click==8.1.7
confluent-kafka==2.3.0
fastapi==0.110.1
h11==0.14.0
httptools==0.6.1
idna==3.6
Jinja2==3.1.3
MarkupSafe==2.1.5
mysql-connector-python==8.3.0
pydantic==2.6.4
pydantic_core==2.16.3
python-dotenv==1.0.1
PyYAML==6.0.1
redis==5.0.3
sniffio==1.3.1
starlette==0.37.2
typing_extensions==4.11.0
uvicorn==0.29.0
uvloop==0.19.0
watchfiles==0.21.0
websockets==12.0
创建 service/trend/infrastructure/cache/redis.py:
import json
from typing import Any, List
import redis
from ...domain.gateway import TrendManager
from ....domain.model import Trend
trends_key = "trends"
query_key_prefix = "q-"
class RedisCache(TrendManager):
def __init__(self, host: str, port: int, password: str, db: int):
self.c = redis.Redis(
host=host,
port=port,
db=db,
password=password,
decode_responses=True
)
def create_trend(self, t: Trend) -> int:
member = t.query
score: Any = self.c.zincrby(trends_key, 1, member)
k = query_key_prefix + t.query
results = json.dumps(t.books)
self.c.set(k, results)
return score
def top_trends(self, offset: int) -> List[Trend]:
top_items: Any = self.c.zrevrange(
trends_key, 0, offset, withscores=True)
trends = []
for item in top_items:
query = item[0]
t = Trend(query=query, books=[], created_at=None)
k = query_key_prefix + query
value: Any = self.c.get(k)
if value is not None:
t.books = json.loads(value)
trends.append(t)
return trends
创建 service/trend/infrastructure/config/config.py:
from dataclasses import dataclass
import yaml
@dataclass
class CacheConfig:
host: str
port: int
password: str
db: int
@dataclass
class Config:
cache: CacheConfig
def parseConfig(filename: str) -> Config:
with open(filename, 'r') as f:
data = yaml.safe_load(f)
return Config(
CacheConfig(**data['cache'])
)
创建 service/trend/config.yml:
cache:
host: 127.0.0.1
port: 6379
password: test_pass
db: 0
创建 service/trend/application/wire_helper.py:
from ..domain.gateway import TrendManager
from ..infrastructure.config import Config
from ..infrastructure.cache import RedisCache
class WireHelper:
def __init__(self, kvStore: RedisCache):
self.kvStore = kvStore
@classmethod
def new(cls, c: Config):
kv = RedisCache(c.cache.host, c.cache.port,
c.cache.password, c.cache.db)
return cls(kv)
def trend_manager(self) -> TrendManager:
return self.kvStore
创建 service/trend/application/executor/trend_operator.py:
from typing import List
from ....domain.model import Trend
from ...domain.gateway import TrendManager
class TrendOperator():
def __init__(self, trend_manager: TrendManager):
self.trend_manager = trend_manager
def create_trend(self, t: Trend) -> int:
return self.trend_manager.create_trend(t)
def top_trends(self, offset: int) -> List[Trend]:
return self.trend_manager.top_trends(offset)
创建 service/trend/adapter/router.py:
import logging
from fastapi import FastAPI, HTTPException
from ..application.executor import TrendOperator
from ..application.wire_helper import WireHelper
class RestHandler:
def __init__(self, logger: logging.Logger, trend_operator: TrendOperator):
self._logger = logger
self.trend_operator = trend_operator
def get_trends(self, offset: int):
try:
return self.trend_operator.top_trends(offset)
except Exception as e:
self._logger.error(f"Failed to get trends: {e}")
raise HTTPException(status_code=404, detail="Failed to get trends")
def make_router(app: FastAPI, wire_helper: WireHelper):
rest_handler = RestHandler(
logging.getLogger("lr-event"),
TrendOperator(wire_helper.trend_manager())
)
@app.get("/trends")
async def get_trends(o: int = 0):
return rest_handler.get_trends(o)
最后,添加 service/trend/main.py:
from fastapi import FastAPI
from .adapter.router import make_router
from .application import WireHelper
from .infrastructure.config import parseConfig
CONFIG_FILENAME = "service/trend/config.yml"
c = parseConfig(CONFIG_FILENAME)
wire_helper = WireHelper.new(c)
app = FastAPI()
make_router(app, wire_helper)
启动服务器:
uvicorn service.trend.main:app --port 8001
尝试用 curl 访问 http://localhost:8001/trends 。
Loading...
> 此处输出代码运行结果