» Go:使用Kafka构建事件驱动微服务 » 4. 消费者:推荐服务 » 4.3 Gin API 服务器

Gin API 服务器

添加 service/recommendation/application/executor/interest_operator.go:

/*
Package executor handles request-response style business logic.
*/
package executor

import (
	"context"

	"literank.com/event-books/service/recommendation/domain/gateway"
	"literank.com/event-books/service/recommendation/domain/model"
)

// InterestOperator handles trend input/output and proxies operations to the interest manager.
type InterestOperator struct {
	interestManager gateway.InterestManager
}

// NewInterestOperator constructs a new InterestOperator
func NewInterestOperator(t gateway.InterestManager) *InterestOperator {
	return &InterestOperator{interestManager: t}
}

// TopTrends gets the top trends order by hits in descending order
func (o *InterestOperator) InterestsForUser(ctx context.Context, userID string) ([]*model.Interest, error) {
	return o.interestManager.ListInterests(ctx, userID)
}

添加 service/recommendation/adapter/router.go:

/*
Package adapter adapts to all kinds of framework or protocols.
*/
package adapter

import (
	"net/http"

	"github.com/gin-gonic/gin"

	"literank.com/event-books/service/recommendation/application"
	"literank.com/event-books/service/recommendation/application/executor"
)

const (
	fieldUID = "uid"
)

// RestHandler handles all restful requests
type RestHandler struct {
	interestOperator *executor.InterestOperator
}

func newRestHandler(wireHelper *application.WireHelper) *RestHandler {
	return &RestHandler{
		interestOperator: executor.NewInterestOperator(wireHelper.InterestManager()),
	}
}

// MakeRouter makes the main router
func MakeRouter(wireHelper *application.WireHelper) (*gin.Engine, error) {
	rest := newRestHandler(wireHelper)
	// Create a new Gin router
	r := gin.Default()

	// Define a health endpoint handler
	r.GET("/recommendations", rest.getInterests)
	return r, nil
}

// Get all trends
func (r *RestHandler) getInterests(c *gin.Context) {
	uid := c.Query(fieldUID)
	if uid == "" {
		c.JSON(http.StatusBadRequest, gin.H{"error": "Empty uid is not allowed"})
		return
	}
	trends, err := r.interestOperator.InterestsForUser(c, uid)
	if err != nil {
		c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to get interests for user " + uid})
		return
	}
	c.JSON(http.StatusOK, trends)
}

主文件中也启动 gin server,service/recommendation/main.go:

@@ -2,12 +2,15 @@ package main
 
 import (
        "context"
+       "fmt"
        "log"
+       "net/http"
        "os"
        "os/signal"
        "syscall"
 
        "literank.com/event-books/infrastructure/parser"
+       "literank.com/event-books/service/recommendation/adapter"
        "literank.com/event-books/service/recommendation/application"
        "literank.com/event-books/service/recommendation/application/consumer"
        "literank.com/event-books/service/recommendation/infrastructure/config"
@@ -32,13 +35,36 @@ func main() {
        tc := consumer.NewInterestConsumer(wireHelper.InterestManager(), wireHelper.TrendEventConsumer())
        eventConsumer := tc.EventConsumer()
        go func() {
-               // Shutdown signals
-               stopAll := make(chan os.Signal, 1)
-               signal.Notify(stopAll, syscall.SIGINT, syscall.SIGTERM)
+               tc.Start(context.Background())
+
+       }()
+
+       // Build main router
+       r, err := adapter.MakeRouter(wireHelper)
+       if err != nil {
+               panic(err)
+       }
+
+       svr := &http.Server{
+               Addr:    fmt.Sprintf(":%d", c.App.Port),
+               Handler: r,
+       }
+
+       // Shutdown signals
+       stopAll := make(chan os.Signal, 1)
+       signal.Notify(stopAll, syscall.SIGINT, syscall.SIGTERM)
+       go func() {
                <-stopAll
                if err := eventConsumer.Stop(); err != nil {
                        log.Panicf("Failed to close consumer group: %v", err)
                }
+               if err := svr.Shutdown(context.Background()); err != nil {
+                       log.Panicf("Failed to shutdown Gin server: %v", err)
+               }
        }()
-       tc.Start(context.Background())
+
+       // Run the server on the specified port
+       if err := svr.ListenAndServe(); err != nil && err != http.ErrServerClosed {
+               panic(err)
+       }
 }

启动服务,并尝试用 curl 访问 http://localhost:8082/recommendations?uid=YVM2P

“YVM2P“ 是本教程的 cookie uid 的值。你应该替换成你自己浏览器中的值。

结果如下:

[
  {
    "user_id": "YVM2P",
    "title": "War and Peace",
    "author": "Leo Tolstoy",
    "score": 2
  },
  {
    "user_id": "YVM2P",
    "title": "Pride and Prejudice",
    "author": "Jane Austen",
    "score": 1
  },
  {
    "user_id": "YVM2P",
    "title": "The Odyssey",
    "author": "Homer",
    "score": 1
  }
]