事件消费者
因为推荐服务的消费者和热搜服务的消费者有很多共通之处,故此处可以将共通部分移到更高层的目录以实现代码复用。
重组文件
- 调整 service/trend/domain/gateway/trend_manager.py:
@@ -1,10 +1,8 @@
from abc import ABC, abstractmethod
-from typing import Callable, List
+from typing import List
from ....domain.model import Trend
-ConsumeCallback = Callable[[bytes, bytes], None]
-
class TrendManager(ABC):
@abstractmethod
@@ -14,13 +12,3 @@ class TrendManager(ABC):
@abstractmethod
def top_trends(self, page_size: int) -> List[Trend]:
pass
-
-
-class TrendEventConsumer(ABC):
- @abstractmethod
- def consume_events(self, callback: ConsumeCallback):
- pass
-
- @abstractmethod
- def stop(self):
- pass
将共通的 TrendEventConsumer
和 ConsumeCallback
类型移出该文件。
- 将这些类型放入 service/domain/gateway/event_consumer.py:
from abc import ABC, abstractmethod
from typing import Callable
ConsumeCallback = Callable[[bytes, bytes], None]
class TrendEventConsumer(ABC):
@abstractmethod
def consume_events(self, callback: ConsumeCallback):
pass
@abstractmethod
def stop(self):
pass
- 移动 service/trend/infrastructure/mq/kafka.py 到 service/infrastructure/mq/kafka_consumer.py。
class KafkaConsumer(TrendEventConsumer):
def __init__(self, brokers: List[str], topic: str, group_id: str):
添加一个给消费组的新参数 group_id
。
- 添加配置项,service/trend/infrastructure/config/config.py:
@@ -8,6 +8,7 @@ import yaml
class MQConfig:
brokers: List[str]
topic: str
+ group_id: str
@dataclass
- 填入配置值,service/trend/config.yml:
@@ -7,3 +7,4 @@ mq:
brokers:
- localhost:9094
topic: lr-book-searches
+ group_id: trend-svr
- 调整 import 路径,service/trend/application/consumer/trend.py:
@@ -1,7 +1,8 @@
import json
from ....domain.model import Trend
-from ...domain.gateway import TrendManager, TrendEventConsumer
+from ...domain.gateway import TrendManager
+from ....domain.gateway import TrendEventConsumer
class TrendConsumer():
- 调整 import 路径,service/trend/application/wire_helper.py:
@@ -1,7 +1,8 @@
-from ..domain.gateway import TrendManager, TrendEventConsumer
+from ..domain.gateway import TrendManager
+from ...domain.gateway import TrendEventConsumer
from ..infrastructure.config import Config
from ..infrastructure.cache import RedisCache
-from ..infrastructure.mq import KafkaConsumer
+from ...infrastructure.mq import KafkaConsumer
class WireHelper:
- 记得创建或更新 __init__.py 文件:
- service/domain/gateway/__init__.py
- service/infrastructure/mq/__init__.py
- service/trend/domain/gateway/__init__.py
调整 web 服务中的生产者的事件
添加 user_id
参数,service/web/application/executor/book_operator.py:
@@ -23,13 +23,14 @@ class BookOperator():
book.id = id
return book
- def get_books(self, offset: int, query: str) -> List[Book]:
+ def get_books(self, offset: int, user_id: str, query: str) -> List[Book]:
books = self.book_manager.get_books(offset, query)
- # Send search query and its results
+ # Send a user's search query and its results
if query:
+ k = query + ':' + user_id
json_data = json.dumps([_convert(b)
for b in books]).encode('utf-8')
- self.mq_helper.send_event(query, json_data)
+ self.mq_helper.send_event(k, json_data)
return books
def get_trends(self, trend_url: str) -> List[Trend]:
读取 cookie 值并传递给 user_id
,service/web/adapter/router.py:
@@ -1,4 +1,7 @@
import logging
+import random
+import string
+import time
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import HTMLResponse
@@ -10,6 +13,16 @@ from ...domain.model import Book
from ..infrastructure.config.config import RemoteServiceConfig
+FIELD_UID = "uid"
+
+
+def random_string(length):
+ random.seed(time.time()) # Using time as seed
+ charset = string.ascii_uppercase + "0123456789"
+ result = [random.choice(charset) for _ in range(length)]
+ return ''.join(result)
+
+
class RestHandler:
def __init__(self, logger: logging.Logger, remote: RemoteServiceConfig, book_operator: BookOperator):
self._logger = logger
@@ -23,9 +36,9 @@ class RestHandler:
self._logger.error(f"Failed to create: {e}")
raise HTTPException(status_code=400, detail="Failed to create")
- def get_books(self, offset: int, query: str):
+ def get_books(self, offset: int, user_id: str, query: str):
try:
- books = self.book_operator.get_books(offset, query)
+ books = self.book_operator.get_books(offset, user_id, query)
return books
except Exception as e:
self._logger.error(f"Failed to get books: {e}")
@@ -44,10 +57,17 @@ def make_router(app: FastAPI, templates_dir: str, remote: RemoteServiceConfig, w
@app.get("/", response_class=HTMLResponse)
async def index_page(request: Request, q: str = ""):
- books = rest_handler.book_operator.get_books(0, q)
- trends = rest_handler.book_operator.get_trends(
- rest_handler.remote.trend_url)
- return templates.TemplateResponse(
+ user_id = request.cookies.get(FIELD_UID)
+ if not user_id:
+ user_id = random_string(5)
+ books = rest_handler.book_operator.get_books(0, user_id, q)
+ try:
+ trends = rest_handler.book_operator.get_trends(
+ rest_handler.remote.trend_url)
+ except Exception as e:
+ rest_handler._logger.warn(f"Failed to get trends: {e}")
+ trends = []
+ resp = templates.TemplateResponse(
name="index.html", context={
"request": request,
"title": "LiteRank Book Store",
@@ -56,6 +76,8 @@ def make_router(app: FastAPI, templates_dir: str, remote: RemoteServiceConfig, w
"q": q,
}
)
+ resp.set_cookie(FIELD_UID, user_id, 3600*24*30)
+ return resp
@app.post("/api/books", response_model=Book)
async def create_book(b: dto.Book):
@@ -63,4 +85,4 @@ def make_router(app: FastAPI, templates_dir: str, remote: RemoteServiceConfig, w
@app.get("/api/books")
async def get_books(o: int = 0, q: str = ""):
- return rest_handler.get_books(o, q)
+ return rest_handler.get_books(o, "", q)
调整热搜服务中的消费者
因为 key
的格式有所变化,此处热搜服务需要适配变化。
更新 service/trend/application/consumer/trend.py:
@@ -13,7 +13,9 @@ class TrendConsumer():
def start(self):
def process_event(key: bytes, data: bytes):
- t = Trend(query=key.decode('utf-8'), books=json.loads(data))
+ parts = key.decode('utf-8').split(':')
+ query = parts[0]
+ t = Trend(query=query, books=json.loads(data))
self.trend_manager.create_trend(t)
self.event_consumer.consume_events(process_event)
添加 group_id
,service/trend/application/wire_helper.py:
@@ -14,7 +14,7 @@ class WireHelper:
def new(cls, c: Config):
kv = RedisCache(c.cache.host, c.cache.port,
c.cache.password, c.cache.db)
- consumer = KafkaConsumer(c.mq.brokers, c.mq.topic)
+ consumer = KafkaConsumer(c.mq.brokers, c.mq.topic, c.mq.group_id)
return cls(kv, consumer)
def trend_manager(self) -> TrendManager:
推荐服务的消费者
添加 service/domain/model/interest.py:
from dataclasses import dataclass
@dataclass
class Interest:
user_id: str
title: str
author: str
score: float
添加 service/recommendation/domain/gateway/interest_manager.py:
from abc import ABC, abstractmethod
from typing import List
from ....domain.model import Interest
class InterestManager(ABC):
@abstractmethod
def increase_interest(self, i: Interest):
pass
@abstractmethod
def list_interests(self, user_id: str) -> List[Interest]:
pass
安装 mongo
依赖:
pip3 install pymongo
准备 mongoDB 数据库:
- 在机器上安装 mongoDB 并启动它。
- 创建一个库,名为
lr_event_rec
.- 在需要的 collections 上创建索引。
实现 InterestManager
,service/recommendation/infrastructure/database/mongo.py:
from typing import List
from pymongo import MongoClient, DESCENDING
from ...domain.gateway import InterestManager
from ....domain.model import Interest
COLL_REVIEW = "interests"
class MongoPersistence(InterestManager):
def __init__(self, uri: str, db_name: str, page_size: int):
self.client = MongoClient(uri)
self.db = self.client[db_name]
self.coll = self.db[COLL_REVIEW]
self.page_size = page_size
def increase_interest(self, i: Interest):
filter_query = {
"user_id": i.user_id,
"title": i.title,
"author": i.author,
}
update_query = {
"$inc": {"score": 1}
}
self.coll.update_one(filter_query, update_query, upsert=True)
def list_interests(self, user_id: str) -> List[Interest]:
filter_query = {"user_id": user_id}
cursor = self.coll.find(filter_query).sort(
"score", DESCENDING).limit(self.page_size)
return list(cursor)
添加 service/recommendation/infrastructure/config/config.py:
from dataclasses import dataclass
from typing import List
import yaml
@dataclass
class DBConfig:
mongo_uri: str
mongo_db_name: str
@dataclass
class MQConfig:
brokers: List[str]
topic: str
group_id: str
@dataclass
class ApplicationConfig:
page_size: int
@dataclass
class Config:
app: ApplicationConfig
db: DBConfig
mq: MQConfig
def parseConfig(filename: str) -> Config:
with open(filename, 'r') as f:
data = yaml.safe_load(f)
return Config(
ApplicationConfig(**data['app']),
DBConfig(**data['db']),
MQConfig(**data['mq'])
)
添加 service/recommendation/config.yml:
app:
page_size: 10
db:
mongo_uri: "mongodb://localhost:27017"
mongo_db_name: "lr_event_rec"
mq:
brokers:
- localhost:9094
topic: "lr-book-searches"
group_id: "rec-svr"
添加 service/recommendation/application/consumer/interest.py:
import json
from typing import Dict, List
from ....domain.model import Interest
from ...domain.gateway import InterestManager
from ....domain.gateway import TrendEventConsumer
class InterestConsumer():
def __init__(self, interest_manager: InterestManager, event_consumer: TrendEventConsumer):
self.interest_manager = interest_manager
self.event_consumer = event_consumer
def start(self):
def process_event(key: bytes, data: bytes):
parts = key.decode('utf-8').split(':')
if len(parts) == 1:
# no user_id, ignore it
return
books: List[Dict] = json.loads(data)
user_id = parts[1]
for b in books:
self.interest_manager.increase_interest(Interest(
user_id=user_id,
title=b['title'],
author=b['author'],
score=0
))
self.event_consumer.consume_events(process_event)
def get_event_consumer(self) -> TrendEventConsumer:
return self.event_consumer
添加 service/recommendation/application/wire_helper.py:
from ..domain.gateway import InterestManager
from ...domain.gateway import TrendEventConsumer
from ..infrastructure.config import Config
from ..infrastructure.database import MongoPersistence
from ...infrastructure.mq import KafkaConsumer
class WireHelper:
def __init__(self, noSQLPersistence: MongoPersistence, consumer: KafkaConsumer):
self.noSQLPersistence = noSQLPersistence
self.consumer = consumer
@classmethod
def new(cls, c: Config):
mdb = MongoPersistence(
c.db.mongo_uri, c.db.mongo_db_name, c.app.page_size)
consumer = KafkaConsumer(c.mq.brokers, c.mq.topic, c.mq.group_id)
return cls(mdb, consumer)
def interest_manager(self) -> InterestManager:
return self.noSQLPersistence
def trend_event_consumer(self) -> TrendEventConsumer:
return self.consumer
添加 service/recommendation/main.py:
import signal
from .application import WireHelper
from .application.consumer import InterestConsumer
from .infrastructure.config import parseConfig
CONFIG_FILENAME = "service/recommendation/config.yml"
c = parseConfig(CONFIG_FILENAME)
wire_helper = WireHelper.new(c)
# Run the consumer
tc = InterestConsumer(wire_helper.interest_manager(),
wire_helper.trend_event_consumer())
event_consumer = tc.get_event_consumer()
def sigterm_handler(signal, frame):
event_consumer.stop()
print("Consumer stopped. Exiting gracefully...")
signal.signal(signal.SIGTERM, sigterm_handler)
signal.signal(signal.SIGINT, sigterm_handler)
print("Started to consume events...")
tc.start()
记得处理 __init__.py 文件:
- service/domain/model/__init__.py
- service/recommendation/application/__init__.py
- service/recommendation/application/consumer/__init__.py
- service/recommendation/domain/gateway/__init__.py
- service/recommendation/infrastructure/config/__init__.py
- service/recommendation/infrastructure/database/__init__.py
最新 requirements.txt:
annotated-types==0.6.0
anyio==4.3.0
click==8.1.7
confluent-kafka==2.3.0
dnspython==2.6.1
fastapi==0.110.1
h11==0.14.0
httptools==0.6.1
idna==3.6
Jinja2==3.1.3
MarkupSafe==2.1.5
mysql-connector-python==8.3.0
pydantic==2.6.4
pydantic_core==2.16.3
pymongo==4.6.3
python-dotenv==1.0.1
PyYAML==6.0.1
redis==5.0.3
sniffio==1.3.1
starlette==0.37.2
typing_extensions==4.11.0
uvicorn==0.29.0
uvloop==0.19.0
watchfiles==0.21.0
websockets==12.0
启动推荐服务的消费者:
python3 -m service.recommendation.main
你应该能看到类似如下输出:
Started to consume events...
在另一个终端中,重启 web 服务并尝试多次搜索关键词”love“和”war“等。
然后,你将在 mongoDB 中看到如下记录:
lr_event_rec> db.interests.find();
[
{
_id: ObjectId('6617c3df9f5e3692de429083'),
title: 'The Catcher in the Rye',
author: 'J.D. Salinger',
user_id: 'C32WB',
score: 1
},
{
_id: ObjectId('6617c3ef9f5e3692de42908e'),
title: 'Pride and Prejudice',
author: 'Jane Austen',
user_id: 'C32WB',
score: 1
},
{
_id: ObjectId('6617c3ef9f5e3692de429090'),
title: 'War and Peace',
author: 'Leo Tolstoy',
user_id: 'C32WB',
score: 2
}
]
这就表示你的消费者运作正常。