实现 RouteGuide
首先让我们看看如何创建一个 RouteGuide
服务器。
RouteGuide
服务有两个主要部分:
- 实现从我们的服务定义生成的服务接口:执行我们服务的实际“工作”。
- 运行一个 gRPC 服务器来监听来自客户端的请求并返回服务的响应。
你可以在 examples/cpp/route_guide/route_guide_server.cc
中找到示例 RouteGuide
服务器代码。
让我们拆解一下它是如何工作的。
如你所见,server 有一个 RouteGuideImpl
类。它实现了生成的 RouteGuide::Service
接口:
class RouteGuideImpl final : public RouteGuide::Service {
...
}
此处我们正在实现 RouteGuide
的同步版本,也是我们 gRPC 服务器的默认行为。你还可以实现异步接口 RouteGuide::AsyncService
,以进一步控制服务器的线程行为。本教程暂不会讨论这个。
简单 RPC
RouteGuideImpl
实现了所有的服务方法。让我们首先看看最简单的类型,GetFeature
。它从客户端获取一个 Point
,并返回其数据库中对应的要素信息 Feature
。
Status GetFeature(ServerContext* context, const Point* point,
Feature* feature) override {
feature->set_name(GetFeatureName(*point, feature_list_));
feature->mutable_location()->CopyFrom(*point);
return Status::OK;
}
该方法接收一个用于 RPC 的 context 对象,客户端的 Point
protocol buffer 请求,以及一个用于填充响应信息的 Feature
protocol buffer。
在该方法中,我们填充好 Feature
,然后 return
一个 OK
状态,告诉 gRPC 我们已经处理完 RPC,Feature
可以发送给客户端。
注意,所有服务方法都可以(也会被)同时从多个线程调用。你必须确保你的方法实现是线程安全的。在我们的示例中,feature_list_
在构建后永远不会更改,因此它被设计得就很安全。但是,如果在服务的生命周期内 feature_list_
会发生变化,我们就需要 synchronize 对这个成员的访问。
服务端流式 RPC
现在让我们看看一个稍复杂的流式 RPC。ListFeatures
是一个服务端流式RPC。我们需要向客户端发送多个 Feature
。
Status ListFeatures(ServerContext* context, const Rectangle* rectangle,
ServerWriter<Feature>* writer) override {
auto lo = rectangle->lo();
auto hi = rectangle->hi();
long left = std::min(lo.longitude(), hi.longitude());
long right = std::max(lo.longitude(), hi.longitude());
long top = std::max(lo.latitude(), hi.latitude());
long bottom = std::min(lo.latitude(), hi.latitude());
for (const Feature& f : feature_list_) {
if (f.location().longitude() >= left &&
f.location().longitude() <= right &&
f.location().latitude() >= bottom &&
f.location().latitude() <= top) {
writer->Write(f);
}
}
return Status::OK;
}
如你所见,与之前方法参数中简单的请求和响应对象不同,这次我们有了一个 Rectangle
请求对象(用于在其中查找 Feature
)和一个特殊的 ServerWriter
对象。
在该方法中,我们填充多个 Feature
对象,并使用 Write()
方法将它们写入 ServerWriter
。最后,我们 return Status::OK
来告诉 gRPC 我们已经完成了写入响应。
客户端流式 RPC
客户端流式方法 RecordRoute
与之前大体相似,除了这次的参数是一个 ServerReader
,而不是一个简单的请求对象。我们使用 ServerReader
的 Read()
方法从客户端流式获取一系列 Point
,直到没有更多消息为止:server 需要检查每次 Read()
调用的返回值,如果是 true
,则表示流依然良好,可以继续读取;如果它是 false
,则表示流已经结束。
Status RecordRoute(ServerContext* context, ServerReader<Point>* reader,
RouteSummary* summary) override {
...
while (reader->Read(&point)) {
// 处理客户端输入
...
}
...
return Status::OK;
}
双向流式 RPC
最后,让我们看看双向流式 RPC RouteChat()
。
Status RouteChat(ServerContext* context,
ServerReaderWriter<RouteNote, RouteNote>* stream) override {
RouteNote note;
while (stream->Read(¬e)) {
std::unique_lock<std::mutex> lock(mu_);
for (const RouteNote& n : received_notes_) {
if (n.location().latitude() == note.location().latitude() &&
n.location().longitude() == note.location().longitude()) {
stream->Write(n);
}
}
received_notes_.push_back(note);
}
return Status::OK;
}
这次我们得到了一个 ServerReaderWriter
流,可用于读取和写入消息。
这里的读取和写入语法与我们的客户端流式和服务端流式方法完全相同。虽然每一方始终以消息被写入的顺序获取另一方的消息,但客户端和服务端都可以以任意顺序读取和写入,因为流操作完全独立。
注意,因为 received_notes_
是一个实例变量,可以被多个线程访问。故,此处使用一个 mutex lock 来保证互斥访问。