/* * * Copyright 2021 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #include #include #include #include #include #include #include #include #include #include #include #include #include "helper.h" #ifdef BAZEL_BUILD #include "examples/protos/route_guide.grpc.pb.h" #else #include "route_guide.grpc.pb.h" #endif using grpc::CallbackServerContext; using grpc::Server; using grpc::ServerBuilder; using grpc::Status; using routeguide::Feature; using routeguide::Point; using routeguide::Rectangle; using routeguide::RouteGuide; using routeguide::RouteNote; using routeguide::RouteSummary; using std::chrono::system_clock; float ConvertToRadians(float num) { return num * 3.1415926 / 180; } // The formula is based on http://mathforum.org/library/drmath/view/51879.html float GetDistance(const Point& start, const Point& end) { const float kCoordFactor = 10000000.0; float lat_1 = start.latitude() / kCoordFactor; float lat_2 = end.latitude() / kCoordFactor; float lon_1 = start.longitude() / kCoordFactor; float lon_2 = end.longitude() / kCoordFactor; float lat_rad_1 = ConvertToRadians(lat_1); float lat_rad_2 = ConvertToRadians(lat_2); float delta_lat_rad = ConvertToRadians(lat_2 - lat_1); float delta_lon_rad = ConvertToRadians(lon_2 - lon_1); float a = pow(sin(delta_lat_rad / 2), 2) + cos(lat_rad_1) * cos(lat_rad_2) * pow(sin(delta_lon_rad / 2), 2); float c = 2 * atan2(sqrt(a), sqrt(1 - a)); int R = 6371000; // metres return R * c; } std::string GetFeatureName(const Point& point, const std::vector& feature_list) { for (const Feature& f : feature_list) { if (f.location().latitude() == point.latitude() && f.location().longitude() == point.longitude()) { return f.name(); } } return ""; } class RouteGuideImpl final : public RouteGuide::CallbackService { public: explicit RouteGuideImpl(const std::string& db) { routeguide::ParseDb(db, &feature_list_); } grpc::ServerUnaryReactor* GetFeature(CallbackServerContext* context, const Point* point, Feature* feature) override { feature->set_name(GetFeatureName(*point, feature_list_)); feature->mutable_location()->CopyFrom(*point); auto* reactor = context->DefaultReactor(); reactor->Finish(Status::OK); return reactor; } grpc::ServerWriteReactor* ListFeatures( CallbackServerContext* context, const routeguide::Rectangle* rectangle) override { class Lister : public grpc::ServerWriteReactor { public: Lister(const routeguide::Rectangle* rectangle, const std::vector* feature_list) : left_((std::min)(rectangle->lo().longitude(), rectangle->hi().longitude())), right_((std::max)(rectangle->lo().longitude(), rectangle->hi().longitude())), top_((std::max)(rectangle->lo().latitude(), rectangle->hi().latitude())), bottom_((std::min)(rectangle->lo().latitude(), rectangle->hi().latitude())), feature_list_(feature_list), next_feature_(feature_list_->begin()) { NextWrite(); } void OnDone() override { delete this; } void OnWriteDone(bool /*ok*/) override { NextWrite(); } private: void NextWrite() { while (next_feature_ != feature_list_->end()) { const Feature& f = *next_feature_; next_feature_++; if (f.location().longitude() >= left_ && f.location().longitude() <= right_ && f.location().latitude() >= bottom_ && f.location().latitude() <= top_) { StartWrite(&f); return; } } // Didn't write anything, all is done. Finish(Status::OK); } const long left_; const long right_; const long top_; const long bottom_; const std::vector* feature_list_; std::vector::const_iterator next_feature_; }; return new Lister(rectangle, &feature_list_); } grpc::ServerReadReactor* RecordRoute(CallbackServerContext* context, RouteSummary* summary) override { class Recorder : public grpc::ServerReadReactor { public: Recorder(RouteSummary* summary, const std::vector* feature_list) : start_time_(system_clock::now()), summary_(summary), feature_list_(feature_list) { StartRead(&point_); } void OnDone() { delete this; } void OnReadDone(bool ok) { if (ok) { point_count_++; if (!GetFeatureName(point_, *feature_list_).empty()) { feature_count_++; } if (point_count_ != 1) { distance_ += GetDistance(previous_, point_); } previous_ = point_; StartRead(&point_); } else { summary_->set_point_count(point_count_); summary_->set_feature_count(feature_count_); summary_->set_distance(static_cast(distance_)); auto secs = std::chrono::duration_cast( system_clock::now() - start_time_); summary_->set_elapsed_time(secs.count()); Finish(Status::OK); } } private: system_clock::time_point start_time_; RouteSummary* summary_; const std::vector* feature_list_; Point point_; int point_count_ = 0; int feature_count_ = 0; float distance_ = 0.0; Point previous_; }; return new Recorder(summary, &feature_list_); } grpc::ServerBidiReactor* RouteChat( CallbackServerContext* context) override { class Chatter : public grpc::ServerBidiReactor { public: Chatter(std::mutex* mu, std::vector* received_notes) : mu_(mu), received_notes_(received_notes) { StartRead(¬e_); } void OnDone() override { // Collect the read_starter thread if needed if (read_starter_.joinable()) { read_starter_.join(); } delete this; } void OnReadDone(bool ok) override { if (ok) { // We may need to wait an arbitary amount of time on this mutex // and we cannot delay the reaction, so start it in a thread // Collect the previous read_starter thread if needed if (read_starter_.joinable()) { read_starter_.join(); } read_starter_ = std::thread([this] { mu_->lock(); notes_iterator_ = received_notes_->begin(); NextWrite(); }); } else { Finish(Status::OK); } } void OnWriteDone(bool /*ok*/) override { NextWrite(); } private: void NextWrite() { while (notes_iterator_ != received_notes_->end()) { const RouteNote& n = *notes_iterator_; notes_iterator_++; if (n.location().latitude() == note_.location().latitude() && n.location().longitude() == note_.location().longitude()) { StartWrite(&n); return; } } // Didn't write anything, so all done with this note received_notes_->push_back(note_); mu_->unlock(); StartRead(¬e_); } RouteNote note_; std::mutex* mu_; std::vector* received_notes_; std::vector::iterator notes_iterator_; std::thread read_starter_; }; return new Chatter(&mu_, &received_notes_); } private: std::vector feature_list_; std::mutex mu_; std::vector received_notes_; }; void RunServer(const std::string& db_path) { std::string server_address("0.0.0.0:50051"); RouteGuideImpl service(db_path); ServerBuilder builder; builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); builder.RegisterService(&service); std::unique_ptr server(builder.BuildAndStart()); std::cout << "Server listening on " << server_address << std::endl; server->Wait(); } int main(int argc, char** argv) { // Expect only arg: --db_path=path/to/route_guide_db.json. std::string db = routeguide::GetDbFileContent(argc, argv); RunServer(db); return 0; }