事件消费者
因为推荐服务的消费者和热搜服务的消费者有很多共通之处,故此处可以将共通部分移到更高层的目录以实现代码复用。
重组文件
- 调整 src/trend/domain/gateway/trend_manager.ts:
@@ -1,13 +1,6 @@
import { Trend } from "../../../domain/model";
-export type ConsumeCallback = (key: Buffer, value: Buffer) => void;
-
export interface TrendManager {
createTrend(t: Trend): Promise<number>;
topTrends(pageSize: number): Promise<Trend[]>;
}
-
-export interface TrendEventConsumer {
- consumeEvents(callback: ConsumeCallback): Promise<void>;
- stop(): Promise<void>;
-}
将共通的 TrendEventConsumer
和 ConsumeCallback
类型移出该文件。
- 将这些类型放入 src/domain/gateway/event_consumer.ts:
export type ConsumeCallback = (key: Buffer, value: Buffer) => void;
export interface TrendEventConsumer {
consumeEvents(callback: ConsumeCallback): Promise<void>;
stop(): Promise<void>;
}
- 移动 src/trend/infrastructure/mq/kafka.ts 到 src/infrastructure/mq/kafka_consumer.ts。
export class KafkaConsumer implements TrendEventConsumer {
...
constructor(brokers: string[], topic: string, groupId: string) {
添加一个给消费组的新参数 groupId
。
- 添加配置项,src/trend/infrastructure/config/config.ts:
@@ -15,6 +15,7 @@ export interface CacheConfig {
interface MQConfig {
brokers: string[];
topic: string;
+ groupId: string;
}
export interface Config {
- 填入配置值,src/trend/config.json:
@@ -11,6 +11,7 @@
},
"mq": {
"brokers": ["localhost:9094"],
- "topic": "lr-book-searches"
+ "topic": "lr-book-searches",
+ "groupId": "trend-svr"
}
}
- 调整 import 路径,src/trend/application/consumer/trend.ts:
@@ -1,5 +1,6 @@
import { Trend } from "../../../domain/model";
-import { TrendManager, TrendEventConsumer } from "../../domain/gateway";
+import { TrendManager } from "../../domain/gateway";
+import { TrendEventConsumer } from "../../../domain/gateway";
export class TrendConsumer {
private trendManager: TrendManager;
- 调整 import 路径,src/trend/application/wire_helper.ts:
@@ -1,7 +1,8 @@
import { Config } from "../infrastructure/config";
-import { TrendManager, TrendEventConsumer } from "../domain/gateway";
+import { TrendManager } from "../domain/gateway";
+import { TrendEventConsumer } from "../../domain/gateway";
import { RedisCache } from "../infrastructure/cache";
-import { KafkaConsumer } from "../infrastructure/mq";
+import { KafkaConsumer } from "../../infrastructure/mq";
// WireHelper is the helper for dependency injection
export class WireHelper {
@@ -10,7 +11,7 @@ export class WireHelper {
constructor(c: Config) {
this.kv_store = new RedisCache(c.cache);
- this.consumer = new KafkaConsumer(c.mq.brokers, c.mq.topic);
+ this.consumer = new KafkaConsumer(c.mq.brokers, c.mq.topic, c.mq.groupId);
}
trendManager(): TrendManager {
- 记得创建和更新 index.ts 文件:
- src/trend/domain/gateway/index.ts
- src/infrastructure/mq/index.ts
- src/domain/gateway/index.ts
调整 web 服务中的生产者的事件
添加 userId
参数,src/web/application/executor/book_operator.ts:
@@ -17,11 +17,16 @@ export class BookOperator {
return b;
}
- async getBooks(offset: number, query: string): Promise<Book[]> {
+ async getBooks(
+ offset: number,
+ userId: string,
+ query: string
+ ): Promise<Book[]> {
const books = await this.bookManager.getBooks(offset, query);
if (query) {
+ const k = query + ":" + userId;
const jsonData = JSON.stringify(books);
- await this.mqHelper.sendEvent(query, Buffer.from(jsonData, "utf8"));
+ await this.mqHelper.sendEvent(k, Buffer.from(jsonData, "utf8"));
}
return books;
}
安装 cookie-parser
:
npm i cookie-parser
以及其类型:
npm i -D @types/cookie-parser
读取 cookie 值并传递给 user id,src/web/adapter/router.ts:
@@ -1,4 +1,5 @@
import express, { Request, Response } from "express";
+import cookieParser from "cookie-parser";
import { engine } from "express-handlebars";
import { Book, Trend } from "../../domain/model";
@@ -6,6 +7,8 @@ import { BookOperator } from "../application/executor";
import { WireHelper } from "../application";
import { RemoteServiceConfig } from "../infrastructure/config";
+const FIELD_UID = "uid";
+
class RestHandler {
private bookOperator: BookOperator;
private remote: RemoteServiceConfig;
@@ -16,10 +19,15 @@ class RestHandler {
}
public async indexPage(req: Request, res: Response): Promise<void> {
+ let user_id = req.cookies.uid;
+ if (!user_id) {
+ user_id = randomString(5);
+ res.cookie(FIELD_UID, user_id, { maxAge: 1000 * 3600 * 24 * 30 });
+ }
let books: Book[];
const q = req.query.q as string;
try {
- books = await this.bookOperator.getBooks(0, q);
+ books = await this.bookOperator.getBooks(0, user_id, q);
} catch (err) {
console.warn(`Failed to get books: ${err}`);
books = [];
@@ -50,6 +58,7 @@ class RestHandler {
try {
const books = await this.bookOperator.getBooks(
offset,
+ "",
req.query.q as string
);
res.status(200).json(books);
@@ -98,6 +107,9 @@ export function InitApp(
// Middleware to parse JSON bodies
app.use(express.json());
+ // Use cookie parser middleware
+ app.use(cookieParser());
+
// Set Handlebars as the template engine
app.engine("handlebars", engine());
app.set("view engine", "handlebars");
@@ -108,3 +120,12 @@ export function InitApp(
app.use("", r);
return app;
}
+
+function randomString(length: number): string {
+ const charset: string = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
+ let result: string = "";
+ for (let i = 0; i < length; i++) {
+ result += charset.charAt(Math.floor(Math.random() * charset.length));
+ }
+ return result;
+}
调整热搜服务中的消费者
因为 key
的格式有所变化,此处热搜服务需要适配变化。
更新 src/trend/application/consumer/trend.ts:
start() {
const processEvent = async (key: Buffer, data: Buffer): Promise<void> => {
if (key && data) {
- const query = key.toString("utf-8");
+ const parts = key.toString("utf-8").split(":");
+ const query = parts[0];
const books: Book[] = JSON.parse(data.toString("utf-8"));
推荐服务的消费者
添加 src/domain/model/interest.ts:
export interface Interest {
userId: string;
title: string;
author: string;
score: number;
}
添加 src/recommendation/domain/gateway/interest_manager.ts:
import { Interest } from "../../../domain/model";
export interface InterestManager {
increaseInterest(i: Interest): Promise<void>;
listInterests(userId: string): Promise<Interest[]>;
}
安装 mongo
依赖:
npm i mongodb
准备 mongoDB 数据库:
- 在机器上安装 mongoDB 并启动它。
- 创建一个库,名为
lr_event_rec
.- 在需要的 collections 上创建索引。
实现 InterestManager
,src/recommendation/infrastructure/database/mongo.ts:
import { MongoClient, Db, Collection } from "mongodb";
import { Interest } from "../../../domain/model";
import { InterestManager } from "../../domain/gateway";
const COLL_INTERESTS = "interests";
export class MongoPersistence implements InterestManager {
private db!: Db;
private coll!: Collection;
private pageSize: number;
constructor(uri: string, dbName: string, pageSize: number) {
this.pageSize = pageSize;
const client = new MongoClient(uri);
client.connect().then(() => {
this.db = client.db(dbName);
this.coll = this.db.collection(COLL_INTERESTS);
});
}
async increaseInterest(interest: Interest): Promise<void> {
const filterQuery = {
user_id: interest.userId,
title: interest.title,
author: interest.author,
};
const updateQuery = {
$inc: { score: 1 },
};
await this.coll.updateOne(filterQuery, updateQuery, { upsert: true });
}
async listInterests(userId: string): Promise<Interest[]> {
const filterQuery = { user_id: userId };
const cursor = this.coll
.find(filterQuery)
.sort({ score: -1 })
.limit(this.pageSize);
const interestDocs = await cursor.toArray();
return interestDocs.map((doc) => ({
userId: doc.user_id,
title: doc.title,
author: doc.author,
score: doc.score,
}));
}
}
添加 src/recommendation/infrastructure/config/config.ts:
import { readFileSync } from "fs";
interface ApplicationConfig {
port: number;
pageSize: number;
}
export interface DatabaseConfig {
uri: string;
dbName: string;
}
interface MQConfig {
brokers: string[];
topic: string;
groupId: string;
}
export interface Config {
app: ApplicationConfig;
db: DatabaseConfig;
mq: MQConfig;
}
export function parseConfig(filename: string): Config {
return JSON.parse(readFileSync(filename, "utf-8"));
}
添加 src/recommendation/config.json:
{
"app": {
"port": 3002,
"pageSize": 10
},
"db": {
"uri": "mongodb://localhost:27017",
"dbName": "lr_event_rec"
},
"mq": {
"brokers": ["localhost:9094"],
"topic": "lr-book-searches",
"groupId": "rec-svr"
}
}
添加 src/recommendation/application/consumer/interest.ts:
import { Book } from "../../../domain/model";
import { InterestManager } from "../../domain/gateway";
import { TrendEventConsumer } from "../../../domain/gateway";
export class InterestConsumer {
private interestManager: InterestManager;
private eventConsumer: TrendEventConsumer;
constructor(i: InterestManager, e: TrendEventConsumer) {
this.interestManager = i;
this.eventConsumer = e;
}
start() {
const processEvent = async (key: Buffer, data: Buffer): Promise<void> => {
if (key && data) {
const parts = key.toString("utf-8").split(":");
if (parts.length === 1) {
// no user id, ignore it
return;
}
const query = parts[0];
const books: Book[] = JSON.parse(data.toString("utf-8"));
const userId = parts[1];
for (let book of books) {
this.interestManager.increaseInterest({
userId,
title: book.title,
author: book.author,
score: 0,
});
}
}
};
this.eventConsumer.consumeEvents(processEvent).catch((err) => {
console.log("Consumer error:", err);
});
}
getEventConsumer(): TrendEventConsumer {
return this.eventConsumer;
}
}
添加 src/recommendation/application/wire_helper.ts:
import { Config } from "../infrastructure/config";
import { InterestManager } from "../domain/gateway";
import { TrendEventConsumer } from "../../domain/gateway";
import { MongoPersistence } from "../infrastructure/database";
import { KafkaConsumer } from "../../infrastructure/mq";
// WireHelper is the helper for dependency injection
export class WireHelper {
private noSQLPersistence: MongoPersistence;
private consumer: KafkaConsumer;
constructor(c: Config) {
this.noSQLPersistence = new MongoPersistence(
c.db.uri,
c.db.dbName,
c.app.pageSize
);
this.consumer = new KafkaConsumer(c.mq.brokers, c.mq.topic, c.mq.groupId);
}
interestManager(): InterestManager {
return this.noSQLPersistence;
}
trendEventConsumer(): TrendEventConsumer {
return this.consumer;
}
}
添加 src/recommendation/app.ts:
import { WireHelper } from "./application";
import { parseConfig } from "./infrastructure/config";
import { InterestConsumer } from "./application/consumer";
const configFilename = "src/recommendation/config.json";
const c = parseConfig(configFilename);
const wireHelper = new WireHelper(c);
const tc = new InterestConsumer(
wireHelper.interestManager(),
wireHelper.trendEventConsumer()
);
tc.start();
添加启动 script,package.json:
@@ -6,6 +6,7 @@
"scripts": {
"dev-web": "ts-node src/web/app.ts",
"dev-trend": "tsc && node dist/trend/app.js",
+ "dev-rec": "ts-node src/recommendation/app.ts",
"build": "tsc"
},
"repository": {
@@ -29,6 +30,7 @@
"express-handlebars": "^7.1.2",
"ioredis": "^5.3.2",
"kafkajs": "^2.2.4",
+ "mongodb": "^6.5.0",
"mysql2": "^3.9.4"
},
"devDependencies": {
记得创建和更新 index.ts 文件:
- src/domain/model/index.ts
- src/recommendation/application/consumer/index.ts
- src/recommendation/application/index.ts
- src/recommendation/domain/gateway/index.ts
- src/recommendation/infrastructure/config/index.ts
- src/recommendation/infrastructure/database/index.ts
启动推荐服务的消费者:
npm run dev-rec
你应该能看到类似如下输出:
{"level":"INFO","timestamp":"2024-04-17T06:55:37.291Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"rec-svr"}
{"level":"INFO","timestamp":"2024-04-17T06:55:40.320Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","groupId":"rec-svr","memberId":"kafkajs-946c4ada-9ce8-4596-913e-977667990920","leaderId":"kafkajs-946c4ada-9ce8-4596-913e-977667990920","isLeader":true,"memberAssignment":{"lr-book-searches":[0]},"groupProtocol":"RoundRobinAssigner","duration":3027}
在另一个终端中,重启 web 服务并尝试多次搜索关键词”love“和”peace“等。
然后,你将在 mongoDB 中看到如下记录:
lr_event_rec> db.interests.find();
[
{
_id: ObjectId('661f7029f2f9d14bb96731a4'),
title: 'War and Peace',
user_id: 'VI0GS',
author: 'Leo Tolstoy',
score: 2
},
{
_id: ObjectId('661f7029f2f9d14bb96731a5'),
title: 'Pride and Prejudice',
user_id: 'VI0GS',
author: 'Jane Austen',
score: 1
},
{
_id: ObjectId('661f7030f2f9d14bb96731ac'),
title: 'The Great Gatsby',
user_id: 'VI0GS',
author: 'F. Scott Fitzgerald',
score: 1
}
]
这就表示你的消费者运行正常。