» Node.js:使用Kafka构建事件驱动微服务 » 2. 生产者:Web服务 » 2.5 事件生产者

事件生产者

事件驱动架构中,事件生产者消费者在系统内不同组件或服务之间的通信和交互中发挥着至关重要的作用。

事件生产者是系统内生成或发出事件的任何组件或服务。

事件消费者是系统内订阅并处理事件的组件或服务。

Event Queue

在这个图表中,“书店网站服务(Bookstore Web Service)”是生产者,而其他两个服务是消费者。

事件驱动架构的关键优势包括:

  • 低耦合:事件驱动架构促进了组件之间的低耦合,因为生产者和消费者通过基于事件的异步通信进行交互。
  • 易扩展:事件消费者可完全独立于事件生产者进行扩展,确保系统能够方便且有效地处理不同大小的工作负载。
  • 容错性:事件驱动架构容错性和韧性很好,因为即使一些组件或服务暂时不可用,事件仍然可以最终被可靠地处理。

使用 Apache Kafka

  1. 在机器上安装 Apache Kafka 并启动它。

使用 Docker 镜像是最方便的安装方式。

docker pull apache/kafka:3.7.0
docker run -p 9092:9092 apache/kafka:3.7.0
  1. 创建用于存储事件的 topic

事件被组织并持久存储在主题(topic)中。简单来说,一个 topic 类似于文件系统中的一个文件夹,而事件则是该文件夹中的文件。

bin/kafka-topics.sh --create --topic lr-book-searches --bootstrap-server localhost:9092

Kafka 中的 topic 是可以有多生产者和多订阅者的:一个 topic 可以有零个、一个或多个生产者向其写入事件,以及零个、一个或多个订阅者订阅这些事件。

  1. 添加 kafka 依赖:
npm i kafkajs

KafkaJS 是一个现代的 Node.js Apache Kafka 客户端。

  1. 更新代码。

创建 src/infrastructure/mq/index.ts:

export { KafkaQueue } from "./kafka";

export interface MQHelper {
  sendEvent(key: string, value: Buffer): Promise<boolean>;
}

创建 src/infrastructure/mq/kafka.ts:

import { Kafka, Producer } from "kafkajs";

import { MQHelper } from ".";

export class KafkaQueue implements MQHelper {
  private producer: Producer;
  private topic: string;
  private _connected: boolean = false;

  constructor(brokers: string[], topic: string) {
    this.topic = topic;
    const k = new Kafka({
      clientId: "web-svr",
      brokers,
    });
    this.producer = k.producer({ allowAutoTopicCreation: true });
  }

  async sendEvent(key: string, value: Buffer): Promise<boolean> {
    if (!this._connected) {
      await this.connect();
    }
    await this.producer.send({
      topic: this.topic,
      messages: [
        {
          key,
          value,
        },
      ],
    });
    return true;
  }

  async connect(): Promise<void> {
    await this.producer.connect();
    this._connected = true;
  }

  async close(): Promise<void> {
    await this.producer.disconnect();
  }
}

添加消息队列配置项,src/infrastructure/config/config.ts:

@@ -10,9 +10,15 @@ interface ApplicationConfig {
   templates_dir: string;
 }
 
+interface MQConfig {
+  brokers: string[];
+  topic: string;
+}
+
 export interface Config {
   app: ApplicationConfig;
   db: DBConfig;
+  mq: MQConfig;
 }
 
 export function parseConfig(filename: string): Config {

放入配置值,config.json:

@@ -6,5 +6,9 @@
   },
   "db": {
     "dsn": "mysql://test_user:test_pass@127.0.0.1:3306/lr_event_book?charset=utf8mb4"
+  },
+  "mq": {
+    "brokers": ["localhost:9094"],
+    "topic": "lr-book-searches"
   }
 }

此处使用 9094 端口是为了从外部访问跑在 docker 容器内的 kafka。
详情阅读 “Accessing Apache Kafka with internal and external clients“配置一节,bitnami/kafka 镜像。

调整 src/application/wire_helper.ts:

@@ -1,16 +1,23 @@
 import { Config } from "../infrastructure/config";
 import { BookManager } from "../domain/gateway";
 import { MySQLPersistence } from "../infrastructure/database";
+import { KafkaQueue, MQHelper } from "../infrastructure/mq";
 
 // WireHelper is the helper for dependency injection
 export class WireHelper {
   private sql_persistence: MySQLPersistence;
+  private mq: KafkaQueue;
 
   constructor(c: Config) {
     this.sql_persistence = new MySQLPersistence(c.db.dsn, c.app.page_size);
+    this.mq = new KafkaQueue(c.mq.brokers, c.mq.topic);
   }
 
   bookManager(): BookManager {
     return this.sql_persistence;
   }
+
+  messageQueueHelper(): MQHelper {
+    return this.mq;
+  }
 }

