» Go:使用Kafka构建事件驱动微服务 » 3. 消费者:热搜服务 » 3.3 事件消费者

事件消费者

调整 domain/model/trend.go:

@@ -1,11 +1,8 @@
 package model
 
-import "time"
-
 // Trend represents the structure of a
 // trendy query and its related books.
 type Trend struct {
-       Query     string    `json:"query"`
-       Books     []Book    `json:"books"`
-       CreatedAt time.Time `json:"created_at"`
+       Query string `json:"query"`
+       Books []Book `json:"books"`
 }

可以使用 kafka 事件的时间戳来进行时间追踪,此处可以移去时间字段。

调整 service/trend/domain/gateway/trend_manager.go:

@@ -9,8 +9,16 @@ import (
        "literank.com/event-books/domain/model"
 )
 
+type ConsumeCallback func(key, value []byte) error
+
 // TrendManager manages all trends
 type TrendManager interface {
        CreateTrend(ctx context.Context, t *model.Trend) (uint, error)
-       TopTrends(ctx context.Context, offset int) ([]*model.Trend, error)
+       TopTrends(ctx context.Context, pageSize uint) ([]*model.Trend, error)
+}
+
+// TrendEventConsumer consumes trend events
+type TrendEventConsumer interface {
+       ConsumeEvents(ctx context.Context, callback ConsumeCallback)
+       Stop() error
 }

实现 TrendEventConsumerservice/trend/infrastructure/mq/kafka.go:

/*
Package mq does all message queue jobs.
*/
package mq

import (
	"context"
	"fmt"
	"log"

	"github.com/IBM/sarama"

	"literank.com/event-books/service/trend/domain/gateway"
)

const (
	groupID = "trend-svr"
)

// KafkaConsumer consumers events from the kafka queue
type KafkaConsumer struct {
	cg    sarama.ConsumerGroup
	topic string
}

// NewKafkaConsumer constructs a new KafkaConsumer
func NewKafkaConsumer(brokers []string, topic string) (*KafkaConsumer, error) {
	// Create a new consumer configuration
	config := sarama.NewConfig()
	config.Consumer.Group.Rebalance.Strategy = sarama.NewBalanceStrategyRoundRobin()
	config.Consumer.Offsets.Initial = sarama.OffsetOldest

	// Create consumer
	fmt.Println(brokers)
	consumer, err := sarama.NewConsumerGroup(brokers, groupID, config)
	if err != nil {
		return nil, err
	}
	return &KafkaConsumer{consumer, topic}, nil
}

func (k *KafkaConsumer) ConsumeEvents(ctx context.Context, callback gateway.ConsumeCallback) {

	consumer := Consumer{callback}
	if err := k.cg.Consume(ctx, []string{k.topic}, &consumer); err != nil {
		log.Panicf("Failed to start consuming: %v", err)
	}
}

func (k *KafkaConsumer) Stop() error {
	return k.cg.Close()
}

