» Go:使用Kafka构建事件驱动微服务 » 2. 生产者:Web 服务 » 2.5 事件生产者

事件生产者

事件驱动架构中,事件生产者消费者在系统内不同组件或服务之间的通信和交互中发挥着至关重要的作用。

事件生产者是系统内生成或发出事件的任何组件或服务。

事件消费者是系统内订阅并处理事件的组件或服务。

Event Queue

在这个图表中,“书店网站服务(Bookstore Web Service)”是生产者,而其他两个服务是消费者。

事件驱动架构的关键优势包括:

  • 低耦合:事件驱动架构促进了组件之间的低耦合,因为生产者和消费者通过基于事件的异步通信进行交互。
  • 易扩展:事件消费者可完全独立于事件生产者进行扩展,确保系统能够方便且有效地处理不同大小的工作负载。
  • 容错性:事件驱动架构容错性和韧性很好,因为即使一些组件或服务暂时不可用,事件仍然可以最终被可靠地处理。

使用 Apache Kafka

  1. 在机器上安装 Apache Kafka 并启动它。

使用 Docker 镜像是最方便的安装方式。

docker pull apache/kafka:3.7.0
docker run -p 9092:9092 apache/kafka:3.7.0
  1. 创建用于存储事件的 topic

事件被组织并持久存储在主题(topic)中。简单来说,一个 topic 类似于文件系统中的一个文件夹,而事件则是该文件夹中的文件。

bin/kafka-topics.sh --create --topic lr-book-searches --bootstrap-server localhost:9092

Kafka 中的 topic 是可以有多生产者和多订阅者的:一个 topic 可以有零个、一个或多个生产者向其写入事件,以及零个、一个或多个订阅者订阅这些事件。

  1. 添加 kafka 依赖:
go get -u github.com/IBM/sarama

Sarama 是基于 MIT 协议的开源 Apache Kafka Go 客户端。

  1. 修改代码。

创建 infrastructure/mq/helper.go:

package mq

// Helper sends events to the message queue.
type Helper interface {
	SendEvent(key string, value []byte) (bool, error)
}

创建 infrastructure/mq/kafka.go:

/*
Package mq does all message queue jobs.
*/
package mq

import (
	"github.com/IBM/sarama"
)

// KafkaQueue runs all Kafka operations
type KafkaQueue struct {
	producer sarama.SyncProducer
	topic    string
}

// NewKafkaQueue constructs a new KafkaQueue
func NewKafkaQueue(brokers []string, topic string) (*KafkaQueue, error) {
	// Configuration
	c := sarama.NewConfig()
	c.Producer.RequiredAcks = sarama.WaitForLocal
	c.Producer.Retry.Max = 3
	c.Producer.Return.Successes = true
	// Create producer
	producer, err := sarama.NewSyncProducer(brokers, c)
	if err != nil {
		return nil, err
	}
	return &KafkaQueue{producer, topic}, nil
}

// CreateBook creates a new book
func (k *KafkaQueue) SendEvent(key string, value []byte) (bool, error) {
	// Send a message
	message := &sarama.ProducerMessage{
		Topic: k.topic,
		Key:   sarama.StringEncoder(key),
		Value: sarama.ByteEncoder(value),
	}
	// Send message to Kafka
	_, _, err := k.producer.SendMessage(message)
	if err != nil {
		return false, err
	}
	return true, nil
}

添加消息队列配置项,infrastructure/config/config.go:

