» Node.js:使用Kafka构建事件驱动微服务 » 3. 消费者:热搜服务 » 3.3 事件消费者

事件消费者

调整 src/domain/model/trend.ts:

@@ -3,5 +3,4 @@ import { Book } from ".";
 export interface Trend {
   query: string;
   books: Book[];
-  created_at: Date | null;
 }

可以使用 kafka 事件的时间戳来进行时间追踪,此处可以移去时间字段。

调整 src/trend/domain/gateway/trend_manager.ts:

@@ -1,6 +1,13 @@
 import { Trend } from "../../../domain/model";
 
+export type ConsumeCallback = (key: Buffer, value: Buffer) => void;
+
 export interface TrendManager {
   createTrend(t: Trend): Promise<number>;
-  topTrends(offset: number): Promise<Trend[]>;
+  topTrends(pageSize: number): Promise<Trend[]>;
+}
+
+export interface TrendEventConsumer {
+  consumeEvents(callback: ConsumeCallback): Promise<void>;
+  stop(): Promise<void>;
 }

实现 TrendEventConsumersrc/trend/infrastructure/mq/kafka.ts:

import { Kafka, Consumer, EachMessagePayload } from "kafkajs";

import { ConsumeCallback, TrendEventConsumer } from "../../domain/gateway";

const GROUP_ID = "trend-svr";

export class KafkaConsumer implements TrendEventConsumer {
  private consumer: Consumer;
  private topic: string;

  constructor(brokers: string[], topic: string) {
    const kafka = new Kafka({ brokers });
    this.consumer = kafka.consumer({ groupId: GROUP_ID });
    this.topic = topic;
  }

  async consumeEvents(callback: ConsumeCallback): Promise<void> {
    await this.consumer.connect();
    await this.consumer.subscribe({ topic: this.topic });

    await this.consumer.run({
      eachMessage: async ({
        topic,
        partition,
        message,
      }: EachMessagePayload) => {
        if (message.key && message.value) {
          await callback(message.key, message.value);
        } else {
          console.warn(`Null message from ${topic}-${partition}`);
        }
      },
    });
  }

  async stop(): Promise<void> {
    await this.consumer.disconnect();
  }
}

调整 TrendManager 实现,src/trend/infrastructure/cache/redis.ts:

@@ -19,7 +19,6 @@ export class RedisCache implements TrendManager {
       commandTimeout: c.timeout,
     };
     this.client = new Redis(options);
