Fixing RouteGuide's C++ Reactor example. (#29404)

* Fixing RouteGuide's C++ Reactor example.

The current method involves locking and unlocking a mutex from different
threads, which isn't allowed. Changing the strategy a bit to address
this.

* Automated change: Fix sanity tests

* Switching to absl::Mutex to annotate usage properly.

* Actually, let's not cover the examples with sanity checks.
reviewable/pr29474/r1^2
Nicolas Noble 3 years ago committed by GitHub
parent 0ba3c59672
commit eb2ae7a0cc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 66
      examples/cpp/route_guide/route_guide_callback_server.cc
  2. 1
      tools/run_tests/sanity/check_absl_mutex.sh

@ -196,30 +196,32 @@ class RouteGuideImpl final : public RouteGuide::CallbackService {
CallbackServerContext* context) override { CallbackServerContext* context) override {
class Chatter : public grpc::ServerBidiReactor<RouteNote, RouteNote> { class Chatter : public grpc::ServerBidiReactor<RouteNote, RouteNote> {
public: public:
Chatter(std::mutex* mu, std::vector<RouteNote>* received_notes) Chatter(absl::Mutex* mu, std::vector<RouteNote>* received_notes)
: mu_(mu), received_notes_(received_notes) { : mu_(mu), received_notes_(received_notes) {
StartRead(&note_); StartRead(&note_);
} }
void OnDone() override { void OnDone() override { delete this; }
// Collect the read_starter thread if needed
if (read_starter_.joinable()) {
read_starter_.join();
}
delete this;
}
void OnReadDone(bool ok) override { void OnReadDone(bool ok) override {
if (ok) { if (ok) {
// We may need to wait an arbitary amount of time on this mutex // Unlike the other example in this directory that's not using
// and we cannot delay the reaction, so start it in a thread // the reactor pattern, we can't grab a local lock to secure the
// Collect the previous read_starter thread if needed // access to the notes vector, because the reactor will most likely
if (read_starter_.joinable()) { // make us jump threads, so we'll have to use a different locking
read_starter_.join(); // strategy. We'll grab the lock locally to build a copy of the
} // list of nodes we're going to send, then we'll grab the lock
read_starter_ = std::thread([this] { // again to append the received note to the existing vector.
mu_->lock(); mu_->Lock();
notes_iterator_ = received_notes_->begin(); std::copy_if(received_notes_->begin(), received_notes_->end(),
NextWrite(); std::back_inserter(to_send_notes_),
}); [this](const RouteNote& note) {
return note.location().latitude() ==
note_.location().latitude() &&
note.location().longitude() ==
note_.location().longitude();
});
mu_->Unlock();
notes_iterator_ = to_send_notes_.begin();
NextWrite();
} else { } else {
Finish(Status::OK); Finish(Status::OK);
} }
@ -228,33 +230,29 @@ class RouteGuideImpl final : public RouteGuide::CallbackService {
private: private:
void NextWrite() { void NextWrite() {
while (notes_iterator_ != received_notes_->end()) { if (notes_iterator_ != to_send_notes_.end()) {
const RouteNote& n = *notes_iterator_; StartWrite(&*notes_iterator_);
notes_iterator_++; notes_iterator_++;
if (n.location().latitude() == note_.location().latitude() && } else {
n.location().longitude() == note_.location().longitude()) { mu_->Lock();
StartWrite(&n); received_notes_->push_back(note_);
return; mu_->Unlock();
} StartRead(&note_);
} }
// Didn't write anything, so all done with this note
received_notes_->push_back(note_);
mu_->unlock();
StartRead(&note_);
} }
RouteNote note_; RouteNote note_;
std::mutex* mu_; absl::Mutex* mu_;
std::vector<RouteNote>* received_notes_; std::vector<RouteNote>* received_notes_;
std::vector<RouteNote> to_send_notes_;
std::vector<RouteNote>::iterator notes_iterator_; std::vector<RouteNote>::iterator notes_iterator_;
std::thread read_starter_;
}; };
return new Chatter(&mu_, &received_notes_); return new Chatter(&mu_, &received_notes_);
} }
private: private:
std::vector<Feature> feature_list_; std::vector<Feature> feature_list_;
std::mutex mu_; absl::Mutex mu_;
std::vector<RouteNote> received_notes_; std::vector<RouteNote> received_notes_ ABSL_GUARDED_BY(mu_);
}; };
void RunServer(const std::string& db_path) { void RunServer(const std::string& db_path) {

@ -27,7 +27,6 @@ find . \( \( -name "*.cc" \) -or \( -name "*.h" \) \) \
-a \( \( -wholename "./src/*" \) \ -a \( \( -wholename "./src/*" \) \
-or \( -wholename "./include/*" \) \ -or \( -wholename "./include/*" \) \
-or \( -wholename "./test/*" \) \ -or \( -wholename "./test/*" \) \
-or \( -wholename "./examples/*" \) \) \
-a -not -wholename "./include/grpcpp/impl/codegen/sync.h" \ -a -not -wholename "./include/grpcpp/impl/codegen/sync.h" \
-a -not -wholename "./src/core/lib/gprpp/sync.h" \ -a -not -wholename "./src/core/lib/gprpp/sync.h" \
-a -not -wholename "./src/core/lib/gpr/sync_abseil.cc" \ -a -not -wholename "./src/core/lib/gpr/sync_abseil.cc" \

Loading…
Cancel
Save