Express API 服务器
添加 src/recommendation/application/executor/interest_operator.ts:
import { Interest } from "../../../domain/model";
import { InterestManager } from "../../domain/gateway";
export class InterestOperator {
private interestManager: InterestManager;
constructor(i: InterestManager) {
this.interestManager = i;
}
async interestsForUser(userId: string): Promise<Interest[]> {
return await this.interestManager.listInterests(userId);
}
}
添加 src/recommendation/application/executor/index.ts:
export { InterestOperator } from "./interest_operator";
添加 src/recommendation/adapter/router.ts:
import express, { Request, Response } from "express";
import { InterestOperator } from "../application/executor";
import { WireHelper } from "../application";
class RestHandler {
private interestOperator: InterestOperator;
constructor(interestOperator: InterestOperator) {
this.interestOperator = interestOperator;
}
public async getInterests(req: Request, res: Response): Promise<void> {
const uid = (req.query.uid as string) || "";
try {
const interests = await this.interestOperator.interestsForUser(uid);
res.status(200).json(interests);
} catch (err) {
console.error(`Failed to get interests: ${err}`);
res.status(404).json({ error: "Failed to get interests" });
}
}
}
// Create router
function MakeRouter(wireHelper: WireHelper): express.Router {
const restHandler = new RestHandler(
new InterestOperator(wireHelper.interestManager())
);
const router = express.Router();
router.get("/recommendations", restHandler.getInterests.bind(restHandler));
return router;
}
export function InitApp(wireHelper: WireHelper): express.Express {
const app = express();
// Middleware to parse JSON bodies
app.use(express.json());
const r = MakeRouter(wireHelper);
app.use("", r);
return app;
}
主文件中也启动 Express server,src/recommendation/app.ts:
@@ -1,14 +1,54 @@
+import { Worker, isMainThread, parentPort } from "worker_threads";
+
import { WireHelper } from "./application";
+import { InitApp } from "./adapter/router";
import { parseConfig } from "./infrastructure/config";
import { InterestConsumer } from "./application/consumer";
const configFilename = "src/recommendation/config.json";
+const stopConsumer = "stop-consumer";
+const stopServer = "stop-svr";
const c = parseConfig(configFilename);
const wireHelper = new WireHelper(c);
-const tc = new InterestConsumer(
- wireHelper.interestManager(),
- wireHelper.trendEventConsumer()
-);
-tc.start();
+if (isMainThread) {
+ const worker = new Worker(__filename);
+
+ const app = InitApp(wireHelper);
+ const svr = app.listen(c.app.port, () => {
+ console.log(`Running on port ${c.app.port}`);
+
+ worker.on("message", (msg) => {
+ // Close the server
+ if (msg === stopServer) {
+ svr.close(() => {
+ console.log("Server is gracefully closed");
+ process.exit(0);
+ });
+ }
+ });
+
+ const shutdown = () => {
+ console.log("Server is shutting down...");
+ // Stop the consumer
+ worker.postMessage(stopConsumer);
+ };
+
+ // Handle SIGINT (Ctrl+C) and SIGTERM signals
+ process.on("SIGINT", shutdown);
+ process.on("SIGTERM", shutdown);
+ });
+} else {
+ const tc = new InterestConsumer(
+ wireHelper.interestManager(),
+ wireHelper.trendEventConsumer()
+ );
+ parentPort?.on("message", async (msg) => {
+ if (msg === stopConsumer) {
+ await tc.getEventConsumer().stop();
+ parentPort?.postMessage(stopServer);
+ }
+ });
+ tc.start();
+}
我们使用 worker_threads
来优雅地启动、停止消费者。
修改启动 script,package.json:
@@ -6,7 +6,7 @@
"scripts": {
"dev-web": "ts-node src/web/app.ts",
"dev-trend": "tsc && node dist/trend/app.js",
- "dev-rec": "ts-node src/recommendation/app.ts",
+ "dev-rec": "tsc && node dist/recommendation/app.js",
"build": "tsc"
},
"repository": {
启动服务器:
npm run dev-rec
尝试用 curl 访问 http://localhost:3002/recommendations?uid=VI0GS 。
“VI0GS“ 是本教程的 cookie
uid
的值。你应该替换成你自己浏览器中的值。
结果如下:
[
{
"userId": "VI0GS",
"title": "War and Peace",
"author": "Leo Tolstoy",
"score": 2
},
{
"userId": "VI0GS",
"title": "Pride and Prejudice",
"author": "Jane Austen",
"score": 1
},
{
"userId": "VI0GS",
"title": "The Great Gatsby",
"author": "F. Scott Fitzgerald",
"score": 1
}
]