» Python:使用Kafka构建事件驱动微服务 » 4. 消费者:推荐服务 » 4.2 事件消费者

事件消费者

因为推荐服务的消费者和热搜服务的消费者有很多共通之处,故此处可以将共通部分移到更高层的目录以实现代码复用。

重组文件

  • 调整 service/trend/domain/gateway/trend_manager.py:
@@ -1,10 +1,8 @@
 from abc import ABC, abstractmethod
-from typing import Callable, List
+from typing import List
 
 from ....domain.model import Trend
 
-ConsumeCallback = Callable[[bytes, bytes], None]
-
 
 class TrendManager(ABC):
     @abstractmethod
@@ -14,13 +12,3 @@ class TrendManager(ABC):
     @abstractmethod
     def top_trends(self, page_size: int) -> List[Trend]:
         pass
-
-
-class TrendEventConsumer(ABC):
-    @abstractmethod
-    def consume_events(self, callback: ConsumeCallback):
-        pass
-
-    @abstractmethod
-    def stop(self):
-        pass

将共通的 TrendEventConsumerConsumeCallback 类型移出该文件。

  • 将这些类型放入 service/domain/gateway/event_consumer.py:
from abc import ABC, abstractmethod
from typing import Callable


ConsumeCallback = Callable[[bytes, bytes], None]


class TrendEventConsumer(ABC):
    @abstractmethod
    def consume_events(self, callback: ConsumeCallback):
        pass

    @abstractmethod
    def stop(self):
        pass
  • 移动 service/trend/infrastructure/mq/kafka.pyservice/infrastructure/mq/kafka_consumer.py
class KafkaConsumer(TrendEventConsumer):
    def __init__(self, brokers: List[str], topic: str, group_id: str):

添加一个给消费组的新参数 group_id

  • 添加配置项,service/trend/infrastructure/config/config.py:
@@ -8,6 +8,7 @@ import yaml
 class MQConfig:
     brokers: List[str]
     topic: str
+    group_id: str
 
 
 @dataclass
  • 填入配置值,service/trend/config.yml:
@@ -7,3 +7,4 @@ mq:
   brokers:
     - localhost:9094
   topic: lr-book-searches
+  group_id: trend-svr
  • 调整 import 路径,service/trend/application/consumer/trend.py:
@@ -1,7 +1,8 @@
 import json
 
 from ....domain.model import Trend
-from ...domain.gateway import TrendManager, TrendEventConsumer
+from ...domain.gateway import TrendManager
+from ....domain.gateway import TrendEventConsumer
 
 
 class TrendConsumer():
  • 调整 import 路径,service/trend/application/wire_helper.py:
@@ -1,7 +1,8 @@
-from ..domain.gateway import TrendManager, TrendEventConsumer
+from ..domain.gateway import TrendManager
+from ...domain.gateway import TrendEventConsumer
 from ..infrastructure.config import Config
 from ..infrastructure.cache import RedisCache
-from ..infrastructure.mq import KafkaConsumer
+from ...infrastructure.mq import KafkaConsumer
 
 
 class WireHelper:
  • 记得创建或更新 __init__.py 文件:
    • service/domain/gateway/__init__.py
    • service/infrastructure/mq/__init__.py
    • service/trend/domain/gateway/__init__.py

调整 web 服务中的生产者的事件

添加 user_id 参数,service/web/application/executor/book_operator.py:

@@ -23,13 +23,14 @@ class BookOperator():
         book.id = id
         return book
 
-    def get_books(self, offset: int, query: str) -> List[Book]:
+    def get_books(self, offset: int, user_id: str, query: str) -> List[Book]:
         books = self.book_manager.get_books(offset, query)
-        # Send search query and its results
+        # Send a user's search query and its results
         if query:
+            k = query + ':' + user_id
             json_data = json.dumps([_convert(b)
                                    for b in books]).encode('utf-8')
