» Node.js:使用Kafka构建事件驱动微服务 » 4. 消费者:推荐服务 » 4.2 事件消费者

事件消费者

因为推荐服务的消费者和热搜服务的消费者有很多共通之处,故此处可以将共通部分移到更高层的目录以实现代码复用。

重组文件

  • 调整 src/trend/domain/gateway/trend_manager.ts:
@@ -1,13 +1,6 @@
 import { Trend } from "../../../domain/model";
 
-export type ConsumeCallback = (key: Buffer, value: Buffer) => void;
-
 export interface TrendManager {
   createTrend(t: Trend): Promise<number>;
   topTrends(pageSize: number): Promise<Trend[]>;
 }
-
-export interface TrendEventConsumer {
-  consumeEvents(callback: ConsumeCallback): Promise<void>;
-  stop(): Promise<void>;
-}

将共通的 TrendEventConsumerConsumeCallback 类型移出该文件。

  • 将这些类型放入 src/domain/gateway/event_consumer.ts:
export type ConsumeCallback = (key: Buffer, value: Buffer) => void;

export interface TrendEventConsumer {
  consumeEvents(callback: ConsumeCallback): Promise<void>;
  stop(): Promise<void>;
}
  • 移动 src/trend/infrastructure/mq/kafka.tssrc/infrastructure/mq/kafka_consumer.ts
export class KafkaConsumer implements TrendEventConsumer {
  ...

