» Python:使用Kafka构建事件驱动微服务 » 2. 生产者:Web服务 » 2.5 事件生产者

事件生产者

事件驱动架构中,事件生产者消费者在系统内不同组件或服务之间的通信和交互中发挥着至关重要的作用。

事件生产者是系统内生成或发出事件的任何组件或服务。

事件消费者是系统内订阅并处理事件的组件或服务。

Event Queue

在这个图表中,“书店网站服务(Bookstore Web Service)”是生产者,而其他两个服务是消费者。

事件驱动架构的关键优势包括:

  • 低耦合:事件驱动架构促进了组件之间的低耦合,因为生产者和消费者通过基于事件的异步通信进行交互。
  • 易扩展:事件消费者可完全独立于事件生产者进行扩展,确保系统能够方便且有效地处理不同大小的工作负载。
  • 容错性:事件驱动架构容错性和韧性很好,因为即使一些组件或服务暂时不可用,事件仍然可以最终被可靠地处理。

使用 Apache Kafka

  1. 在机器上安装 Apache Kafka 并启动它。

使用 Docker 镜像是最方便的安装方式。

docker pull apache/kafka:3.7.0
docker run -p 9092:9092 apache/kafka:3.7.0
  1. 创建用于存储事件的 topic

事件被组织并持久存储在主题(topic)中。简单来说,一个 topic 类似于文件系统中的一个文件夹,而事件则是该文件夹中的文件。

bin/kafka-topics.sh --create --topic lr-book-searches --bootstrap-server localhost:9092

Kafka 中的 topic 是可以有多生产者和多订阅者的:一个 topic 可以有零个、一个或多个生产者向其写入事件,以及零个、一个或多个订阅者订阅这些事件。

  1. 添加 kafka 依赖:
pip3 install confluent-kafka

记得更新 requirements.txt

  1. 修改代码。

创建 web/infrastructure/mq/helper.py:

from abc import ABC, abstractmethod


class MQHelper(ABC):
    @abstractmethod
    def send_event(self, key: str, value: bytes) -> bool:
        pass

创建 web/infrastructure/mq/kafka.py:

from typing import List

from confluent_kafka import Producer

from .helper import MQHelper


class KafkaQueue(MQHelper):
    def __init__(self, brokers: List[str], topic: str):
        self.producer = Producer({'bootstrap.servers': ','.join(brokers)})
        self.topic = topic

    def send_event(self, key: str, value: bytes) -> bool:
        self.producer.produce(self.topic, value, key)
        return True

添加消息队列配置项,web/infrastructure/config/config.py:

@@ -1,4 +1,5 @@
 from dataclasses import dataclass
+from typing import List
 import yaml
 
 
@@ -11,6 +12,12 @@ class DBConfig:
     database: str
 
 
+@dataclass
+class MQConfig:
+    brokers: List[str]
+    topic: str
+
+
 @dataclass
 class ApplicationConfig:
     port: int
@@ -22,6 +29,7 @@ class ApplicationConfig:
 class Config:
     app: ApplicationConfig
     db: DBConfig
+    mq: MQConfig
 
 
 def parseConfig(filename: str) -> Config:
@@ -29,5 +37,6 @@ def parseConfig(filename: str) -> Config:
         data = yaml.safe_load(f)
         return Config(
             ApplicationConfig(**data['app']),
-            DBConfig(**data['db'])
+            DBConfig(**data['db']),
+            MQConfig(**data['mq'])
         )

放入配置值,web/config.yml:

@@ -7,4 +7,8 @@ db:
   port: 3306
   user: "test_user"
   password: "test_pass"
-  database: "lr_event_book"
\ No newline at end of file
+  database: "lr_event_book"
+mq:
+  brokers:
+    - localhost:9092
+  topic: "lr-book-searches"

调整 web/application/wire_helper.py:

@@ -1,16 +1,22 @@
 from ..domain.gateway import BookManager
 from ..infrastructure.config import Config
 from ..infrastructure.database import MySQLPersistence
+from ..infrastructure.mq import KafkaQueue, MQHelper
 
 
 class WireHelper:
-    def __init__(self, sqlPersistence: MySQLPersistence):
+    def __init__(self, sqlPersistence: MySQLPersistence, mq: KafkaQueue):
         self.sqlPersistence = sqlPersistence
+        self.mq = mq
 
     @classmethod
     def new(cls, c: Config):
         db = MySQLPersistence(c.db, c.app.page_size)
-        return cls(db)
+        mq = KafkaQueue(c.mq.brokers, c.mq.topic)
+        return cls(db, mq)
 
     def book_manager(self) -> BookManager:
         return self.sqlPersistence
+
+    def message_queue_helper(self) -> MQHelper:
+        return self.mq

调整 web/application/executor/book_operator.py:

