事件消费者
因为推荐服务的消费者和热搜服务的消费者有很多共通之处,故此处可以将共通部分移到更高层的目录以实现代码复用。
重组文件
- 重命名 infrastructure/mq/kafka.go 为 infrastructure/mq/kafka_producer.go。
- 移动 service/trend/infrastructure/mq/kafka.go 到 infrastructure/mq/kafka_consumer.go。
func NewKafkaConsumer(brokers []string, topic, groupID string) (*KafkaConsumer, error) {
添加一个给消费组的新参数 groupID
。
- 添加配置项,service/trend/infrastructure/config/config.go:
@@ -26,4 +26,5 @@ type CacheConfig struct {
type MQConfig struct {
Brokers []string `json:"brokers" yaml:"brokers"`
Topic string `json:"topic" yaml:"topic"`
+ GroupID string `json:"group_id" yaml:"group_id"`
}
- 填入配置值,service/trend/config.yml:
@@ -7,4 +7,5 @@ cache:
mq:
brokers:
- localhost:9094
topic: "lr-book-searches"
+ group_id: "trend-svr"
- 调整 import 路径,service/trend/application/consumer/trend.go:
@@ -7,16 +7,17 @@ import (
"context"
"encoding/json"
+ topgw "literank.com/event-books/domain/gateway"
"literank.com/event-books/domain/model"
"literank.com/event-books/service/trend/domain/gateway"
)
type TrendConsumer struct {
trendManager gateway.TrendManager
- eventConsumer gateway.TrendEventConsumer
+ eventConsumer topgw.EventConsumer
}
-func NewTrendConsumer(t gateway.TrendManager, e gateway.TrendEventConsumer) *TrendConsumer {
+func NewTrendConsumer(t gateway.TrendManager, e topgw.EventConsumer) *TrendConsumer {
return &TrendConsumer{trendManager: t, eventConsumer: e}
}
@@ -33,6 +34,6 @@ func (c *TrendConsumer) Start(ctx context.Context) {
})
}
-func (c *TrendConsumer) EventConsumer() gateway.TrendEventConsumer {
+func (c *TrendConsumer) EventConsumer() topgw.EventConsumer {
return c.eventConsumer
}
- 调整 import 路径,service/trend/application/wire_helper.go:
@@ -4,10 +4,11 @@ Package application provides all common structures and functions of the applicat
package application
import (
+ topgw "literank.com/event-books/domain/gateway"
+ "literank.com/event-books/infrastructure/mq"
"literank.com/event-books/service/trend/domain/gateway"
"literank.com/event-books/service/trend/infrastructure/cache"
"literank.com/event-books/service/trend/infrastructure/config"
- "literank.com/event-books/service/trend/infrastructure/mq"
)
// WireHelper is the helper for dependency injection
@@ -19,7 +20,7 @@ type WireHelper struct {
// NewWireHelper constructs a new WireHelper
func NewWireHelper(c *config.Config) (*WireHelper, error) {
kv := cache.NewRedisCache(c.Cache.Address, c.Cache.Password, c.Cache.DB)
- consumer, err := mq.NewKafkaConsumer(c.MQ.Brokers, c.MQ.Topic)
+ consumer, err := mq.NewKafkaConsumer(c.MQ.Brokers, c.MQ.Topic, c.MQ.GroupID)
if err != nil {
return nil, err
}
@@ -35,6 +36,6 @@ func (w *WireHelper) TrendManager() gateway.TrendManager {
}
// TrendEventConsumer returns an instance of TrendEventConsumer
-func (w *WireHelper) TrendEventConsumer() gateway.TrendEventConsumer {
+func (w *WireHelper) TrendEventConsumer() topgw.EventConsumer {
return w.consumer
}
- 调整 service/trend/domain/gateway/trend_manager.go:
@@ -9,16 +9,8 @@ import (
"literank.com/event-books/domain/model"
)
-type ConsumeCallback func(key, value []byte) error
-
// TrendManager manages all trends
type TrendManager interface {
CreateTrend(ctx context.Context, t *model.Trend) (uint, error)
TopTrends(ctx context.Context, pageSize uint) ([]*model.Trend, error)
}
-
-// TrendEventConsumer consumes trend events
-type TrendEventConsumer interface {
- ConsumeEvents(ctx context.Context, callback ConsumeCallback)
- Stop() error
-}
将共通的 TrendEventConsumer
和 ConsumeCallback
类型移出该文件。
- 将那些类型放到 domain/gateway/event_consumer.go:
package gateway
import "context"
type ConsumeCallback func(key, value []byte) error
// EventConsumer consumes all kinds of events
type EventConsumer interface {
ConsumeEvents(ctx context.Context, callback ConsumeCallback)
Stop() error
}
调整 web 服务中的生产者的事件
添加 userID
参数,service/web/application/executor/book_operator.go:
@@ -36,18 +36,19 @@ func (o *BookOperator) CreateBook(ctx context.Context, b *model.Book) (*model.Bo
}
// GetBooks gets a list of books by offset and keyword, and caches its result if needed
-func (o *BookOperator) GetBooks(ctx context.Context, offset int, query string) ([]*model.Book, error) {
+func (o *BookOperator) GetBooks(ctx context.Context, offset int, userID, query string) ([]*model.Book, error) {
books, err := o.bookManager.GetBooks(ctx, offset, query)
if err != nil {
return nil, err
}
- // Send search query and its results
+ // Send a user's search query and its results
if query != "" {
+ k := query + ":" + userID
jsonData, err := json.Marshal(books)
if err != nil {
return nil, fmt.Errorf("failed to send event due to %w", err)
}
- o.mqHelper.SendEvent(query, jsonData)
+ o.mqHelper.SendEvent(k, jsonData)
}
return books, nil
}
读取 cookie 值并传递给 userID
,service/web/adapter/router.go:
@@ -6,8 +6,10 @@ package adapter
import (
"fmt"
"log"
+ "math/rand"
"net/http"
"strconv"
+ "time"
"github.com/gin-gonic/gin"
@@ -20,6 +22,7 @@ import (
const (
fieldOffset = "o"
fieldQuery = "q"
+ fieldUID = "uid"
)
// RestHandler handles all restful requests
@@ -54,8 +57,14 @@ func MakeRouter(templates_pattern string, remote *config.RemoteServiceConfig, wi
// Render and show the index page
func (r *RestHandler) indexPage(c *gin.Context) {
+ userID, err := c.Cookie(fieldUID)
+ if err != nil {
+ // Doesn't exist, make a new one
+ userID = randomString(5)
+ c.SetCookie(fieldUID, userID, 3600*24*30, "/", "", false, false)
+ }
q := c.Query(fieldQuery)
- books, err := r.bookOperator.GetBooks(c, 0, q)
+ books, err := r.bookOperator.GetBooks(c, 0, userID, q)
if err != nil {
c.String(http.StatusNotFound, "failed to get books")
return
@@ -87,7 +96,7 @@ func (r *RestHandler) getBooks(c *gin.Context) {
}
offset = value
}
- books, err := r.bookOperator.GetBooks(c, offset, c.Query(fieldQuery))
+ books, err := r.bookOperator.GetBooks(c, offset, "", c.Query(fieldQuery))
if err != nil {
fmt.Printf("Failed to get books: %v\n", err)
c.JSON(http.StatusNotFound, gin.H{"error": "failed to get books"})
@@ -112,3 +121,14 @@ func (r *RestHandler) createBook(c *gin.Context) {
}
c.JSON(http.StatusCreated, book)
}
+
+func randomString(length int) string {
+ source := rand.NewSource(time.Now().UnixNano())
+ random := rand.New(source)
+ const charset = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
+ result := make([]byte, length)
+ for i := range result {
+ result[i] = charset[random.Intn(len(charset))]
+ }
+ return string(result)
+}
调整热搜服务中的消费者
因为 key
的格式有所变化,此处热搜服务需要适配变化。
更新 service/trend/application/consumer/trend.go:
@@ -6,6 +6,7 @@ package consumer
import (
"context"
"encoding/json"
+ "strings"
topgw "literank.com/event-books/domain/gateway"
"literank.com/event-books/domain/model"
@@ -23,8 +24,10 @@ func NewTrendConsumer(t gateway.TrendManager, e topgw.EventConsumer) *TrendConsu
func (c *TrendConsumer) Start(ctx context.Context) {
c.eventConsumer.ConsumeEvents(ctx, func(key, data []byte) error {
+ parts := strings.Split(string(key), ":")
+ query := parts[0]
t := &model.Trend{
- Query: string(key),
+ Query: query,
}
if err := json.Unmarshal(data, &t.Books); err != nil {
return err
推荐服务的消费者
添加 service/recommendation/domain/model/interest.go:
package model
// Interest represents a user's interest in a book.
type Interest struct {
UserID string `json:"user_id" bson:"user_id"`
Title string `json:"title"`
Author string `json:"author"`
Score float32 `json:"score"`
}
添加 service/recommendation/domain/gateway/interest_manager.go:
/*
Package gateway contains all domain gateways.
*/
package gateway
import (
"context"
"literank.com/event-books/service/recommendation/domain/model"
)
// InterestManager manages all interests
type InterestManager interface {
IncreaseInterest(ctx context.Context, i *model.Interest) error
ListInterests(ctx context.Context, userID string) ([]*model.Interest, error)
}
安装 mongo
依赖:
go get -u go.mongodb.org/mongo-driver/mongo
准备 mongoDB 数据库:
- 在机器上安装 mongoDB 并启动它。
- 创建一个库,名为
lr_event_rec
.- 在需要的 collections 上创建索引。
实现 InterestManager
,service/recommendation/infrastructure/database/mongo.go:
/*
Package database does all db persistence implementations.
*/
package database
import (
"context"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"literank.com/event-books/service/recommendation/domain/model"
)
const (
collReview = "interests"
)
// MongoPersistence runs all mongoDB operations
type MongoPersistence struct {
db *mongo.Database
coll *mongo.Collection
pageSize int
}
// NewMongoPersistence constructs a new MongoPersistence
func NewMongoPersistence(mongoURI, dbName string, pageSize int) (*MongoPersistence, error) {
client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(mongoURI))
if err != nil {
return nil, err
}
db := client.Database(dbName)
coll := db.Collection(collReview)
return &MongoPersistence{db, coll, pageSize}, nil
}
// GetReview gets a review by ID
func (m *MongoPersistence) IncreaseInterest(ctx context.Context, i *model.Interest) error {
filter := bson.M{
"user_id": i.UserID,
"title": i.Title,
"author": i.Author,
}
update := bson.M{"$inc": bson.M{"score": 1}}
opts := options.Update().SetUpsert(true)
if _, err := m.coll.UpdateOne(ctx, filter, update, opts); err != nil {
return err
}
return nil
}
// ListInterests lists user interests by a use id
func (m *MongoPersistence) ListInterests(ctx context.Context, userID string) ([]*model.Interest, error) {
filter := bson.M{"user_id": userID}
opts := options.Find()
opts.SetSort(bson.M{"score": -1})
opts.SetLimit(int64(m.pageSize))
cursor, err := m.coll.Find(ctx, filter, opts)
if err != nil {
return nil, err
}
defer cursor.Close(ctx)
interests := make([]*model.Interest, 0)
if err := cursor.All(ctx, &interests); err != nil {
return nil, err
}
return interests, nil
}
添加 service/recommendation/infrastructure/config/config.go:
/*
Package config provides config structures and parse funcs.
*/
package config
// Config is the global configuration.
type Config struct {
App ApplicationConfig `json:"app" yaml:"app"`
DB DBConfig `json:"db" yaml:"db"`
MQ MQConfig `json:"mq" yaml:"mq"`
}
// DBConfig is the configuration of databases.
type DBConfig struct {
MongoURI string `json:"mongo_uri" yaml:"mongo_uri"`
MongoDBName string `json:"mongo_db_name" yaml:"mongo_db_name"`
}
// ApplicationConfig is the configuration of main app.
type ApplicationConfig struct {
Port int `json:"port" yaml:"port"`
PageSize int `json:"page_size" yaml:"page_size"`
}
// MQConfig is the configuration of message queues.
type MQConfig struct {
Brokers []string `json:"brokers" yaml:"brokers"`
Topic string `json:"topic" yaml:"topic"`
GroupID string `json:"group_id" yaml:"group_id"`
}
添加 service/recommendation/config.yml:
app:
port: 8082
page_size: 10
db:
mongo_uri: "mongodb://localhost:27017"
mongo_db_name: "lr_event_rec"
mq:
brokers:
- localhost:9094
topic: "lr-book-searches"
group_id: "rec-svr"
添加 service/recommendation/application/consumer/interest.go:
/*
Package consumer handles event-trigger style business logic.
*/
package consumer
import (
"context"
"encoding/json"
"strings"
topgw "literank.com/event-books/domain/gateway"
topmodel "literank.com/event-books/domain/model"
"literank.com/event-books/service/recommendation/domain/gateway"
"literank.com/event-books/service/recommendation/domain/model"
)
type InterestConsumer struct {
interestManager gateway.InterestManager
eventConsumer topgw.EventConsumer
}
func NewInterestConsumer(t gateway.InterestManager, e topgw.EventConsumer) *InterestConsumer {
return &InterestConsumer{interestManager: t, eventConsumer: e}
}
func (c *InterestConsumer) Start(ctx context.Context) {
c.eventConsumer.ConsumeEvents(ctx, func(key, data []byte) error {
parts := strings.Split(string(key), ":")
if len(parts) == 1 {
// No userID, ignore it
return nil
}
var books []*topmodel.Book
if err := json.Unmarshal(data, &books); err != nil {
return err
}
userID := parts[1]
for _, book := range books {
i := &model.Interest{
UserID: userID,
Title: book.Title,
Author: book.Author,
}
if err := c.interestManager.IncreaseInterest(ctx, i); err != nil {
return err
}
}
return nil
})
}
func (c *InterestConsumer) EventConsumer() topgw.EventConsumer {
return c.eventConsumer
}
添加 service/recommendation/main.go:
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"literank.com/event-books/infrastructure/parser"
"literank.com/event-books/service/recommendation/application"
"literank.com/event-books/service/recommendation/application/consumer"
"literank.com/event-books/service/recommendation/infrastructure/config"
)
const configFileName = "config.yml"
func main() {
// Read the config
c, err := parser.Parse[config.Config](configFileName)
if err != nil {
panic(err)
}
// Prepare dependencies
wireHelper, err := application.NewWireHelper(c)
if err != nil {
panic(err)
}
// Run the consumer
tc := consumer.NewInterestConsumer(wireHelper.InterestManager(), wireHelper.TrendEventConsumer())
eventConsumer := tc.EventConsumer()
go func() {
// Shutdown signals
stopAll := make(chan os.Signal, 1)
signal.Notify(stopAll, syscall.SIGINT, syscall.SIGTERM)
<-stopAll
if err := eventConsumer.Stop(); err != nil {
log.Panicf("Failed to close consumer group: %v", err)
}
}()
tc.Start(context.Background())
}
启动推荐服务的消费者:
# in service/recommendation
go run main.go
你应该能看到类似如下输出:
2024/04/05 23:56:59 Started to consume events...
在另一个终端中,重启 web 服务并尝试多次搜索关键词”love“和”war“等。
然后,你将在 mongoDB 中看到如下记录:
lr_event_rec> db.interests.find();
[
{
_id: ObjectId('66101a027a7027627b271269'),
author: 'Jane Austen',
title: 'Pride and Prejudice',
user_id: 'YVM2P',
score: 1
},
{
_id: ObjectId('66101a027a7027627b27126b'),
author: 'Leo Tolstoy',
title: 'War and Peace',
user_id: 'YVM2P',
score: 2
},
{
_id: ObjectId('66101a437a7027627b271299'),
author: 'Homer',
title: 'The Odyssey',
user_id: 'YVM2P',
score: 1
}
]
这就表示你的消费者运作正常。