-    console.log("Connected to Redis");
   }
 
   async createTrend(t: Trend): Promise<number> {
@@ -32,17 +31,17 @@ export class RedisCache implements TrendManager {
     return Number(score);
   }
 
-  async topTrends(offset: number): Promise<Trend[]> {
+  async topTrends(pageSize: number): Promise<Trend[]> {
     const topItems = await this.client.zrevrange(
       trendsKey,
       0,
-      offset,
+      pageSize - 1,
       "WITHSCORES"
     );
     const trends: Trend[] = [];
     for (let i = 0; i < topItems.length; i += 2) {
       const query = topItems[i];
-      const t = { query: query, books: [], created_at: null };
+      const t = { query: query, books: [] };
       const k = queryKeyPrefix + query;
       const value = await this.client.get(k);
       if (value !== null) {

调整 src/trend/application/wire_helper.ts:

@@ -1,16 +1,23 @@
 import { Config } from "../infrastructure/config";
-import { TrendManager } from "../domain/gateway";
+import { TrendManager, TrendEventConsumer } from "../domain/gateway";
 import { RedisCache } from "../infrastructure/cache";
+import { KafkaConsumer } from "../infrastructure/mq";
 
 // WireHelper is the helper for dependency injection
 export class WireHelper {
   private kv_store: RedisCache;
+  private consumer: KafkaConsumer;
 
   constructor(c: Config) {
     this.kv_store = new RedisCache(c.cache);
+    this.consumer = new KafkaConsumer(c.mq.brokers, c.mq.topic);
   }
 
   trendManager(): TrendManager {
     return this.kv_store;
   }
+
+  trendEventConsumer(): TrendEventConsumer {
+    return this.consumer;
+  }
 }

添加配置项,src/trend/infrastructure/config/config.ts:

@@ -12,9 +12,15 @@ export interface CacheConfig {
   timeout: number; // in milliseconds
 }
 
+interface MQConfig {
+  brokers: string[];
+  topic: string;
+}
+
 export interface Config {
   app: ApplicationConfig;
   cache: CacheConfig;
+  mq: MQConfig;
 }
 
 export function parseConfig(filename: string): Config {

填入配置值,src/trend/config.json:

@@ -8,5 +8,9 @@
     "password": "test_pass",
     "db": 0,
     "timeout": 5000
+  },
+  "mq": {
+    "brokers": ["localhost:9094"],
+    "topic": "lr-book-searches"
   }
 }

如前文所述,此处使用 9094 端口是为了从外部访问跑在 docker 容器内的 kafka。
详情阅读 “Accessing Apache Kafka with internal and external clients“配置一节,bitnami/kafka 镜像。

添加 src/trend/application/consumer/trend.ts:

import { Trend } from "../../../domain/model";
import { TrendManager, TrendEventConsumer } from "../../domain/gateway";

export class TrendConsumer {
  private trendManager: TrendManager;
  private eventConsumer: TrendEventConsumer;

  constructor(t: TrendManager, e: TrendEventConsumer) {
    this.trendManager = t;
    this.eventConsumer = e;
  }

  start() {
    const processEvent = async (key: Buffer, data: Buffer): Promise<void> => {
      if (key && data) {
        const query: string = key.toString("utf-8");
        const books: any = JSON.parse(data.toString("utf-8"));
        const trend: Trend = { query, books };
        await this.trendManager.createTrend(trend);
      }
    };

    this.eventConsumer.consumeEvents(processEvent).catch((err) => {
      console.log("Consumer error:", err);
    });
  }

  getEventConsumer(): TrendEventConsumer {
    return this.eventConsumer;
  }
}

调整 src/trend/application/executor/trend_operator.ts:

@@ -8,11 +8,7 @@ export class TrendOperator {
     this.trendManager = t;
   }
 
-  async createTrend(t: Trend): Promise<number> {
-    return await this.trendManager.createTrend(t);
-  }
-
-  async topTrends(offset: number): Promise<Trend[]> {
-    return await this.trendManager.topTrends(offset);
+  async topTrends(pageSize: number): Promise<Trend[]> {
+    return await this.trendManager.topTrends(pageSize);
   }
 }

调整 src/trend/adapter/router.ts:

@@ -11,9 +11,9 @@ class RestHandler {
   }
 
   public async getTrends(req: Request, res: Response): Promise<void> {
-    let offset = parseInt(req.query.o as string) || 0;
+    let pageSize = parseInt(req.query.ps as string) || 0;
     try {
-      const books = await this.trendOperator.topTrends(offset);
+      const books = await this.trendOperator.topTrends(pageSize);
       res.status(200).json(books);
     } catch (err) {
       console.error(`Failed to get trends: ${err}`);

修改 src/trend/app.ts,使得 consumer 和 router 都能优雅关闭:

@@ -1,13 +1,54 @@
+import { Worker, isMainThread, parentPort } from "worker_threads";
+
 import { WireHelper } from "./application";
 import { InitApp } from "./adapter/router";
 import { parseConfig } from "./infrastructure/config";
+import { TrendConsumer } from "./application/consumer";
 
-const config_filename = "src/trend/config.json";
+const configFilename = "src/trend/config.json";
+const stopConsumer = "stop-consumer";
+const stopServer = "stop-svr";
 
-const c = parseConfig(config_filename);
+const c = parseConfig(configFilename);
 const wireHelper = new WireHelper(c);
 const app = InitApp(wireHelper);
 
-app.listen(c.app.port, () => {
-  console.log(`Running on port ${c.app.port}`);
-});
+if (isMainThread) {
+  const worker = new Worker(__filename);
+
+  const svr = app.listen(c.app.port, () => {
+    console.log(`Running on port ${c.app.port}`);
+
+    worker.on("message", (msg) => {
+      // Close the server
+      if (msg === stopServer) {
+        svr.close(() => {
+          console.log("Server is gracefully closed");
+          process.exit(0);
+        });
+      }
+    });
+
+    const shutdown = () => {
+      console.log("Server is shutting down...");
+      // Stop the consumer
+      worker.postMessage(stopConsumer);
+    };
+
+    // Handle SIGINT (Ctrl+C) and SIGTERM signals
+    process.on("SIGINT", shutdown);
+    process.on("SIGTERM", shutdown);
+  });
+} else {
+  const tc = new TrendConsumer(
+    wireHelper.trendManager(),
+    wireHelper.trendEventConsumer()
+  );
+  parentPort?.on("message", async (msg) => {
+    if (msg === stopConsumer) {
+      await tc.getEventConsumer().stop();
+      parentPort?.postMessage(stopServer);
+    }
+  });
+  tc.start();
+}

Kafka 消费者和 Express 服务器是运行在同一个进程中的 2 个线程。这里使用 worker_threads 的消息机制来优雅地关闭二者。

记得创建或修改 index.ts 文件来导出符号。

  • src/trend/domain/gateway/index.ts
  • src/trend/application/consumer/index.ts
  • src/trend/infrastructure/mq/index.ts

调整 package.json:

@@ -5,7 +5,7 @@
   "main": "app.js",
   "scripts": {
     "dev-web": "ts-node src/web/app.ts",
-    "dev-trend": "ts-node src/trend/app.ts",
+    "dev-trend": "tsc && node dist/trend/app.js",
     "build": "tsc"
   },
   "repository": {

ts-nodeworker_threads 不太协调。我们可以编译后直接用原生 node 运行程序。

启动 Web 服务和热搜服务。尝试在 Web 服务的首页 http://localhost:3000/ 中搜索关键词,如“love”、“war” 和 “wealth”。

然后你应该能够从热搜服务的网址 (http://localhost:3001/trends) 中看到类似以下的一些结果:

[
  {
    "query": "love",
    "books": [
      {
        "id": 4,
        "title": "Pride and Prejudice",
        "author": "Jane Austen",
        "published_at": "1813-01-28",
        "description": "A classic novel exploring the themes of love, reputation, and social class in Georgian England.",
        "created_at": "2024-04-02T21:02:59.314+08:00"
      },
      {
        "id": 10,
        "title": "War and Peace",
        "author": "Leo Tolstoy",
        "published_at": "1869-01-01",
        "description": "A novel depicting the Napoleonic era in Russia, exploring themes of love, war, and historical determinism.",
        "created_at": "2024-04-02T21:02:59.42+08:00"
      }
    ]
  },
  {
    "query": "war",
    "books": [
      {
        "id": 10,
        "title": "War and Peace",
        "author": "Leo Tolstoy",
        "published_at": "1869-01-01",
        "description": "A novel depicting the Napoleonic era in Russia, exploring themes of love, war, and historical determinism.",
        "created_at": "2024-04-02T21:02:59.42+08:00"
      },
      {
        "id": 12,
        "title": "The Odyssey",
        "author": "Homer",
        "published_at": "8th Century BC",
        "description": "An ancient Greek epic poem attributed to Homer, detailing the journey of Odysseus after the Trojan War.",
        "created_at": "2024-04-02T21:02:59.455+08:00"
      }
    ]
  },
  {
    "query": "wealth",
    "books": [
      {
        "id": 1,
        "title": "The Great Gatsby",
        "author": "F. Scott Fitzgerald",
        "published_at": "1925-04-10",
        "description": "A novel depicting the opulent lives of wealthy Long Island residents during the Jazz Age.",
        "created_at": "2024-04-02T20:36:01.015+08:00"
      }
    ]
  }
]
上页下页