Express API 服务器
创建 src/domain/model/trend.ts:
import { Book } from ".";
export interface Trend {
query: string;
books: Book[];
created_at: Date | null;
}
创建 src/trend/domain/gateway/trend_manager.ts:
import { Trend } from "../../../domain/model";
export interface TrendManager {
createTrend(t: Trend): Promise<number>;
topTrends(offset: number): Promise<Trend[]>;
}
在 Redis 中,ZSET(有序集合)是一种结合了集合和有序列表特性的数据结构。 它类似于普通集合,但它还可根据每个成员关联的分数维护元素排序顺序。这个分数使得成员可以按升序或降序排序。
因此,我们可以使用 ZSET 来存储热搜数据,用其分数来表示”热度“。
安装 redis 依赖:
npm i ioredis
创建 src/trend/infrastructure/cache/redis.ts:
import Redis, { RedisOptions } from "ioredis";
import { CacheConfig } from "../config/config";
import { TrendManager } from "../../domain/gateway";
import { Trend } from "../../../domain/model";
const trendsKey = "trends";
const queryKeyPrefix = "q-";
export class RedisCache implements TrendManager {
private client: Redis;
constructor(c: CacheConfig) {
const options: RedisOptions = {
host: c.host,
port: c.port,
password: c.password,
db: c.db,
commandTimeout: c.timeout,
};
this.client = new Redis(options);
console.log("Connected to Redis");
}
async createTrend(t: Trend): Promise<number> {
const member = t.query;
const score = await this.client.zincrby(trendsKey, 1, member);
const k = queryKeyPrefix + t.query;
const results = JSON.stringify(t.books);
await this.client.set(k, results);
return Number(score);
}
async topTrends(offset: number): Promise<Trend[]> {
const topItems = await this.client.zrevrange(
trendsKey,
0,
offset,
"WITHSCORES"
);
const trends: Trend[] = [];
for (let i = 0; i < topItems.length; i += 2) {
const query = topItems[i];
const t = { query: query, books: [], created_at: null };
const k = queryKeyPrefix + query;
const value = await this.client.get(k);
if (value !== null) {
t.books = JSON.parse(value);
}
trends.push(t);
}
return trends;
}
}
创建 src/trend/infrastructure/config/config.ts:
import { readFileSync } from "fs";
interface ApplicationConfig {
port: number;
}
export interface CacheConfig {
host: string;
port: number;
password: string;
db: number;
timeout: number; // in milliseconds
}
export interface Config {
app: ApplicationConfig;
cache: CacheConfig;
}
export function parseConfig(filename: string): Config {
return JSON.parse(readFileSync(filename, "utf-8"));
}
创建 src/trend/config.json:
{
"app": {
"port": 3001
},
"cache": {
"host": "localhost",
"port": 6379,
"password": "test_pass",
"db": 0,
"timeout": 5000
}
}
创建 src/trend/application/wire_helper.ts:
import { Config } from "../infrastructure/config";
import { TrendManager } from "../domain/gateway";
import { RedisCache } from "../infrastructure/cache";
// WireHelper is the helper for dependency injection
export class WireHelper {
private kv_store: RedisCache;
constructor(c: Config) {
this.kv_store = new RedisCache(c.cache);
}
trendManager(): TrendManager {
return this.kv_store;
}
}
创建 src/trend/application/executor/trend_operator.ts:
import { Trend } from "../../../domain/model";
import { TrendManager } from "../../domain/gateway";
export class TrendOperator {
private trendManager: TrendManager;
constructor(t: TrendManager) {
this.trendManager = t;
}
async createTrend(t: Trend): Promise<number> {
return await this.trendManager.createTrend(t);
}
async topTrends(offset: number): Promise<Trend[]> {
return await this.trendManager.topTrends(offset);
}
}
创建 src/trend/adapter/router.ts:
import express, { Request, Response } from "express";
import { TrendOperator } from "../application/executor";
import { WireHelper } from "../application";
class RestHandler {
private trendOperator: TrendOperator;
constructor(trendOperator: TrendOperator) {
this.trendOperator = trendOperator;
}
public async getTrends(req: Request, res: Response): Promise<void> {
let offset = parseInt(req.query.o as string) || 0;
try {
const books = await this.trendOperator.topTrends(offset);
res.status(200).json(books);
} catch (err) {
console.error(`Failed to get trends: ${err}`);
res.status(404).json({ error: "Failed to get trends" });
}
}
}
// Create router
function MakeRouter(wireHelper: WireHelper): express.Router {
const restHandler = new RestHandler(
new TrendOperator(wireHelper.trendManager())
);
const router = express.Router();
router.get("/trends", restHandler.getTrends.bind(restHandler));
return router;
}
export function InitApp(wireHelper: WireHelper): express.Express {
const app = express();
// Middleware to parse JSON bodies
app.use(express.json());
const r = MakeRouter(wireHelper);
app.use("", r);
return app;
}
最后,添加 src/trend/app.ts:
import { WireHelper } from "./application";
import { InitApp } from "./adapter/router";
import { parseConfig } from "./infrastructure/config";
const config_filename = "src/trend/config.json";
const c = parseConfig(config_filename);
const wireHelper = new WireHelper(c);
const app = InitApp(wireHelper);
app.listen(c.app.port, () => {
console.log(`Running on port ${c.app.port}`);
});
记得为上面的子目录添加 index.ts 文件。
添加一个 script,package.json:
@@ -5,6 +5,7 @@
"main": "app.js",
"scripts": {
"dev-web": "ts-node src/web/app.ts",
+ "dev-trend": "ts-node src/trend/app.ts",
"build": "tsc"
},
"repository": {
@@ -25,6 +26,7 @@
"dependencies": {
"express": "^4.19.2",
"express-handlebars": "^7.1.2",
+ "ioredis": "^5.3.2",
"kafkajs": "^2.2.4",
"mysql2": "^3.9.4"
},
启动服务器:
npm run dev-trend
测试使用 curl 访问 http://localhost:3001/trends 。