// Consumer represents a Sarama consumer group consumer
type Consumer struct {
	callback gateway.ConsumeCallback
}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (c *Consumer) Setup(sarama.ConsumerGroupSession) error {
	log.Println("Started to consume events...")
	return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (c *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
	return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
// Once the Messages() channel is closed, the Handler must finish its processing
// loop and exit.
func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	// NOTE:
	// Do not move the code below to a goroutine.
	// The `ConsumeClaim` itself is called within a goroutine, see:
	// https://github.com/IBM/sarama/blob/main/consumer_group.go#L27-L29
	for {
		select {
		case message, ok := <-claim.Messages():
			if !ok {
				log.Printf("message channel was closed")
				return nil
			}
			if err := c.callback(message.Key, message.Value); err != nil {
				log.Printf("Failed to handle event from [%s] key = %s, timestamp = %v, value = %s, error: %v", message.Topic, string(message.Key), message.Timestamp, string(message.Value), err)
			}
			session.MarkMessage(message, "")
		// Should return when `session.Context()` is done.
		// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
		// https://github.com/IBM/sarama/issues/1192
		case <-session.Context().Done():
			return nil
		}
	}
}

调整 TrendManager 实现,service/trend/infrastructure/cache/redis.go:

@@ -9,6 +9,7 @@ import (
        "fmt"
 
        "github.com/redis/go-redis/v9"
+
        "literank.com/event-books/domain/model"
 )
 
@@ -41,12 +42,13 @@ func (r *RedisCache) CreateTrend(ctx context.Context, t *model.Trend) (uint, err
        if err != nil {
                if err == redis.Nil {
                        // Member doesn't exist, add it with initial score of 1
-                       err := r.c.ZAdd(ctx, trendsKey, redis.Z{Score: 1, Member: member}).Err()
+                       err = r.c.ZAdd(ctx, trendsKey, redis.Z{Score: 1, Member: member}).Err()
                        if err != nil {
                                return 0, err
                        }
+               } else {
+                       return 0, err
                }
-               return 0, err
        }
        score, err := r.c.ZIncrBy(ctx, trendsKey, 1, member).Result()
        if err != nil {
@@ -65,8 +67,8 @@ func (r *RedisCache) CreateTrend(ctx context.Context, t *model.Trend) (uint, err
        return uint(score), nil
 }
 
-func (r *RedisCache) TopTrends(ctx context.Context, offset int) ([]*model.Trend, error) {
-       topItems, err := r.c.ZRevRangeWithScores(ctx, trendsKey, 0, int64(offset)).Result()
+func (r *RedisCache) TopTrends(ctx context.Context, pageSize uint) ([]*model.Trend, error) {
+       topItems, err := r.c.ZRevRangeWithScores(ctx, trendsKey, 0, int64(pageSize)-1).Result()
        if err != nil {
                return nil, err
        }
@@ -81,11 +83,17 @@ func (r *RedisCache) TopTrends(ctx context.Context, offset int) ([]*model.Trend,
                }
                k := queryKeyPrefix + query
                value, err := r.c.Get(ctx, k).Result()
-               if err != nil && err != redis.Nil {
-                       return nil, err
-               }
-               if err := json.Unmarshal([]byte(value), &t.Books); err != nil {
+               if err != nil {
+                       if err == redis.Nil {
+                               t.Books = make([]model.Book, 0)
+                               trends = append(trends, t)
+                               continue
+                       }
                        return nil, err
+               } else {
+                       if err := json.Unmarshal([]byte(value), &t.Books); err != nil {
+                               return nil, err
+                       }
                }
                trends = append(trends, t)
        }

调整 service/trend/application/wire_helper.go:

@@ -7,18 +7,25 @@ import (
        "literank.com/event-books/service/trend/domain/gateway"
        "literank.com/event-books/service/trend/infrastructure/cache"
        "literank.com/event-books/service/trend/infrastructure/config"
+       "literank.com/event-books/service/trend/infrastructure/mq"
 )
 
 // WireHelper is the helper for dependency injection
 type WireHelper struct {
-       kvStore *cache.RedisCache
+       kvStore  *cache.RedisCache
+       consumer *mq.KafkaConsumer
 }
 
 // NewWireHelper constructs a new WireHelper
 func NewWireHelper(c *config.Config) (*WireHelper, error) {
        kv := cache.NewRedisCache(c.Cache.Address, c.Cache.Password, c.Cache.DB)
+       consumer, err := mq.NewKafkaConsumer(c.MQ.Brokers, c.MQ.Topic)
+       if err != nil {
+               return nil, err
+       }
        return &WireHelper{
-               kvStore: kv,
+               kvStore:  kv,
+               consumer: consumer,
        }, nil
 }
 
@@ -26,3 +33,8 @@ func NewWireHelper(c *config.Config) (*WireHelper, error) {
 func (w *WireHelper) TrendManager() gateway.TrendManager {
        return w.kvStore
 }
+
+// TrendEventConsumer returns an instance of TrendEventConsumer
+func (w *WireHelper) TrendEventConsumer() gateway.TrendEventConsumer {
+       return w.consumer
+}

添加配置项,service/trend/infrastructure/config/config.go:

@@ -7,6 +7,7 @@ package config
 type Config struct {
        App   ApplicationConfig `json:"app" yaml:"app"`
        Cache CacheConfig       `json:"cache" yaml:"cache"`
+       MQ    MQConfig          `json:"mq" yaml:"mq"`
 }
 
 // ApplicationConfig is the configuration of main app.
@@ -20,3 +21,9 @@ type CacheConfig struct {
        Password string `json:"password" yaml:"password"`
        DB       int    `json:"db" yaml:"db"`
 }
+
+// MQConfig is the configuration of message queues.
+type MQConfig struct {
+       Brokers []string `json:"brokers" yaml:"brokers"`
+       Topic   string   `json:"topic" yaml:"topic"`
+}

填入配置值,service/web/config.yml:

@@ -3,4 +3,8 @@ app:
 cache:
   address: localhost:6379
   password: test_pass
-  db: 0
+  db: 0
+mq:
+  brokers:
+    - localhost:9094
+  topic: "lr-book-searches"

此处使用 9094 端口是为了从外部访问跑在 docker 容器内的 kafka。
详情阅读 “Accessing Apache Kafka with internal and external clients“配置一节,bitnami/kafka 镜像。

添加 service/trend/application/consumer/trend.go:

/*
Package consumer handles event-trigger style business logic.
*/
package consumer

import (
	"context"
	"encoding/json"

	"literank.com/event-books/domain/model"
	"literank.com/event-books/service/trend/domain/gateway"
)

type TrendConsumer struct {
	trendManager  gateway.TrendManager
	eventConsumer gateway.TrendEventConsumer
}

func NewTrendConsumer(t gateway.TrendManager, e gateway.TrendEventConsumer) *TrendConsumer {
	return &TrendConsumer{trendManager: t, eventConsumer: e}
}

func (c *TrendConsumer) Start(ctx context.Context) {
	c.eventConsumer.ConsumeEvents(ctx, func(key, data []byte) error {
		t := &model.Trend{
			Query: string(key),
		}
		if err := json.Unmarshal(data, &t.Books); err != nil {
			return err
		}
		_, err := c.trendManager.CreateTrend(ctx, t)
		return err
	})
}

func (c *TrendConsumer) EventConsumer() gateway.TrendEventConsumer {
	return c.eventConsumer
}

调整 service/trend/application/executor/trend_operator.go:

@@ -20,12 +20,7 @@ func NewTrendOperator(t gateway.TrendManager) *TrendOperator {
        return &TrendOperator{trendManager: t}
 }
 
-// CreateTrend creates a new trend
-func (o *TrendOperator) CreateTrend(ctx context.Context, t *model.Trend) (uint, error) {
-       return o.trendManager.CreateTrend(ctx, t)
-}
-
 // TopTrends gets the top trends order by hits in descending order
-func (o *TrendOperator) TopTrends(ctx context.Context, offset int) ([]*model.Trend, error) {
-       return o.trendManager.TopTrends(ctx, offset)
+func (o *TrendOperator) TopTrends(ctx context.Context, pageSize uint) ([]*model.Trend, error) {
+       return o.trendManager.TopTrends(ctx, pageSize)
 }

调整 service/trend/adapter/router.go:

@@ -14,7 +14,7 @@ import (
 )
 
 const (
-       fieldOffset = "o"
+       fieldPageSize = "ps"
 )
 
 // RestHandler handles all restful requests
@@ -41,19 +41,19 @@ func MakeRouter(wireHelper *application.WireHelper) (*gin.Engine, error) {
 
 // Get all trends
 func (r *RestHandler) getTrends(c *gin.Context) {
-       offset := 0
-       offsetParam := c.Query(fieldOffset)
-       if offsetParam != "" {
-               value, err := strconv.Atoi(offsetParam)
+       ps := 10
+       psParam := c.Query(fieldPageSize)
+       if psParam != "" {
+               value, err := strconv.Atoi(psParam)
                if err != nil {
-                       c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid offset"})
+                       c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid page size"})
                        return
                }
-               offset = value
+               ps = value
        }
-       trends, err := r.trendOperator.TopTrends(c, offset)
+       trends, err := r.trendOperator.TopTrends(c, uint(ps))
        if err != nil {
-               c.JSON(http.StatusNotFound, gin.H{"error": "failed to get trends"})
+               c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to get trends"})
                return
        }
        c.JSON(http.StatusOK, trends)

修改 main.go,使得 consumer 和 router 都能优雅关闭:

@@ -1,11 +1,18 @@
 package main
 
 import (
+       "context"
        "fmt"
+       "log"
+       "net/http"
+       "os"
+       "os/signal"
+       "syscall"
 
        "literank.com/event-books/infrastructure/parser"
        "literank.com/event-books/service/trend/adapter"
        "literank.com/event-books/service/trend/application"
+       "literank.com/event-books/service/trend/application/consumer"
        "literank.com/event-books/service/trend/infrastructure/config"
 )
 
@@ -24,13 +31,39 @@ func main() {
                panic(err)
        }
 
+       // Run the consumer
+       tc := consumer.NewTrendConsumer(wireHelper.TrendManager(), wireHelper.TrendEventConsumer())
+       eventConsumer := tc.EventConsumer()
+       go func() {
+               tc.Start(context.Background())
+       }()
+
        // Build main router
        r, err := adapter.MakeRouter(wireHelper)
        if err != nil {
                panic(err)
        }
+
+       svr := &http.Server{
+               Addr:    fmt.Sprintf(":%d", c.App.Port),
+               Handler: r,
+       }
+
+       // Shutdown signals
+       stopAll := make(chan os.Signal, 1)
+       signal.Notify(stopAll, syscall.SIGINT, syscall.SIGTERM)
+       go func() {
+               <-stopAll
+               if err := eventConsumer.Stop(); err != nil {
+                       log.Panicf("Failed to close consumer group: %v", err)
+               }
+               if err := svr.Shutdown(context.Background()); err != nil {
+                       log.Panicf("Failed to shutdown Gin server: %v", err)
+               }
+       }()
+
        // Run the server on the specified port
-       if err := r.Run(fmt.Sprintf(":%d", c.App.Port)); err != nil {
+       if err := svr.ListenAndServe(); err != nil && err != http.ErrServerClosed {
                panic(err)
        }
 }

Kafka 消费者和 gin 服务器可理解成是运行在同一个进程中的 2 个“线程”。你需要捕获系统信号 SIGINTSIGTERM,并手动优雅地关停它们两个。

启动 Web 服务和热搜服务。尝试在 Web 服务的首页 http://localhost:8080/ 中搜索关键词,如“love”、“war” 和 “wealth”。

然后你应该能够从热搜服务的网址 (http://localhost:8081/trends) 中看到类似以下的一些结果:

[
  {
    "query": "love",
    "books": [
      {
        "id": 4,
        "title": "Pride and Prejudice",
        "author": "Jane Austen",
        "published_at": "1813-01-28",
        "description": "A classic novel exploring the themes of love, reputation, and social class in Georgian England.",
        "created_at": "2024-04-02T21:02:59.314+08:00"
      },
      {
        "id": 10,
        "title": "War and Peace",
        "author": "Leo Tolstoy",
        "published_at": "1869-01-01",
        "description": "A novel depicting the Napoleonic era in Russia, exploring themes of love, war, and historical determinism.",
        "created_at": "2024-04-02T21:02:59.42+08:00"
      }
    ]
  },
  {
    "query": "war",
    "books": [
      {
        "id": 10,
        "title": "War and Peace",
        "author": "Leo Tolstoy",
        "published_at": "1869-01-01",
        "description": "A novel depicting the Napoleonic era in Russia, exploring themes of love, war, and historical determinism.",
        "created_at": "2024-04-02T21:02:59.42+08:00"
      },
      {
        "id": 12,
        "title": "The Odyssey",
        "author": "Homer",
        "published_at": "8th Century BC",
        "description": "An ancient Greek epic poem attributed to Homer, detailing the journey of Odysseus after the Trojan War.",
        "created_at": "2024-04-02T21:02:59.455+08:00"
      }
    ]
  },
  {
    "query": "wealth",
    "books": [
      {
        "id": 1,
        "title": "The Great Gatsby",
        "author": "F. Scott Fitzgerald",
        "published_at": "1925-04-10",
        "description": "A novel depicting the opulent lives of wealthy Long Island residents during the Jazz Age.",
        "created_at": "2024-04-02T20:36:01.015+08:00"
      }
    ]
  }
]
上页下页