调整 src/application/executor/book_operator.ts:

@@ -1,11 +1,14 @@
 import { BookManager } from "../../domain/gateway";
 import { Book } from "../../domain/model";
+import { MQHelper } from "../../infrastructure/mq";
 
 export class BookOperator {
   private bookManager: BookManager;
+  private mqHelper: MQHelper;
 
-  constructor(b: BookManager) {
+  constructor(b: BookManager, m: MQHelper) {
     this.bookManager = b;
+    this.mqHelper = m;
   }
 
   async createBook(b: Book): Promise<Book> {
@@ -15,6 +18,11 @@ export class BookOperator {
   }
 
   async getBooks(offset: number, query: string): Promise<Book[]> {
-    return await this.bookManager.getBooks(offset, query);
+    const books = await this.bookManager.getBooks(offset, query);
+    if (query) {
+      const jsonData = JSON.stringify(books);
+      await this.mqHelper.sendEvent(query, Buffer.from(jsonData, "utf8"));
+    }
+    return books;
   }
 }

调整 src/adapter/router.ts:

@@ -63,7 +63,7 @@ class RestHandler {
 // Create router
 function MakeRouter(wireHelper: WireHelper): express.Router {
   const restHandler = new RestHandler(
-    new BookOperator(wireHelper.bookManager())
+    new BookOperator(wireHelper.bookManager(), wireHelper.messageQueueHelper())
   );
 
   const router = express.Router();

重启 server,尝试进行一些搜索。你会看到类似如下 kafka 日志:

[2024-04-14 18:00:27,716] INFO Sent auto-creation request for Set(lr-book-searches) to the active controller. (kafka.server.DefaultAutoTopicCreationManager)
[2024-04-14 18:00:27,737] INFO [QuorumController id=0] CreateTopics result(s): CreatableTopic(name='lr-book-searches', numPartitions=1, replicationFactor=1, assignments=[], configs=[]): SUCCESS (org.apache.kafka.controller.ReplicationControlManager)
[2024-04-14 18:00:27,737] INFO [QuorumController id=0] Replayed TopicRecord for topic lr-book-searches with topic ID T02I1daRSEKcB3YZLrC3hg. (org.apache.kafka.controller.ReplicationControlManager)
[2024-04-14 18:00:27,738] INFO [QuorumController id=0] Replayed PartitionRecord for new partition lr-book-searches-0 with topic ID T02I1daRSEKcB3YZLrC3hg and PartitionRegistration(replicas=[0], directories=[9pTFQhR6i-GUUBD0tbOFlg], isr=[0], removingReplicas=[], addingReplicas=[], elr=[], lastKnownElr=[], leader=0, leaderRecoveryState=RECOVERED, leaderEpoch=0, partitionEpoch=0). (org.apache.kafka.controller.ReplicationControlManager)
[2024-04-14 18:00:27,770] INFO [Broker id=0] Transitioning 1 partition(s) to local leaders. (state.change.logger)
[2024-04-14 18:00:27,771] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions Set(lr-book-searches-0) (kafka.server.ReplicaFetcherManager)
[2024-04-14 18:00:27,774] INFO [Broker id=0] Creating new partition lr-book-searches-0 with topic id T02I1daRSEKcB3YZLrC3hg. (state.change.logger)
[2024-04-14 18:00:27,794] INFO [LogLoader partition=lr-book-searches-0, dir=/bitnami/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.UnifiedLog$)
[2024-04-14 18:00:27,798] INFO Created log for partition lr-book-searches-0 in /bitnami/kafka/data/lr-book-searches-0 with properties {} (kafka.log.LogManager)
[2024-04-14 18:00:27,800] INFO [Partition lr-book-searches-0 broker=0] No checkpointed highwatermark is found for partition lr-book-searches-0 (kafka.cluster.Partition)
[2024-04-14 18:00:27,802] INFO [Partition lr-book-searches-0 broker=0] Log loaded for partition lr-book-searches-0 with initial high watermark 0 (kafka.cluster.Partition)
[2024-04-14 18:00:27,805] INFO [Broker id=0] Leader lr-book-searches-0 with topic id Some(T02I1daRSEKcB3YZLrC3hg) starts at leader epoch 0 from offset 0 with partition epoch 0, high watermark 0, ISR [0], adding replicas [] and removing replicas [] . Previous leader None and previous leader epoch was -1. (state.change.logger)
上页下页