-            self.mq_helper.send_event(query, json_data)
+            self.mq_helper.send_event(k, json_data)
         return books
 
     def get_trends(self, trend_url: str) -> List[Trend]:

读取 cookie 值并传递给 user_idservice/web/adapter/router.py:

@@ -1,4 +1,7 @@
 import logging
+import random
+import string
+import time
 
 from fastapi import FastAPI, HTTPException, Request
 from fastapi.responses import HTMLResponse
@@ -10,6 +13,16 @@ from ...domain.model import Book
 from ..infrastructure.config.config import RemoteServiceConfig
 
 
+FIELD_UID = "uid"
+
+
+def random_string(length):
+    random.seed(time.time())  # Using time as seed
+    charset = string.ascii_uppercase + "0123456789"
+    result = [random.choice(charset) for _ in range(length)]
+    return ''.join(result)
+
+
 class RestHandler:
     def __init__(self, logger: logging.Logger, remote: RemoteServiceConfig, book_operator: BookOperator):
         self._logger = logger
@@ -23,9 +36,9 @@ class RestHandler:
             self._logger.error(f"Failed to create: {e}")
             raise HTTPException(status_code=400, detail="Failed to create")
 
-    def get_books(self, offset: int, query: str):
+    def get_books(self, offset: int, user_id: str, query: str):
         try:
-            books = self.book_operator.get_books(offset, query)
+            books = self.book_operator.get_books(offset, user_id, query)
             return books
         except Exception as e:
             self._logger.error(f"Failed to get books: {e}")