@@ -14,6 +14,7 @@ import (
 type Config struct {
        App ApplicationConfig `json:"app" yaml:"app"`
        DB  DBConfig          `json:"db" yaml:"db"`
+       MQ  MQConfig          `json:"mq" yaml:"mq"`
 }
 
 // DBConfig is the configuration of databases.
@@ -28,6 +29,12 @@ type ApplicationConfig struct {
        TemplatesPattern string `json:"templates_pattern" yaml:"templates_pattern"`
 }
 
+// MQConfig is the configuration of message queues.
+type MQConfig struct {
+       Brokers []string `json:"brokers" yaml:"brokers"`
+       Topic   string   `json:"topic" yaml:"topic"`
+}
+
 // Parse parses config file and returns a Config.
 func Parse(filename string) (*Config, error) {
        buf, err := os.ReadFile(filename)

放入配置值,config.yml:

@@ -3,4 +3,8 @@ app:
   page_size: 5
   templates_pattern: "adapter/templates/*.html"
 db:
-  dsn: "test_user:test_pass@tcp(127.0.0.1:3306)/lr_event_book?charset=utf8mb4&parseTime=True&loc=Local"
\ No newline at end of file
+  dsn: "test_user:test_pass@tcp(127.0.0.1:3306)/lr_event_book?charset=utf8mb4&parseTime=True&loc=Local"
+mq:
+  brokers:
+    - localhost:9092
+  topic: "lr-book-searches"

调整 application/wire_helper.go:

@@ -7,11 +7,13 @@ import (
        "literank.com/event-books/domain/gateway"
        "literank.com/event-books/infrastructure/config"
        "literank.com/event-books/infrastructure/database"
+       "literank.com/event-books/infrastructure/mq"
 )
 
 // WireHelper is the helper for dependency injection
 type WireHelper struct {
        sqlPersistence *database.MySQLPersistence
+       mq             *mq.KafkaQueue
 }
 
 // NewWireHelper constructs a new WireHelper
@@ -20,12 +22,22 @@ func NewWireHelper(c *config.Config) (*WireHelper, error) {
        if err != nil {
                return nil, err
        }
+       mq, err := mq.NewKafkaQueue(c.MQ.Brokers, c.MQ.Topic)
+       if err != nil {
+               return nil, err
+       }
 
        return &WireHelper{
-               sqlPersistence: db}, nil
+               sqlPersistence: db, mq: mq,
+       }, nil
 }
 
 // BookManager returns an instance of BookManager
 func (w *WireHelper) BookManager() gateway.BookManager {
        return w.sqlPersistence
 }
+
+// MessageQueueHelper returns an instance of mq helper
+func (w *WireHelper) MessageQueueHelper() mq.Helper {
+       return w.mq
+}

调整 application/executor/book_operator.go:

@@ -5,19 +5,23 @@ package executor
 
 import (
        "context"
+       "encoding/json"
+       "fmt"
 
        "literank.com/event-books/domain/gateway"
        "literank.com/event-books/domain/model"
+       "literank.com/event-books/infrastructure/mq"
 )
 
 // BookOperator handles book input/output and proxies operations to the book manager.
 type BookOperator struct {
        bookManager gateway.BookManager
+       mqHelper    mq.Helper
 }
 
 // NewBookOperator constructs a new BookOperator
-func NewBookOperator(b gateway.BookManager) *BookOperator {
-       return &BookOperator{bookManager: b}
+func NewBookOperator(b gateway.BookManager, m mq.Helper) *BookOperator {
+       return &BookOperator{bookManager: b, mqHelper: m}
 }
 
 // CreateBook creates a new book
@@ -32,5 +36,17 @@ func (o *BookOperator) CreateBook(ctx context.Context, b *model.Book) (*model.Bo
 
 // GetBooks gets a list of books by offset and keyword, and caches its result if needed
 func (o *BookOperator) GetBooks(ctx context.Context, offset int, query string) ([]*model.Book, error) {
-       return o.bookManager.GetBooks(ctx, offset, query)
+       books, err := o.bookManager.GetBooks(ctx, offset, query)
+       if err != nil {
+               return nil, err
+       }
+       // Send search query and its results
+       if query != "" {
+               jsonData, err := json.Marshal(books)
+               if err != nil {
+                       return nil, fmt.Errorf("failed to send event due to %w", err)
+               }
+               o.mqHelper.SendEvent(query, jsonData)
+       }
+       return books, nil
 }

调整 adapter/router.go:

@@ -27,7 +27,7 @@ type RestHandler struct {
 
 func newRestHandler(wireHelper *application.WireHelper) *RestHandler {
        return &RestHandler{
-               bookOperator: executor.NewBookOperator(wireHelper.BookManager()),
+               bookOperator: executor.NewBookOperator(wireHelper.BookManager(), wireHelper.MessageQueueHelper()),
        }
 }

重启 server,尝试进行一些搜索,你会看到类似如下 kafka 日志:

[2024-04-03 05:48:16,240] INFO [QuorumController id=0] CreateTopics result(s): CreatableTopic(name='lr-book-searches', numPartitions=1, replicationFactor=1, assignments=[], configs=[]): SUCCESS (org.apache.kafka.controller.ReplicationControlManager)
[2024-04-03 05:48:16,241] INFO [QuorumController id=0] Replayed TopicRecord for topic lr-book-searches with topic ID UVB4UXbPT6SjcFYtElt56A. (org.apache.kafka.controller.ReplicationControlManager)
[2024-04-03 05:48:16,241] INFO [QuorumController id=0] Replayed PartitionRecord for new partition lr-book-searches-0 with topic ID UVB4UXbPT6SjcFYtElt56A and PartitionRegistration(replicas=[0], directories=[2_8o-ARVFEZ2Ue7fnPRGrg], isr=[0], removingReplicas=[], addingReplicas=[], elr=[], lastKnownElr=[], leader=0, leaderRecoveryState=RECOVERED, leaderEpoch=0, partitionEpoch=0). (org.apache.kafka.controller.ReplicationControlManager)
[2024-04-03 05:48:16,271] INFO [Broker id=0] Transitioning 1 partition(s) to local leaders. (state.change.logger)
上页下页