» Python:使用Kafka构建事件驱动微服务 » 4. 消费者:推荐服务 » 4.3 FastAPI API 服务器

FastAPI API 服务器

添加 service/recommendation/application/executor/interest_operator.py:

from typing import List

from ....domain.model import Interest
from ...domain.gateway import InterestManager


class InterestOperator():

    def __init__(self, interest_manager: InterestManager):
        self.interest_manager = interest_manager

    def interests_for_user(self, user_id: str) -> List[Interest]:
        return self.interest_manager.list_interests(user_id)

添加 service/recommendation/application/executor/__init__.py:

from .interest_operator import InterestOperator

添加 service/recommendation/adapter/router.py:

import logging

from fastapi import FastAPI, HTTPException

from ..application.executor import InterestOperator
from ..application.wire_helper import WireHelper


class RestHandler:
    def __init__(self, logger: logging.Logger, interest_operator: InterestOperator):
        self._logger = logger
        self.interest_operator = interest_operator

    def get_interests(self, user_id: str):
        try:
            return self.interest_operator.interests_for_user(user_id)
        except Exception as e:
            self._logger.error(f"Failed to get interests for {user_id}: {e}")
            raise HTTPException(
                status_code=404, detail=f"Failed to get interests for {user_id}")


def make_router(app: FastAPI, wire_helper: WireHelper):
    rest_handler = RestHandler(
        logging.getLogger("lr-event"),
        InterestOperator(wire_helper.interest_manager())
    )

    @app.get("/recommendations")
    async def get_interests(uid: str = ""):
        return rest_handler.get_interests(uid)

主文件中也启动 FastAPI server,service/recommendation/main.py:

@@ -1,5 +1,9 @@
-import signal
+from contextlib import asynccontextmanager
+import threading
 
+from fastapi import FastAPI
+
+from .adapter.router import make_router
 from .application import WireHelper
 from .application.consumer import InterestConsumer
 from .infrastructure.config import parseConfig
@@ -13,15 +17,20 @@ wire_helper = WireHelper.new(c)
 tc = InterestConsumer(wire_helper.interest_manager(),
                       wire_helper.trend_event_consumer())
 event_consumer = tc.get_event_consumer()
+consumer_thread = threading.Thread(target=tc.start)
 
 
-def sigterm_handler(signal, frame):
+@asynccontextmanager
+async def lifespan(app: FastAPI):
+    # Run at startup
+    print("Started to consume events...")
+    consumer_thread.start()
+    yield
+    # Run on shutdown
     event_consumer.stop()
+    consumer_thread.join()
     print("Consumer stopped. Exiting gracefully...")
 
-
-signal.signal(signal.SIGTERM, sigterm_handler)
-signal.signal(signal.SIGINT, sigterm_handler)
-
-print("Started to consume events...")
-tc.start()
+# Run the FastAPI server
+app = FastAPI(lifespan=lifespan)
+make_router(app, wire_helper)

这里再次使用了 lifespan 函数来优雅地启动、停止消费者。

修复错误,service/recommendation/infrastructure/database/mongo.py:

@@ -28,6 +28,8 @@ class MongoPersistence(InterestManager):
 
     def list_interests(self, user_id: str) -> List[Interest]:
         filter_query = {"user_id": user_id}
-        cursor = self.coll.find(filter_query).sort(
+        # Exclude the _id field from the result
+        projection = {"_id": 0}
+        cursor = self.coll.find(filter_query, projection).sort(
             "score", DESCENDING).limit(self.page_size)
         return list(cursor)

启动服务器:

uvicorn service.recommendation.main:app --port 8002

尝试用 curl 访问 http://localhost:8082/recommendations?uid=C32WB

“C32WB“ 是本教程的 cookie uid 的值。你应该替换成你自己浏览器中的值。

结果如下:

[
  {
    "title": "War and Peace",
    "author": "Leo Tolstoy",
    "user_id": "C32WB",
    "score": 2
  },
  {
    "title": "The Catcher in the Rye",
    "author": "J.D. Salinger",
    "user_id": "C32WB",
    "score": 1
  },
  {
    "title": "Pride and Prejudice",
    "author": "Jane Austen",
    "user_id": "C32WB",
    "score": 1
  }
]
上页下页