调用服务方法
在 gRPC-Go 中,RPCs 以阻塞/同步模式运行,这意味着 RPC 调用会等待服务器响应,要么返回响应,要么返回错误。
简单 RPC
调用简单 RPC GetFeature
几乎和调用本地方法一样简单。
feature, err := client.GetFeature(context.Background(), &pb.Point{409146138, -746188906})
if err != nil {
...
}
如你所见,我们在之前得到的 stub(存根)上调用方法。在方法参数中,我们创建并填充一个请求 protocol buffer 对象(此处是 Point
)。我们还传递了一个 context.Context
对象,以便在必要时更改 RPC 的行为,例如在进行中超时/取消 RPC。如果调用没有返回错误,那么我们可以从第一个返回值中读取来自服务器的响应信息。
log.Println(feature)
服务端流式 RPC
如下是调用服务器端流式方法 ListFeatures
的代码,它返回一个包含系列地理信息 Feature
的流。你可能会发现与前文的服务端流式处理逻辑类似。流式 RPC 在两端都以类似的方式实现。
rect := &pb.Rectangle{ ... } // 构建一个 pb.Rectangle
stream, err := client.ListFeatures(context.Background(), rect)
if err != nil {
...
}
for {
feature, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("%v.ListFeatures(_) = _, %v", client, err)
}
log.Println(feature)
}
与简单 RPC 一样,我们将 context 和请求对象传递给方法。不一样的是,我们得到的不是一个响应对象,而是一个 RouteGuide_ListFeaturesClient
实例。客户端可以使用 RouteGuide_ListFeaturesClient
流来读取服务器的响应。
我们使用 RouteGuide_ListFeaturesClient
的 Recv()
方法重复读取服务器的响应到一个 protocol buffer 对象(此处是 Feature
)中,直到没有更多的消息为止:客户端需要在每次调用后检查从 Recv()
返回的错误 err
。如果是 nil
,则流仍然有效,可以继续读取;如果是 io.EOF
,则消息流已结束;否则,肯定发生了 RPC 错误,并通过 err
表示了出来。
客户端流式 RPC
客户端流式方法 RecordRoute
与服务器端方法类似,不同之处在于我们只传递了 context 对象给方法,并获得了一个 RouteGuide_RecordRouteClient
流。我们可以用它来写入和读取消息。
// 创建随机数量的随机点
r := rand.New(rand.NewSource(time.Now().UnixNano()))
pointCount := int(r.Int31n(100)) + 2 // 遍历至少两个点
var points []*pb.Point
for i := 0; i < pointCount; i++ {
points = append(points, randomPoint(r))
}
log.Printf("Traversing %d points.", len(points))
stream, err := client.RecordRoute(context.Background())
if err != nil {
log.Fatalf("%v.RecordRoute(_) = _, %v", client, err)
}
for _, point := range points {
if err := stream.Send(point); err != nil {
log.Fatalf("%v.Send(%v) = %v", stream, point, err)
}
}
reply, err := stream.CloseAndRecv()
if err != nil {
log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
}
log.Printf("Route summary: %v", reply)
RouteGuide_RecordRouteClient
有一个 Send()
方法。我们可以用其向服务器发送请求。一旦我们使用 Send()
将客户端的请求写入流中,我们需要调用 CloseAndRecv()
来告诉 gRPC 我们已经完成了写入,并期望收到一个响应。我们从 CloseAndRecv()
返回的 err
中获取我们的 RPC 状态。如果状态是 nil
,那么 CloseAndRecv()
的第一个返回值就是一个有效的服务器响应。
双向流式 RPC
最后,让我们看看双向流式 RPC RouteChat()
。与 RecordRoute
类似,我们只将 context 对象传递给方法,并获得一个流,我们可以用其写入和读取消息。但是,这次当服务器仍然在向它消息流写入消息时我们就可以通过方法的流来返回值。
stream, err := client.RouteChat(context.Background())
waitc := make(chan struct{})
go func() {
for {
in, err := stream.Recv()
if err == io.EOF {
// read done.
close(waitc)
return
}
if err != nil {
log.Fatalf("Failed to receive a note : %v", err)
}
log.Printf("Got message %s at point(%d, %d)", in.Message, in.Location.Latitude, in.Location.Longitude)
}
}()
for _, note := range notes {
if err := stream.Send(note); err != nil {
log.Fatalf("Failed to send a note: %v", err)
}
}
stream.CloseSend()
<-waitc
这里的读取和写入语法与客户端流式方法非常相似,不同之处在于一旦完成调用,我们使用流的 CloseSend()
方法。尽管两端始终按照其写入顺序收到另一方的消息,但是客户端和服务端都可以按任意顺序读取和写入 - 各自的流完全独立运行。