@@ -1,15 +1,19 @@
+from dataclasses import asdict
 from datetime import datetime
-from typing import List
+import json
+from typing import Dict, List
 
 from .. import dto
 from ...domain.model import Book
 from ...domain.gateway import BookManager
+from ...infrastructure.mq import MQHelper
 
 
 class BookOperator():
 
-    def __init__(self, book_manager: BookManager):
+    def __init__(self, book_manager: BookManager, mq_helper: MQHelper):
         self.book_manager = book_manager
+        self.mq_helper = mq_helper
 
     def create_book(self, b: dto.Book) -> Book:
         book = Book(id=0, created_at=datetime.now(), **b.__dict__)
@@ -18,4 +22,16 @@ class BookOperator():
         return book
 
     def get_books(self, offset: int, query: str) -> List[Book]:
-        return self.book_manager.get_books(offset, query)
+        books = self.book_manager.get_books(offset, query)
+        # Send search query and its results
+        if query:
+            json_data = json.dumps([_convert(b)
+                                   for b in books]).encode('utf-8')
+            self.mq_helper.send_event(query, json_data)
+        return books
+
+
+def _convert(b: Book) -> Dict:
+    d = asdict(b)
+    d['created_at'] = d['created_at'].strftime('%Y-%m-%d %H:%M:%S')
+    return d

调整 web/adapter/router.py:

@@ -33,7 +33,8 @@ class RestHandler:
 def make_router(app: FastAPI, templates_dir: str, wire_helper: WireHelper):
     rest_handler = RestHandler(
         logging.getLogger("lr-event"),
-        BookOperator(wire_helper.book_manager())
+        BookOperator(wire_helper.book_manager(),
+                     wire_helper.message_queue_helper())
     )
 
     templates = Jinja2Templates(directory=templates_dir)

重启 server,尝试进行一些搜索:

uvicorn web.main:app --reload

你会看到类似如下 kafka 日志:

[2024-04-10 07:01:51,147] INFO Sent auto-creation request for Set(lr-book-searches) to the active controller. (kafka.server.DefaultAutoTopicCreationManager)
[2024-04-10 07:01:51,164] INFO [QuorumController id=0] CreateTopics result(s): CreatableTopic(name='lr-book-searches', numPartitions=1, replicationFactor=1, assignments=[], configs=[]): SUCCESS (org.apache.kafka.controller.ReplicationControlManager)
[2024-04-10 07:01:51,165] INFO [QuorumController id=0] Replayed TopicRecord for topic lr-book-searches with topic ID cyQQePRVSjOHCbT5zKthQw. (org.apache.kafka.controller.ReplicationControlManager)
[2024-04-10 07:01:51,165] INFO [QuorumController id=0] Replayed PartitionRecord for new partition lr-book-searches-0 with topic ID cyQQePRVSjOHCbT5zKthQw and PartitionRegistration(replicas=[0], directories=[JNQSQwG010NmYpIsyAYTzw], isr=[0], removingReplicas=[], addingReplicas=[], elr=[], lastKnownElr=[], leader=0, leaderRecoveryState=RECOVERED, leaderEpoch=0, partitionEpoch=0). (org.apache.kafka.controller.ReplicationControlManager)
[2024-04-10 07:01:51,196] INFO [Broker id=0] Transitioning 1 partition(s) to local leaders. (state.change.logger)
[2024-04-10 07:01:51,198] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions Set(lr-book-searches-0) (kafka.server.ReplicaFetcherManager)
[2024-04-10 07:01:51,200] INFO [Broker id=0] Creating new partition lr-book-searches-0 with topic id cyQQePRVSjOHCbT5zKthQw. (state.change.logger)
[2024-04-10 07:01:51,217] INFO [LogLoader partition=lr-book-searches-0, dir=/bitnami/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.UnifiedLog$)
[2024-04-10 07:01:51,220] INFO Created log for partition lr-book-searches-0 in /bitnami/kafka/data/lr-book-searches-0 with properties {} (kafka.log.LogManager)
[2024-04-10 07:01:51,222] INFO [Partition lr-book-searches-0 broker=0] No checkpointed highwatermark is found for partition lr-book-searches-0 (kafka.cluster.Partition)
[2024-04-10 07:01:51,224] INFO [Partition lr-book-searches-0 broker=0] Log loaded for partition lr-book-searches-0 with initial high watermark 0 (kafka.cluster.Partition)
[2024-04-10 07:01:51,227] INFO [Broker id=0] Leader lr-book-searches-0 with topic id Some(cyQQePRVSjOHCbT5zKthQw) starts at leader epoch 0 from offset 0 with partition epoch 0, high watermark 0, ISR [0], adding replicas [] and removing replicas [] . Previous leader None and previous leader epoch was -1. (state.change.logger)