Merge pull request #22065 from vjpai/completion_op_conditional_inline

Be careful about when to inline user's OnDone server callback
pull/22098/head
Vijay Pai 5 years ago committed by GitHub
commit 5064076e42
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 262
      include/grpcpp/impl/codegen/server_callback_handlers.h
  2. 52
      include/grpcpp/impl/codegen/server_callback_impl.h
  3. 2
      include/grpcpp/impl/codegen/server_context_impl.h
  4. 56
      src/cpp/server/server_callback.cc
  5. 17
      test/cpp/end2end/test_service_impl.cc

@ -117,9 +117,19 @@ class CallbackUnaryHandler : public ::grpc::internal::MethodHandler {
class ServerCallbackUnaryImpl : public ServerCallbackUnary {
public:
void Finish(::grpc::Status s) override {
// A callback that only contains a call to MaybeDone can be run as an
// inline callback regardless of whether or not OnDone is inlineable
// because if the actual OnDone callback needs to be scheduled, MaybeDone
// is responsible for dispatching to an executor thread if needed. Thus,
// when setting up the finish_tag_, we can set its own callback to
// inlineable.
finish_tag_.Set(
call_.call(), [this](bool) { MaybeDone(); }, &finish_ops_,
reactor_.load(std::memory_order_relaxed)->InternalInlineable());
call_.call(),
[this](bool) {
this->MaybeDone(
reactor_.load(std::memory_order_relaxed)->InternalInlineable());
},
&finish_ops_, /*can_inline=*/true);
finish_ops_.set_core_cq_tag(&finish_tag_);
if (!ctx_->sent_initial_metadata_) {
@ -144,13 +154,19 @@ class CallbackUnaryHandler : public ::grpc::internal::MethodHandler {
void SendInitialMetadata() override {
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
this->Ref();
// The callback for this function should not be marked inline because it
// is directly invoking a user-controlled reaction
// (OnSendInitialMetadataDone). Thus it must be dispatched to an executor
// thread. However, any OnDone needed after that can be inlined because it
// is already running on an executor thread.
meta_tag_.Set(call_.call(),
[this](bool ok) {
reactor_.load(std::memory_order_relaxed)
->OnSendInitialMetadataDone(ok);
MaybeDone();
ServerUnaryReactor* reactor =
reactor_.load(std::memory_order_relaxed);
reactor->OnSendInitialMetadataDone(ok);
this->MaybeDone(/*inlineable_ondone=*/true);
},
&meta_ops_, false);
&meta_ops_, /*can_inline=*/false);
meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
if (ctx_->compression_level_set()) {
@ -184,22 +200,20 @@ class CallbackUnaryHandler : public ::grpc::internal::MethodHandler {
reactor_.store(reactor, std::memory_order_relaxed);
this->BindReactor(reactor);
this->MaybeCallOnCancel(reactor);
this->MaybeDone();
this->MaybeDone(reactor->InternalInlineable());
}
const RequestType* request() { return allocator_state_->request(); }
ResponseType* response() { return allocator_state_->response(); }
void MaybeDone() override {
if (GPR_UNLIKELY(this->Unref() == 1)) {
reactor_.load(std::memory_order_relaxed)->OnDone();
grpc_call* call = call_.call();
auto call_requester = std::move(call_requester_);
allocator_state_->Release();
this->~ServerCallbackUnaryImpl(); // explicitly call destructor
::grpc::g_core_codegen_interface->grpc_call_unref(call);
call_requester();
}
void CallOnDone() override {
reactor_.load(std::memory_order_relaxed)->OnDone();
grpc_call* call = call_.call();
auto call_requester = std::move(call_requester_);
allocator_state_->Release();
this->~ServerCallbackUnaryImpl(); // explicitly call destructor
::grpc::g_core_codegen_interface->grpc_call_unref(call);
call_requester();
}
ServerReactor* reactor() override {
@ -255,8 +269,13 @@ class CallbackClientStreamingHandler : public ::grpc::internal::MethodHandler {
static_cast<::grpc_impl::CallbackServerContext*>(
param.server_context),
param.call, std::move(param.call_requester));
// Inlineable OnDone can be false in the CompletionOp callback because there
// is no read reactor that has an inlineable OnDone; this only applies to
// the DefaultReactor (which is unary).
param.server_context->BeginCompletionOp(
param.call, [reader](bool) { reader->MaybeDone(); }, reader);
param.call,
[reader](bool) { reader->MaybeDone(/*inlineable_ondone=*/false); },
reader);
ServerReadReactor<RequestType>* reactor = nullptr;
if (param.status.ok()) {
@ -287,8 +306,17 @@ class CallbackClientStreamingHandler : public ::grpc::internal::MethodHandler {
class ServerCallbackReaderImpl : public ServerCallbackReader<RequestType> {
public:
void Finish(::grpc::Status s) override {
finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); }, &finish_ops_,
false);
// A finish tag with only MaybeDone can have its callback inlined
// regardless even if OnDone is not inlineable because this callback just
// checks a ref and then decides whether or not to dispatch OnDone.
finish_tag_.Set(call_.call(),
[this](bool) {
// Inlineable OnDone can be false here because there is
// no read reactor that has an inlineable OnDone; this
// only applies to the DefaultReactor (which is unary).
this->MaybeDone(/*inlineable_ondone=*/false);
},
&finish_ops_, /*can_inline=*/true);
if (!ctx_->sent_initial_metadata_) {
finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
@ -311,13 +339,17 @@ class CallbackClientStreamingHandler : public ::grpc::internal::MethodHandler {
void SendInitialMetadata() override {
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
this->Ref();
// The callback for this function should not be inlined because it invokes
// a user-controlled reaction, but any resulting OnDone can be inlined in
// the executor to which this callback is dispatched.
meta_tag_.Set(call_.call(),
[this](bool ok) {
reactor_.load(std::memory_order_relaxed)
->OnSendInitialMetadataDone(ok);
MaybeDone();
ServerReadReactor<RequestType>* reactor =
reactor_.load(std::memory_order_relaxed);
reactor->OnSendInitialMetadataDone(ok);
this->MaybeDone(/*inlineable_ondone=*/true);
},
&meta_ops_, false);
&meta_ops_, /*can_inline=*/false);
meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
if (ctx_->compression_level_set()) {
@ -344,31 +376,35 @@ class CallbackClientStreamingHandler : public ::grpc::internal::MethodHandler {
void SetupReactor(ServerReadReactor<RequestType>* reactor) {
reactor_.store(reactor, std::memory_order_relaxed);
// The callback for this function should not be inlined because it invokes
// a user-controlled reaction, but any resulting OnDone can be inlined in
// the executor to which this callback is dispatched.
read_tag_.Set(call_.call(),
[this](bool ok) {
reactor_.load(std::memory_order_relaxed)->OnReadDone(ok);
MaybeDone();
[this, reactor](bool ok) {
reactor->OnReadDone(ok);
this->MaybeDone(/*inlineable_ondone=*/true);
},
&read_ops_, false);
&read_ops_, /*can_inline=*/false);
read_ops_.set_core_cq_tag(&read_tag_);
this->BindReactor(reactor);
this->MaybeCallOnCancel(reactor);
this->MaybeDone();
// Inlineable OnDone can be false here because there is no read
// reactor that has an inlineable OnDone; this only applies to the
// DefaultReactor (which is unary).
this->MaybeDone(/*inlineable_ondone=*/false);
}
~ServerCallbackReaderImpl() {}
ResponseType* response() { return &resp_; }
void MaybeDone() override {
if (GPR_UNLIKELY(this->Unref() == 1)) {
reactor_.load(std::memory_order_relaxed)->OnDone();
grpc_call* call = call_.call();
auto call_requester = std::move(call_requester_);
this->~ServerCallbackReaderImpl(); // explicitly call destructor
::grpc::g_core_codegen_interface->grpc_call_unref(call);
call_requester();
}
void CallOnDone() override {
reactor_.load(std::memory_order_relaxed)->OnDone();
grpc_call* call = call_.call();
auto call_requester = std::move(call_requester_);
this->~ServerCallbackReaderImpl(); // explicitly call destructor
::grpc::g_core_codegen_interface->grpc_call_unref(call);
call_requester();
}
ServerReactor* reactor() override {
@ -419,8 +455,13 @@ class CallbackServerStreamingHandler : public ::grpc::internal::MethodHandler {
param.server_context),
param.call, static_cast<RequestType*>(param.request),
std::move(param.call_requester));
// Inlineable OnDone can be false in the CompletionOp callback because there
// is no write reactor that has an inlineable OnDone; this only applies to
// the DefaultReactor (which is unary).
param.server_context->BeginCompletionOp(
param.call, [writer](bool) { writer->MaybeDone(); }, writer);
param.call,
[writer](bool) { writer->MaybeDone(/*inlineable_ondone=*/false); },
writer);
ServerWriteReactor<ResponseType>* reactor = nullptr;
if (param.status.ok()) {
@ -467,8 +508,17 @@ class CallbackServerStreamingHandler : public ::grpc::internal::MethodHandler {
class ServerCallbackWriterImpl : public ServerCallbackWriter<ResponseType> {
public:
void Finish(::grpc::Status s) override {
finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); }, &finish_ops_,
false);
// A finish tag with only MaybeDone can have its callback inlined
// regardless even if OnDone is not inlineable because this callback just
// checks a ref and then decides whether or not to dispatch OnDone.
finish_tag_.Set(call_.call(),
[this](bool) {
// Inlineable OnDone can be false here because there is
// no write reactor that has an inlineable OnDone; this
// only applies to the DefaultReactor (which is unary).
this->MaybeDone(/*inlineable_ondone=*/false);
},
&finish_ops_, /*can_inline=*/true);
finish_ops_.set_core_cq_tag(&finish_tag_);
if (!ctx_->sent_initial_metadata_) {
@ -486,13 +536,17 @@ class CallbackServerStreamingHandler : public ::grpc::internal::MethodHandler {
void SendInitialMetadata() override {
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
this->Ref();
// The callback for this function should not be inlined because it invokes
// a user-controlled reaction, but any resulting OnDone can be inlined in
// the executor to which this callback is dispatched.
meta_tag_.Set(call_.call(),
[this](bool ok) {
reactor_.load(std::memory_order_relaxed)
->OnSendInitialMetadataDone(ok);
MaybeDone();
ServerWriteReactor<ResponseType>* reactor =
reactor_.load(std::memory_order_relaxed);
reactor->OnSendInitialMetadataDone(ok);
this->MaybeDone(/*inlineable_ondone=*/true);
},
&meta_ops_, false);
&meta_ops_, /*can_inline=*/false);
meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
if (ctx_->compression_level_set()) {
@ -547,31 +601,34 @@ class CallbackServerStreamingHandler : public ::grpc::internal::MethodHandler {
void SetupReactor(ServerWriteReactor<ResponseType>* reactor) {
reactor_.store(reactor, std::memory_order_relaxed);
write_tag_.Set(
call_.call(),
[this](bool ok) {
reactor_.load(std::memory_order_relaxed)->OnWriteDone(ok);
MaybeDone();
},
&write_ops_, false);
// The callback for this function should not be inlined because it invokes
// a user-controlled reaction, but any resulting OnDone can be inlined in
// the executor to which this callback is dispatched.
write_tag_.Set(call_.call(),
[this, reactor](bool ok) {
reactor->OnWriteDone(ok);
this->MaybeDone(/*inlineable_ondone=*/true);
},
&write_ops_, /*can_inline=*/false);
write_ops_.set_core_cq_tag(&write_tag_);
this->BindReactor(reactor);
this->MaybeCallOnCancel(reactor);
this->MaybeDone();
// Inlineable OnDone can be false here because there is no write
// reactor that has an inlineable OnDone; this only applies to the
// DefaultReactor (which is unary).
this->MaybeDone(/*inlineable_ondone=*/false);
}
~ServerCallbackWriterImpl() { req_->~RequestType(); }
const RequestType* request() { return req_; }
void MaybeDone() override {
if (GPR_UNLIKELY(this->Unref() == 1)) {
reactor_.load(std::memory_order_relaxed)->OnDone();
grpc_call* call = call_.call();
auto call_requester = std::move(call_requester_);
this->~ServerCallbackWriterImpl(); // explicitly call destructor
::grpc::g_core_codegen_interface->grpc_call_unref(call);
call_requester();
}
void CallOnDone() override {
reactor_.load(std::memory_order_relaxed)->OnDone();
grpc_call* call = call_.call();
auto call_requester = std::move(call_requester_);
this->~ServerCallbackWriterImpl(); // explicitly call destructor
::grpc::g_core_codegen_interface->grpc_call_unref(call);
call_requester();
}
ServerReactor* reactor() override {
@ -620,8 +677,13 @@ class CallbackBidiHandler : public ::grpc::internal::MethodHandler {
static_cast<::grpc_impl::CallbackServerContext*>(
param.server_context),
param.call, std::move(param.call_requester));
// Inlineable OnDone can be false in the CompletionOp callback because there
// is no bidi reactor that has an inlineable OnDone; this only applies to
// the DefaultReactor (which is unary).
param.server_context->BeginCompletionOp(
param.call, [stream](bool) { stream->MaybeDone(); }, stream);
param.call,
[stream](bool) { stream->MaybeDone(/*inlineable_ondone=*/false); },
stream);
ServerBidiReactor<RequestType, ResponseType>* reactor = nullptr;
if (param.status.ok()) {
@ -652,8 +714,17 @@ class CallbackBidiHandler : public ::grpc::internal::MethodHandler {
: public ServerCallbackReaderWriter<RequestType, ResponseType> {
public:
void Finish(::grpc::Status s) override {
finish_tag_.Set(call_.call(), [this](bool) { MaybeDone(); }, &finish_ops_,
false);
// A finish tag with only MaybeDone can have its callback inlined
// regardless even if OnDone is not inlineable because this callback just
// checks a ref and then decides whether or not to dispatch OnDone.
finish_tag_.Set(call_.call(),
[this](bool) {
// Inlineable OnDone can be false here because there is
// no bidi reactor that has an inlineable OnDone; this
// only applies to the DefaultReactor (which is unary).
this->MaybeDone(/*inlineable_ondone=*/false);
},
&finish_ops_, /*can_inline=*/true);
finish_ops_.set_core_cq_tag(&finish_tag_);
if (!ctx_->sent_initial_metadata_) {
@ -671,13 +742,17 @@ class CallbackBidiHandler : public ::grpc::internal::MethodHandler {
void SendInitialMetadata() override {
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
this->Ref();
// The callback for this function should not be inlined because it invokes
// a user-controlled reaction, but any resulting OnDone can be inlined in
// the executor to which this callback is dispatched.
meta_tag_.Set(call_.call(),
[this](bool ok) {
reactor_.load(std::memory_order_relaxed)
->OnSendInitialMetadataDone(ok);
MaybeDone();
ServerBidiReactor<RequestType, ResponseType>* reactor =
reactor_.load(std::memory_order_relaxed);
reactor->OnSendInitialMetadataDone(ok);
this->MaybeDone(/*inlineable_ondone=*/true);
},
&meta_ops_, false);
&meta_ops_, /*can_inline=*/false);
meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
if (ctx_->compression_level_set()) {
@ -733,35 +808,38 @@ class CallbackBidiHandler : public ::grpc::internal::MethodHandler {
void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) {
reactor_.store(reactor, std::memory_order_relaxed);
write_tag_.Set(
call_.call(),
[this](bool ok) {
reactor_.load(std::memory_order_relaxed)->OnWriteDone(ok);
MaybeDone();
},
&write_ops_, false);
// The callbacks for these functions should not be inlined because they
// invoke user-controlled reactions, but any resulting OnDones can be
// inlined in the executor to which a callback is dispatched.
write_tag_.Set(call_.call(),
[this, reactor](bool ok) {
reactor->OnWriteDone(ok);
this->MaybeDone(/*inlineable_ondone=*/true);
},
&write_ops_, /*can_inline=*/false);
write_ops_.set_core_cq_tag(&write_tag_);
read_tag_.Set(call_.call(),
[this](bool ok) {
reactor_.load(std::memory_order_relaxed)->OnReadDone(ok);
MaybeDone();
[this, reactor](bool ok) {
reactor->OnReadDone(ok);
this->MaybeDone(/*inlineable_ondone=*/true);
},
&read_ops_, false);
&read_ops_, /*can_inline=*/false);
read_ops_.set_core_cq_tag(&read_tag_);
this->BindReactor(reactor);
this->MaybeCallOnCancel(reactor);
this->MaybeDone();
}
void MaybeDone() override {
if (GPR_UNLIKELY(this->Unref() == 1)) {
reactor_.load(std::memory_order_relaxed)->OnDone();
grpc_call* call = call_.call();
auto call_requester = std::move(call_requester_);
this->~ServerCallbackReaderWriterImpl(); // explicitly call destructor
::grpc::g_core_codegen_interface->grpc_call_unref(call);
call_requester();
}
// Inlineable OnDone can be false here because there is no bidi
// reactor that has an inlineable OnDone; this only applies to the
// DefaultReactor (which is unary).
this->MaybeDone(/*inlineable_ondone=*/false);
}
void CallOnDone() override {
reactor_.load(std::memory_order_relaxed)->OnDone();
grpc_call* call = call_.call();
auto call_requester = std::move(call_requester_);
this->~ServerCallbackReaderWriterImpl(); // explicitly call destructor
::grpc::g_core_codegen_interface->grpc_call_unref(call);
call_requester();
}
ServerReactor* reactor() override {

@ -73,11 +73,33 @@ class ServerCallbackCall {
public:
virtual ~ServerCallbackCall() {}
// This object is responsible for tracking when it is safe to call
// OnCancel. This function should not be called until after the method handler
// is done and the RPC has completed with a cancellation. This is tracked by
// counting how many of these conditions have been met and calling OnCancel
// when none remain unmet.
// This object is responsible for tracking when it is safe to call OnDone and
// OnCancel. OnDone should not be called until the method handler is complete,
// Finish has been called, the ServerContext CompletionOp (which tracks
// cancellation or successful completion) has completed, and all outstanding
// Read/Write actions have seen their reactions. OnCancel should not be called
// until after the method handler is done and the RPC has completed with a
// cancellation. This is tracked by counting how many of these conditions have
// been met and calling OnCancel when none remain unmet.
// Public versions of MaybeDone: one where we don't know the reactor in
// advance (used for the ServerContext CompletionOp), and one for where we
// know the inlineability of the OnDone reaction. You should set the inline
// flag to true if either the Reactor is InternalInlineable() or if this
// callback is already being forced to run dispatched to an executor
// (typically because it contains additional work than just the MaybeDone).
void MaybeDone() {
if (GPR_UNLIKELY(Unref() == 1)) {
ScheduleOnDone(reactor()->InternalInlineable());
}
}
void MaybeDone(bool inline_ondone) {
if (GPR_UNLIKELY(Unref() == 1)) {
ScheduleOnDone(inline_ondone);
}
}
// Fast version called with known reactor passed in, used from derived
// classes, typically in non-cancel case
@ -101,14 +123,17 @@ class ServerCallbackCall {
/// Increases the reference count
void Ref() { callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); }
/// Decreases the reference count and returns the previous value
int Unref() {
return callbacks_outstanding_.fetch_sub(1, std::memory_order_acq_rel);
}
private:
virtual ServerReactor* reactor() = 0;
virtual void MaybeDone() = 0;
// CallOnDone performs the work required at completion of the RPC: invoking
// the OnDone function and doing all necessary cleanup. This function is only
// ever invoked on a fully-Unref'fed ServerCallbackCall.
virtual void CallOnDone() = 0;
// If the OnDone reaction is inlineable, execute it inline. Otherwise send it
// to an executor.
void ScheduleOnDone(bool inline_ondone);
// If the OnCancel reaction is inlineable, execute it inline. Otherwise send
// it to an executor.
@ -121,6 +146,11 @@ class ServerCallbackCall {
1, std::memory_order_acq_rel) == 1;
}
/// Decreases the reference count and returns the previous value
int Unref() {
return callbacks_outstanding_.fetch_sub(1, std::memory_order_acq_rel);
}
std::atomic_int on_cancel_conditions_remaining_{2};
std::atomic_int callbacks_outstanding_{
3}; // reserve for start, Finish, and CompletionOp

@ -474,7 +474,7 @@ class ServerContextBase {
::grpc::Status status() const { return status_; }
private:
void MaybeDone() override {}
void CallOnDone() override {}
::grpc_impl::internal::ServerReactor* reactor() override {
return reactor_;
}

@ -24,27 +24,59 @@
namespace grpc_impl {
namespace internal {
void ServerCallbackCall::ScheduleOnDone(bool inline_ondone) {
if (inline_ondone) {
CallOnDone();
} else {
// Unlike other uses of closure, do not Ref or Unref here since at this
// point, all the Ref'fing and Unref'fing is done for this call.
grpc_core::ExecCtx exec_ctx;
struct ClosureWithArg {
grpc_closure closure;
ServerCallbackCall* call;
explicit ClosureWithArg(ServerCallbackCall* call_arg) : call(call_arg) {
GRPC_CLOSURE_INIT(&closure,
[](void* void_arg, grpc_error*) {
ClosureWithArg* arg =
static_cast<ClosureWithArg*>(void_arg);
arg->call->CallOnDone();
delete arg;
},
this, grpc_schedule_on_exec_ctx);
}
};
ClosureWithArg* arg = new ClosureWithArg(this);
grpc_core::Executor::Run(&arg->closure, GRPC_ERROR_NONE);
}
}
void ServerCallbackCall::CallOnCancel(ServerReactor* reactor) {
if (reactor->InternalInlineable()) {
reactor->OnCancel();
} else {
// Ref to make sure that the closure executes before the whole call gets
// destructed, and Unref within the closure.
Ref();
grpc_core::ExecCtx exec_ctx;
struct ClosureArg {
struct ClosureWithArg {
grpc_closure closure;
ServerCallbackCall* call;
ServerReactor* reactor;
ClosureWithArg(ServerCallbackCall* call_arg, ServerReactor* reactor_arg)
: call(call_arg), reactor(reactor_arg) {
GRPC_CLOSURE_INIT(&closure,
[](void* void_arg, grpc_error*) {
ClosureWithArg* arg =
static_cast<ClosureWithArg*>(void_arg);
arg->reactor->OnCancel();
arg->call->MaybeDone();
delete arg;
},
this, grpc_schedule_on_exec_ctx);
}
};
ClosureArg* arg = new ClosureArg{this, reactor};
grpc_core::Executor::Run(GRPC_CLOSURE_CREATE(
[](void* void_arg, grpc_error*) {
ClosureArg* arg =
static_cast<ClosureArg*>(void_arg);
arg->reactor->OnCancel();
arg->call->MaybeDone();
delete arg;
},
arg, nullptr),
GRPC_ERROR_NONE);
ClosureWithArg* arg = new ClosureWithArg(this, reactor);
grpc_core::Executor::Run(&arg->closure, GRPC_ERROR_NONE);
}
}

@ -839,7 +839,15 @@ CallbackTestServiceImpl::BidiStream(
}
setup_done_ = true;
}
void OnDone() override { delete this; }
void OnDone() override {
{
// Use the same lock as finish to make sure that OnDone isn't inlined.
std::lock_guard<std::mutex> l(finish_mu_);
EXPECT_TRUE(finished_);
finish_thread_.join();
}
delete this;
}
void OnCancel() override {
EXPECT_TRUE(setup_done_);
EXPECT_TRUE(ctx_->IsCancelled());
@ -878,8 +886,12 @@ CallbackTestServiceImpl::BidiStream(
void FinishOnce(const Status& s) {
std::lock_guard<std::mutex> l(finish_mu_);
if (!finished_) {
Finish(s);
finished_ = true;
// Finish asynchronously to make sure that there are no deadlocks.
finish_thread_ = std::thread([this, s] {
std::lock_guard<std::mutex> l(finish_mu_);
Finish(s);
});
}
}
@ -892,6 +904,7 @@ CallbackTestServiceImpl::BidiStream(
std::mutex finish_mu_;
bool finished_{false};
bool setup_done_{false};
std::thread finish_thread_;
};
return new Reactor(context);

Loading…
Cancel
Save