事件消费者
调整 domain/model/trend.go:
@@ -1,11 +1,8 @@
package model
-import "time"
-
// Trend represents the structure of a
// trendy query and its related books.
type Trend struct {
- Query string `json:"query"`
- Books []Book `json:"books"`
- CreatedAt time.Time `json:"created_at"`
+ Query string `json:"query"`
+ Books []Book `json:"books"`
}
可以使用 kafka 事件的时间戳来进行时间追踪,此处可以移去时间字段。
调整 service/trend/domain/gateway/trend_manager.go:
@@ -9,8 +9,16 @@ import (
"literank.com/event-books/domain/model"
)
+type ConsumeCallback func(key, value []byte) error
+
// TrendManager manages all trends
type TrendManager interface {
CreateTrend(ctx context.Context, t *model.Trend) (uint, error)
- TopTrends(ctx context.Context, offset int) ([]*model.Trend, error)
+ TopTrends(ctx context.Context, pageSize uint) ([]*model.Trend, error)
+}
+
+// TrendEventConsumer consumes trend events
+type TrendEventConsumer interface {
+ ConsumeEvents(ctx context.Context, callback ConsumeCallback)
+ Stop() error
}
实现 TrendEventConsumer
,service/trend/infrastructure/mq/kafka.go:
/*
Package mq does all message queue jobs.
*/
package mq
import (
"context"
"fmt"
"log"
"github.com/IBM/sarama"
"literank.com/event-books/service/trend/domain/gateway"
)
const (
groupID = "trend-svr"
)
// KafkaConsumer consumers events from the kafka queue
type KafkaConsumer struct {
cg sarama.ConsumerGroup
topic string
}
// NewKafkaConsumer constructs a new KafkaConsumer
func NewKafkaConsumer(brokers []string, topic string) (*KafkaConsumer, error) {
// Create a new consumer configuration
config := sarama.NewConfig()
config.Consumer.Group.Rebalance.Strategy = sarama.NewBalanceStrategyRoundRobin()
config.Consumer.Offsets.Initial = sarama.OffsetOldest
// Create consumer
fmt.Println(brokers)
consumer, err := sarama.NewConsumerGroup(brokers, groupID, config)
if err != nil {
return nil, err
}
return &KafkaConsumer{consumer, topic}, nil
}
func (k *KafkaConsumer) ConsumeEvents(ctx context.Context, callback gateway.ConsumeCallback) {
consumer := Consumer{callback}
if err := k.cg.Consume(ctx, []string{k.topic}, &consumer); err != nil {
log.Panicf("Failed to start consuming: %v", err)
}
}
func (k *KafkaConsumer) Stop() error {
return k.cg.Close()
}
// Consumer represents a Sarama consumer group consumer
type Consumer struct {
callback gateway.ConsumeCallback
}
// Setup is run at the beginning of a new session, before ConsumeClaim
func (c *Consumer) Setup(sarama.ConsumerGroupSession) error {
log.Println("Started to consume events...")
return nil
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (c *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
// Once the Messages() channel is closed, the Handler must finish its processing
// loop and exit.
func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// NOTE:
// Do not move the code below to a goroutine.
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/IBM/sarama/blob/main/consumer_group.go#L27-L29
for {
select {
case message, ok := <-claim.Messages():
if !ok {
log.Printf("message channel was closed")
return nil
}
if err := c.callback(message.Key, message.Value); err != nil {
log.Printf("Failed to handle event from [%s] key = %s, timestamp = %v, value = %s, error: %v", message.Topic, string(message.Key), message.Timestamp, string(message.Value), err)
}
session.MarkMessage(message, "")
// Should return when `session.Context()` is done.
// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
// https://github.com/IBM/sarama/issues/1192
case <-session.Context().Done():
return nil
}
}
}
调整 TrendManager
实现,service/trend/infrastructure/cache/redis.go:
@@ -9,6 +9,7 @@ import (
"fmt"
"github.com/redis/go-redis/v9"
+
"literank.com/event-books/domain/model"
)
@@ -41,12 +42,13 @@ func (r *RedisCache) CreateTrend(ctx context.Context, t *model.Trend) (uint, err
if err != nil {
if err == redis.Nil {
// Member doesn't exist, add it with initial score of 1
- err := r.c.ZAdd(ctx, trendsKey, redis.Z{Score: 1, Member: member}).Err()
+ err = r.c.ZAdd(ctx, trendsKey, redis.Z{Score: 1, Member: member}).Err()
if err != nil {
return 0, err
}
+ } else {
+ return 0, err
}
- return 0, err
}
score, err := r.c.ZIncrBy(ctx, trendsKey, 1, member).Result()
if err != nil {
@@ -65,8 +67,8 @@ func (r *RedisCache) CreateTrend(ctx context.Context, t *model.Trend) (uint, err
return uint(score), nil
}
-func (r *RedisCache) TopTrends(ctx context.Context, offset int) ([]*model.Trend, error) {
- topItems, err := r.c.ZRevRangeWithScores(ctx, trendsKey, 0, int64(offset)).Result()
+func (r *RedisCache) TopTrends(ctx context.Context, pageSize uint) ([]*model.Trend, error) {
+ topItems, err := r.c.ZRevRangeWithScores(ctx, trendsKey, 0, int64(pageSize)-1).Result()
if err != nil {
return nil, err
}
@@ -81,11 +83,17 @@ func (r *RedisCache) TopTrends(ctx context.Context, offset int) ([]*model.Trend,
}
k := queryKeyPrefix + query
value, err := r.c.Get(ctx, k).Result()
- if err != nil && err != redis.Nil {
- return nil, err
- }
- if err := json.Unmarshal([]byte(value), &t.Books); err != nil {
+ if err != nil {
+ if err == redis.Nil {
+ t.Books = make([]model.Book, 0)
+ trends = append(trends, t)
+ continue
+ }
return nil, err
+ } else {
+ if err := json.Unmarshal([]byte(value), &t.Books); err != nil {
+ return nil, err
+ }
}
trends = append(trends, t)
}
调整 service/trend/application/wire_helper.go:
@@ -7,18 +7,25 @@ import (
"literank.com/event-books/service/trend/domain/gateway"
"literank.com/event-books/service/trend/infrastructure/cache"
"literank.com/event-books/service/trend/infrastructure/config"
+ "literank.com/event-books/service/trend/infrastructure/mq"
)
// WireHelper is the helper for dependency injection
type WireHelper struct {
- kvStore *cache.RedisCache
+ kvStore *cache.RedisCache
+ consumer *mq.KafkaConsumer
}
// NewWireHelper constructs a new WireHelper
func NewWireHelper(c *config.Config) (*WireHelper, error) {
kv := cache.NewRedisCache(c.Cache.Address, c.Cache.Password, c.Cache.DB)
+ consumer, err := mq.NewKafkaConsumer(c.MQ.Brokers, c.MQ.Topic)
+ if err != nil {
+ return nil, err
+ }
return &WireHelper{
- kvStore: kv,
+ kvStore: kv,
+ consumer: consumer,
}, nil
}
@@ -26,3 +33,8 @@ func NewWireHelper(c *config.Config) (*WireHelper, error) {
func (w *WireHelper) TrendManager() gateway.TrendManager {
return w.kvStore
}
+
+// TrendEventConsumer returns an instance of TrendEventConsumer
+func (w *WireHelper) TrendEventConsumer() gateway.TrendEventConsumer {
+ return w.consumer
+}
添加配置项,service/trend/infrastructure/config/config.go:
@@ -7,6 +7,7 @@ package config
type Config struct {
App ApplicationConfig `json:"app" yaml:"app"`
Cache CacheConfig `json:"cache" yaml:"cache"`
+ MQ MQConfig `json:"mq" yaml:"mq"`
}
// ApplicationConfig is the configuration of main app.
@@ -20,3 +21,9 @@ type CacheConfig struct {
Password string `json:"password" yaml:"password"`
DB int `json:"db" yaml:"db"`
}
+
+// MQConfig is the configuration of message queues.
+type MQConfig struct {
+ Brokers []string `json:"brokers" yaml:"brokers"`
+ Topic string `json:"topic" yaml:"topic"`
+}
填入配置值,service/web/config.yml:
@@ -3,4 +3,8 @@ app:
cache:
address: localhost:6379
password: test_pass
- db: 0
+ db: 0
+mq:
+ brokers:
+ - localhost:9094
+ topic: "lr-book-searches"
此处使用
9094
端口是为了从外部访问跑在 docker 容器内的 kafka。
详情阅读 “Accessing Apache Kafka with internal and external clients“配置一节,bitnami/kafka
镜像。
添加 service/trend/application/consumer/trend.go:
/*
Package consumer handles event-trigger style business logic.
*/
package consumer
import (
"context"
"encoding/json"
"literank.com/event-books/domain/model"
"literank.com/event-books/service/trend/domain/gateway"
)
type TrendConsumer struct {
trendManager gateway.TrendManager
eventConsumer gateway.TrendEventConsumer
}
func NewTrendConsumer(t gateway.TrendManager, e gateway.TrendEventConsumer) *TrendConsumer {
return &TrendConsumer{trendManager: t, eventConsumer: e}
}
func (c *TrendConsumer) Start(ctx context.Context) {
c.eventConsumer.ConsumeEvents(ctx, func(key, data []byte) error {
t := &model.Trend{
Query: string(key),
}
if err := json.Unmarshal(data, &t.Books); err != nil {
return err
}
_, err := c.trendManager.CreateTrend(ctx, t)
return err
})
}
func (c *TrendConsumer) EventConsumer() gateway.TrendEventConsumer {
return c.eventConsumer
}
调整 service/trend/application/executor/trend_operator.go:
@@ -20,12 +20,7 @@ func NewTrendOperator(t gateway.TrendManager) *TrendOperator {
return &TrendOperator{trendManager: t}
}
-// CreateTrend creates a new trend
-func (o *TrendOperator) CreateTrend(ctx context.Context, t *model.Trend) (uint, error) {
- return o.trendManager.CreateTrend(ctx, t)
-}
-
// TopTrends gets the top trends order by hits in descending order
-func (o *TrendOperator) TopTrends(ctx context.Context, offset int) ([]*model.Trend, error) {
- return o.trendManager.TopTrends(ctx, offset)
+func (o *TrendOperator) TopTrends(ctx context.Context, pageSize uint) ([]*model.Trend, error) {
+ return o.trendManager.TopTrends(ctx, pageSize)
}
调整 service/trend/adapter/router.go:
@@ -14,7 +14,7 @@ import (
)
const (
- fieldOffset = "o"
+ fieldPageSize = "ps"
)
// RestHandler handles all restful requests
@@ -41,19 +41,19 @@ func MakeRouter(wireHelper *application.WireHelper) (*gin.Engine, error) {
// Get all trends
func (r *RestHandler) getTrends(c *gin.Context) {
- offset := 0
- offsetParam := c.Query(fieldOffset)
- if offsetParam != "" {
- value, err := strconv.Atoi(offsetParam)
+ ps := 10
+ psParam := c.Query(fieldPageSize)
+ if psParam != "" {
+ value, err := strconv.Atoi(psParam)
if err != nil {
- c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid offset"})
+ c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid page size"})
return
}
- offset = value
+ ps = value
}
- trends, err := r.trendOperator.TopTrends(c, offset)
+ trends, err := r.trendOperator.TopTrends(c, uint(ps))
if err != nil {
- c.JSON(http.StatusNotFound, gin.H{"error": "failed to get trends"})
+ c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to get trends"})
return
}
c.JSON(http.StatusOK, trends)
修改 main.go,使得 consumer 和 router 都能优雅关闭:
@@ -1,11 +1,18 @@
package main
import (
+ "context"
"fmt"
+ "log"
+ "net/http"
+ "os"
+ "os/signal"
+ "syscall"
"literank.com/event-books/infrastructure/parser"
"literank.com/event-books/service/trend/adapter"
"literank.com/event-books/service/trend/application"
+ "literank.com/event-books/service/trend/application/consumer"
"literank.com/event-books/service/trend/infrastructure/config"
)
@@ -24,13 +31,39 @@ func main() {
panic(err)
}
+ // Run the consumer
+ tc := consumer.NewTrendConsumer(wireHelper.TrendManager(), wireHelper.TrendEventConsumer())
+ eventConsumer := tc.EventConsumer()
+ go func() {
+ tc.Start(context.Background())
+ }()
+
// Build main router
r, err := adapter.MakeRouter(wireHelper)
if err != nil {
panic(err)
}
+
+ svr := &http.Server{
+ Addr: fmt.Sprintf(":%d", c.App.Port),
+ Handler: r,
+ }
+
+ // Shutdown signals
+ stopAll := make(chan os.Signal, 1)
+ signal.Notify(stopAll, syscall.SIGINT, syscall.SIGTERM)
+ go func() {
+ <-stopAll
+ if err := eventConsumer.Stop(); err != nil {
+ log.Panicf("Failed to close consumer group: %v", err)
+ }
+ if err := svr.Shutdown(context.Background()); err != nil {
+ log.Panicf("Failed to shutdown Gin server: %v", err)
+ }
+ }()
+
// Run the server on the specified port
- if err := r.Run(fmt.Sprintf(":%d", c.App.Port)); err != nil {
+ if err := svr.ListenAndServe(); err != nil && err != http.ErrServerClosed {
panic(err)
}
}
Kafka 消费者和 gin 服务器可理解成是运行在同一个进程中的 2 个“线程”。你需要捕获系统信号 SIGINT
和 SIGTERM
,并手动优雅地关停它们两个。
启动 Web 服务和热搜服务。尝试在 Web 服务的首页 http://localhost:8080/ 中搜索关键词,如“love”、“war” 和 “wealth”。
然后你应该能够从热搜服务的网址 (http://localhost:8081/trends) 中看到类似以下的一些结果:
[
{
"query": "love",
"books": [
{
"id": 4,
"title": "Pride and Prejudice",
"author": "Jane Austen",
"published_at": "1813-01-28",
"description": "A classic novel exploring the themes of love, reputation, and social class in Georgian England.",
"created_at": "2024-04-02T21:02:59.314+08:00"
},
{
"id": 10,
"title": "War and Peace",
"author": "Leo Tolstoy",
"published_at": "1869-01-01",
"description": "A novel depicting the Napoleonic era in Russia, exploring themes of love, war, and historical determinism.",
"created_at": "2024-04-02T21:02:59.42+08:00"
}
]
},
{
"query": "war",
"books": [
{
"id": 10,
"title": "War and Peace",
"author": "Leo Tolstoy",
"published_at": "1869-01-01",
"description": "A novel depicting the Napoleonic era in Russia, exploring themes of love, war, and historical determinism.",
"created_at": "2024-04-02T21:02:59.42+08:00"
},
{
"id": 12,
"title": "The Odyssey",
"author": "Homer",
"published_at": "8th Century BC",
"description": "An ancient Greek epic poem attributed to Homer, detailing the journey of Odysseus after the Trojan War.",
"created_at": "2024-04-02T21:02:59.455+08:00"
}
]
},
{
"query": "wealth",
"books": [
{
"id": 1,
"title": "The Great Gatsby",
"author": "F. Scott Fitzgerald",
"published_at": "1925-04-10",
"description": "A novel depicting the opulent lives of wealthy Long Island residents during the Jazz Age.",
"created_at": "2024-04-02T20:36:01.015+08:00"
}
]
}
]