@@ -44,10 +57,17 @@ def make_router(app: FastAPI, templates_dir: str, remote: RemoteServiceConfig, w
 
     @app.get("/", response_class=HTMLResponse)
     async def index_page(request: Request, q: str = ""):
-        books = rest_handler.book_operator.get_books(0, q)
-        trends = rest_handler.book_operator.get_trends(
-            rest_handler.remote.trend_url)
-        return templates.TemplateResponse(
+        user_id = request.cookies.get(FIELD_UID)
+        if not user_id:
+            user_id = random_string(5)
+        books = rest_handler.book_operator.get_books(0, user_id, q)
+        try:
+            trends = rest_handler.book_operator.get_trends(
+                rest_handler.remote.trend_url)
+        except Exception as e:
+            rest_handler._logger.warn(f"Failed to get trends: {e}")
+            trends = []
+        resp = templates.TemplateResponse(
             name="index.html", context={
                 "request": request,
                 "title": "LiteRank Book Store",
@@ -56,6 +76,8 @@ def make_router(app: FastAPI, templates_dir: str, remote: RemoteServiceConfig, w
                 "q": q,
             }
         )
+        resp.set_cookie(FIELD_UID, user_id, 3600*24*30)
+        return resp
 
     @app.post("/api/books", response_model=Book)
     async def create_book(b: dto.Book):
@@ -63,4 +85,4 @@ def make_router(app: FastAPI, templates_dir: str, remote: RemoteServiceConfig, w
 
     @app.get("/api/books")
     async def get_books(o: int = 0, q: str = ""):
-        return rest_handler.get_books(o, q)
+        return rest_handler.get_books(o, "", q)

调整热搜服务中的消费者

因为 key 的格式有所变化,此处热搜服务需要适配变化。

更新 service/trend/application/consumer/trend.py:

@@ -13,7 +13,9 @@ class TrendConsumer():
 
     def start(self):
         def process_event(key: bytes, data: bytes):
-            t = Trend(query=key.decode('utf-8'), books=json.loads(data))
+            parts = key.decode('utf-8').split(':')
+            query = parts[0]
+            t = Trend(query=query, books=json.loads(data))
             self.trend_manager.create_trend(t)
 
         self.event_consumer.consume_events(process_event)

添加 group_idservice/trend/application/wire_helper.py:

@@ -14,7 +14,7 @@ class WireHelper:
     def new(cls, c: Config):
         kv = RedisCache(c.cache.host, c.cache.port,
                         c.cache.password, c.cache.db)
-        consumer = KafkaConsumer(c.mq.brokers, c.mq.topic)
+        consumer = KafkaConsumer(c.mq.brokers, c.mq.topic, c.mq.group_id)
         return cls(kv, consumer)
 
     def trend_manager(self) -> TrendManager:

推荐服务的消费者

添加 service/domain/model/interest.py:

from dataclasses import dataclass


@dataclass
class Interest:
    user_id: str
    title: str
    author: str
    score: float

添加 service/recommendation/domain/gateway/interest_manager.py:

from abc import ABC, abstractmethod
from typing import List

from ....domain.model import Interest


class InterestManager(ABC):
    @abstractmethod
    def increase_interest(self, i: Interest):
        pass

    @abstractmethod
    def list_interests(self, user_id: str) -> List[Interest]:
        pass

安装 mongo 依赖:

pip3 install pymongo

准备 mongoDB 数据库:

  • 在机器上安装 mongoDB 并启动它。
  • 创建一个库,名为 lr_event_rec.
  • 在需要的 collections 上创建索引。

实现 InterestManagerservice/recommendation/infrastructure/database/mongo.py:

from typing import List

from pymongo import MongoClient, DESCENDING

from ...domain.gateway import InterestManager
from ....domain.model import Interest

COLL_REVIEW = "interests"


class MongoPersistence(InterestManager):
    def __init__(self, uri: str, db_name: str, page_size: int):
        self.client = MongoClient(uri)
        self.db = self.client[db_name]
        self.coll = self.db[COLL_REVIEW]
        self.page_size = page_size

    def increase_interest(self, i: Interest):
        filter_query = {
            "user_id": i.user_id,
            "title": i.title,
            "author": i.author,
        }
        update_query = {
            "$inc": {"score": 1}
        }
        self.coll.update_one(filter_query, update_query, upsert=True)

    def list_interests(self, user_id: str) -> List[Interest]:
        filter_query = {"user_id": user_id}
        cursor = self.coll.find(filter_query).sort(
            "score", DESCENDING).limit(self.page_size)
        return list(cursor)

添加 service/recommendation/infrastructure/config/config.py:

from dataclasses import dataclass
from typing import List
import yaml


@dataclass
class DBConfig:
    mongo_uri: str
    mongo_db_name: str


@dataclass
class MQConfig:
    brokers: List[str]
    topic: str
    group_id: str


@dataclass
class ApplicationConfig:
    page_size: int


@dataclass
class Config:
    app: ApplicationConfig
    db: DBConfig
    mq: MQConfig


def parseConfig(filename: str) -> Config:
    with open(filename, 'r') as f:
        data = yaml.safe_load(f)
        return Config(
            ApplicationConfig(**data['app']),
            DBConfig(**data['db']),
            MQConfig(**data['mq'])
        )

添加 service/recommendation/config.yml:

app:
  page_size: 10
db:
  mongo_uri: "mongodb://localhost:27017"
  mongo_db_name: "lr_event_rec"
mq:
  brokers:
    - localhost:9094
  topic: "lr-book-searches"
  group_id: "rec-svr"

添加 service/recommendation/application/consumer/interest.py:

import json
from typing import Dict, List

from ....domain.model import Interest
from ...domain.gateway import InterestManager
from ....domain.gateway import TrendEventConsumer


class InterestConsumer():

    def __init__(self, interest_manager: InterestManager, event_consumer: TrendEventConsumer):
        self.interest_manager = interest_manager
        self.event_consumer = event_consumer

    def start(self):
        def process_event(key: bytes, data: bytes):
            parts = key.decode('utf-8').split(':')
            if len(parts) == 1:
                # no user_id, ignore it
                return
            books: List[Dict] = json.loads(data)
            user_id = parts[1]
            for b in books:
                self.interest_manager.increase_interest(Interest(
                    user_id=user_id,
                    title=b['title'],
                    author=b['author'],
                    score=0
                ))
        self.event_consumer.consume_events(process_event)

    def get_event_consumer(self) -> TrendEventConsumer:
        return self.event_consumer

添加 service/recommendation/application/wire_helper.py:

from ..domain.gateway import InterestManager
from ...domain.gateway import TrendEventConsumer
from ..infrastructure.config import Config
from ..infrastructure.database import MongoPersistence
from ...infrastructure.mq import KafkaConsumer


class WireHelper:
    def __init__(self, noSQLPersistence: MongoPersistence, consumer: KafkaConsumer):
        self.noSQLPersistence = noSQLPersistence
        self.consumer = consumer

    @classmethod
    def new(cls, c: Config):
        mdb = MongoPersistence(
            c.db.mongo_uri, c.db.mongo_db_name, c.app.page_size)
        consumer = KafkaConsumer(c.mq.brokers, c.mq.topic, c.mq.group_id)
        return cls(mdb, consumer)

    def interest_manager(self) -> InterestManager:
        return self.noSQLPersistence

    def trend_event_consumer(self) -> TrendEventConsumer:
        return self.consumer

添加 service/recommendation/main.py:

import signal

from .application import WireHelper
from .application.consumer import InterestConsumer
from .infrastructure.config import parseConfig

CONFIG_FILENAME = "service/recommendation/config.yml"

c = parseConfig(CONFIG_FILENAME)
wire_helper = WireHelper.new(c)

# Run the consumer
tc = InterestConsumer(wire_helper.interest_manager(),
                      wire_helper.trend_event_consumer())
event_consumer = tc.get_event_consumer()


def sigterm_handler(signal, frame):
    event_consumer.stop()
    print("Consumer stopped. Exiting gracefully...")


signal.signal(signal.SIGTERM, sigterm_handler)
signal.signal(signal.SIGINT, sigterm_handler)

print("Started to consume events...")
tc.start()

记得处理 __init__.py 文件:

  • service/domain/model/__init__.py
  • service/recommendation/application/__init__.py
  • service/recommendation/application/consumer/__init__.py
  • service/recommendation/domain/gateway/__init__.py
  • service/recommendation/infrastructure/config/__init__.py
  • service/recommendation/infrastructure/database/__init__.py

最新 requirements.txt:

annotated-types==0.6.0
anyio==4.3.0
click==8.1.7
confluent-kafka==2.3.0
dnspython==2.6.1
fastapi==0.110.1
h11==0.14.0
httptools==0.6.1
idna==3.6
Jinja2==3.1.3
MarkupSafe==2.1.5
mysql-connector-python==8.3.0
pydantic==2.6.4
pydantic_core==2.16.3
pymongo==4.6.3
python-dotenv==1.0.1
PyYAML==6.0.1
redis==5.0.3
sniffio==1.3.1
starlette==0.37.2
typing_extensions==4.11.0
uvicorn==0.29.0
uvloop==0.19.0
watchfiles==0.21.0
websockets==12.0

启动推荐服务的消费者:

python3 -m service.recommendation.main

你应该能看到类似如下输出:

Started to consume events...

在另一个终端中,重启 web 服务并尝试多次搜索关键词”love“和”war“等。

然后,你将在 mongoDB 中看到如下记录:

lr_event_rec> db.interests.find();
[
  {
    _id: ObjectId('6617c3df9f5e3692de429083'),
    title: 'The Catcher in the Rye',
    author: 'J.D. Salinger',
    user_id: 'C32WB',
    score: 1
  },
  {
    _id: ObjectId('6617c3ef9f5e3692de42908e'),
    title: 'Pride and Prejudice',
    author: 'Jane Austen',
    user_id: 'C32WB',
    score: 1
  },
  {
    _id: ObjectId('6617c3ef9f5e3692de429090'),
    title: 'War and Peace',
    author: 'Leo Tolstoy',
    user_id: 'C32WB',
    score: 2
  }
]

这就表示你的消费者运作正常。