FastAPI API 服务器
添加 service/recommendation/application/executor/interest_operator.py:
from typing import List
from ....domain.model import Interest
from ...domain.gateway import InterestManager
class InterestOperator():
def __init__(self, interest_manager: InterestManager):
self.interest_manager = interest_manager
def interests_for_user(self, user_id: str) -> List[Interest]:
return self.interest_manager.list_interests(user_id)
添加 service/recommendation/application/executor/__init__.py:
from .interest_operator import InterestOperator
添加 service/recommendation/adapter/router.py:
import logging
from fastapi import FastAPI, HTTPException
from ..application.executor import InterestOperator
from ..application.wire_helper import WireHelper
class RestHandler:
def __init__(self, logger: logging.Logger, interest_operator: InterestOperator):
self._logger = logger
self.interest_operator = interest_operator
def get_interests(self, user_id: str):
try:
return self.interest_operator.interests_for_user(user_id)
except Exception as e:
self._logger.error(f"Failed to get interests for {user_id}: {e}")
raise HTTPException(
status_code=404, detail=f"Failed to get interests for {user_id}")
def make_router(app: FastAPI, wire_helper: WireHelper):
rest_handler = RestHandler(
logging.getLogger("lr-event"),
InterestOperator(wire_helper.interest_manager())
)
@app.get("/recommendations")
async def get_interests(uid: str = ""):
return rest_handler.get_interests(uid)
主文件中也启动 FastAPI server,service/recommendation/main.py:
@@ -1,5 +1,9 @@
-import signal
+from contextlib import asynccontextmanager
+import threading
+from fastapi import FastAPI
+
+from .adapter.router import make_router
from .application import WireHelper
from .application.consumer import InterestConsumer
from .infrastructure.config import parseConfig
@@ -13,15 +17,20 @@ wire_helper = WireHelper.new(c)
tc = InterestConsumer(wire_helper.interest_manager(),
wire_helper.trend_event_consumer())
event_consumer = tc.get_event_consumer()
+consumer_thread = threading.Thread(target=tc.start)
-def sigterm_handler(signal, frame):
+@asynccontextmanager
+async def lifespan(app: FastAPI):
+ # Run at startup
+ print("Started to consume events...")
+ consumer_thread.start()
+ yield
+ # Run on shutdown
event_consumer.stop()
+ consumer_thread.join()
print("Consumer stopped. Exiting gracefully...")
-
-signal.signal(signal.SIGTERM, sigterm_handler)
-signal.signal(signal.SIGINT, sigterm_handler)
-
-print("Started to consume events...")
-tc.start()
+# Run the FastAPI server
+app = FastAPI(lifespan=lifespan)
+make_router(app, wire_helper)
这里再次使用了 lifespan
函数来优雅地启动、停止消费者。
修复错误,service/recommendation/infrastructure/database/mongo.py:
@@ -28,6 +28,8 @@ class MongoPersistence(InterestManager):
def list_interests(self, user_id: str) -> List[Interest]:
filter_query = {"user_id": user_id}
- cursor = self.coll.find(filter_query).sort(
+ # Exclude the _id field from the result
+ projection = {"_id": 0}
+ cursor = self.coll.find(filter_query, projection).sort(
"score", DESCENDING).limit(self.page_size)
return list(cursor)
启动服务器:
uvicorn service.recommendation.main:app --port 8002
尝试用 curl 访问 http://localhost:8082/recommendations?uid=C32WB 。
“C32WB“ 是本教程的 cookie
uid
的值。你应该替换成你自己浏览器中的值。
结果如下:
[
{
"title": "War and Peace",
"author": "Leo Tolstoy",
"user_id": "C32WB",
"score": 2
},
{
"title": "The Catcher in the Rye",
"author": "J.D. Salinger",
"user_id": "C32WB",
"score": 1
},
{
"title": "Pride and Prejudice",
"author": "Jane Austen",
"user_id": "C32WB",
"score": 1
}
]