» Node.js:使用Kafka构建事件驱动微服务 » 4. 消费者:推荐服务 » 4.3 Express API 服务器

Express API 服务器

添加 src/recommendation/application/executor/interest_operator.ts:

import { Interest } from "../../../domain/model";
import { InterestManager } from "../../domain/gateway";

export class InterestOperator {
  private interestManager: InterestManager;

  constructor(i: InterestManager) {
    this.interestManager = i;
  }

  async interestsForUser(userId: string): Promise<Interest[]> {
    return await this.interestManager.listInterests(userId);
  }
}

添加 src/recommendation/application/executor/index.ts:

export { InterestOperator } from "./interest_operator";

添加 src/recommendation/adapter/router.ts:

import express, { Request, Response } from "express";

import { InterestOperator } from "../application/executor";
import { WireHelper } from "../application";

class RestHandler {
  private interestOperator: InterestOperator;

  constructor(interestOperator: InterestOperator) {
    this.interestOperator = interestOperator;
  }

  public async getInterests(req: Request, res: Response): Promise<void> {
    const uid = (req.query.uid as string) || "";
    try {
      const interests = await this.interestOperator.interestsForUser(uid);
      res.status(200).json(interests);
    } catch (err) {
      console.error(`Failed to get interests: ${err}`);
      res.status(404).json({ error: "Failed to get interests" });
    }
  }
}

// Create router
function MakeRouter(wireHelper: WireHelper): express.Router {
  const restHandler = new RestHandler(
    new InterestOperator(wireHelper.interestManager())
  );

  const router = express.Router();
  router.get("/recommendations", restHandler.getInterests.bind(restHandler));
  return router;
}

export function InitApp(wireHelper: WireHelper): express.Express {
  const app = express();

  // Middleware to parse JSON bodies
  app.use(express.json());

  const r = MakeRouter(wireHelper);
  app.use("", r);
  return app;
}

主文件中也启动 Express server,src/recommendation/app.ts:

@@ -1,14 +1,54 @@
+import { Worker, isMainThread, parentPort } from "worker_threads";
+
 import { WireHelper } from "./application";
+import { InitApp } from "./adapter/router";
 import { parseConfig } from "./infrastructure/config";
 import { InterestConsumer } from "./application/consumer";
 
 const configFilename = "src/recommendation/config.json";
+const stopConsumer = "stop-consumer";
+const stopServer = "stop-svr";
 
 const c = parseConfig(configFilename);
 const wireHelper = new WireHelper(c);
 
-const tc = new InterestConsumer(
-  wireHelper.interestManager(),
-  wireHelper.trendEventConsumer()
-);
-tc.start();
+if (isMainThread) {
+  const worker = new Worker(__filename);
+
+  const app = InitApp(wireHelper);
+  const svr = app.listen(c.app.port, () => {
+    console.log(`Running on port ${c.app.port}`);
+
+    worker.on("message", (msg) => {
+      // Close the server
+      if (msg === stopServer) {
+        svr.close(() => {
+          console.log("Server is gracefully closed");
+          process.exit(0);
+        });
+      }
+    });
+
+    const shutdown = () => {
+      console.log("Server is shutting down...");
+      // Stop the consumer
+      worker.postMessage(stopConsumer);
+    };
+
+    // Handle SIGINT (Ctrl+C) and SIGTERM signals
+    process.on("SIGINT", shutdown);
+    process.on("SIGTERM", shutdown);
+  });
+} else {
+  const tc = new InterestConsumer(
+    wireHelper.interestManager(),
+    wireHelper.trendEventConsumer()
+  );
+  parentPort?.on("message", async (msg) => {
+    if (msg === stopConsumer) {
+      await tc.getEventConsumer().stop();
+      parentPort?.postMessage(stopServer);
+    }
+  });
+  tc.start();
+}

我们使用 worker_threads 来优雅地启动、停止消费者。

修改启动 script,package.json:

@@ -6,7 +6,7 @@
   "scripts": {
     "dev-web": "ts-node src/web/app.ts",
     "dev-trend": "tsc && node dist/trend/app.js",
-    "dev-rec": "ts-node src/recommendation/app.ts",
+    "dev-rec": "tsc && node dist/recommendation/app.js",
     "build": "tsc"
   },
   "repository": {

启动服务器:

npm run dev-rec

尝试用 curl 访问 http://localhost:3002/recommendations?uid=VI0GS

“VI0GS“ 是本教程的 cookie uid 的值。你应该替换成你自己浏览器中的值。

结果如下:

[
  {
    "userId": "VI0GS",
    "title": "War and Peace",
    "author": "Leo Tolstoy",
    "score": 2
  },
  {
    "userId": "VI0GS",
    "title": "Pride and Prejudice",
    "author": "Jane Austen",
    "score": 1
  },
  {
    "userId": "VI0GS",
    "title": "The Great Gatsby",
    "author": "F. Scott Fitzgerald",
    "score": 1
  }
]
上页下页