Resolve race between OnReadDone(ok=false) and IsCancelled (#26245)

* Resolve and test race between OnReadDone(ok=false) and IsCancelled

* Fix retry case

* Fix health check case

* Address reviewer comments.

* ADD TODO requested by markdroth
pull/26304/head
Vijay Pai 4 years ago committed by GitHub
parent 113cd9546d
commit 0f80378a21
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      include/grpcpp/impl/codegen/server_callback_handlers.h
  2. 5
      include/grpcpp/impl/codegen/server_context.h
  3. 2
      src/core/ext/filters/client_channel/health/health_check_client.cc
  4. 6
      src/core/ext/filters/client_channel/health/health_check_client.h
  5. 1
      src/core/ext/filters/client_channel/retry_filter.cc
  6. 6
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  7. 1
      src/core/ext/transport/chttp2/transport/internal.h
  8. 5
      src/core/ext/transport/inproc/inproc_transport.cc
  9. 7
      src/core/lib/surface/call.cc
  10. 6
      src/core/lib/surface/call.h
  11. 2
      src/core/lib/transport/transport.h
  12. 10
      src/cpp/server/server_context.cc
  13. 3
      test/cpp/end2end/end2end_test.cc
  14. 5
      test/cpp/end2end/test_service_impl.cc
  15. 1
      test/cpp/end2end/test_service_impl.h
  16. 1
      test/cpp/microbenchmarks/bm_chttp2_transport.cc

@ -384,6 +384,9 @@ class CallbackClientStreamingHandler : public ::grpc::internal::MethodHandler {
read_tag_.Set(
call_.call(),
[this, reactor](bool ok) {
if (GPR_UNLIKELY(!ok)) {
ctx_->MaybeMarkCancelledOnRead();
}
reactor->OnReadDone(ok);
this->MaybeDone(/*inlineable_ondone=*/true);
},
@ -831,6 +834,9 @@ class CallbackBidiHandler : public ::grpc::internal::MethodHandler {
read_tag_.Set(
call_.call(),
[this, reactor](bool ok) {
if (GPR_UNLIKELY(!ok)) {
ctx_->MaybeMarkCancelledOnRead();
}
reactor->OnReadDone(ok);
this->MaybeDone(/*inlineable_ondone=*/true);
},

@ -434,6 +434,8 @@ class ServerContextBase {
message_allocator_state_ = allocator_state;
}
void MaybeMarkCancelledOnRead();
struct CallWrapper {
~CallWrapper();
@ -524,6 +526,9 @@ class ServerContextBase {
typename std::aligned_storage<sizeof(Reactor), alignof(Reactor)>::type
default_reactor_;
std::atomic_bool default_reactor_used_{false};
std::atomic_bool marked_cancelled_{false};
std::unique_ptr<TestServerCallbackUnary> test_unary_;
};

@ -352,6 +352,7 @@ void HealthCheckClient::CallState::StartCall() {
batch_.recv_initial_metadata = true;
// Add recv_message op.
payload_.recv_message.recv_message = &recv_message_;
payload_.recv_message.call_failed_before_recv_message = nullptr;
// recv_message callback takes ref, handled manually.
call_->Ref(DEBUG_LOCATION, "recv_message_ready").release();
payload_.recv_message.recv_message_ready = GRPC_CLOSURE_INIT(
@ -478,6 +479,7 @@ void HealthCheckClient::CallState::DoneReadingRecvMessage(
// callbacks from the original batch have completed yet.
recv_message_batch_.payload = &payload_;
payload_.recv_message.recv_message = &recv_message_;
payload_.recv_message.call_failed_before_recv_message = nullptr;
payload_.recv_message.recv_message_ready = GRPC_CLOSURE_INIT(
&recv_message_ready_, RecvMessageReady, this, grpc_schedule_on_exec_ctx);
recv_message_batch_.recv_message = true;

@ -128,14 +128,14 @@ class HealthCheckClient : public InternallyRefCounted<HealthCheckClient> {
grpc_slice_buffer recv_message_buffer_;
Atomic<bool> seen_response_{false};
// True if the cancel_stream batch has been started.
Atomic<bool> cancelled_{false};
// recv_trailing_metadata
grpc_metadata_batch recv_trailing_metadata_;
grpc_transport_stream_stats collect_stats_;
grpc_closure recv_trailing_metadata_ready_;
// True if the cancel_stream batch has been started.
Atomic<bool> cancelled_{false};
// Closure for call stack destruction.
grpc_closure after_call_stack_destruction_;
};

@ -1650,6 +1650,7 @@ void RetryFilter::CallData::CallAttempt::BatchData::
++call_attempt_->started_recv_message_count_;
batch_.recv_message = true;
batch_.payload->recv_message.recv_message = &call_attempt_->recv_message_;
batch_.payload->recv_message.call_failed_before_recv_message = nullptr;
GRPC_CLOSURE_INIT(&call_attempt_->recv_message_ready_, RecvMessageReady, this,
grpc_schedule_on_exec_ctx);
batch_.payload->recv_message.recv_message_ready =

@ -1582,6 +1582,8 @@ static void perform_stream_op_locked(void* stream_op,
GPR_ASSERT(!s->pending_byte_stream);
s->recv_message_ready = op_payload->recv_message.recv_message_ready;
s->recv_message = op_payload->recv_message.recv_message;
s->call_failed_before_recv_message =
op_payload->recv_message.call_failed_before_recv_message;
if (s->id != 0) {
if (!s->read_closed) {
before = s->frame_storage.length +
@ -1947,6 +1949,10 @@ void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* /*t*/,
null_then_sched_closure(&s->recv_message_ready);
} else if (s->published_metadata[1] != GRPC_METADATA_NOT_PUBLISHED) {
*s->recv_message = nullptr;
if (s->call_failed_before_recv_message != nullptr) {
*s->call_failed_before_recv_message =
(s->published_metadata[1] != GRPC_METADATA_PUBLISHED_AT_CLOSE);
}
null_then_sched_closure(&s->recv_message_ready);
}
GRPC_ERROR_UNREF(error);

@ -556,6 +556,7 @@ struct grpc_chttp2_stream {
grpc_closure* recv_initial_metadata_ready = nullptr;
bool* trailing_metadata_available = nullptr;
grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message;
bool* call_failed_before_recv_message = nullptr;
grpc_closure* recv_message_ready = nullptr;
grpc_metadata_batch* recv_trailing_metadata;
grpc_closure* recv_trailing_metadata_finished = nullptr;

@ -487,6 +487,11 @@ void fail_helper_locked(inproc_stream* s, grpc_error_handle error) {
if (s->recv_message_op) {
INPROC_LOG(GPR_INFO, "fail_helper %p scheduling message-ready %s", s,
grpc_error_std_string(error).c_str());
if (s->recv_message_op->payload->recv_message
.call_failed_before_recv_message != nullptr) {
*s->recv_message_op->payload->recv_message
.call_failed_before_recv_message = true;
}
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
s->recv_message_op->payload->recv_message.recv_message_ready,

@ -223,6 +223,7 @@ struct grpc_call {
grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sending_stream;
grpc_core::OrphanablePtr<grpc_core::ByteStream> receiving_stream;
bool call_failed_before_recv_message = false;
grpc_byte_buffer** receiving_buffer = nullptr;
grpc_slice receiving_slice = grpc_empty_slice();
grpc_closure receiving_slice_ready;
@ -1845,6 +1846,8 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
stream_op->recv_message = true;
call->receiving_buffer = op->data.recv_message.recv_message;
stream_op_payload->recv_message.recv_message = &call->receiving_stream;
stream_op_payload->recv_message.call_failed_before_recv_message =
&call->call_failed_before_recv_message;
GRPC_CLOSURE_INIT(&call->receiving_stream_ready,
receiving_stream_ready_in_call_combiner, bctl,
grpc_schedule_on_exec_ctx);
@ -2014,6 +2017,10 @@ grpc_compression_algorithm grpc_call_compression_for_level(
return algo;
}
bool grpc_call_failed_before_recv_message(grpc_call* c) {
return c->call_failed_before_recv_message;
}
const char* grpc_call_error_to_string(grpc_call_error error) {
switch (error) {
case GRPC_CALL_ERROR:

@ -120,6 +120,12 @@ size_t grpc_call_get_initial_size_estimate();
grpc_compression_algorithm grpc_call_compression_for_level(
grpc_call* call, grpc_compression_level level);
/* Returns whether or not the call's receive message operation failed because of
* an error (as opposed to a graceful end-of-stream) */
/* TODO(markdroth): This is currently available only to the C++ API.
Move to surface API if requested by other languages. */
bool grpc_call_failed_before_recv_message(grpc_call* c);
extern grpc_core::TraceFlag grpc_call_error_trace;
extern grpc_core::TraceFlag grpc_compression_trace;

@ -295,6 +295,8 @@ struct grpc_transport_stream_op_batch_payload {
// containing a received message.
// Will be NULL if trailing metadata is received instead of a message.
grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message = nullptr;
// Was this recv_message failed for reasons other than a clean end-of-stream
bool* call_failed_before_recv_message = nullptr;
/** Should be enqueued when one message is ready to be processed. */
grpc_closure* recv_message_ready = nullptr;
} recv_message;

@ -19,6 +19,7 @@
#include <grpcpp/impl/codegen/server_context.h>
#include <algorithm>
#include <atomic>
#include <utility>
#include <grpc/compression.h>
@ -324,10 +325,17 @@ void ServerContextBase::TryCancel() const {
}
}
void ServerContextBase::MaybeMarkCancelledOnRead() {
if (grpc_call_failed_before_recv_message(call_.call)) {
marked_cancelled_.store(true, std::memory_order_release);
}
}
bool ServerContextBase::IsCancelled() const {
if (completion_tag_) {
// When using callback API, this result is always valid.
return completion_op_->CheckCancelledAsync();
return marked_cancelled_.load(std::memory_order_acquire) ||
completion_op_->CheckCancelledAsync();
} else if (has_notify_when_done_tag_) {
// When using async API, the result is only valid
// if the tag has already been delivered at the completion queue

@ -1268,6 +1268,9 @@ TEST_P(End2endTest, ClientCancelsBidi) {
ClientContext context;
std::string msg("hello");
// Send server_try_cancel value in the client metadata
context.AddMetadata(kClientTryCancelRequest, std::to_string(1));
auto stream = stub_->BidiStream(&context);
request.set_message(msg + "0");

@ -551,6 +551,8 @@ CallbackTestServiceImpl::BidiStream(
kServerTryCancelRequest, ctx->client_metadata(), DO_NOT_CANCEL);
server_write_last_ = internal::GetIntValueFromMetadata(
kServerFinishAfterNReads, ctx->client_metadata(), 0);
client_try_cancel_ = static_cast<bool>(internal::GetIntValueFromMetadata(
kClientTryCancelRequest, ctx->client_metadata(), 0));
if (server_try_cancel_ == CANCEL_BEFORE_PROCESSING) {
internal::ServerTryCancelNonblocking(ctx);
} else {
@ -589,6 +591,8 @@ CallbackTestServiceImpl::BidiStream(
return;
}
}
} else if (client_try_cancel_) {
EXPECT_TRUE(ctx_->IsCancelled());
}
if (server_try_cancel_ == CANCEL_DURING_PROCESSING) {
@ -629,6 +633,7 @@ CallbackTestServiceImpl::BidiStream(
bool finished_{false};
bool setup_done_{false};
std::thread finish_thread_;
bool client_try_cancel_ = false;
};
return new Reactor(context);

@ -42,6 +42,7 @@ namespace testing {
const int kServerDefaultResponseStreamsToSend = 3;
const char* const kServerResponseStreamsToSend = "server_responses_to_send";
const char* const kServerTryCancelRequest = "server_try_cancel";
const char* const kClientTryCancelRequest = "client_try_cancel";
const char* const kDebugInfoTrailerKey = "debug-info-bin";
const char* const kServerFinishAfterNReads = "server_finish_after_n_reads";
const char* const kServerUseCoalescingApi = "server_use_coalescing_api";

@ -606,6 +606,7 @@ static void BM_TransportStreamRecv(benchmark::State& state) {
op.on_complete = do_nothing.get();
op.recv_message = true;
op.payload->recv_message.recv_message = &recv_stream;
op.payload->recv_message.call_failed_before_recv_message = nullptr;
op.payload->recv_message.recv_message_ready = drain_start.get();
s->Op(&op);
f.PushInput(grpc_slice_ref(incoming_data));

Loading…
Cancel
Save