» Go:使用Kafka构建事件驱动微服务 » 4. 消费者:推荐服务 » 4.2 事件消费者

事件消费者

因为推荐服务的消费者和热搜服务的消费者有很多共通之处,故此处可以将共通部分移到更高层的目录以实现代码复用。

重组文件

  • 重命名 infrastructure/mq/kafka.goinfrastructure/mq/kafka_producer.go
  • 移动 service/trend/infrastructure/mq/kafka.goinfrastructure/mq/kafka_consumer.go
func NewKafkaConsumer(brokers []string, topic, groupID string) (*KafkaConsumer, error) {

添加一个给消费组的新参数 groupID

  • 添加配置项,service/trend/infrastructure/config/config.go:
@@ -26,4 +26,5 @@ type CacheConfig struct {
 type MQConfig struct {
        Brokers []string `json:"brokers" yaml:"brokers"`
        Topic   string   `json:"topic" yaml:"topic"`
+       GroupID string   `json:"group_id" yaml:"group_id"`
 }
  • 填入配置值,service/trend/config.yml:
@@ -7,4 +7,5 @@ cache:
 mq:
   brokers:
     - localhost:9094
   topic: "lr-book-searches"
+  group_id: "trend-svr"
  • 调整 import 路径,service/trend/application/consumer/trend.go:
@@ -7,16 +7,17 @@ import (
        "context"
        "encoding/json"
 
+       topgw "literank.com/event-books/domain/gateway"
        "literank.com/event-books/domain/model"
        "literank.com/event-books/service/trend/domain/gateway"
 )
 
 type TrendConsumer struct {
        trendManager  gateway.TrendManager
-       eventConsumer gateway.TrendEventConsumer
+       eventConsumer topgw.EventConsumer
 }
 
-func NewTrendConsumer(t gateway.TrendManager, e gateway.TrendEventConsumer) *TrendConsumer {
+func NewTrendConsumer(t gateway.TrendManager, e topgw.EventConsumer) *TrendConsumer {
        return &TrendConsumer{trendManager: t, eventConsumer: e}
 }
 
@@ -33,6 +34,6 @@ func (c *TrendConsumer) Start(ctx context.Context) {
        })
 }
 
-func (c *TrendConsumer) EventConsumer() gateway.TrendEventConsumer {
+func (c *TrendConsumer) EventConsumer() topgw.EventConsumer {
        return c.eventConsumer
 }
  • 调整 import 路径,service/trend/application/wire_helper.go:
@@ -4,10 +4,11 @@ Package application provides all common structures and functions of the applicat
 package application
 
 import (
+       topgw "literank.com/event-books/domain/gateway"
+       "literank.com/event-books/infrastructure/mq"
        "literank.com/event-books/service/trend/domain/gateway"
        "literank.com/event-books/service/trend/infrastructure/cache"
        "literank.com/event-books/service/trend/infrastructure/config"
-       "literank.com/event-books/service/trend/infrastructure/mq"
 )
 
 // WireHelper is the helper for dependency injection
@@ -19,7 +20,7 @@ type WireHelper struct {
 // NewWireHelper constructs a new WireHelper
 func NewWireHelper(c *config.Config) (*WireHelper, error) {
        kv := cache.NewRedisCache(c.Cache.Address, c.Cache.Password, c.Cache.DB)
-       consumer, err := mq.NewKafkaConsumer(c.MQ.Brokers, c.MQ.Topic)
+       consumer, err := mq.NewKafkaConsumer(c.MQ.Brokers, c.MQ.Topic, c.MQ.GroupID)
        if err != nil {
                return nil, err
        }
@@ -35,6 +36,6 @@ func (w *WireHelper) TrendManager() gateway.TrendManager {
 }
 
 // TrendEventConsumer returns an instance of TrendEventConsumer
-func (w *WireHelper) TrendEventConsumer() gateway.TrendEventConsumer {
+func (w *WireHelper) TrendEventConsumer() topgw.EventConsumer {
        return w.consumer
 }
  • 调整 service/trend/domain/gateway/trend_manager.go:
@@ -9,16 +9,8 @@ import (
        "literank.com/event-books/domain/model"
 )
 
-type ConsumeCallback func(key, value []byte) error
-
 // TrendManager manages all trends
 type TrendManager interface {
        CreateTrend(ctx context.Context, t *model.Trend) (uint, error)
        TopTrends(ctx context.Context, pageSize uint) ([]*model.Trend, error)
 }
-
-// TrendEventConsumer consumes trend events
-type TrendEventConsumer interface {
-       ConsumeEvents(ctx context.Context, callback ConsumeCallback)
-       Stop() error
-}

将共通的 TrendEventConsumerConsumeCallback 类型移出该文件。

  • 将那些类型放到 domain/gateway/event_consumer.go:
package gateway

import "context"

type ConsumeCallback func(key, value []byte) error

// EventConsumer consumes all kinds of events
type EventConsumer interface {
	ConsumeEvents(ctx context.Context, callback ConsumeCallback)
	Stop() error
}

调整 web 服务中的生产者的事件

添加 userID 参数,service/web/application/executor/book_operator.go:

@@ -36,18 +36,19 @@ func (o *BookOperator) CreateBook(ctx context.Context, b *model.Book) (*model.Bo
 }
 
 // GetBooks gets a list of books by offset and keyword, and caches its result if needed
-func (o *BookOperator) GetBooks(ctx context.Context, offset int, query string) ([]*model.Book, error) {
+func (o *BookOperator) GetBooks(ctx context.Context, offset int, userID, query string) ([]*model.Book, error) {
        books, err := o.bookManager.GetBooks(ctx, offset, query)
        if err != nil {
                return nil, err
        }
-       // Send search query and its results
+       // Send a user's search query and its results
        if query != "" {
+               k := query + ":" + userID
                jsonData, err := json.Marshal(books)
                if err != nil {
                        return nil, fmt.Errorf("failed to send event due to %w", err)
                }
-               o.mqHelper.SendEvent(query, jsonData)
+               o.mqHelper.SendEvent(k, jsonData)
        }
        return books, nil
 }

读取 cookie 值并传递给 userIDservice/web/adapter/router.go:

@@ -6,8 +6,10 @@ package adapter
 import (
        "fmt"
        "log"
+       "math/rand"
        "net/http"
        "strconv"
+       "time"
 
        "github.com/gin-gonic/gin"
 
@@ -20,6 +22,7 @@ import (
 const (
        fieldOffset = "o"
        fieldQuery  = "q"
+       fieldUID    = "uid"
 )
 
 // RestHandler handles all restful requests
@@ -54,8 +57,14 @@ func MakeRouter(templates_pattern string, remote *config.RemoteServiceConfig, wi
 
 // Render and show the index page
 func (r *RestHandler) indexPage(c *gin.Context) {
+       userID, err := c.Cookie(fieldUID)
+       if err != nil {
+               // Doesn't exist, make a new one
+               userID = randomString(5)
+               c.SetCookie(fieldUID, userID, 3600*24*30, "/", "", false, false)
+       }
        q := c.Query(fieldQuery)
-       books, err := r.bookOperator.GetBooks(c, 0, q)
+       books, err := r.bookOperator.GetBooks(c, 0, userID, q)
        if err != nil {
                c.String(http.StatusNotFound, "failed to get books")
                return
@@ -87,7 +96,7 @@ func (r *RestHandler) getBooks(c *gin.Context) {
                }
                offset = value
        }
-       books, err := r.bookOperator.GetBooks(c, offset, c.Query(fieldQuery))
+       books, err := r.bookOperator.GetBooks(c, offset, "", c.Query(fieldQuery))
        if err != nil {
                fmt.Printf("Failed to get books: %v\n", err)
                c.JSON(http.StatusNotFound, gin.H{"error": "failed to get books"})
@@ -112,3 +121,14 @@ func (r *RestHandler) createBook(c *gin.Context) {
        }
        c.JSON(http.StatusCreated, book)
 }
+
+func randomString(length int) string {
+       source := rand.NewSource(time.Now().UnixNano())
+       random := rand.New(source)
+       const charset = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
+       result := make([]byte, length)
+       for i := range result {
+               result[i] = charset[random.Intn(len(charset))]
+       }
+       return string(result)
+}

调整热搜服务中的消费者

因为 key 的格式有所变化,此处热搜服务需要适配变化。

更新 service/trend/application/consumer/trend.go:

@@ -6,6 +6,7 @@ package consumer
 import (
        "context"
        "encoding/json"
+       "strings"
 
        topgw "literank.com/event-books/domain/gateway"
        "literank.com/event-books/domain/model"
@@ -23,8 +24,10 @@ func NewTrendConsumer(t gateway.TrendManager, e topgw.EventConsumer) *TrendConsu
 
 func (c *TrendConsumer) Start(ctx context.Context) {
        c.eventConsumer.ConsumeEvents(ctx, func(key, data []byte) error {
+               parts := strings.Split(string(key), ":")
+               query := parts[0]
                t := &model.Trend{
-                       Query: string(key),
+                       Query: query,
                }
                if err := json.Unmarshal(data, &t.Books); err != nil {
                        return err

推荐服务的消费者

添加 service/recommendation/domain/model/interest.go:

package model

// Interest represents a user's interest in a book.
type Interest struct {
	UserID string  `json:"user_id" bson:"user_id"`
	Title  string  `json:"title"`
	Author string  `json:"author"`
	Score  float32 `json:"score"`
}

添加 service/recommendation/domain/gateway/interest_manager.go:

/*
Package gateway contains all domain gateways.
*/
package gateway

import (
	"context"

	"literank.com/event-books/service/recommendation/domain/model"
)

// InterestManager manages all interests
type InterestManager interface {
	IncreaseInterest(ctx context.Context, i *model.Interest) error
	ListInterests(ctx context.Context, userID string) ([]*model.Interest, error)
}

安装 mongo 依赖:

go get -u go.mongodb.org/mongo-driver/mongo

准备 mongoDB 数据库:

  • 在机器上安装 mongoDB 并启动它。
  • 创建一个库,名为 lr_event_rec.
  • 在需要的 collections 上创建索引。

实现 InterestManagerservice/recommendation/infrastructure/database/mongo.go:

/*
Package database does all db persistence implementations.
*/
package database

import (
	"context"

	"go.mongodb.org/mongo-driver/bson"
	"go.mongodb.org/mongo-driver/mongo"
	"go.mongodb.org/mongo-driver/mongo/options"

	"literank.com/event-books/service/recommendation/domain/model"
)

const (
	collReview = "interests"
)

// MongoPersistence runs all mongoDB operations
type MongoPersistence struct {
	db       *mongo.Database
	coll     *mongo.Collection
	pageSize int
}

// NewMongoPersistence constructs a new MongoPersistence
func NewMongoPersistence(mongoURI, dbName string, pageSize int) (*MongoPersistence, error) {
	client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(mongoURI))
	if err != nil {
		return nil, err
	}
	db := client.Database(dbName)
	coll := db.Collection(collReview)
	return &MongoPersistence{db, coll, pageSize}, nil
}

// GetReview gets a review by ID
func (m *MongoPersistence) IncreaseInterest(ctx context.Context, i *model.Interest) error {
	filter := bson.M{
		"user_id": i.UserID,
		"title":   i.Title,
		"author":  i.Author,
	}
	update := bson.M{"$inc": bson.M{"score": 1}}
	opts := options.Update().SetUpsert(true)

	if _, err := m.coll.UpdateOne(ctx, filter, update, opts); err != nil {
		return err
	}
	return nil
}

// ListInterests lists user interests by a use id
func (m *MongoPersistence) ListInterests(ctx context.Context, userID string) ([]*model.Interest, error) {
	filter := bson.M{"user_id": userID}

	opts := options.Find()
	opts.SetSort(bson.M{"score": -1})
	opts.SetLimit(int64(m.pageSize))

	cursor, err := m.coll.Find(ctx, filter, opts)
	if err != nil {
		return nil, err
	}
	defer cursor.Close(ctx)

	interests := make([]*model.Interest, 0)
	if err := cursor.All(ctx, &interests); err != nil {
		return nil, err
	}
	return interests, nil
}

添加 service/recommendation/infrastructure/config/config.go:

/*
Package config provides config structures and parse funcs.
*/
package config

// Config is the global configuration.
type Config struct {
	App ApplicationConfig `json:"app" yaml:"app"`
	DB  DBConfig          `json:"db" yaml:"db"`
	MQ  MQConfig          `json:"mq" yaml:"mq"`
}

// DBConfig is the configuration of databases.
type DBConfig struct {
	MongoURI    string `json:"mongo_uri" yaml:"mongo_uri"`
	MongoDBName string `json:"mongo_db_name" yaml:"mongo_db_name"`
}

// ApplicationConfig is the configuration of main app.
type ApplicationConfig struct {
	Port     int `json:"port" yaml:"port"`
	PageSize int `json:"page_size" yaml:"page_size"`
}

// MQConfig is the configuration of message queues.
type MQConfig struct {
	Brokers []string `json:"brokers" yaml:"brokers"`
	Topic   string   `json:"topic" yaml:"topic"`
	GroupID string   `json:"group_id" yaml:"group_id"`
}

添加 service/recommendation/config.yml:

app:
  port: 8082
  page_size: 10
db:
  mongo_uri: "mongodb://localhost:27017"
  mongo_db_name: "lr_event_rec"
mq:
  brokers:
    - localhost:9094
  topic: "lr-book-searches"
  group_id: "rec-svr"

添加 service/recommendation/application/consumer/interest.go:

/*
Package consumer handles event-trigger style business logic.
*/
package consumer

import (
	"context"
	"encoding/json"
	"strings"

	topgw "literank.com/event-books/domain/gateway"
	topmodel "literank.com/event-books/domain/model"
	"literank.com/event-books/service/recommendation/domain/gateway"
	"literank.com/event-books/service/recommendation/domain/model"
)

type InterestConsumer struct {
	interestManager gateway.InterestManager
	eventConsumer   topgw.EventConsumer
}

func NewInterestConsumer(t gateway.InterestManager, e topgw.EventConsumer) *InterestConsumer {
	return &InterestConsumer{interestManager: t, eventConsumer: e}
}

func (c *InterestConsumer) Start(ctx context.Context) {
	c.eventConsumer.ConsumeEvents(ctx, func(key, data []byte) error {
		parts := strings.Split(string(key), ":")
		if len(parts) == 1 {
			// No userID, ignore it
			return nil
		}

		var books []*topmodel.Book
		if err := json.Unmarshal(data, &books); err != nil {
			return err
		}
		userID := parts[1]
		for _, book := range books {
			i := &model.Interest{
				UserID: userID,
				Title:  book.Title,
				Author: book.Author,
			}
			if err := c.interestManager.IncreaseInterest(ctx, i); err != nil {
				return err
			}
		}
		return nil
	})
}

func (c *InterestConsumer) EventConsumer() topgw.EventConsumer {
	return c.eventConsumer
}

添加 service/recommendation/main.go:

package main

import (
	"context"
	"log"
	"os"
	"os/signal"
	"syscall"

	"literank.com/event-books/infrastructure/parser"
	"literank.com/event-books/service/recommendation/application"
	"literank.com/event-books/service/recommendation/application/consumer"
	"literank.com/event-books/service/recommendation/infrastructure/config"
)

const configFileName = "config.yml"

func main() {
	// Read the config
	c, err := parser.Parse[config.Config](configFileName)
	if err != nil {
		panic(err)
	}

	// Prepare dependencies
	wireHelper, err := application.NewWireHelper(c)
	if err != nil {
		panic(err)
	}

	// Run the consumer
	tc := consumer.NewInterestConsumer(wireHelper.InterestManager(), wireHelper.TrendEventConsumer())
	eventConsumer := tc.EventConsumer()
	go func() {
		// Shutdown signals
		stopAll := make(chan os.Signal, 1)
		signal.Notify(stopAll, syscall.SIGINT, syscall.SIGTERM)
		<-stopAll
		if err := eventConsumer.Stop(); err != nil {
			log.Panicf("Failed to close consumer group: %v", err)
		}
	}()
	tc.Start(context.Background())
}

启动推荐服务的消费者:

# in service/recommendation
go run main.go

你应该能看到类似如下输出:

2024/04/05 23:56:59 Started to consume events...

在另一个终端中,重启 web 服务并尝试多次搜索关键词”love“和”war“等。

然后,你将在 mongoDB 中看到如下记录:

lr_event_rec> db.interests.find();
[
  {
    _id: ObjectId('66101a027a7027627b271269'),
    author: 'Jane Austen',
    title: 'Pride and Prejudice',
    user_id: 'YVM2P',
    score: 1
  },
  {
    _id: ObjectId('66101a027a7027627b27126b'),
    author: 'Leo Tolstoy',
    title: 'War and Peace',
    user_id: 'YVM2P',
    score: 2
  },
  {
    _id: ObjectId('66101a437a7027627b271299'),
    author: 'Homer',
    title: 'The Odyssey',
    user_id: 'YVM2P',
    score: 1
  }
]

这就表示你的消费者运作正常。

上页下页