实现 RouteGuide
首先让我们看看如何创建一个 RouteGuide
服务器。
RouteGuide
服务有两个主要部分:
- 实现从我们的服务定义生成的服务接口:执行我们服务的实际“工作”。
- 运行一个 gRPC 服务器来监听来自客户端的请求并将它们分派给对应的服务实现。
你可以在 server/server.go
中找到示例 RouteGuide
服务器代码。
让我们拆解一下它是如何工作的。
如你所见,服务器有一个 routeGuideServer
结构体。它实现了生成的 RouteGuideServer
接口:
type routeGuideServer struct {
...
}
...
func (s *routeGuideServer) GetFeature(ctx context.Context, point *pb.Point) (*pb.Feature, error) {
...
}
...
func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
...
}
...
func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
...
}
...
func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
...
}
...
简单 RPC
routeGuideServer
实现了所有的服务方法。让我们首先看看最简单的类型,GetFeature
。它从客户端获取一个 Point
,并返回其数据库中对应的要素信息 Feature
。
func (s *routeGuideServer) GetFeature(ctx context.Context, point *pb.Point) (*pb.Feature, error) {
for _, feature := range s.savedFeatures {
if proto.Equal(feature.Location, point) {
return feature, nil
}
}
// 未找到要素,返回一个无名要素
return &pb.Feature{Location: point}, nil
}
该方法接收一个用于 RPC 的 context 对象以及客户端的 Point
protocol buffer 请求。它返回一个带有 Feature
protocol buffer 对象和一个 error
的 的响应信息。
在该方法中,我们填充好 Feature
,然后和 nil
空错误一起返回,告诉 gRPC 我们已经处理完 RPC,Feature
可以发送给客户端。
服务端流式 RPC
ListFeatures
是一个服务端流式RPC。我们需要向客户端发送多个 Feature
。
func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
for _, feature := range s.savedFeatures {
if inRange(feature.Location, rect) {
if err := stream.Send(feature); err != nil {
return err
}
}
}
return nil
}
如你所见,与之前方法参数中简单的请求和响应对象不同,这次我们有了一个 Rectangle
请求对象(用于在其中查找 Feature
)和一个特殊的 RouteGuide_ListFeaturesServer
对象,用于写入响应。
在该方法中,我们填充多个 Feature
对象,并使用 Send()
方法将它们写入 RouteGuide_ListFeaturesServer
。最后,与之前的简单RPC一样,我们返回一个 nil
错误,告诉 gRPC 我们已经完成了写入响应。如果在此调用中发生任何错误,我们将返回一个非 nil
错误;gRPC 层自动将它转为适当的 RPC 状态发回。
客户端流式 RPC
现在让我们看看略复杂一点的东西:客户端流式方法 RecordRoute
。我们从客户端流式获取一系列 Point
,并返回一个包含有关它们行程信息的 RouteSummary
。如你所见,此方法完全没有来自客户端的请求参数。它只有一个 RouteGuide_RecordRouteServer
流,服务器可以使用它来读取和写入消息 - 它可以使用其 Recv()
方法接收客户端消息,并使用其 SendAndClose()
方法返回其单个响应。
func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
var pointCount, featureCount, distance int32
var lastPoint *pb.Point
startTime := time.Now()
for {
point, err := stream.Recv()
if err == io.EOF {
endTime := time.Now()
return stream.SendAndClose(&pb.RouteSummary{
PointCount: pointCount,
FeatureCount: featureCount,
Distance: distance,
ElapsedTime: int32(endTime.Sub(startTime).Seconds()),
})
}
if err != nil {
return err
}
pointCount++
for _, feature := range s.savedFeatures {
if proto.Equal(feature.Location, point) {
featureCount++
}
}
if lastPoint != nil {
distance += calcDistance(lastPoint, point)
}
lastPoint = point
}
}
在该方法体中,我们使用 RouteGuide_RecordRouteServer
的 Recv()
方法重复读取客户端的请求到一个请求对象(此处是一个 Point
),直到没有更多的消息:服务器需要在每次调用后检查从 Recv()
返回的错误。如果这是 nil
,则表示流依然良好,它可以继续读取;如果它是 io.EOF
,则表示消息流已经结束,服务器可以返回其 RouteSummary
。
如果它有任何其他值,我们将错误“原样”返回,以便将它交由 gRPC 层转换为适当的RPC状态。
双向流式 RPC
最后,让我们看看双向流式 RPC RouteChat()
。
func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
key := serialize(in.Location)
... // 寻找要发送给客户端的 notes
for _, note := range s.routeNotes[key] {
if err := stream.Send(note); err != nil {
return err
}
}
}
}
这次我们获得了一个 RouteGuide_RouteChatServer
流,与客户端流式示例中类似,也可以用于读取和写入消息。
然而,这次我们通过方法的流返回值时,客户端仍然在持续将消息写入它的消息流。
这里读取和写入的语法与客户端流式方法非常相似,除了服务器使用流的 Send()
方法而不是 SendAndClose()
。
因为它正在写入多个响应。虽然双方始终以写入顺序接收另一方的消息,但是客户端和服务端都可以按任意顺序读取和写入,因为各自的流完全独立运行。