  constructor(brokers: string[], topic: string, groupId: string) {

添加一个给消费组的新参数 groupId

  • 添加配置项,src/trend/infrastructure/config/config.ts:
@@ -15,6 +15,7 @@ export interface CacheConfig {
 interface MQConfig {
   brokers: string[];
   topic: string;
+  groupId: string;
 }
 
 export interface Config {
  • 填入配置值,src/trend/config.json:
@@ -11,6 +11,7 @@
   },
   "mq": {
     "brokers": ["localhost:9094"],
-    "topic": "lr-book-searches"
+    "topic": "lr-book-searches",
+    "groupId": "trend-svr"
   }
 }
  • 调整 import 路径,src/trend/application/consumer/trend.ts:
@@ -1,5 +1,6 @@
 import { Trend } from "../../../domain/model";
-import { TrendManager, TrendEventConsumer } from "../../domain/gateway";
+import { TrendManager } from "../../domain/gateway";
+import { TrendEventConsumer } from "../../../domain/gateway";
 
 export class TrendConsumer {
   private trendManager: TrendManager;
  • 调整 import 路径,src/trend/application/wire_helper.ts:
@@ -1,7 +1,8 @@
 import { Config } from "../infrastructure/config";
-import { TrendManager, TrendEventConsumer } from "../domain/gateway";
+import { TrendManager } from "../domain/gateway";
+import { TrendEventConsumer } from "../../domain/gateway";
 import { RedisCache } from "../infrastructure/cache";
-import { KafkaConsumer } from "../infrastructure/mq";
+import { KafkaConsumer } from "../../infrastructure/mq";
 
 // WireHelper is the helper for dependency injection
 export class WireHelper {
@@ -10,7 +11,7 @@ export class WireHelper {
 
   constructor(c: Config) {
     this.kv_store = new RedisCache(c.cache);
-    this.consumer = new KafkaConsumer(c.mq.brokers, c.mq.topic);
+    this.consumer = new KafkaConsumer(c.mq.brokers, c.mq.topic, c.mq.groupId);
   }
 
   trendManager(): TrendManager {
  • 记得创建和更新 index.ts 文件:
    • src/trend/domain/gateway/index.ts
    • src/infrastructure/mq/index.ts
    • src/domain/gateway/index.ts

调整 web 服务中的生产者的事件

添加 userId 参数,src/web/application/executor/book_operator.ts:

@@ -17,11 +17,16 @@ export class BookOperator {
     return b;
   }
 
-  async getBooks(offset: number, query: string): Promise<Book[]> {
+  async getBooks(
+    offset: number,
+    userId: string,
+    query: string
+  ): Promise<Book[]> {
     const books = await this.bookManager.getBooks(offset, query);
     if (query) {
+      const k = query + ":" + userId;
       const jsonData = JSON.stringify(books);
-      await this.mqHelper.sendEvent(query, Buffer.from(jsonData, "utf8"));
+      await this.mqHelper.sendEvent(k, Buffer.from(jsonData, "utf8"));
     }
     return books;
   }

安装 cookie-parser

npm i cookie-parser

以及其类型:

npm i -D @types/cookie-parser

读取 cookie 值并传递给 user idsrc/web/adapter/router.ts:

@@ -1,4 +1,5 @@
 import express, { Request, Response } from "express";
+import cookieParser from "cookie-parser";
 import { engine } from "express-handlebars";
 
 import { Book, Trend } from "../../domain/model";
@@ -6,6 +7,8 @@ import { BookOperator } from "../application/executor";
 import { WireHelper } from "../application";
 import { RemoteServiceConfig } from "../infrastructure/config";
 
+const FIELD_UID = "uid";
+
 class RestHandler {
   private bookOperator: BookOperator;
   private remote: RemoteServiceConfig;
@@ -16,10 +19,15 @@ class RestHandler {
   }
 
   public async indexPage(req: Request, res: Response): Promise<void> {
+    let user_id = req.cookies.uid;
+    if (!user_id) {
+      user_id = randomString(5);
+      res.cookie(FIELD_UID, user_id, { maxAge: 1000 * 3600 * 24 * 30 });
+    }
     let books: Book[];
     const q = req.query.q as string;
     try {
-      books = await this.bookOperator.getBooks(0, q);
+      books = await this.bookOperator.getBooks(0, user_id, q);
     } catch (err) {
       console.warn(`Failed to get books: ${err}`);
       books = [];
@@ -50,6 +58,7 @@ class RestHandler {
     try {
       const books = await this.bookOperator.getBooks(
         offset,
+        "",
         req.query.q as string
       );
       res.status(200).json(books);
@@ -98,6 +107,9 @@ export function InitApp(
   // Middleware to parse JSON bodies
   app.use(express.json());
 
+  // Use cookie parser middleware
+  app.use(cookieParser());
+
   // Set Handlebars as the template engine
   app.engine("handlebars", engine());
   app.set("view engine", "handlebars");
@@ -108,3 +120,12 @@ export function InitApp(
   app.use("", r);
   return app;
 }
+
+function randomString(length: number): string {
+  const charset: string = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
+  let result: string = "";
+  for (let i = 0; i < length; i++) {
+    result += charset.charAt(Math.floor(Math.random() * charset.length));
+  }
+  return result;
+}

调整热搜服务中的消费者

因为 key 的格式有所变化,此处热搜服务需要适配变化。

更新 src/trend/application/consumer/trend.ts:

   start() {
     const processEvent = async (key: Buffer, data: Buffer): Promise<void> => {
       if (key && data) {
-        const query = key.toString("utf-8");
+        const parts = key.toString("utf-8").split(":");
+        const query = parts[0];
         const books: Book[] = JSON.parse(data.toString("utf-8"));

推荐服务的消费者

添加 src/domain/model/interest.ts:

export interface Interest {
  userId: string;
  title: string;
  author: string;
  score: number;
}

添加 src/recommendation/domain/gateway/interest_manager.ts:

import { Interest } from "../../../domain/model";

export interface InterestManager {
  increaseInterest(i: Interest): Promise<void>;
  listInterests(userId: string): Promise<Interest[]>;
}

安装 mongo 依赖:

npm i mongodb

准备 mongoDB 数据库:

  • 在机器上安装 mongoDB 并启动它。
  • 创建一个库,名为 lr_event_rec.
  • 在需要的 collections 上创建索引。

实现 InterestManagersrc/recommendation/infrastructure/database/mongo.ts:

import { MongoClient, Db, Collection } from "mongodb";

import { Interest } from "../../../domain/model";
import { InterestManager } from "../../domain/gateway";

const COLL_INTERESTS = "interests";

export class MongoPersistence implements InterestManager {
  private db!: Db;
  private coll!: Collection;
  private pageSize: number;

  constructor(uri: string, dbName: string, pageSize: number) {
    this.pageSize = pageSize;
    const client = new MongoClient(uri);
    client.connect().then(() => {
      this.db = client.db(dbName);
      this.coll = this.db.collection(COLL_INTERESTS);
    });
  }

  async increaseInterest(interest: Interest): Promise<void> {
    const filterQuery = {
      user_id: interest.userId,
      title: interest.title,
      author: interest.author,
    };
    const updateQuery = {
      $inc: { score: 1 },
    };
    await this.coll.updateOne(filterQuery, updateQuery, { upsert: true });
  }

  async listInterests(userId: string): Promise<Interest[]> {
    const filterQuery = { user_id: userId };
    const cursor = this.coll
      .find(filterQuery)
      .sort({ score: -1 })
      .limit(this.pageSize);
    const interestDocs = await cursor.toArray();
    return interestDocs.map((doc) => ({
      userId: doc.user_id,
      title: doc.title,
      author: doc.author,
      score: doc.score,
    }));
  }
}

添加 src/recommendation/infrastructure/config/config.ts:

import { readFileSync } from "fs";

interface ApplicationConfig {
  port: number;
  pageSize: number;
}

export interface DatabaseConfig {
  uri: string;
  dbName: string;
}

interface MQConfig {
  brokers: string[];
  topic: string;
  groupId: string;
}

export interface Config {
  app: ApplicationConfig;
  db: DatabaseConfig;
  mq: MQConfig;
}

export function parseConfig(filename: string): Config {
  return JSON.parse(readFileSync(filename, "utf-8"));
}

添加 src/recommendation/config.json:

{
  "app": {
    "port": 3002,
    "pageSize": 10
  },
  "db": {
    "uri": "mongodb://localhost:27017",
    "dbName": "lr_event_rec"
  },
  "mq": {
    "brokers": ["localhost:9094"],
    "topic": "lr-book-searches",
    "groupId": "rec-svr"
  }
}

添加 src/recommendation/application/consumer/interest.ts:

import { Book } from "../../../domain/model";
import { InterestManager } from "../../domain/gateway";
import { TrendEventConsumer } from "../../../domain/gateway";

export class InterestConsumer {
  private interestManager: InterestManager;
  private eventConsumer: TrendEventConsumer;

  constructor(i: InterestManager, e: TrendEventConsumer) {
    this.interestManager = i;
    this.eventConsumer = e;
  }

  start() {
    const processEvent = async (key: Buffer, data: Buffer): Promise<void> => {
      if (key && data) {
        const parts = key.toString("utf-8").split(":");
        if (parts.length === 1) {
          // no user id, ignore it
          return;
        }
        const query = parts[0];
        const books: Book[] = JSON.parse(data.toString("utf-8"));
        const userId = parts[1];
        for (let book of books) {
          this.interestManager.increaseInterest({
            userId,
            title: book.title,
            author: book.author,
            score: 0,
          });
        }
      }
    };

    this.eventConsumer.consumeEvents(processEvent).catch((err) => {
      console.log("Consumer error:", err);
    });
  }

  getEventConsumer(): TrendEventConsumer {
    return this.eventConsumer;
  }
}

添加 src/recommendation/application/wire_helper.ts:

import { Config } from "../infrastructure/config";
import { InterestManager } from "../domain/gateway";
import { TrendEventConsumer } from "../../domain/gateway";
import { MongoPersistence } from "../infrastructure/database";
import { KafkaConsumer } from "../../infrastructure/mq";

// WireHelper is the helper for dependency injection
export class WireHelper {
  private noSQLPersistence: MongoPersistence;
  private consumer: KafkaConsumer;

  constructor(c: Config) {
    this.noSQLPersistence = new MongoPersistence(
      c.db.uri,
      c.db.dbName,
      c.app.pageSize
    );
    this.consumer = new KafkaConsumer(c.mq.brokers, c.mq.topic, c.mq.groupId);
  }

  interestManager(): InterestManager {
    return this.noSQLPersistence;
  }

  trendEventConsumer(): TrendEventConsumer {
    return this.consumer;
  }
}

添加 src/recommendation/app.ts:

import { WireHelper } from "./application";
import { parseConfig } from "./infrastructure/config";
import { InterestConsumer } from "./application/consumer";

const configFilename = "src/recommendation/config.json";

const c = parseConfig(configFilename);
const wireHelper = new WireHelper(c);

const tc = new InterestConsumer(
  wireHelper.interestManager(),
  wireHelper.trendEventConsumer()
);
tc.start();

添加启动 script,package.json:

@@ -6,6 +6,7 @@
   "scripts": {
     "dev-web": "ts-node src/web/app.ts",
     "dev-trend": "tsc && node dist/trend/app.js",
+    "dev-rec": "ts-node src/recommendation/app.ts",
     "build": "tsc"
   },
   "repository": {
@@ -29,6 +30,7 @@
     "express-handlebars": "^7.1.2",
     "ioredis": "^5.3.2",
     "kafkajs": "^2.2.4",
+    "mongodb": "^6.5.0",
     "mysql2": "^3.9.4"
   },
   "devDependencies": {

记得创建和更新 index.ts 文件:

  • src/domain/model/index.ts
  • src/recommendation/application/consumer/index.ts
  • src/recommendation/application/index.ts
  • src/recommendation/domain/gateway/index.ts
  • src/recommendation/infrastructure/config/index.ts
  • src/recommendation/infrastructure/database/index.ts

启动推荐服务的消费者:

npm run dev-rec

你应该能看到类似如下输出:

{"level":"INFO","timestamp":"2024-04-17T06:55:37.291Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"rec-svr"}
{"level":"INFO","timestamp":"2024-04-17T06:55:40.320Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","groupId":"rec-svr","memberId":"kafkajs-946c4ada-9ce8-4596-913e-977667990920","leaderId":"kafkajs-946c4ada-9ce8-4596-913e-977667990920","isLeader":true,"memberAssignment":{"lr-book-searches":[0]},"groupProtocol":"RoundRobinAssigner","duration":3027}

在另一个终端中,重启 web 服务并尝试多次搜索关键词”love“和”peace“等。

然后,你将在 mongoDB 中看到如下记录:

lr_event_rec> db.interests.find();
[
  {
    _id: ObjectId('661f7029f2f9d14bb96731a4'),
    title: 'War and Peace',
    user_id: 'VI0GS',
    author: 'Leo Tolstoy',
    score: 2
  },
  {
    _id: ObjectId('661f7029f2f9d14bb96731a5'),
    title: 'Pride and Prejudice',
    user_id: 'VI0GS',
    author: 'Jane Austen',
    score: 1
  },
  {
    _id: ObjectId('661f7030f2f9d14bb96731ac'),
    title: 'The Great Gatsby',
    user_id: 'VI0GS',
    author: 'F. Scott Fitzgerald',
    score: 1
  }
]

这就表示你的消费者运行正常。

上页下页